BackedSpace.java
package org.microspace.space;
import java.util.ArrayList;
import java.util.List;
import org.microspace.exception.InitializationException;
import org.microspace.failover.FailoverState;
import org.microspace.failover.FailoverStateListener;
import org.microspace.replicator.ClusterReplicator;
import org.microspace.replicator.FileReplicator;
import org.microspace.replicator.record.AddRecord;
import org.microspace.replicator.record.DataModelRecord;
import org.microspace.replicator.record.Record;
import org.microspace.replicator.record.RemoveRecord;
import org.microspace.specific.CurrentTimeProvider;
import org.microspace.table.Entry;
import org.microspace.table.SimpleTable;
import org.microspace.table.column.IndexedMap;
import org.microspace.table.query.MatchAllQuery;
import org.microspace.thread.ContextId;
import org.microspace.util.MicroLogger;
/**
* A cluster or file backed MicroSpace.
* <p>
* When isClusterBackup is set in {@link org.microspace.space.SpaceConfig SpaceConfig}
* it joins the cluster, and if it recovers from Primary.
* <p>
* If it can not find a primary, it becomes the primary, and if fileRecover and
* fileBackup is set it recovers from file.
* <p>
* If fileBackup is set, the state will be saved in the backupFileName + ".live", regardless
* of PRIMARY or BACKUP states.
* <p>
* backupFileName + ".recover" is used only during recovery internally, so that there is no data loss
* in case of recovery exit.
*
* @author gaspar.sinai at microspace.org
* @version 2016-06-23
*/
public class BackedSpace extends SimpleSpace implements FailoverStateListener, CommitListener {
FileReplicator fileReplicator;
ClusterReplicator clusterReplicator;
static MicroLogger log = new MicroLogger(BackedSpace.class);
FailoverState failoverState = FailoverState.INITIALIZING;
boolean started = false;
FailoverStateListener failoverListener = null;
public BackedSpace() {
}
public BackedSpace(SpaceConfig spaceConfig) {
this(spaceConfig, null);
}
public BackedSpace(SpaceConfig spaceConfig, CurrentTimeProvider currentTimeProvider) {
super(spaceConfig, currentTimeProvider);
setCommitListener(this);
log.trace("Creating BackedSpace. Config=$*", spaceConfig);
}
public void setFailoverStateListener (FailoverStateListener failoverListener) {
this.failoverListener = failoverListener;
}
public boolean isStarted () {
return started;
}
public void start () throws InitializationException {
if (started) {
throw new InitializationException("Space Already started");
}
started = true;
boolean primary = true;
if (getSpaceConfig().isClusterBackup()) {
failoverState = FailoverState.INITIALIZING;
try {
clusterReplicator = new ClusterReplicator (getSpaceConfig(), this, this);
} catch (Exception e) {
throw new InitializationException(e);
}
if (clusterReplicator.getState() == FailoverState.INITIALIZING) {
log.info("Waiting for ClusterReplicator to initialize");
}
long start = System.currentTimeMillis();
while (clusterReplicator.getState() == FailoverState.INITIALIZING) {
log.info("Waiting for clusterReplicator to initialize");
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
}
if (clusterReplicator.getState() != FailoverState.INITIALIZING) break;
long now = System.currentTimeMillis();
if (now - start > getSpaceConfig().getClusterRecoveryTimeout()) {
throw new InitializationException("RecoveryTimeout");
}
}
log.info("ClusterReplicator is initialized with state $*", clusterReplicator.getState());
// FIXME: if PRIMARY use file recovery, if BACKUP use network recovery.
primary = clusterReplicator.getState() == FailoverState.PRIMARY;
}
if (getSpaceConfig().isFileBackup()) {
fileReplicator = new FileReplicator (getSpaceConfig());
log.info("FileBackup primary=$* isFileRecover=$*", primary, getSpaceConfig().isFileRecover ());
if (primary && getSpaceConfig().isFileRecover ()) {
fileReplicator.recoverData(this);
} else {
fileReplicator.noRecover ();
}
// FIXME: add current data and start replication.
fileReplicator.startReplication(this);
}
if (primary || getSpaceConfig().isRemoteSpace()) {
if (failoverState != FailoverState.PRIMARY) {
failoverStateChangeBefore(failoverState, FailoverState.PRIMARY);
}
if (failoverState != FailoverState.PRIMARY) {
failoverStateChangeAfter(failoverState, FailoverState.PRIMARY);
}
}
log.info("Space Initial size: $*", size());
}
public void shutdown () {
if (fileReplicator != null) {
fileReplicator.stopReplication();
}
if (clusterReplicator != null) {
clusterReplicator.stopReplication();
}
}
@Override
public void commitBegins() {
Record record = new Record(new ContextId(), Record.Type.BEGIN);
if (fileReplicator != null) {
fileReplicator.write(record);
}
if (clusterReplicator != null) {
clusterReplicator.write(record);
}
//super.onCommitBegin();
}
@Override
public void entryAdded(Entry<Object> newEntry) {
Record record;
String className = newEntry.getSpaceEntry().getClass().getName();
if (newEntry.isRemoved()) {
RemoveRecord remove = new RemoveRecord (className, newEntry.getPrimaryKey());
record = new Record (new ContextId(), remove);
} else {
AddRecord add = new AddRecord (className, newEntry.getPrimaryKey(), newEntry.getFields());
add.setUpdateCount(newEntry.getUpdateCount());
record = new Record (new ContextId(), add);
}
if (fileReplicator != null) {
fileReplicator.write (record);
}
if (clusterReplicator != null) {
clusterReplicator.write(record);
}
//super.tableChangeEvent(src, oldEntry, newEntry);
}
@Override
public void commitEnds() {
Record record = new Record(new ContextId(), Record.Type.END);
if (fileReplicator != null) {
fileReplicator.write(record);
fileReplicator.sync();
}
if (clusterReplicator != null) {
clusterReplicator.write(record);
clusterReplicator.sync();
}
//super.onCommitEnd();
}
@Override
public void failoverStateChangeBefore(FailoverState before,
FailoverState after) {
if (getSpaceConfig().isRemoteSpace()) {
log.trace("failoverStateChangeBefore IGNORED $* $*", before, after);
return;
}
log.trace("failoverStateChangeBefore $* $*", before, after);
if (failoverListener != null) {
failoverListener.failoverStateChangeBefore(before, after);
}
}
@Override
public void failoverStateChangeAfter(FailoverState before,
FailoverState after) {
if (getSpaceConfig().isRemoteSpace()) {
log.trace("failoverStateChangeBefore IGNORED $* $*", before, after);
return;
}
log.trace("failoverStateChangeAfter $* $*", before, after);
failoverState = after;
if (failoverListener != null) {
failoverListener.failoverStateChangeAfter(before, after);
}
}
public FailoverState getFailoverState() {
return failoverState;
}
@SuppressWarnings({ "unchecked", "rawtypes" })
public void sendDataModelRecord () {
commitLock.readLock().lock();
try {
List<Entry<?>> list = new ArrayList<Entry<?>>();
for (java.util.Map.Entry<Class<?>, SimpleTable<?>> e : maps.entrySet()) {
SimpleTable<Object> t = (SimpleTable<Object>) e.getValue();
MatchAllQuery<Object> query = new MatchAllQuery(e.getKey());
IndexedMap<Object, Entry<Object>> back = t.readMultipleBackEntries(query);
for (Entry<Object> entry : back.values()) {
//System.err.println ("sendDataModelRecord " + entry.getPrimaryKey());
list.add(entry);
}
}
list.sort(null);
DataModelRecord dataModelRecord = new DataModelRecord(list);
Record record = new Record (new ContextId(), dataModelRecord);
if (fileReplicator != null) {
fileReplicator.write(record);
fileReplicator.sync();
}
if (clusterReplicator != null) {
clusterReplicator.write(record);
clusterReplicator.sync();
}
} finally {
commitLock.readLock().unlock();
}
}
}