SimpleFailoverMicroSpace.java
package org.microspace.space;
import java.util.LinkedList;
import java.util.List;
import org.microspace.event.SpaceRecordListener;
import org.microspace.exception.BackupSpaceException;
import org.microspace.exception.IllegalOperationException;
import org.microspace.failover.FailoverState;
import org.microspace.failover.FailoverStateListener;
import org.microspace.specific.CurrentTimeProvider;
import org.microspace.table.query.TableQuery;
import org.microspace.util.MicroLogger;
import org.microspace.util.UniqueId;
/**
* A FailoverMicroSpace implementation.
*
* @author Gaspar Sinai - {@literal gaspar.sinai@microspace.org}
* @version 2016-06-26
*/
public class SimpleFailoverMicroSpace implements FailoverMicroSpace, FailoverStateListener {
private final UniqueId instanceId;
List<FailoverStateListener> listeners = new LinkedList<FailoverStateListener>();
static MicroLogger log = new MicroLogger (SimpleFailoverMicroSpace.class);
final BackedSpace delegate;
public SimpleFailoverMicroSpace() {
delegate = new BackedSpace();
delegate.setFailoverStateListener(this);
this.instanceId = new UniqueId();
}
public SimpleFailoverMicroSpace(SpaceConfig spaceConfig) {
this(spaceConfig, null);
}
public SimpleFailoverMicroSpace(SpaceConfig spaceConfig, CurrentTimeProvider currentTimeProvider) {
delegate = new BackedSpace(spaceConfig, currentTimeProvider);
delegate.setFailoverStateListener(this);
this.instanceId = new UniqueId();
}
/**
* {@inheritDoc}
*/
@Override
public synchronized void addFailoverStateListener(
FailoverStateListener failoverStateListener) {
listeners.add(failoverStateListener);
}
/**
* {@inheritDoc}
*/
@Override
public synchronized void removeFailoverStateListener(
FailoverStateListener failoverStateListener) {
listeners.remove(failoverStateListener);
}
@Override
public synchronized void failoverStateChangeBefore(FailoverState before,
FailoverState after) {
log.debug("failoverStateChangeBefore $* $* listeners=$*", before, after, listeners.size());
for (FailoverStateListener l : listeners) {
try {
l.failoverStateChangeBefore(before, after);
} catch (Exception e){
log.error("Listener failed", e);
}
}
}
@Override
public synchronized void failoverStateChangeAfter(FailoverState before,
FailoverState after) {
log.debug("failoverStateChangeAfter $* $* listeners=$*", before, after, listeners.size());
//delegate.failoverStateChangeBefore(before, after);
for (FailoverStateListener l : listeners) {
try {
l.failoverStateChangeAfter(before, after);
} catch (Exception e){
log.error("Listener failed", e);
}
}
}
void checkStarted () {
if (!delegate.isStarted()) {
throw new IllegalOperationException("Space Has Not Started");
}
}
void checkPrimary () {
checkStarted();
if (getFailoverState() != FailoverState.PRIMARY) {
throw new BackupSpaceException("Space is not PRIMARY");
}
}
/**
* {@inheritDoc}
*/
@Override
public <T> void write(T object) {
checkStarted();
checkPrimary ();
delegate.write(object);
checkPrimary ();
}
/**
* {@inheritDoc}
*/
@Override
public <T> T take(TableQuery<T> query) {
checkStarted ();
checkPrimary ();
T r = delegate.take(query);
return r;
}
/**
* {@inheritDoc}
*/
@Override
public <T> T take(TableQuery<T> query, long timeout) {
checkStarted ();
checkPrimary ();
T r = delegate.take(query, timeout);
checkPrimary ();
return r;
}
/**
* {@inheritDoc}
*/
@Override
public <T> List<T> takeMultiple(TableQuery<T> query) {
checkStarted ();
checkPrimary ();
List<T> r = delegate. takeMultiple(query);
checkPrimary ();
return r;
}
/**
* {@inheritDoc}
*/
@Override
public <T> T read(TableQuery<T> query) {
checkStarted ();
T r = delegate.read(query);
return r;
}
/**
* {@inheritDoc}
*/
@Override
public <T> List<T> readMultiple(TableQuery<T> query) {
checkStarted ();
List<T> r = delegate.readMultiple(query);
return r;
}
/**
* {@inheritDoc}
*/
@Override
public <T> T take(T templateObject) {
checkStarted ();
checkPrimary ();
T r = delegate.take(templateObject);
checkPrimary ();
return r;
}
/**
* {@inheritDoc}
*/
@Override
public <T> T take(T templateObject, long timeout) {
checkStarted ();
checkPrimary ();
T r = delegate.take(templateObject, timeout);
checkPrimary ();
return r;
}
/**
* {@inheritDoc}
*/
@Override
public <T> List<T> takeMultiple(T templateObject) {
checkStarted ();
checkPrimary ();
List<T> r = delegate.takeMultiple(templateObject);
checkPrimary ();
return r;
}
/**
* {@inheritDoc}
*/
@Override
public <T> T read(T templateObject) {
checkStarted ();
T r = delegate.read(templateObject);
return r;
}
/**
* {@inheritDoc}
*/
@Override
public <T> List<T> readMultiple(T templateObject) {
checkStarted ();
List<T> r = delegate.readMultiple(templateObject);
return r;
}
/**
* {@inheritDoc}
*/
@Override
public <T> List<T> readMultiple(T templateObject, int maxEntries) {
checkStarted ();
List<T> r = delegate.readMultiple(templateObject, maxEntries);
return r;
}
/**
* {@inheritDoc}
*/
@Override
public <T> T readById(Class<T> tableClass, Object primaryKey) {
checkStarted ();
T r = delegate.readById(tableClass, primaryKey);
return r;
}
/**
* {@inheritDoc}
*/
@Override
public <T> T takeById(Class<T> tableClass, Object primaryKey) {
checkStarted ();
checkPrimary ();
T r = delegate.takeById(tableClass, primaryKey);
checkPrimary ();
return r;
}
/**
* {@inheritDoc}
*/
@Override
public <T> T readByIdOf(T initializedObject) {
checkStarted ();
T r = delegate.readByIdOf(initializedObject);
return r;
}
/**
* {@inheritDoc}
*/
@Override
public <T> T takeByIdOf(T initializedObject) {
checkStarted ();
checkPrimary ();
T r = delegate.takeByIdOf(initializedObject);
checkPrimary ();
return r;
}
/**
* {@inheritDoc}
*/
@Override
public <T> void clear(Class<T> clazz) {
checkStarted ();
checkPrimary ();
delegate.clear(clazz);
checkPrimary ();
}
/**
* {@inheritDoc}
*/
@Override
public void clear() {
checkStarted ();
checkPrimary ();
delegate.clear();
checkPrimary ();
}
/**
* {@inheritDoc}
*/
@Override
public AccessorGenerator getAccessorGenerator() {
return delegate.getAccessorGenerator();
}
/**
* {@inheritDoc}
*/
@Override
public void commit() {
checkStarted ();
checkPrimary ();
delegate.commit();
checkPrimary ();
}
/**
* {@inheritDoc}
*/
@Override
public void rollback() {
checkStarted ();
delegate.rollback();
}
/**
* {@inheritDoc}
*/
@Override
public FailoverState getFailoverState() {
return delegate.getFailoverState();
}
/**
* {@inheritDoc}
*/
@Override
public void start() {
delegate.start();
}
/**
* {@inheritDoc}
*/
@Override
public void shutdown() {
delegate.shutdown();
}
/**
* {@inheritDoc}
*/
@Override
public <T> int size(Class<T> clazz) {
return delegate.size(clazz);
}
/**
* {@inheritDoc}
*/
@Override
public int size() {
return delegate.size();
}
/**
* {@inheritDoc}
*/
@Override
public List<Class<?>> getTableClasses() {
return delegate.getTableClasses();
}
@Override
public <T> T getNextMessage(TableQuery<T> query) {
checkStarted ();
checkPrimary ();
T r = delegate.getNextMessage(query);
checkPrimary ();
return r;
}
@Override
public <T> T getNextMessage(TableQuery<T> query, long timeout) {
checkStarted ();
checkPrimary ();
T r = delegate.getNextMessage(query, timeout);
checkPrimary ();
return r;
}
@Override
public <T> T getNextMessage(T templateObject) {
checkStarted ();
checkPrimary ();
T r = delegate.getNextMessage(templateObject);
checkPrimary ();
return r;
}
@Override
public <T> T getNextMessage(T templateObject, long timeout) {
checkStarted ();
checkPrimary ();
T r = delegate.getNextMessage(templateObject, timeout);
checkPrimary ();
return r;
}
@Override
public <T> void registerMessageQueue(TableQuery<T> query) {
checkStarted ();
checkPrimary ();
delegate.registerMessageQueue(query);
checkPrimary ();
}
@Override
public <T> void registerMessageQueue(T templateObject) {
checkStarted ();
checkPrimary ();
delegate.registerMessageQueue(templateObject);
checkPrimary ();
}
@Override
public <T> void unregisterMessageQueue(TableQuery<T> query) {
delegate.unregisterMessageQueue(query);
}
@Override
public <T> void unregisterMessageQueue(T templateObject) {
delegate.unregisterMessageQueue(templateObject);
}
/**
* {@inheritDoc}
*/
@Override
public <T> List<TableQuery<T>> getRegisteredMessageQueueQueries (Class<T> clazz) {
return delegate.getRegisteredMessageQueueQueries(clazz);
}
/**
* {@inheritDoc}
*/
@Override
public <T> int getMessageQueueSize (TableQuery<T> query) {
return delegate.getMessageQueueSize(query);
}
/**
* {@inheritDoc}
*/
@Override
public boolean isRemoteSpace () {
return delegate.isRemoteSpace();
}
/**
* {@inheritDoc}
*/
@Override
public boolean isInterrupted () {
return delegate.isInterrupted();
}
/**
* {@inheritDoc}
*/
@Override
public Integer getPartitionId () {
return delegate.getPartitionId();
}
@Override
public UniqueId getSpaceInstanceId() {
return instanceId;
}
SpaceRecordListener spaceRecordListener;
/**
* Set a data listener.
* @param listener The data listener.
*/
@Override
public void setSpaceRecordListener (SpaceRecordListener listener) {
this.spaceRecordListener = listener;
}
/**
* Get the current Time provider.
* @return The current time provider for timestamps.
*/
public CurrentTimeProvider getCurrentTimeProvider() {
return delegate.getCurrentTimeProvider();
}
}