ClusterReplicator.java
package org.microspace.replicator;
import java.io.Serializable;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import org.microspace.exception.InitializationException;
import org.microspace.failover.FailoverState;
import org.microspace.failover.FailoverStateListener;
import org.microspace.replicator.record.AddRecord;
import org.microspace.replicator.record.DataModelRecord;
import org.microspace.replicator.record.Record;
import org.microspace.replicator.record.RecoveryRequest;
import org.microspace.replicator.record.RemoveRecord;
import org.microspace.space.BackedSpace;
import org.microspace.space.SimpleSpace;
import org.microspace.space.SpaceConfig;
import org.microspace.table.Entry;
import org.microspace.table.column.Accessor;
import org.microspace.thread.ContextId;
import org.microspace.transport.ClusterMessageReceiver;
import org.microspace.transport.ClusterTransport;
import org.microspace.transport.NodeAddress;
import org.microspace.util.AccessorCache;
import org.microspace.util.MicroLogger;
import org.microspace.util.UniqueId;
/**
* Replicate messages across the cluster.
*
* This class does not do class conversion, user {@link org.microspace.replicator.FileReplicator FileReplicator} for that purpose.
*
* @author gaspar.sinai at microspace.org
* @version 2016-06-23
*/
public class ClusterReplicator implements ClusterMessageReceiver {
final SpaceConfig config;
final ClusterTransport channel;
final HashMap<String, Boolean> skipFromBackup = new HashMap<String, Boolean>();
final LinkedList<Record> pendingRecords = new LinkedList<Record> ();
final FailoverStateListener failoverStateListener;
final List<NodeAddress> pendingRecoverList = new LinkedList<NodeAddress> ();
final AccessorCache accessorCache;
FailoverState oldState = FailoverState.INITIALIZING;
boolean isRecovering = false;
BackedSpace space;
// Copied 1 by 1 via updates.
private SimpleSpace dataModelCopy;
final static MicroLogger log = new MicroLogger (ClusterReplicator.class);
public ClusterReplicator (SpaceConfig config, BackedSpace space, FailoverStateListener failoverStateListener) throws Exception {
this.config = config;
this.space = space;
this.dataModelCopy = new SimpleSpace(config);
this.accessorCache = new AccessorCache (config);
this.failoverStateListener = failoverStateListener;
channel = config.getClusterTransportGenerator().newClusterTransport(config.getClusterName());
channel.setReceiver(this);
channel.start();
}
public FailoverState getState () {
return oldState;
}
private void receive (Record record) {
if (record.getType() == Record.Type.BEGIN) {
log.trace("Received BEGIN Record from PRIMARY");
space.rollback();
}
if (record.getType() == Record.Type.ADD) {
log.trace("Received ADD Record from PRIMARY");
Entry<?> entry = record.getAddRecord().convert(accessorCache.get(record.getAddRecord().getClassName()));
/* no conversion */
space.write(entry.getSpaceEntry());
dataModelCopy.write(entry.getSpaceEntry());
}
if (record.getType() == Record.Type.REMOVE) {
log.trace("Received REMOVE Record from PRIMARY");
RemoveRecord removeRecord = record.getRemoveRecord();
Accessor<?> accessor = accessorCache.get(removeRecord.getClassName());
Object object = accessor.getBlankObject();
space.takeById(object.getClass(), removeRecord.getKey());
dataModelCopy.takeById(object.getClass(), removeRecord.getKey());
}
if (record.getType() == Record.Type.END) {
log.trace("Received END Record from PRIMARY");
space.commit();
dataModelCopy.commit();
}
}
private void setState (FailoverState state) {
if (isRecovering) {
throw new InitializationException ("SET STATE while recovering.");
}
log.trace ("SET STATE from $* to $* isRemote-$*", oldState, state, config.isRemoteSpace());
FailoverState before = oldState;
failoverStateListener.failoverStateChangeBefore(before, state);
oldState = state;
failoverStateListener.failoverStateChangeAfter(before, state);
}
private void startRecovering () {
isRecovering = true;
log.info("Sending recovery request in state $*", getState());
if (getState() != FailoverState.INITIALIZING) {
return;
}
try {
channel.send(new RecoveryRequest());
} catch (Exception e) {
log.error("Can not send recovery request", e);
throw new InitializationException(e);
}
}
public void stopReplication () {
if (channel != null) {
channel.shutdown();
}
}
/**
* Write the following types of records:
* BEGIN, ADD, REMOVE, END. The HEADER record is automatically generated.
* This is coming in 1 single thread at a time.
* @param record is the record to write
*/
public void write (Record record) {
switch (record.getType()) {
case BEGIN:
//log.trace("Record.BEGIN");
pendingRecords.add (record);
dataModelCopy.rollback();
break;
case ADD:
pendingRecords.add (record);
{
Entry<?> entry = record.getAddRecord().convert(accessorCache.get(record.getAddRecord().getClassName()));
//log.trace("Record.ADD $*", record.getAddRecord().getKey());
dataModelCopy.write(entry.getSpaceEntry());
}
break;
case REMOVE:
pendingRecords.add (record);
{
RemoveRecord removeRecord = record.getRemoveRecord();
Class<?> clazz = null;
try {
getClass();
clazz = Class.forName(removeRecord.getClassName());
} catch (ClassNotFoundException e) {
log.error ("Can not find class $*", e, removeRecord.getClassName());
return;
}
//log.trace("Record.REMOVE $*", record.getRemoveRecord().getKey());
dataModelCopy.takeById(clazz, removeRecord.getKey());
}
break;
case END:
//log.trace("Record.END");
pendingRecords.add (record);
dataModelCopy.commit();
writePendingRecords();
break;
case HEADER:
break;
case DATAMODEL:
// Same thread is receive(Message msg)
//log.trace("Datamodel received. Size of requestors=$0", pendingRecoverList.size());
while (pendingRecoverList.size() > 0) {
NodeAddress backup = pendingRecoverList.remove(0);
sendDataModel (backup, record.getDataModelRecord());
}
break;
}
}
Object sendLock = new Object();
private void writePendingRecords () {
if (getState() != FailoverState.PRIMARY && !config.isRemoteSpace()) {
pendingRecords.clear();
return;
}
// Only begin and end.
if (pendingRecords.size() == 2) {
pendingRecords.clear();
return;
}
synchronized (sendLock) {
try {
log.trace("Writing $* pending records.", pendingRecords.size());
for (Record record : pendingRecords) {
channel.send(record);
}
sync();
pendingRecords.clear();
} catch (Exception ex) {
log.error("writePendingRecords error", ex);
pendingRecords.clear();
}
}
}
public void sync () {
if (config.isSyncCluster()) {
log.trace("Flush is not configured. Config.syncCluster");
return;
}
channel.sync();
}
private void sendDataModel (NodeAddress address, DataModelRecord record) {
ContextId contextId = new ContextId(UniqueId.getSeedPart());
Record rec = new Record (contextId, record);
log.info("Sending data model to $*", address);
/*
DataModelRecord dm = rec.getDataModelRecord();
for (AddRecord add : dm.getRecords()) {
//log.info("Converting $*", add.getClassName());
Entry<?> entry = add.convert(accessorCache.get(add
.getClassName()));
System.err.println ("XXXClusterSend: " + entry.getPrimaryKey());
}
*/
synchronized (sendLock) {
try {
channel.send(address, rec);
sync();
} catch (Exception e) {
log.error("Can not send record.", e);
}
}
}
@Override
public void receive(NodeAddress from, Serializable message) {
// TODO Auto-generated method stub
log.trace ("Received message form $* state=$* class=$*",
from, getState(), message.getClass().getName());
if (message instanceof Record) {
Record rec = (Record) message;
if (rec.getType() == Record.Type.DATAMODEL) {
log.info("Received DATAMODEL in state=$* isRecovering=$*", getState(), isRecovering);
if (getState() == FailoverState.INITIALIZING && isRecovering) {
space.rollback();
space.clear();
space.commit();
DataModelRecord dm = rec.getDataModelRecord();
log.info("Setting new datamodel with size=$*", dm.size());
for (AddRecord add : dm.getRecords()) {
//log.info("Converting $*", add.getClassName());
Entry<?> entry = add.convert(accessorCache.get(add
.getClassName()));
// System.err.println ("XXXClusterRecive: " + entry.getPrimaryKey());
space.write(entry.getSpaceEntry());
dataModelCopy.write(entry.getSpaceEntry());
}
space.commit();
dataModelCopy.commit();
isRecovering = false;
setState (FailoverState.BACKUP);
} else {
log.warn("Throwing away recovery datamodel state=$* isrecovering=$*", null, getState(), isRecovering);
}
} else if (getState() == FailoverState.BACKUP) {
receive (rec);
}
}
if (message instanceof RecoveryRequest) {
if (getState() == FailoverState.PRIMARY) {
pendingRecoverList.add(from);
log.info("Dumping model tp $*", from);
space.sendDataModelRecord();
}
}
}
@Override
public void changeState (FailoverState state) {
if (oldState == state) {
return;
}
log.trace ("Called changing state from $* to $* isRecovering=$*", oldState, state, isRecovering);
switch (oldState) {
case BACKUP:
setState (state);
break;
case INITIALIZING:
if (isRecovering && state == FailoverState.PRIMARY) {
log.error ("Changed to primary while recovering.", null);
isRecovering = false;
throw new InitializationException ("Activate while recovering.");
} else if (state == FailoverState.BACKUP && !isRecovering) {
startRecovering ();
}
if (!isRecovering) {
setState (state);
}
break;
case PRIMARY:
setState (state);
break;
default:
break;
}
}
}