JGroupsTransport.java

package org.microspace.transport.specific;

import java.io.Serializable;

import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.ReceiverAdapter;
import org.jgroups.View;
import org.microspace.exception.InitializationException;
import org.microspace.failover.FailoverState;
import org.microspace.transport.ClusterMessageReceiver;
import org.microspace.transport.ClusterTransport;
import org.microspace.transport.NodeAddress;
import org.microspace.transport.TransportException;
import org.microspace.util.MicroLogger;

/**
 * A ClusterTransport based upon JGroups.
 * 
 * @author Gaspar Sinai - {@literal gaspar.sinai@microspace.org}
 * @version 2016-06-26
 */
public class JGroupsTransport extends ReceiverAdapter implements ClusterTransport {
	
	static MicroLogger log = new MicroLogger (JGroupsTransport.class);
	
	final String clusterName;
	JChannel channel;
	JGroupsFailoverManager failoverManager;
	ClusterMessageReceiver receiver;
	
	public static final String DEFAULT_PROTOCOL_STACK = "udp.xml";
	
	public static final String PROTOCOL_STACK_PROPERY = "microspace.jgroups.protocol.file";
	
	
	public JGroupsTransport (String clusterName) {
		this.clusterName = clusterName;
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public void setReceiver(ClusterMessageReceiver receiver) {
		this.receiver = receiver;
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public void start() {
		if (receiver == null) {
			throw new InitializationException ("Transport Receiver is not set");
		}
		try {
			channel = new JChannel(System.getProperty(PROTOCOL_STACK_PROPERY, DEFAULT_PROTOCOL_STACK));
			channel.setDiscardOwnMessages(true);
			failoverManager = new JGroupsFailoverManager(channel);
			channel.setReceiver(this);
			channel.connect(clusterName);
		} catch (Exception e) {
			throw new InitializationException ("Can not initialize JChannel", e);
		}
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public void send(Serializable message) throws TransportException {
		Message jmsg = new Message(null, message);
		try {
			channel.send(jmsg);
		} catch (Exception e) {
			throw new TransportException()
;		}
		
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public void send(NodeAddress address, Serializable message) throws TransportException {
		JGroupsNodeAddress addr = (JGroupsNodeAddress) address;
		try {
			channel.send(addr.getDelegate(), message);
		} catch (Exception e) {
			throw new TransportException()
;		}
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public void sync() {
		if (!channel.flushSupported()) {
			log.warn ("Flush is not supported.", null);
			return;
		}
		
		boolean retry = failoverManager.getState()==FailoverState.PRIMARY;
		while (retry) {
			try {
				channel.startFlush(true);
				retry = false;
			} catch (Exception e) {
				retry = failoverManager.getState()==FailoverState.PRIMARY;
				log.error("Flush retry $*", e, retry);
			}
			if (!retry)  break;
			try {
				Thread.sleep(1000L);
			} catch (InterruptedException e) {
				log.warn("Sync failed, retry in 1 seconds", e);
			}
		}
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public void shutdown() {
		channel.close();
	}
	
	@Override
	public void viewAccepted(View view) {
		FailoverState state = failoverManager.getState();
		failoverManager.viewChanged(view);
		if (state != failoverManager.getState()) {
			log.debug("Sending state change $*", failoverManager.getState());
			receiver.changeState(failoverManager.getState());
		}
	}
	
	@Override
	public void receive(Message msg) {
		FailoverState state = failoverManager.getState();
		if (failoverManager.handleMessage(msg)) {
			if (state != failoverManager.getState()) {
				log.debug("Sending state change $*", failoverManager.getState());
				receiver.changeState(failoverManager.getState());
			}
			return;
		}
		Serializable payload = (Serializable) msg.getObject();
		receiver.receive(new JGroupsNodeAddress(msg.getSrc()), payload);
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public FailoverState getFailoverState() {
		return failoverManager.getState();
	}
	
}