JvmTransport.java

package org.microspace.transport.specific;

import java.io.Serializable;
import java.util.concurrent.atomic.AtomicInteger;

import org.microspace.failover.FailoverState;
import org.microspace.transport.ClusterMessageReceiver;
import org.microspace.transport.ClusterTransport;
import org.microspace.transport.NodeAddress;
import org.microspace.transport.TransportException;
import org.microspace.util.MicroLogger;

/**
 * Jvm based Transport.
 * 
 * @author Gaspar Sinai - {@literal gaspar.sinai@microspace.org}
 * @version 2016-06-26
 */
public class JvmTransport implements ClusterTransport {

	static MicroLogger log = new MicroLogger (JvmTransport.class);
	
	final static AtomicInteger currentId = new AtomicInteger();

	
	final JvmCluster cluster;
	final Integer id;
	ClusterMessageReceiver receiver;
	FailoverState state = FailoverState.INITIALIZING;
	boolean down = true;

	public JvmTransport(JvmCluster cluster) {
		this.cluster = cluster;
		this.id = currentId.incrementAndGet();
		log.info("JvmTransport $* has been created", id);
	}
	
	public boolean getDown () {
		return down;
	}
	
	public Integer getId() {
		return id;
	}
	
	public void failoverStateChanged (FailoverState newState) {
		FailoverState oldState = state;
		state = newState;
		if (!down) {
			log.info("failoverStateChanged id=$* $* -> $*", getId(), oldState, state);
			if (oldState != state) {
				receiver.changeState(newState);
			}
		}
	}
	
	public void distribute (int from, Serializable msg) {
		receiver.receive(new JvmAddress(from), msg);
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public void setReceiver(ClusterMessageReceiver receiver) {
		this.receiver = receiver;
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public void start() {
		this.down = false;
		cluster.add(this);
		log.info("JvmTransport $* has been added", id);
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public void send(Serializable message) throws TransportException {
		cluster.distribute(getId(), message, null);
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public void send(NodeAddress address, Serializable message)
			throws TransportException {
		JvmAddress addr = (JvmAddress) address;
		cluster.distribute(getId(), message, addr.getDelegate());
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public void sync() {
		// TODO Auto-generated method stub
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public void shutdown() {
		this.down = true;
		cluster.remove(this);
		
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public FailoverState getFailoverState() {
		return state;
	}
}