SimpleFailoverMicroSpace.java

package org.microspace.space;

import java.util.LinkedList;
import java.util.List;

import org.microspace.event.SpaceRecordListener;
import org.microspace.exception.BackupSpaceException;
import org.microspace.exception.IllegalOperationException;
import org.microspace.failover.FailoverState;
import org.microspace.failover.FailoverStateListener;
import org.microspace.specific.CurrentTimeProvider;
import org.microspace.table.query.TableQuery;
import org.microspace.util.MicroLogger;
import org.microspace.util.UniqueId;

/**
 * A FailoverMicroSpace implementation.
 * 
 * @author Gaspar Sinai - {@literal gaspar.sinai@microspace.org}
 * @version 2016-06-26
 */
public class SimpleFailoverMicroSpace implements FailoverMicroSpace, FailoverStateListener {

	private final UniqueId instanceId;
	List<FailoverStateListener> listeners = new LinkedList<FailoverStateListener>();
	
	static MicroLogger log = new MicroLogger (SimpleFailoverMicroSpace.class);
	final BackedSpace delegate;
	
	public SimpleFailoverMicroSpace() {
		delegate = new BackedSpace();
		delegate.setFailoverStateListener(this);
		this.instanceId = new UniqueId();
	}

	public SimpleFailoverMicroSpace(SpaceConfig spaceConfig) {
		this(spaceConfig, null);
	}
	
	public SimpleFailoverMicroSpace(SpaceConfig spaceConfig, CurrentTimeProvider currentTimeProvider) {
		delegate = new BackedSpace(spaceConfig, currentTimeProvider);
		delegate.setFailoverStateListener(this);
		this.instanceId = new UniqueId();
	}
	

	/**
	 * {@inheritDoc}
	 */
	@Override
	public synchronized void addFailoverStateListener(
			FailoverStateListener failoverStateListener) {
		
		listeners.add(failoverStateListener);
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public synchronized void removeFailoverStateListener(
			FailoverStateListener failoverStateListener) {
		
		listeners.remove(failoverStateListener);
	}
	@Override
	public synchronized void failoverStateChangeBefore(FailoverState before,
			FailoverState after) {
		log.debug("failoverStateChangeBefore $* $* listeners=$*", before, after, listeners.size());
		for (FailoverStateListener l : listeners) {
			try {
				l.failoverStateChangeBefore(before, after);
			} catch (Exception e){
				log.error("Listener failed", e);
			}
		}
		
	}

	@Override
	public synchronized void failoverStateChangeAfter(FailoverState before,
			FailoverState after) {
		log.debug("failoverStateChangeAfter $* $* listeners=$*", before, after, listeners.size());
		//delegate.failoverStateChangeBefore(before, after);
		for (FailoverStateListener l : listeners) {
			try {
				l.failoverStateChangeAfter(before, after);
			} catch (Exception e){
				log.error("Listener failed", e);
			}
		}
	}

	void checkStarted () {
		if (!delegate.isStarted()) {
			throw new IllegalOperationException("Space Has Not Started");
		}
	}
	
	void checkPrimary () {
		checkStarted();
		if (getFailoverState() != FailoverState.PRIMARY) {
			throw new BackupSpaceException("Space is not PRIMARY");
		}
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> void write(T object) {
		checkStarted();
		checkPrimary ();
		delegate.write(object);
		checkPrimary ();
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> T take(TableQuery<T> query) {
		checkStarted ();
		checkPrimary ();
		T r = delegate.take(query);
		return r;
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> T take(TableQuery<T> query, long timeout) {
		checkStarted ();
		checkPrimary ();
		T r = delegate.take(query, timeout);
		checkPrimary ();
		return r;
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> List<T> takeMultiple(TableQuery<T> query) {
		checkStarted ();
		checkPrimary ();
		List<T>  r = delegate. takeMultiple(query);
		checkPrimary ();
		return r;
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> T read(TableQuery<T> query) {
		checkStarted ();
		T r = delegate.read(query);
		return r;
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> List<T> readMultiple(TableQuery<T> query) {
		checkStarted ();
		List<T>  r = delegate.readMultiple(query);
		return r;
	}


	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> T take(T templateObject) {
		checkStarted ();
		checkPrimary ();
		T r = delegate.take(templateObject);
		checkPrimary ();
		return r;
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> T take(T templateObject, long timeout) {
		checkStarted ();
		checkPrimary ();
		T r = delegate.take(templateObject, timeout);
		checkPrimary ();
		return r;
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> List<T> takeMultiple(T templateObject) {
		checkStarted ();
		checkPrimary ();
		List<T>  r = delegate.takeMultiple(templateObject);
		checkPrimary ();
		return r;
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> T read(T templateObject) {
		checkStarted ();
		T  r = delegate.read(templateObject);
		return r;
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> List<T> readMultiple(T templateObject) {
		checkStarted ();
		List<T>  r = delegate.readMultiple(templateObject);
		return r;
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> List<T> readMultiple(T templateObject, int maxEntries) {
		checkStarted ();
		List<T>  r = delegate.readMultiple(templateObject, maxEntries);
		return r;
	}
	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> T readById(Class<T> tableClass, Object primaryKey) {
		checkStarted ();
		T r = delegate.readById(tableClass, primaryKey);
		return r;
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> T takeById(Class<T> tableClass, Object primaryKey) {
		checkStarted ();
		checkPrimary ();
		T r = delegate.takeById(tableClass, primaryKey);
		checkPrimary ();
		return r;
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> T readByIdOf(T initializedObject) {
		checkStarted ();
		T r = delegate.readByIdOf(initializedObject);
		return r;
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> T takeByIdOf(T initializedObject) {
		checkStarted ();
		checkPrimary ();
		T r = delegate.takeByIdOf(initializedObject);
		checkPrimary ();
		return r;
	}
	
	

	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> void clear(Class<T> clazz) {
		checkStarted ();
		checkPrimary ();
		delegate.clear(clazz);
		checkPrimary ();
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public void clear() {
		checkStarted ();
		checkPrimary ();
		delegate.clear();
		checkPrimary ();
		
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public AccessorGenerator getAccessorGenerator() {
		return delegate.getAccessorGenerator();
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public void commit() {
		checkStarted ();
		checkPrimary ();
		delegate.commit();
		checkPrimary ();
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public void rollback() {
		checkStarted ();
		delegate.rollback();
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public FailoverState getFailoverState() {
		return delegate.getFailoverState();
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public void start() {
		delegate.start();
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public void shutdown() {
		delegate.shutdown();
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> int size(Class<T> clazz) {
		return delegate.size(clazz);
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public int size() {
		return delegate.size();
	}
	
	/**
	 * {@inheritDoc}
	 */
	@Override
	public List<Class<?>> getTableClasses() {
		return delegate.getTableClasses();
	}

	@Override
	public <T> T getNextMessage(TableQuery<T> query) {
		checkStarted ();
		checkPrimary ();
		T r = delegate.getNextMessage(query);
		checkPrimary ();
		return r;
	}

	@Override
	public <T> T getNextMessage(TableQuery<T> query, long timeout) {
		checkStarted ();
		checkPrimary ();
		T r = delegate.getNextMessage(query, timeout);
		checkPrimary ();
		return r;
	}

	@Override
	public <T> T getNextMessage(T templateObject) {
		checkStarted ();
		checkPrimary ();
		T r = delegate.getNextMessage(templateObject);
		checkPrimary ();
		return r;
	}

	@Override
	public <T> T getNextMessage(T templateObject, long timeout) {
		checkStarted ();
		checkPrimary ();
		T r = delegate.getNextMessage(templateObject, timeout);
		checkPrimary ();
		return r;
	}
	@Override
	public <T> void registerMessageQueue(TableQuery<T> query) {
		checkStarted ();
		checkPrimary ();
		delegate.registerMessageQueue(query);
		checkPrimary ();
	}



	@Override
	public <T> void registerMessageQueue(T templateObject) {
		checkStarted ();
		checkPrimary ();
		delegate.registerMessageQueue(templateObject);
		checkPrimary ();
	}



	@Override
	public <T> void unregisterMessageQueue(TableQuery<T> query) {
		delegate.unregisterMessageQueue(query);
		
	}

	@Override
	public <T> void unregisterMessageQueue(T templateObject) {
		delegate.unregisterMessageQueue(templateObject);
	}
	
	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> List<TableQuery<T>> getRegisteredMessageQueueQueries (Class<T> clazz) {
		return delegate.getRegisteredMessageQueueQueries(clazz);
	}
	
	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> int getMessageQueueSize (TableQuery<T> query) {
		 return delegate.getMessageQueueSize(query);
	}
	/**
	 * {@inheritDoc}
	 */
	@Override
	public boolean isRemoteSpace () {
		return delegate.isRemoteSpace();
	}
	/**
	 * {@inheritDoc}
	 */
	@Override
	public boolean isInterrupted () {
		return delegate.isInterrupted();
	}
	/**
	 * {@inheritDoc}
	 */
	@Override
	public Integer getPartitionId () {
		return delegate.getPartitionId();
	}

	@Override
	public UniqueId getSpaceInstanceId() {
		return instanceId;
	}
	SpaceRecordListener spaceRecordListener;
	/**
	 * Set a data listener.
	 * @param listener The data listener.
	 */
	@Override
	public void setSpaceRecordListener (SpaceRecordListener listener) {
		this.spaceRecordListener = listener;
	}
	/**
	 * Get the current Time provider.
	 * @return The current time provider for timestamps.
	 */
	public CurrentTimeProvider getCurrentTimeProvider() {
		return delegate.getCurrentTimeProvider();
	}

}