FileReplicator.java
package org.microspace.replicator;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.regex.Matcher;
import org.microspace.exception.InitializationException;
import org.microspace.replicator.record.AddRecord;
import org.microspace.replicator.record.DataModelRecord;
import org.microspace.replicator.record.HeaderRecord;
import org.microspace.replicator.record.Record;
import org.microspace.replicator.record.RecordMap;
import org.microspace.space.BackedSpace;
import org.microspace.space.SimpleSpace;
import org.microspace.space.SpaceConfig;
import org.microspace.table.Entry;
import org.microspace.table.column.Accessor;
import org.microspace.util.AccessorCache;
import org.microspace.util.MicroLogger;
import org.microspace.util.PojoUtil;
/**
* Save updates into file, and use that file for recovery.
*
* Unlike ClusterReplicator, this replicator does handle class conversions.
*
* @author gsinai
*
*/
public class FileReplicator {
final SpaceConfig config;
final File recoverData;
final File liveData;
ObjectOutputStream output;
HashMap<String, Accessor<Object>> accessors = new HashMap<String, Accessor<Object>>();
final AccessorCache accessorCache;
HashMap<String, Boolean> skipFromBackup = new HashMap<String, Boolean>();
LinkedList<Record> pendingRecords = new LinkedList<Record> ();
private final MicroLogger log = new MicroLogger (FileReplicator.class);
public FileReplicator (SpaceConfig config) {
this.config = PojoUtil.copy(config);
this.accessorCache = new AccessorCache(config);
this.recoverData = new File (config.getBackupFileName() + ".recover");
this.liveData = new File (config.getBackupFileName() + ".live");
}
public void noRecover () {
log.info ("FileReplicator no recover.");
}
public boolean recoverData (SimpleSpace space) {
if (!liveData.canRead()) {
log.info ("FileReplicator no recover - cant read $*", liveData);
return false;
}
log.info ("FileReplicator recover $*", liveData);
ObjectInputStream in = null;
RecordMap map = new RecordMap (config.getAccessorGenerator());
try {
in = new ObjectInputStream (new FileInputStream (liveData));
while (true) {
Record replica = (Record) in.readObject();
// System.out.println ("Added " + replica.getType());
map.add (replica);
}
} catch (EOFException eof) {
} catch (FileNotFoundException fileNotFound) {
log.warn("Recovery file not found", fileNotFound);
} catch (ClassNotFoundException classNotFound){
log.warn("Recovery class not found", classNotFound);
} catch (IOException ioex) {
log.warn("Recovery io error not found", ioex);
//throw new InitializationException(ioex);
} finally {
try {
if (in!=null) in.close();
} catch (IOException ex) {
}
}
// Write Data To Map
space.rollback();
List<Record> records = map.getMergedRecords();
log.trace ("Got $* records from $*", records.size(), liveData);
for (Record record : records) {
if (record.getType() == Record.Type.ADD) {
AddRecord addRecord = record.getAddRecord();
if (isOkToSkip (addRecord.getClassName())) continue;
Entry<?> entry = addRecord.convert(accessorCache.get(addRecord
.getClassName()));
//System.err.println ("Recover " + entry.getPrimaryKey());
space.write(entry.getSpaceEntry());
}
}
log.info ("Committing $* entries recovered from $*", records.size(), liveData);
space.commit();
return true;
}
private boolean isOkToSkip (String className) {
Boolean ret = skipFromBackup.get(className);
if (ret != null) return ret;
Matcher matcher = config.getSkipFileBackup().matcher(className);
Boolean b = Boolean.valueOf(matcher.matches());
skipFromBackup.put (className, b);
return b;
}
boolean dataModelDumpExpected = false;
/**
* Write out current data from space to file, and resume operation.
* The space in return will call our write method.
* @param space is the space to write to.
*/
public void startReplication (BackedSpace space) {
dataModelDumpExpected = true;
openOut (recoverData);
space.sendDataModelRecord();
sync();
// Windows, non atomic renbame version.
File recoverDataCopy = recoverData.getAbsoluteFile();
if (!recoverDataCopy.renameTo(liveData)) {
log.warn("WINDOWS user: don't kill the process now", null);
try {
output.close();
System.out.println(recoverDataCopy.renameTo(liveData));
openOut(recoverData);
} catch (IOException ex) {
log.error("Can not rename file $* -> $*", ex, recoverData.toPath(), liveData.toPath());
throw new InitializationException("Can not move recovery file.");
}
openOutAppend (liveData);
log.warn("WINDOWS user you can kill the process now", null);
}
dataModelDumpExpected = false;
}
private void openOutAppend (File file) {
try {
if(output!=null){
log.error("Output should have been null", null);
}
output = new AppendingObjectOutputStream (new FileOutputStream (file, true));
} catch (FileNotFoundException fnf) {
throw new InitializationException ("File live data error", fnf);
} catch (IOException io) {
throw new InitializationException ("File live data io error", io);
}
}
private void openOut (File file) {
try {
if(output!=null){
log.error("Output should have been null", null);
}
output = new ObjectOutputStream (new FileOutputStream (file, false));
} catch (FileNotFoundException fnf) {
throw new InitializationException ("File live data error", fnf);
} catch (IOException io) {
throw new InitializationException ("File live data io error", io);
}
}
public void stopReplication () {
if (output == null) return;
try {
output.close();
output = null;
} catch (IOException io) {
io.printStackTrace();
}
}
/**
* Write the following types of records:
* BEGIN, ADD, REMOVE, END. The HEADER record is automatically generated.
* @param record is the record to write
*/
public void write (Record record) {
if (output == null) return;
String className = null;
switch (record.getType()) {
case BEGIN:
pendingRecords.add (record);
return;
case ADD:
className = record.getAddRecord().getClassName();
if (isOkToSkip(className)) return;
break;
case REMOVE:
className = record.getRemoveRecord().getClassName();
if (isOkToSkip(className)) return;
break;
case END:
pendingRecords.add (record);
writePendingRecords();
return;
case HEADER:
// FIXME:
pendingRecords.add (record);
writePendingRecords();
return;
case DATAMODEL:
// Same thread as startReplication
if (dataModelDumpExpected) {
int count=0;
log.info("Start new DataModel");
DataModelRecord drec = record.getDataModelRecord();
Record begin = new Record(record.getContextId(), Record.Type.BEGIN);
write (begin);
for (AddRecord add : drec.getRecords()){
Record rec = new Record(record.getContextId(), add);
write (rec);
count++;
}
Record end = new Record(record.getContextId(), Record.Type.END);
write(end);
log.info("Wrote DataModelRecord size=$*", count);
dataModelDumpExpected = false;
}
return;
}
if (className != null && accessors.get(className) == null) {
@SuppressWarnings("rawtypes")
Class clazz =null;
try {
clazz = Class.forName(className);
} catch (ClassNotFoundException e) {
log.error("Class not found: $*", e, className);
return;
}
@SuppressWarnings("unchecked")
Accessor<Object> accessor = config.getAccessGenerator().newAccessor(clazz);
accessors.put(className, accessor);
HeaderRecord header = new HeaderRecord (accessor);
pendingRecords.add(new Record (header));
// System.out.println ("Wrote header for " + className);
}
pendingRecords.add(record);
}
private void writePendingRecords () {
// Only begin and end.
if (pendingRecords.size() == 2) {
pendingRecords.clear();
return;
}
if (output == null) return;
try {
// Headers first.
for (Record record : pendingRecords) {
if (record.getType() != Record.Type.HEADER) continue;
output.writeObject(record);
}
// Everything else.
for (Record record : pendingRecords) {
if (record.getType() == Record.Type.HEADER) continue;
output.writeObject(record);
}
pendingRecords.clear();
} catch (IOException ex) {
ex.printStackTrace();
}
}
public void sync () {
log.trace("sync");
if (output == null) return;
try {
output.flush();
} catch (IOException ioe) {
ioe.printStackTrace();
}
}
}