RecordMap.java

package org.microspace.replicator.record;

import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

import org.microspace.replicator.FileReplicator;
import org.microspace.space.AccessorGenerator;
import org.microspace.space.SimpleSpace;
import org.microspace.space.SpaceConfig;
import org.microspace.table.Entry;
import org.microspace.table.column.Accessor;
import org.microspace.table.query.MatchAllQuery;
import org.microspace.thread.ContextId;
import org.microspace.util.AccessorCache;
import org.microspace.util.MicroLogger;


/**
 * This Record is used when data is replicated. It is used
 * by {@link FileReplicator FileReplicator} to re-read changed classes.
 * 
 * @author Gaspar Sinai - {@literal gaspar.sinai@microspace.org}
 * @version 2016-06-26
 */
public class RecordMap {
	
	private static final MicroLogger log = new MicroLogger(RecordMap.class);
	
	final SimpleSpace simpleSpace;
	final AccessorCache accessorCache;
	
	final HashMap<String, HeaderRecord> headers = new HashMap<String, HeaderRecord>();
	
	final AccessorGenerator generator;

	public RecordMap (AccessorGenerator generator){
		this.generator = generator;
		this.accessorCache = new AccessorCache(generator);
		SpaceConfig spaceConfig = new SpaceConfig();
		spaceConfig.setAccessorGenerator(generator);
		spaceConfig.setSerialized(false);
		this.simpleSpace = new SimpleSpace(spaceConfig);
		
	}
	
	public Accessor<?> getAccessor (String clazz) {
		return accessorCache.get(clazz);
	}
	
	public Map<String, Accessor<?>> getAccessors () {
		return accessorCache.list();
	}
	/**
	 * The record stream starts with a HEADER, followed by blocks of
	 * BEGIN ADD REMOVE END
	 * We merged all updates into one so there will be only one block of BEGIN ADD END.
	 * @return list of merged add records.
	 */
	public List<Record> getMergedRecords () {
		ContextId contextId = new ContextId();
		
		List<Record> ret = new LinkedList<Record> ();
		List<Class<?>> classes = simpleSpace.getTableClasses();
		
		for (Class<?> clazz : classes) {
			if (simpleSpace.size(clazz) == 0) continue;
			Accessor<?> newAccessor = accessorCache.get(clazz.getName());
			HeaderRecord header = new HeaderRecord (newAccessor);
			ret.add(new Record (header));
		}
		if (ret.size() == 0) return ret;
		ret.add(new Record(contextId, Record.Type.BEGIN));
		for (Class<?> clazz : classes) {
			if (simpleSpace.size(clazz) == 0) continue;
			@SuppressWarnings("unchecked")
			Accessor<Object> newAccessor = (Accessor<Object>) (accessorCache.get(clazz.getName()));
			@SuppressWarnings({ "unchecked", "rawtypes" })
			MatchAllQuery<Object> query = new  MatchAllQuery(clazz);
			List<Object> recs = simpleSpace.readMultiple(query);
			for (Object o : recs) {
				AddRecord addRecord = new AddRecord(newAccessor, o);
				ret.add(new Record(contextId, addRecord));
			}
		}
		ret.add(new Record(contextId, Record.Type.END));
		return ret;
	}
	
	Map<ContextId, LinkedList<Record>> pendingRecords = new HashMap<>(); 
	
	public void add (Record record) {
		LinkedList<Record> records = pendingRecords.get(record.getContextId());
		if (records == null) {
			records = new LinkedList<Record>();
			pendingRecords.put(record.getContextId(), records);
		}
		if (record.getType() == Record.Type.HEADER) {
			addOneThread(record);
			return;
		}
		records.add(record);
		if (record.getType() == Record.Type.END) {
			simpleSpace.rollback();
			for (Record rec : records) {
				addOneThread(rec);
			}
			pendingRecords.remove(record.getContextId());
		}
	}
	
	public void addOneThread (Record record) {
		switch (record.getType()) {
		case HEADER:
			if (accessorCache.get(record.getHeaderRecord().getClassName()) == null) {
				log.error("HEADER class not found $*", null, record.getHeaderRecord().getClassName());
			} else {
				headers.put(record.getHeaderRecord().getClassName(), record.getHeaderRecord());
			}
			break;
		case BEGIN:
			simpleSpace.rollback();
			break;
		case ADD:
		{
			AddRecord add = record.getAddRecord();
			/* convert the fields. */
			Accessor<?> newAccessor = accessorCache.get(add.getClassName());
			if (newAccessor == null) {
				log.error("ADD class not found $*", null, add.getClassName());
				break;
			}
			HeaderRecord oldHeader = headers.get(add.getClassName());
			if (oldHeader == null) {
				log.error("ADD header not found for $*", null, add.getClassName());
				break;
			}
			AddRecord newAdd = oldHeader.convert(add, newAccessor);
			Entry<?> entry = newAdd.convert(newAccessor);
			simpleSpace.write(entry.getSpaceEntry());
		}
			
			break;
		case REMOVE:
			Accessor<?> newAccessor = accessorCache.get(record.getRemoveRecord().getClassName());
			if (newAccessor != null) {
				Object o = simpleSpace.takeById(newAccessor.getTargetClass(), record.getRemoveRecord().getKey());
				if (o == null) {
					log.error("Object not found: $* key=$*", null, newAccessor.getTargetClass(), record.getRemoveRecord().getKey());
				}
			}
			break;
		case END:
			simpleSpace.commit();
			break;
		default:
			break;
		}
	}
}