JGroupsFailoverManager.java
package org.microspace.transport.specific;
import java.util.TreeMap;
import org.jgroups.Address;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.View;
import org.microspace.failover.FailoverState;
import org.microspace.util.MicroLogger;
/**
* Failover manager based upon JGroups.
*
* @author Gaspar Sinai - {@literal gaspar.sinai@microspace.org}
* @version 2016-06-26
*/
public class JGroupsFailoverManager {
final TreeMap<Address, NodeInformation> nodeInformation = new TreeMap<Address, NodeInformation> ();
final NodeInformation thisNodeInfo;
private final static MicroLogger log = new MicroLogger (JGroupsFailoverManager.class);
FailoverState state;
View view;
JChannel channel;
public JGroupsFailoverManager (JChannel channel) {
thisNodeInfo = new NodeInformation();
thisNodeInfo.setStartTime(System.currentTimeMillis());
setState (FailoverState.INITIALIZING);
this.channel = channel;
}
public void viewChanged (View view) {
log.info("ViewAccepted $*", view);
// Send out Node information
Message msg=new Message(null, thisNodeInfo);
// Out of band
msg.setFlag(Message.Flag.OOB);
log.trace("NodeInformation sent nodeInfo=$0", thisNodeInfo);
try {
channel.send(msg);
} catch (Exception e) {
log.error ("Can not send node info", e);
}
synchronized (nodeInformation) {
this.view = view;
checkState ();
}
}
public boolean handleMessage (Message message) {
Object o = message.getObject();
if (o instanceof NodeInformation) {
nodeInfomationReceived (message.getSrc(), (NodeInformation) o);
return true;
} else {
return false;
}
}
private void nodeInfomationReceived (Address address, NodeInformation nodeInfo) {
synchronized (nodeInformation) {
log.trace("NodeInformation received $0 nodeInfo=$1", address, nodeInfo);
nodeInformation.put(address, nodeInfo);
checkState ();
}
}
public FailoverState getState() {
return state;
}
public void setState(FailoverState state) {
if (this.state != state) {
log.debug ("State changed from $* to $*", this.state, state);
}
this.state = state;
}
private void checkState () {
if (view == null) return;
Address thisAddress = channel.getAddress();
Address oldestAddress = thisAddress;
NodeInformation oldestNodeInfo = thisNodeInfo;
boolean allNodeInfoPresent = true;
for (Address a : view.getMembers()) {
if (thisAddress.compareTo(a)==0) continue;
NodeInformation ni = nodeInformation.get(a);
if (ni == null) {
allNodeInfoPresent = false;
break;
}
if (ni.getStartTime() == oldestNodeInfo.getStartTime()) {
if (a.compareTo(oldestAddress) < 0) {
oldestAddress = a;
}
} else if (ni.getStartTime() < oldestNodeInfo.getStartTime()) {
oldestNodeInfo = ni;
oldestAddress = a;
}
}
if (!allNodeInfoPresent) {
log.trace("Waiting for more nodes to report");
return;
}
if (oldestAddress.compareTo(thisAddress) == 0) {
setState (FailoverState.PRIMARY);
} else {
setState (FailoverState.BACKUP);
}
}
}