SharedSpace.java

package org.microspace.space;

import org.microspace.event.SpaceRecordListener;
import org.microspace.failover.FailoverState;
import org.microspace.failover.FailoverStateListener;
import org.microspace.specific.CurrentTimeProvider;
import org.microspace.specific.MicroSpaceAccessorGenerator;
import org.microspace.table.column.GetSetPair;
import org.microspace.table.query.TableQuery;
import org.microspace.util.AccessorCache;
import org.microspace.util.PojoUtil;
import org.microspace.util.UniqueId;

import java.util.List;


/**
 * 
 * @author Gaspar Sinai - {@literal gaspar.sinai@microspace.org}
 * @version 2017-07-05
 */
public class SharedSpace implements FailoverMicroSpace {

	private final UniqueId spaceInstanceId;

	public enum Role {
		EMBEDDED,
		REMOTE,
		REMOTE_COMMITTED,
	}
	
	final SimpleSpace delegate;
	final Role role;
	final AccessorCache accessorCache;
	final Integer PARTITION_ID;
	
	/**
	 * Create a shared SimpleSpace with roles.
	 * @param space Is the space that will be shared.
	 * @param role Is the role.
	 */
	public SharedSpace (SimpleSpace space, Role role) {
		this.delegate = space;
		this.role = role;
		this.accessorCache = new AccessorCache(new MicroSpaceAccessorGenerator());
		if (role == Role.EMBEDDED) {
			PARTITION_ID = 0;
			this.spaceInstanceId = new UniqueId(String.valueOf(PARTITION_ID));
		} else {
			PARTITION_ID = null;
			this.spaceInstanceId = new UniqueId();
		}

	}
	private <T> T autoCommit(T t) {
		if (role  == Role.REMOTE_COMMITTED) {
			commit();
		}
		return(t);
	}
	private void autoCommit() {
		if (role  == Role.REMOTE_COMMITTED) {
			commit();
		}
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> void write(T object) {
		if (role == Role.REMOTE || role == Role.REMOTE_COMMITTED) {
			writeHook (object);
			return;
		}
		@SuppressWarnings("unchecked")
		GetSetPair<T> partitionGetSetPair = (GetSetPair<T>) accessorCache.get(object.getClass()).getPartitionIdGetSetPair();
		if (partitionGetSetPair == null) {
			writeHook (object);
			return;
		} else {
			Integer partitionId = (Integer) partitionGetSetPair.get(object);
			if (partitionId != null) {
				writeHook (object);
				return;
			}
			T copy = PojoUtil.copy(object);
			partitionGetSetPair.set(object, PARTITION_ID);
			writeHook (copy);
			return;
		}
	}

	private void writeHook (Object o) {
		delegate.write(o);
		autoCommit();
	}
	

	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> T take(TableQuery<T> query) {
		T t1 = delegate.take(query);
		t1 = autoCommit(t1);
		return t1;
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> T take(TableQuery<T> query, long timeout) {
		T t1 = delegate.take(query, timeout);
		t1 = autoCommit(t1);
		return t1;
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> List<T> takeMultiple(TableQuery<T> query) {
		List<T> allT =  autoCommit(delegate.takeMultiple(query));
		autoCommit();
		return allT;
	}
	

	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> T read(TableQuery<T> query) {
		return autoCommit(delegate.read(query));
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> T getNextMessage(TableQuery<T> query) {
		return autoCommit(delegate.getNextMessage(query));
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> T getNextMessage(TableQuery<T> query, long timeout) {
		return autoCommit(delegate.getNextMessage(query, timeout));
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> List<T> readMultiple(TableQuery<T> query) {
		return autoCommit(delegate.readMultiple(query));
	}


	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> T take(T templateObject) {
		T t1 = autoCommit(delegate.take(templateObject));
		t1 = autoCommit(t1);
		return t1;

	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> T take(T templateObject, long timeout) {
		T t1 = autoCommit(delegate.take(templateObject, timeout));
		t1 = autoCommit(t1);
		return t1;
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> List<T> takeMultiple(T templateObject) {
		List<T> allT =  autoCommit(delegate.takeMultiple(templateObject));
		autoCommit();
		return allT;
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> T read(T templateObject) {
		return autoCommit(delegate.read(templateObject));
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> T getNextMessage(T templateObject) {
		return autoCommit(delegate.getNextMessage(templateObject));
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> T getNextMessage(T templateObject, long timeout) {
		return autoCommit(delegate.getNextMessage(templateObject, timeout));
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> List<T> readMultiple(T templateObject) {
		return autoCommit(delegate.readMultiple(templateObject));
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> List<T> readMultiple(T templateObject, int maxEntries) {
		return autoCommit(delegate.readMultiple(templateObject));
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> T readById(Class<T> tableClass, Object primaryKey) {
		return autoCommit(delegate.readById(tableClass, primaryKey));
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> T takeById(Class<T> tableClass, Object primaryKey) {
		T t1 =  autoCommit(delegate.takeById(tableClass, primaryKey));
		t1 = autoCommit(t1);
		return t1;
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> T readByIdOf(T initializedObject) {
		return autoCommit(delegate.readByIdOf(initializedObject));
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> T takeByIdOf(T initializedObject) {
		T t1 =  autoCommit(delegate.takeByIdOf(initializedObject));
		t1 = autoCommit(t1);
		return t1;
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public List<Class<?>> getTableClasses() {
		return delegate.getTableClasses();
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> void registerMessageQueue(TableQuery<T> query) {
		delegate.registerMessageQueue(query);
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> void registerMessageQueue(T templateObject) {
		delegate.registerMessageQueue(templateObject);
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> void unregisterMessageQueue(TableQuery<T> query) {
		delegate.unregisterMessageQueue(query);
		
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> void unregisterMessageQueue(T templateObject) {
		delegate.unregisterMessageQueue(templateObject);
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> List<TableQuery<T>> getRegisteredMessageQueueQueries(Class<T> clazz) {
		return delegate.getRegisteredMessageQueueQueries(clazz);
	}
	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> int getMessageQueueSize (TableQuery<T> query) {
		 return delegate.getMessageQueueSize(query);
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> void clear(Class<T> clazz) {
		delegate.clear(clazz);
		autoCommit();
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public void clear() {
		delegate.clear();
		autoCommit();
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> int size(Class<T> clazz) {
		return delegate.size(clazz);
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public int size() {
		return delegate.size();
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public AccessorGenerator getAccessorGenerator() {
		return delegate.getAccessorGenerator();
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public boolean isRemoteSpace() {
		if (role == Role.REMOTE) {
			return true;
		}else if(role == Role.REMOTE_COMMITTED){
			return true;
		}
		return false;
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public boolean isInterrupted() {
		return delegate.isInterrupted();
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public Integer getPartitionId() {
		if (role == Role.REMOTE) {
			return null;
		}
		return PARTITION_ID;
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public UniqueId getSpaceInstanceId() {
		return spaceInstanceId;
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public void commit() {
		delegate.commit();
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public void rollback() {
		delegate.rollback();
	}

	/**
	 * Always return primary.
	 * @return FailoverState.PRIMARY.
	 */
	@Override
	public FailoverState getFailoverState() {
		return FailoverState.PRIMARY;
	}

	/**
	 * Unimplemented.
	 * 
	 * @param failoverStateListener The listener.
	 */
	@Override
	public void addFailoverStateListener(
			FailoverStateListener failoverStateListener) {
		// TODO
	}

	/**
	 * Unimplemented.
	 * 
	 * @param failoverStateListener The listener.
	 */
	@Override
	public void removeFailoverStateListener(
			FailoverStateListener failoverStateListener) {
		// TODO
	}

	/**
	 * Unimplemented.
	 */
	@Override
	public void start() {
		// TODO
		
	}

	/**
	 * Unimplemented.
	 */
	@Override
	public void shutdown() {
		// TODO
		
	}
	
	/**
	 * Set a data listener.
	 * @param listener The data listener.
	 */
	@Override
	public void setSpaceRecordListener (SpaceRecordListener listener) {
		this.delegate.setSpaceRecordListener(listener);
	}
	/**
	 * Get the current Time provider.
	 * @return The current time provider for timestamps.
	 */
	public CurrentTimeProvider getCurrentTimeProvider() {
		return this.delegate.getCurrentTimeProvider();
	}

}