JvmTransport.java
package org.microspace.transport.specific;
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicInteger;
import org.microspace.failover.FailoverState;
import org.microspace.transport.ClusterMessageReceiver;
import org.microspace.transport.ClusterTransport;
import org.microspace.transport.NodeAddress;
import org.microspace.transport.TransportException;
import org.microspace.util.MicroLogger;
/**
* Jvm based Transport.
*
* @author Gaspar Sinai - {@literal gaspar.sinai@microspace.org}
* @version 2016-06-26
*/
public class JvmTransport implements ClusterTransport {
static MicroLogger log = new MicroLogger (JvmTransport.class);
final static AtomicInteger currentId = new AtomicInteger();
final JvmCluster cluster;
final Integer id;
ClusterMessageReceiver receiver;
FailoverState state = FailoverState.INITIALIZING;
boolean down = true;
public JvmTransport(JvmCluster cluster) {
this.cluster = cluster;
this.id = currentId.incrementAndGet();
log.info("JvmTransport $* has been created", id);
}
public boolean getDown () {
return down;
}
public Integer getId() {
return id;
}
public void failoverStateChanged (FailoverState newState) {
FailoverState oldState = state;
state = newState;
if (!down) {
log.info("failoverStateChanged id=$* $* -> $*", getId(), oldState, state);
if (oldState != state) {
receiver.changeState(newState);
}
}
}
public void distribute (int from, Serializable msg) {
receiver.receive(new JvmAddress(from), msg);
}
/**
* {@inheritDoc}
*/
@Override
public void setReceiver(ClusterMessageReceiver receiver) {
this.receiver = receiver;
}
/**
* {@inheritDoc}
*/
@Override
public void start() {
this.down = false;
cluster.add(this);
log.info("JvmTransport $* has been added", id);
}
/**
* {@inheritDoc}
*/
@Override
public void send(Serializable message) throws TransportException {
cluster.distribute(getId(), message, null);
}
/**
* {@inheritDoc}
*/
@Override
public void send(NodeAddress address, Serializable message)
throws TransportException {
JvmAddress addr = (JvmAddress) address;
cluster.distribute(getId(), message, addr.getDelegate());
}
/**
* {@inheritDoc}
*/
@Override
public void sync() {
// TODO Auto-generated method stub
}
/**
* {@inheritDoc}
*/
@Override
public void shutdown() {
this.down = true;
cluster.remove(this);
}
/**
* {@inheritDoc}
*/
@Override
public FailoverState getFailoverState() {
return state;
}
}