BackedSpace.java

package org.microspace.space;

import java.util.ArrayList;
import java.util.List;

import org.microspace.exception.InitializationException;
import org.microspace.failover.FailoverState;
import org.microspace.failover.FailoverStateListener;
import org.microspace.replicator.ClusterReplicator;
import org.microspace.replicator.FileReplicator;
import org.microspace.replicator.record.AddRecord;
import org.microspace.replicator.record.DataModelRecord;
import org.microspace.replicator.record.Record;
import org.microspace.replicator.record.RemoveRecord;
import org.microspace.specific.CurrentTimeProvider;
import org.microspace.table.Entry;
import org.microspace.table.SimpleTable;
import org.microspace.table.column.IndexedMap;
import org.microspace.table.query.MatchAllQuery;
import org.microspace.thread.ContextId;
import org.microspace.util.MicroLogger;

/**
 * A cluster or file backed MicroSpace.
 * <p>
 * When isClusterBackup is set in {@link org.microspace.space.SpaceConfig SpaceConfig}
 * it joins the cluster, and if it recovers from Primary. 
 * <p>
 * If it can not find a primary, it becomes the primary, and if fileRecover and 
 * fileBackup is set it recovers from file.
 * <p>
 * If fileBackup is set, the state will be saved in the backupFileName + ".live", regardless
 * of PRIMARY or BACKUP states.
 * <p>
 * backupFileName + ".recover" is used only during recovery internally, so that there is no data loss
 * in case of recovery exit.
 * 
 * @author gaspar.sinai at microspace.org
 * @version 2016-06-23
 */
public class BackedSpace extends SimpleSpace implements FailoverStateListener, CommitListener {
	
	FileReplicator fileReplicator;
	ClusterReplicator clusterReplicator;
	
	static MicroLogger log = new MicroLogger(BackedSpace.class);
	
	FailoverState failoverState = FailoverState.INITIALIZING;
	
	boolean started  = false;

	FailoverStateListener failoverListener = null;
	
	
	public BackedSpace() {
	}
	
	public BackedSpace(SpaceConfig spaceConfig) {
		this(spaceConfig, null);
	}

	public BackedSpace(SpaceConfig spaceConfig, CurrentTimeProvider currentTimeProvider) {
		super(spaceConfig, currentTimeProvider);
		setCommitListener(this);
		log.trace("Creating BackedSpace. Config=$*", spaceConfig);
	}
	
	public void setFailoverStateListener (FailoverStateListener failoverListener) {
		this.failoverListener = failoverListener;
	}
	
	public boolean isStarted () {
		return started;
	}
	
	public void start () throws InitializationException {
		if (started) {
			throw new InitializationException("Space Already started");
		}
		
		started = true;

		boolean primary = true;
		if (getSpaceConfig().isClusterBackup()) {
			failoverState = FailoverState.INITIALIZING;
			try {
				clusterReplicator = new ClusterReplicator (getSpaceConfig(), this, this);
				
			} catch (Exception e) {
				throw new InitializationException(e);
			}
			if (clusterReplicator.getState() == FailoverState.INITIALIZING) {
				log.info("Waiting for ClusterReplicator to initialize");
			}
			long start = System.currentTimeMillis();
			while (clusterReplicator.getState() == FailoverState.INITIALIZING) {
				log.info("Waiting for clusterReplicator to initialize");
				try {
					Thread.sleep(1000L);
				} catch (InterruptedException e) {
				}
				if (clusterReplicator.getState() != FailoverState.INITIALIZING) break;
				long now = System.currentTimeMillis();
				if (now - start > getSpaceConfig().getClusterRecoveryTimeout()) {
					throw new InitializationException("RecoveryTimeout");
				}
			}
			log.info("ClusterReplicator is initialized with state $*", clusterReplicator.getState());
			// FIXME: if PRIMARY use file recovery, if BACKUP use network recovery.
			primary = clusterReplicator.getState() == FailoverState.PRIMARY;
		}
		if (getSpaceConfig().isFileBackup()) {
			fileReplicator = new FileReplicator (getSpaceConfig());
			log.info("FileBackup primary=$* isFileRecover=$*", primary, getSpaceConfig().isFileRecover ());
			if (primary && getSpaceConfig().isFileRecover ()) {
				fileReplicator.recoverData(this);
			} else {
				fileReplicator.noRecover ();
			}
			// FIXME: add current data and start replication. 
			fileReplicator.startReplication(this);
		}
		if (primary || getSpaceConfig().isRemoteSpace()) {
			if (failoverState != FailoverState.PRIMARY) {
				failoverStateChangeBefore(failoverState, FailoverState.PRIMARY);
			}
			if (failoverState != FailoverState.PRIMARY) {
				failoverStateChangeAfter(failoverState, FailoverState.PRIMARY);
			}
		}
		log.info("Space Initial size: $*", size()); 
	}
	
	public void shutdown () {
		if (fileReplicator != null) {
			fileReplicator.stopReplication();
		}
		if (clusterReplicator != null) {
			clusterReplicator.stopReplication();
		}
	}

	@Override
	public void commitBegins() {
		Record record = new Record(new ContextId(), Record.Type.BEGIN);
		if (fileReplicator != null) {
			fileReplicator.write(record);
		}
		if (clusterReplicator != null) {
			clusterReplicator.write(record);
		}
		//super.onCommitBegin();
	}

	@Override
	public void entryAdded(Entry<Object> newEntry) {
		Record record;
		String className = newEntry.getSpaceEntry().getClass().getName();
		if (newEntry.isRemoved()) {
			RemoveRecord remove = new RemoveRecord (className, newEntry.getPrimaryKey());
			record = new Record (new ContextId(), remove);
		} else {
			AddRecord add = new AddRecord (className, newEntry.getPrimaryKey(), newEntry.getFields());
			add.setUpdateCount(newEntry.getUpdateCount());
			record = new Record (new ContextId(), add);
		}
		if (fileReplicator != null) {
			fileReplicator.write (record);
		}
		if (clusterReplicator != null) {
			clusterReplicator.write(record);
		}
		//super.tableChangeEvent(src, oldEntry, newEntry);
	}

	@Override
	public void commitEnds() {
		Record record = new Record(new ContextId(), Record.Type.END);
		if (fileReplicator != null) {
			fileReplicator.write(record);
			fileReplicator.sync();
		}
		if (clusterReplicator != null) {
			clusterReplicator.write(record);
			clusterReplicator.sync();
		}
		//super.onCommitEnd();
	}

	@Override
	public void failoverStateChangeBefore(FailoverState before,
			FailoverState after) {
		if (getSpaceConfig().isRemoteSpace()) {
			log.trace("failoverStateChangeBefore IGNORED  $* $*", before, after);
			return;
		}
		
		log.trace("failoverStateChangeBefore $* $*", before, after);
		if (failoverListener != null) {
			failoverListener.failoverStateChangeBefore(before, after);
		}
	}

	@Override
	public void failoverStateChangeAfter(FailoverState before,
			FailoverState after) {
		if (getSpaceConfig().isRemoteSpace()) {
			log.trace("failoverStateChangeBefore IGNORED  $* $*", before, after);
			return;
		}
		log.trace("failoverStateChangeAfter $* $*", before, after);
		failoverState = after;
		if (failoverListener != null) {
			failoverListener.failoverStateChangeAfter(before, after);
		}
	}

	public FailoverState getFailoverState() {
		return failoverState;
	}

	@SuppressWarnings({ "unchecked", "rawtypes" })
	public void sendDataModelRecord () { 
		
		commitLock.readLock().lock();
		try {
			List<Entry<?>> list = new ArrayList<Entry<?>>();
			for (java.util.Map.Entry<Class<?>, SimpleTable<?>> e : maps.entrySet()) {
				SimpleTable<Object> t = (SimpleTable<Object>) e.getValue();
				MatchAllQuery<Object> query = new MatchAllQuery(e.getKey());
				IndexedMap<Object, Entry<Object>> back = t.readMultipleBackEntries(query);
				for (Entry<Object> entry : back.values()) {
					//System.err.println ("sendDataModelRecord " + entry.getPrimaryKey());
					list.add(entry);
				}
			}
			list.sort(null);
			DataModelRecord dataModelRecord = new DataModelRecord(list);
			Record record = new Record (new ContextId(), dataModelRecord);
			if (fileReplicator != null) {
				fileReplicator.write(record);
				fileReplicator.sync();
			}
			if (clusterReplicator != null) {
				clusterReplicator.write(record);
				clusterReplicator.sync();
			}
		} finally {
			commitLock.readLock().unlock();
		}
	}
	
}