JGroupsTransport.java
package org.microspace.transport.specific;
import java.io.Serializable;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.ReceiverAdapter;
import org.jgroups.View;
import org.microspace.exception.InitializationException;
import org.microspace.failover.FailoverState;
import org.microspace.transport.ClusterMessageReceiver;
import org.microspace.transport.ClusterTransport;
import org.microspace.transport.NodeAddress;
import org.microspace.transport.TransportException;
import org.microspace.util.MicroLogger;
/**
* A ClusterTransport based upon JGroups.
*
* @author Gaspar Sinai - {@literal gaspar.sinai@microspace.org}
* @version 2016-06-26
*/
public class JGroupsTransport extends ReceiverAdapter implements ClusterTransport {
static MicroLogger log = new MicroLogger (JGroupsTransport.class);
final String clusterName;
JChannel channel;
JGroupsFailoverManager failoverManager;
ClusterMessageReceiver receiver;
public static final String DEFAULT_PROTOCOL_STACK = "udp.xml";
public static final String PROTOCOL_STACK_PROPERY = "microspace.jgroups.protocol.file";
public JGroupsTransport (String clusterName) {
this.clusterName = clusterName;
}
/**
* {@inheritDoc}
*/
@Override
public void setReceiver(ClusterMessageReceiver receiver) {
this.receiver = receiver;
}
/**
* {@inheritDoc}
*/
@Override
public void start() {
if (receiver == null) {
throw new InitializationException ("Transport Receiver is not set");
}
try {
channel = new JChannel(System.getProperty(PROTOCOL_STACK_PROPERY, DEFAULT_PROTOCOL_STACK));
channel.setDiscardOwnMessages(true);
failoverManager = new JGroupsFailoverManager(channel);
channel.setReceiver(this);
channel.connect(clusterName);
} catch (Exception e) {
throw new InitializationException ("Can not initialize JChannel", e);
}
}
/**
* {@inheritDoc}
*/
@Override
public void send(Serializable message) throws TransportException {
Message jmsg = new Message(null, message);
try {
channel.send(jmsg);
} catch (Exception e) {
throw new TransportException()
; }
}
/**
* {@inheritDoc}
*/
@Override
public void send(NodeAddress address, Serializable message) throws TransportException {
JGroupsNodeAddress addr = (JGroupsNodeAddress) address;
try {
channel.send(addr.getDelegate(), message);
} catch (Exception e) {
throw new TransportException()
; }
}
/**
* {@inheritDoc}
*/
@Override
public void sync() {
if (!channel.flushSupported()) {
log.warn ("Flush is not supported.", null);
return;
}
boolean retry = failoverManager.getState()==FailoverState.PRIMARY;
while (retry) {
try {
channel.startFlush(true);
retry = false;
} catch (Exception e) {
retry = failoverManager.getState()==FailoverState.PRIMARY;
log.error("Flush retry $*", e, retry);
}
if (!retry) break;
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
log.warn("Sync failed, retry in 1 seconds", e);
}
}
}
/**
* {@inheritDoc}
*/
@Override
public void shutdown() {
channel.close();
}
@Override
public void viewAccepted(View view) {
FailoverState state = failoverManager.getState();
failoverManager.viewChanged(view);
if (state != failoverManager.getState()) {
log.debug("Sending state change $*", failoverManager.getState());
receiver.changeState(failoverManager.getState());
}
}
@Override
public void receive(Message msg) {
FailoverState state = failoverManager.getState();
if (failoverManager.handleMessage(msg)) {
if (state != failoverManager.getState()) {
log.debug("Sending state change $*", failoverManager.getState());
receiver.changeState(failoverManager.getState());
}
return;
}
Serializable payload = (Serializable) msg.getObject();
receiver.receive(new JGroupsNodeAddress(msg.getSrc()), payload);
}
/**
* {@inheritDoc}
*/
@Override
public FailoverState getFailoverState() {
return failoverManager.getState();
}
}