RecordMap.java
package org.microspace.replicator.record;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.microspace.replicator.FileReplicator;
import org.microspace.space.AccessorGenerator;
import org.microspace.space.SimpleSpace;
import org.microspace.space.SpaceConfig;
import org.microspace.table.Entry;
import org.microspace.table.column.Accessor;
import org.microspace.table.query.MatchAllQuery;
import org.microspace.thread.ContextId;
import org.microspace.util.AccessorCache;
import org.microspace.util.MicroLogger;
/**
* This Record is used when data is replicated. It is used
* by {@link FileReplicator FileReplicator} to re-read changed classes.
*
* @author Gaspar Sinai - {@literal gaspar.sinai@microspace.org}
* @version 2016-06-26
*/
public class RecordMap {
private static final MicroLogger log = new MicroLogger(RecordMap.class);
final SimpleSpace simpleSpace;
final AccessorCache accessorCache;
final HashMap<String, HeaderRecord> headers = new HashMap<String, HeaderRecord>();
final AccessorGenerator generator;
public RecordMap (AccessorGenerator generator){
this.generator = generator;
this.accessorCache = new AccessorCache(generator);
SpaceConfig spaceConfig = new SpaceConfig();
spaceConfig.setAccessorGenerator(generator);
spaceConfig.setSerialized(false);
this.simpleSpace = new SimpleSpace(spaceConfig);
}
public Accessor<?> getAccessor (String clazz) {
return accessorCache.get(clazz);
}
public Map<String, Accessor<?>> getAccessors () {
return accessorCache.list();
}
/**
* The record stream starts with a HEADER, followed by blocks of
* BEGIN ADD REMOVE END
* We merged all updates into one so there will be only one block of BEGIN ADD END.
* @return list of merged add records.
*/
public List<Record> getMergedRecords () {
ContextId contextId = new ContextId();
List<Record> ret = new LinkedList<Record> ();
List<Class<?>> classes = simpleSpace.getTableClasses();
for (Class<?> clazz : classes) {
if (simpleSpace.size(clazz) == 0) continue;
Accessor<?> newAccessor = accessorCache.get(clazz.getName());
HeaderRecord header = new HeaderRecord (newAccessor);
ret.add(new Record (header));
}
if (ret.size() == 0) return ret;
ret.add(new Record(contextId, Record.Type.BEGIN));
for (Class<?> clazz : classes) {
if (simpleSpace.size(clazz) == 0) continue;
@SuppressWarnings("unchecked")
Accessor<Object> newAccessor = (Accessor<Object>) (accessorCache.get(clazz.getName()));
@SuppressWarnings({ "unchecked", "rawtypes" })
MatchAllQuery<Object> query = new MatchAllQuery(clazz);
List<Object> recs = simpleSpace.readMultiple(query);
for (Object o : recs) {
AddRecord addRecord = new AddRecord(newAccessor, o);
ret.add(new Record(contextId, addRecord));
}
}
ret.add(new Record(contextId, Record.Type.END));
return ret;
}
Map<ContextId, LinkedList<Record>> pendingRecords = new HashMap<>();
public void add (Record record) {
LinkedList<Record> records = pendingRecords.get(record.getContextId());
if (records == null) {
records = new LinkedList<Record>();
pendingRecords.put(record.getContextId(), records);
}
if (record.getType() == Record.Type.HEADER) {
addOneThread(record);
return;
}
records.add(record);
if (record.getType() == Record.Type.END) {
simpleSpace.rollback();
for (Record rec : records) {
addOneThread(rec);
}
pendingRecords.remove(record.getContextId());
}
}
public void addOneThread (Record record) {
switch (record.getType()) {
case HEADER:
if (accessorCache.get(record.getHeaderRecord().getClassName()) == null) {
log.error("HEADER class not found $*", null, record.getHeaderRecord().getClassName());
} else {
headers.put(record.getHeaderRecord().getClassName(), record.getHeaderRecord());
}
break;
case BEGIN:
simpleSpace.rollback();
break;
case ADD:
{
AddRecord add = record.getAddRecord();
/* convert the fields. */
Accessor<?> newAccessor = accessorCache.get(add.getClassName());
if (newAccessor == null) {
log.error("ADD class not found $*", null, add.getClassName());
break;
}
HeaderRecord oldHeader = headers.get(add.getClassName());
if (oldHeader == null) {
log.error("ADD header not found for $*", null, add.getClassName());
break;
}
AddRecord newAdd = oldHeader.convert(add, newAccessor);
Entry<?> entry = newAdd.convert(newAccessor);
simpleSpace.write(entry.getSpaceEntry());
}
break;
case REMOVE:
Accessor<?> newAccessor = accessorCache.get(record.getRemoveRecord().getClassName());
if (newAccessor != null) {
Object o = simpleSpace.takeById(newAccessor.getTargetClass(), record.getRemoveRecord().getKey());
if (o == null) {
log.error("Object not found: $* key=$*", null, newAccessor.getTargetClass(), record.getRemoveRecord().getKey());
}
}
break;
case END:
simpleSpace.commit();
break;
default:
break;
}
}
}