JGroupsFailoverManager.java

package org.microspace.transport.specific;

import java.util.TreeMap;

import org.jgroups.Address;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.View;
import org.microspace.failover.FailoverState;
import org.microspace.util.MicroLogger;

/**
 * Failover manager based upon JGroups.
 * 
 * @author Gaspar Sinai - {@literal gaspar.sinai@microspace.org}
 * @version 2016-06-26
 */
public class JGroupsFailoverManager {
	
	final TreeMap<Address, NodeInformation> nodeInformation = new TreeMap<Address, NodeInformation> ();
	final NodeInformation thisNodeInfo;
	
	private final static MicroLogger log = new MicroLogger (JGroupsFailoverManager.class);
	
	FailoverState state;
	View view;
	JChannel channel;
	
	public JGroupsFailoverManager (JChannel channel) {
		thisNodeInfo = new NodeInformation();
		thisNodeInfo.setStartTime(System.currentTimeMillis());
		setState (FailoverState.INITIALIZING);
		this.channel = channel;
	}
	
	public void viewChanged (View view) {
		log.info("ViewAccepted $*", view);
		// Send out Node information
		Message msg=new Message(null, thisNodeInfo);
		// Out of band
		msg.setFlag(Message.Flag.OOB);
		log.trace("NodeInformation sent nodeInfo=$0", thisNodeInfo);
		try {
			channel.send(msg);
		} catch (Exception e) {
			log.error ("Can not send node info", e);
		}
		synchronized (nodeInformation) {
			this.view = view;
			checkState ();
		}
	} 
	
	public boolean handleMessage (Message message) {
		Object o = message.getObject();
		if (o instanceof NodeInformation) {
			nodeInfomationReceived (message.getSrc(), (NodeInformation) o);
			return true;
		} else {
			return false;
		}
	}
	
	
	private void nodeInfomationReceived (Address address, NodeInformation nodeInfo) {
		synchronized (nodeInformation) {
			log.trace("NodeInformation received $0 nodeInfo=$1", address, nodeInfo);
			
			nodeInformation.put(address, nodeInfo);
			checkState ();
		}
	}
	
	public FailoverState getState() {
		return state;
	}

	public void setState(FailoverState state) {
		if (this.state != state) {
			log.debug ("State changed from $* to $*", this.state, state);
		}
		this.state = state;
	}

	private void checkState () {
		if (view == null) return;
		Address thisAddress = channel.getAddress();
		Address oldestAddress = thisAddress;
		NodeInformation oldestNodeInfo = thisNodeInfo;
		boolean allNodeInfoPresent = true;
		
		for (Address a : view.getMembers()) {
			if (thisAddress.compareTo(a)==0) continue;
			NodeInformation ni = nodeInformation.get(a);
			if (ni == null) {
				allNodeInfoPresent = false;
				break;
			}
			if (ni.getStartTime() == oldestNodeInfo.getStartTime()) {
				if (a.compareTo(oldestAddress) < 0) {
					oldestAddress = a;
				}
				
			} else if (ni.getStartTime() < oldestNodeInfo.getStartTime()) {
				oldestNodeInfo = ni;
				oldestAddress = a;
			} 
		}
		
		if (!allNodeInfoPresent) {
			log.trace("Waiting for more nodes to report");
			return;
		}
		
		if (oldestAddress.compareTo(thisAddress) == 0) {
			setState (FailoverState.PRIMARY);
		} else {
			setState (FailoverState.BACKUP);
		}
	}
}