ClusterReplicator.java

package org.microspace.replicator;

import java.io.Serializable;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;

import org.microspace.exception.InitializationException;
import org.microspace.failover.FailoverState;
import org.microspace.failover.FailoverStateListener;
import org.microspace.replicator.record.AddRecord;
import org.microspace.replicator.record.DataModelRecord;
import org.microspace.replicator.record.Record;
import org.microspace.replicator.record.RecoveryRequest;
import org.microspace.replicator.record.RemoveRecord;
import org.microspace.space.BackedSpace;
import org.microspace.space.SimpleSpace;
import org.microspace.space.SpaceConfig;
import org.microspace.table.Entry;
import org.microspace.table.column.Accessor;
import org.microspace.thread.ContextId;
import org.microspace.transport.ClusterMessageReceiver;
import org.microspace.transport.ClusterTransport;
import org.microspace.transport.NodeAddress;
import org.microspace.util.AccessorCache;
import org.microspace.util.MicroLogger;
import org.microspace.util.UniqueId;


/**
 * Replicate messages across the cluster.
 * 
 * This class does not do class conversion, user {@link org.microspace.replicator.FileReplicator FileReplicator} for that purpose.
 * 
 * @author gaspar.sinai at microspace.org
 * @version 2016-06-23
 */
public class ClusterReplicator implements ClusterMessageReceiver {
	final SpaceConfig config;
	final ClusterTransport channel;
	final HashMap<String, Boolean> skipFromBackup = new HashMap<String, Boolean>();
	final LinkedList<Record> pendingRecords = new LinkedList<Record> ();
	final FailoverStateListener failoverStateListener;
	final List<NodeAddress> pendingRecoverList = new LinkedList<NodeAddress> ();
	
	final AccessorCache accessorCache;
	
	FailoverState oldState = FailoverState.INITIALIZING;
	boolean isRecovering = false;
	
	BackedSpace space;

	// Copied 1 by 1 via updates.
	private SimpleSpace dataModelCopy;
	
	final static MicroLogger log = new MicroLogger (ClusterReplicator.class);
	
	public ClusterReplicator (SpaceConfig config, BackedSpace space, FailoverStateListener failoverStateListener) throws Exception {
		this.config = config;
		this.space = space;
		this.dataModelCopy = new SimpleSpace(config);
		this.accessorCache = new AccessorCache (config);
		this.failoverStateListener = failoverStateListener;
		channel = config.getClusterTransportGenerator().newClusterTransport(config.getClusterName());
		channel.setReceiver(this);
		channel.start();
	}
	
	public FailoverState getState () {
		return oldState;
	}
	

	private void receive (Record record) {
		
		if (record.getType() == Record.Type.BEGIN) {
			log.trace("Received BEGIN Record from PRIMARY");
			space.rollback();
		}
		if (record.getType() == Record.Type.ADD) {
			log.trace("Received ADD Record from PRIMARY");
			Entry<?> entry = record.getAddRecord().convert(accessorCache.get(record.getAddRecord().getClassName()));
			/* no conversion */
			space.write(entry.getSpaceEntry());

			dataModelCopy.write(entry.getSpaceEntry());
		}
		if (record.getType() == Record.Type.REMOVE) {
			log.trace("Received REMOVE Record from PRIMARY");
			RemoveRecord removeRecord = record.getRemoveRecord();
			Accessor<?> accessor = accessorCache.get(removeRecord.getClassName());
			Object object = accessor.getBlankObject();
			space.takeById(object.getClass(), removeRecord.getKey());
			dataModelCopy.takeById(object.getClass(), removeRecord.getKey());
		}
		if (record.getType() == Record.Type.END) {
			log.trace("Received END Record from PRIMARY");
			space.commit();
			dataModelCopy.commit();
		}
		
	}
	
	private void setState (FailoverState state) {
		if (isRecovering) {
			throw new InitializationException ("SET STATE while recovering.");
		}
		log.trace ("SET STATE from $* to $* isRemote-$*", oldState, state, config.isRemoteSpace());
		FailoverState before = oldState;
		failoverStateListener.failoverStateChangeBefore(before, state);
		oldState = state;
		failoverStateListener.failoverStateChangeAfter(before, state);
	}
	
	private void startRecovering () {
		isRecovering = true;
		log.info("Sending recovery request in state $*", getState());
		if (getState() != FailoverState.INITIALIZING) {
			return;
		}
		try {
			channel.send(new RecoveryRequest());
		} catch (Exception e) {
			log.error("Can not send recovery request", e);
			throw new InitializationException(e);
	    }
	}
	
	public void stopReplication () {
		if (channel != null) {
			channel.shutdown();
		}
	}
	
	/**
	 * Write the following types of records:
	 * BEGIN, ADD, REMOVE, END. The HEADER record is automatically generated.
	 * This is coming in 1 single thread at a time.
	 * @param record is the record to write
	 */
	public void write (Record record) {
		switch (record.getType()) {
		case BEGIN:
			//log.trace("Record.BEGIN");
			pendingRecords.add (record);
			dataModelCopy.rollback();
			break;
		case ADD:
			pendingRecords.add (record);
			{
				Entry<?> entry = record.getAddRecord().convert(accessorCache.get(record.getAddRecord().getClassName()));
				//log.trace("Record.ADD $*", record.getAddRecord().getKey());
				dataModelCopy.write(entry.getSpaceEntry());
			}
			break;
		case REMOVE:
			pendingRecords.add (record);
			{
				RemoveRecord removeRecord = record.getRemoveRecord();
				Class<?> clazz = null;
				try {
					getClass();
					clazz = Class.forName(removeRecord.getClassName());
				} catch (ClassNotFoundException e) {
					log.error ("Can not find class $*", e, removeRecord.getClassName());
					return;
				}
				//log.trace("Record.REMOVE $*", record.getRemoveRecord().getKey());
				dataModelCopy.takeById(clazz, removeRecord.getKey());
			}
			break;
		case END:
			//log.trace("Record.END");
			pendingRecords.add (record);
			dataModelCopy.commit();
			writePendingRecords();
			break;
		case HEADER:
			break;
		case DATAMODEL:
			// Same thread is receive(Message msg)
			//log.trace("Datamodel received. Size of requestors=$0", pendingRecoverList.size());
			while (pendingRecoverList.size() > 0) {
				NodeAddress backup = pendingRecoverList.remove(0);
				sendDataModel (backup, record.getDataModelRecord());
			}
			
			break;

		}
	}
	
	Object sendLock = new Object();
	
	private void writePendingRecords () {
		if (getState() != FailoverState.PRIMARY && !config.isRemoteSpace()) {
			pendingRecords.clear();
			return;
		}
		// Only begin and end.
		if (pendingRecords.size() == 2) {
			pendingRecords.clear();
			return;
		}
		synchronized (sendLock) {
			try {
				log.trace("Writing $* pending records.", pendingRecords.size());
				for (Record record : pendingRecords) {
					channel.send(record);
				}
				sync();
				pendingRecords.clear();
			} catch (Exception ex) {
				log.error("writePendingRecords error", ex);
				pendingRecords.clear();
			}
		}
	}
	
	public void sync () {
		
		if (config.isSyncCluster()) {
			log.trace("Flush is not configured. Config.syncCluster");
			return;
		}
		channel.sync();
	}
	
	private void sendDataModel (NodeAddress address, DataModelRecord record) {
		ContextId contextId = new ContextId(UniqueId.getSeedPart());
		Record rec = new Record (contextId, record);
		log.info("Sending data model to $*", address);
		
/*		
		DataModelRecord dm = rec.getDataModelRecord();
		for (AddRecord add : dm.getRecords()) {
			//log.info("Converting $*", add.getClassName());
			Entry<?> entry = add.convert(accessorCache.get(add
					.getClassName()));
			System.err.println ("XXXClusterSend: " + entry.getPrimaryKey());
		}
*/		
		synchronized (sendLock) {
			try {
				channel.send(address, rec);
				sync();
			} catch (Exception e) {
				log.error("Can not send record.", e);
			}
		}
	}

	@Override
	public void receive(NodeAddress from, Serializable message) {
		// TODO Auto-generated method stub
		log.trace ("Received message form $* state=$* class=$*", 
				from, getState(), message.getClass().getName());
	    if (message instanceof Record) {
			Record rec = (Record) message;
			if (rec.getType() == Record.Type.DATAMODEL) {
				log.info("Received DATAMODEL in state=$* isRecovering=$*", getState(), isRecovering);
				if (getState() == FailoverState.INITIALIZING && isRecovering) {
					space.rollback();
					space.clear();
					space.commit();
					DataModelRecord dm = rec.getDataModelRecord();
					log.info("Setting new datamodel with size=$*", dm.size());
					for (AddRecord add : dm.getRecords()) {
						//log.info("Converting $*", add.getClassName());
						Entry<?> entry = add.convert(accessorCache.get(add
								.getClassName()));
//						System.err.println ("XXXClusterRecive: " + entry.getPrimaryKey());
						space.write(entry.getSpaceEntry());
						dataModelCopy.write(entry.getSpaceEntry());
					}
					space.commit();
					dataModelCopy.commit();
					isRecovering = false;
					setState (FailoverState.BACKUP);

				} else {
					log.warn("Throwing away recovery datamodel state=$* isrecovering=$*", null, getState(), isRecovering);
				}
	    	} else if (getState() == FailoverState.BACKUP) {
	    		receive (rec);
	    		
	    	}
	    }
	    if (message instanceof RecoveryRequest) {
	    	if (getState() == FailoverState.PRIMARY) {
	    		pendingRecoverList.add(from);
	    		log.info("Dumping model tp $*", from);
	    		space.sendDataModelRecord();
	    	}
	    }
	}
	
	@Override
	public void changeState (FailoverState state) {
		if (oldState == state) {
			return;
		}
		log.trace ("Called changing state from $* to $* isRecovering=$*", oldState, state, isRecovering);
		
		switch (oldState) {
		case BACKUP:
			setState (state);
			break;
		case INITIALIZING:
			if (isRecovering && state == FailoverState.PRIMARY) {
				log.error ("Changed to primary while recovering.", null);
				isRecovering = false;
				throw new InitializationException ("Activate while recovering.");
			} else if (state == FailoverState.BACKUP  && !isRecovering) {
				startRecovering ();
			} 
			if (!isRecovering) {
				setState (state);
			}
			break;
		case PRIMARY:
			setState (state);
			break;
		default:
			break;
		}
	}
}