FileReplicator.java

package org.microspace.replicator;

import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.regex.Matcher;

import org.microspace.exception.InitializationException;
import org.microspace.replicator.record.AddRecord;
import org.microspace.replicator.record.DataModelRecord;
import org.microspace.replicator.record.HeaderRecord;
import org.microspace.replicator.record.Record;
import org.microspace.replicator.record.RecordMap;
import org.microspace.space.BackedSpace;
import org.microspace.space.SimpleSpace;
import org.microspace.space.SpaceConfig;
import org.microspace.table.Entry;
import org.microspace.table.column.Accessor;
import org.microspace.util.AccessorCache;
import org.microspace.util.MicroLogger;
import org.microspace.util.PojoUtil;

/**
 * Save updates into file, and use that file for recovery.
 * 
 * Unlike ClusterReplicator, this replicator does handle class conversions.
 * 
 * @author gsinai
 *
 */
public class FileReplicator {
	final SpaceConfig config;
	final File recoverData;
	final File liveData;
	ObjectOutputStream output;
	HashMap<String, Accessor<Object>> accessors = new HashMap<String, Accessor<Object>>();
	
	
	final AccessorCache accessorCache;
	
	
	HashMap<String, Boolean> skipFromBackup = new HashMap<String, Boolean>();
	LinkedList<Record> pendingRecords = new LinkedList<Record> ();
	
	private final MicroLogger log = new MicroLogger (FileReplicator.class);
	
	public FileReplicator (SpaceConfig config) {
		this.config = PojoUtil.copy(config);
		this.accessorCache = new AccessorCache(config);
		this.recoverData = new File (config.getBackupFileName() + ".recover");
		this.liveData = new File (config.getBackupFileName() + ".live");
	}
	
	public void noRecover () {
		log.info ("FileReplicator no recover.");
	}
	
	public boolean recoverData (SimpleSpace space) {
		if (!liveData.canRead()) {
			log.info ("FileReplicator no recover - cant read $*", liveData);
			return false;
		}
		log.info ("FileReplicator recover $*", liveData);
		ObjectInputStream in = null;
		RecordMap map = new RecordMap (config.getAccessorGenerator());
		try {
			in = new ObjectInputStream (new FileInputStream (liveData));
			while (true) {
				Record replica = (Record) in.readObject();
//				System.out.println ("Added " + replica.getType());
				map.add (replica);
			}
		} catch (EOFException eof) {
		} catch (FileNotFoundException fileNotFound) {
			log.warn("Recovery file not found", fileNotFound);
		} catch (ClassNotFoundException classNotFound){
			log.warn("Recovery class not found", classNotFound);
		} catch (IOException ioex) {
			log.warn("Recovery io error not found", ioex);
			//throw new InitializationException(ioex);
		} finally {
			try {
				if (in!=null) in.close();
			} catch (IOException ex) {
			}
		}
		// Write Data To Map
		space.rollback();
		List<Record> records = map.getMergedRecords();
		log.trace ("Got $* records from $*", records.size(), liveData);
		for (Record record : records) {
			if (record.getType() == Record.Type.ADD) {
				AddRecord addRecord = record.getAddRecord();
                if (isOkToSkip (addRecord.getClassName())) continue;
                Entry<?> entry = addRecord.convert(accessorCache.get(addRecord
						.getClassName()));
//System.err.println ("Recover " + entry.getPrimaryKey());
				space.write(entry.getSpaceEntry());

			}
		}
		log.info ("Committing $* entries recovered from $*", records.size(), liveData);
		space.commit();
		return true;
	}
	
	private boolean isOkToSkip (String className) {
		Boolean ret = skipFromBackup.get(className);
		if (ret != null) return ret;
		Matcher matcher = config.getSkipFileBackup().matcher(className);
		Boolean b = Boolean.valueOf(matcher.matches());
		skipFromBackup.put (className, b);
		return b;
	}
	
	boolean dataModelDumpExpected = false;
	
	/**
	 * Write out current data from space to file, and resume operation.
	 * The space in return will call our write method.
	 * @param space is the space to write to.
	 */
	public void startReplication (BackedSpace space) {
		dataModelDumpExpected = true;
		openOut (recoverData);
		space.sendDataModelRecord();
		sync();
		// Windows, non atomic renbame version.
		File recoverDataCopy = recoverData.getAbsoluteFile();
		if (!recoverDataCopy.renameTo(liveData)) {
			log.warn("WINDOWS user: don't kill the process now", null);
			try {
				output.close();
				System.out.println(recoverDataCopy.renameTo(liveData));
			    openOut(recoverData);
			} catch (IOException ex) {
				log.error("Can not rename file $* -> $*", ex, recoverData.toPath(), liveData.toPath());
				throw new InitializationException("Can not move recovery file.");
			}
			openOutAppend (liveData);
			log.warn("WINDOWS user you can kill the process now", null);
		}
		dataModelDumpExpected = false;
	}
	private void openOutAppend (File file) {
		try {
			if(output!=null){
				log.error("Output should have been null", null);
			}
			output = new AppendingObjectOutputStream (new FileOutputStream (file, true));
		} catch (FileNotFoundException fnf) {
			throw new InitializationException ("File live data error", fnf);
		} catch (IOException io) {
			throw new InitializationException ("File live data io error", io);
		}
	}
	private void openOut (File file) {
		try {
			if(output!=null){
				log.error("Output should have been null", null);
			}
			output = new ObjectOutputStream (new FileOutputStream (file, false));
		} catch (FileNotFoundException fnf) {
			throw new InitializationException ("File live data error", fnf);
		} catch (IOException io) {
			throw new InitializationException ("File live data io error", io);
		}
	}
	
	public void stopReplication () {
		if (output == null) return;
		try {
			output.close();
			output = null;
		} catch (IOException io) {
			io.printStackTrace();
		}
	}
	
	/**
	 * Write the following types of records:
	 * BEGIN, ADD, REMOVE, END. The HEADER record is automatically generated.
	 * @param record is the record to write
	 */
	public void write (Record record) {
		if (output == null) return;
		String className = null;
		switch (record.getType()) {
		case BEGIN:
			pendingRecords.add (record);
			return;
		case ADD:
			className = record.getAddRecord().getClassName();
			if (isOkToSkip(className)) return;
			break;
		case REMOVE:
			className = record.getRemoveRecord().getClassName();
			if (isOkToSkip(className)) return;
			break;
		case END:
			pendingRecords.add (record);
			writePendingRecords();
			return;
		case HEADER:
			// FIXME:
			pendingRecords.add (record);
			writePendingRecords();
			return;
		case DATAMODEL:
			// Same thread as startReplication
			if (dataModelDumpExpected) {
				int count=0;
				log.info("Start new DataModel");
				DataModelRecord drec =  record.getDataModelRecord();
				Record begin = new Record(record.getContextId(), Record.Type.BEGIN);
				write (begin);
				for (AddRecord add : drec.getRecords()){
					Record rec = new Record(record.getContextId(), add);
					write (rec);
					count++;
				}
				Record end = new Record(record.getContextId(), Record.Type.END);
				write(end);
				log.info("Wrote DataModelRecord size=$*", count);
				dataModelDumpExpected = false;
			}
			return;
		}
		if (className != null && accessors.get(className) == null) {
			@SuppressWarnings("rawtypes")
			Class clazz =null;
			try {
				clazz = Class.forName(className);
			} catch (ClassNotFoundException e) {
				log.error("Class not found: $*", e, className);
				return;
			}
			@SuppressWarnings("unchecked")
			Accessor<Object> accessor = config.getAccessGenerator().newAccessor(clazz);
			accessors.put(className, accessor);
			
			HeaderRecord header = new HeaderRecord (accessor);
			pendingRecords.add(new Record (header));
			//	System.out.println ("Wrote header for " + className);
		}
		pendingRecords.add(record);
	}
	
	private void writePendingRecords () {
		// Only begin and end.
		if (pendingRecords.size() == 2) {
			pendingRecords.clear();
			return;
		}
		if (output == null) return;
		try {
			// Headers first.
			for (Record record : pendingRecords) {
				if (record.getType() != Record.Type.HEADER) continue;
				output.writeObject(record);
			}
			// Everything else.
			for (Record record : pendingRecords) {
				if (record.getType() == Record.Type.HEADER) continue;
				output.writeObject(record);
			}
			pendingRecords.clear();
		} catch (IOException ex) {
				ex.printStackTrace();
		}
	}
		
	public void sync () {
		log.trace("sync");
		if (output == null) return;
		try {
			output.flush();
		} catch (IOException ioe) {
			ioe.printStackTrace();
		}
	}
}