TakeManager.java
package org.microspace.event;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.microspace.annotation.PartitionId;
import org.microspace.annotation.ThreadId;
import org.microspace.exception.BackupSpaceException;
import org.microspace.exception.IllegalOperationException;
import org.microspace.space.MicroSpace;
import org.microspace.table.column.Accessor;
import org.microspace.table.query.TableQuery;
import org.microspace.table.query.TemplateQuery;
import org.microspace.util.AccessorCache;
import org.microspace.util.MicroLogger;
/**
* TakeHandler registers take Queries and Templates and hands them over to the
* takers {@link TakeHandler TakeHandler}.
* <p>
* A ThreadPool is used to dispatch these Object taken from the space. The taken
* Objects must have an annotated {@link ThreadId ThreadId} field that
* synchronizes threads across all takes, even across Tables. The ThreadId for
* taken messages can never be null.
* <p>
* The {@link PartitionId PartitionId} has similar functionality than
* {@link ThreadId ThreadId} but it can be also null to designate current
* partition.
*
* @param <T>
* is the type of the {@link ThreadId ThreadId} and
* {@link PartitionId PartitionId} field - they should match).
* @author gaspar.sinai at microspace.org
* @version 2016-07-22
*/
public class TakeManager<T> {
final MicroSpace space;
final TakeScheduler<T> messageProcesor;
boolean started = false;
/*
* final HashMap<ConsumerId, Consumer<? extends MicroMessage>> consumers =
* new HashMap<ConsumerId, Consumer<? extends MicroMessage>>();
*
* final ArrayList<TakeInterest<? extends MicroMessage>> interests = new
* ArrayList<TakeInterest<? extends MicroMessage>>(); final
* ArrayList<Thread> interestsThreads = new ArrayList<Thread>();
*/
private static final MicroLogger log = new MicroLogger(TakeManager.class);
final ArrayList<TakeInterest<T, ?>> interests = new ArrayList<TakeInterest<T, ?>>();
final ArrayList<InterestPoller<?>> interestsThreads = new ArrayList<>();
CountDownLatch countDownLatch;
final AccessorCache accessorCache;
final Class<T> threadIdType;
private static AtomicInteger instance = new AtomicInteger();
private final int thisInstance;
/**
* Create a take manager.
*
* @param space
* is the underlying Space.
* @param threadIdType
* is the ThreadId class that will be shared between all taken
* objects.
* @param threadPoolSize
* is the size of the worker threads.
*/
public TakeManager(MicroSpace space, Class<T> threadIdType,
int threadPoolSize) {
this.space = space;
this.threadIdType = threadIdType;
this.accessorCache = new AccessorCache(space.getAccessorGenerator());
this.messageProcesor = new TakeScheduler<T>(space, threadPoolSize);
this.thisInstance = instance.incrementAndGet();
}
/**
* Create a take manager.
*
* @param space
* is the underlying Space.
* @param threadIdType
* is the ThreadId class that will be shared between all taken
* objects.
*/
public TakeManager(MicroSpace space, Class<T> threadIdType) {
this.space = space;
this.threadIdType = threadIdType;
this.accessorCache = new AccessorCache(space.getAccessorGenerator());
this.messageProcesor = new TakeScheduler<T>(space);
this.thisInstance = instance.incrementAndGet();
}
/**
* This method can be called when {link
* {@link TakeHandler#handleTake(Object)} handleTake} is called in
* TakeHandler.
*
* @return the current ThreadId.
*/
public T getCurrentThreadId() {
return messageProcesor.getCurrentThreadId();
}
/**
* Register a new take handler using a Template object.
* <p>
* All registrations should occur before this TakeManager starts.
*
* @param handler
* is the take handler.
* @param templ
* is the template that should match.
* @param <M>
* is the Table class
*/
public synchronized <M> void register(TakeHandler<M> handler, M templ) {
register(handler, new TemplateQuery<M>(templ));
}
/**
* Register a new take handler using a TableQuery.
* <p>
* All registrations should occur before this TakeManager starts.
*
* @param handler
* is the take handler.
* @param query
* is the template that should match.
* @param <M>
* is the Table class
*/
public synchronized <M> void register(TakeHandler<M> handler,
TableQuery<M> query) {
if (started) {
throw new IllegalOperationException("Service already started");
}
Accessor<M> accessor = accessorCache.get(query.getTableClass());
if (accessor.getThreadIdGetSetPair() == null) {
throw new IllegalArgumentException(
"ThreadId notation is not found in "
+ query.getTableClass().getName());
}
if (threadIdType != accessor.getThreadIdGetSetPair().getReturnType()) {
log.error("$* ThreadId type is expected in $*, got $*", null,
threadIdType, query.getTableClass(), accessor
.getThreadIdGetSetPair().getReturnType());
throw new IllegalArgumentException("ThreadId type mismatch in "
+ query.getTableClass().getName());
}
TakeInterest<T, M> ti = new TakeInterest<T, M>(accessor, query, handler);
interests.add(ti);
interestsThreads
.add(new InterestPoller<M>(ti, interestsThreads.size()+1));
space.registerMessageQueue(query);
}
/**
* Start the TakeHandler.
*
* Allocate a Thread for each takes, and redistribute them using a
* ThreadPool. The ThreadPool will not execute simultaneously the takes with
* the same ThreadId.
*/
public void start() {
if (started) {
throw new IllegalOperationException("Service already started");
}
countDownLatch = new CountDownLatch(interestsThreads.size());
messageProcesor.start();
started = true;
for (InterestPoller<?> ip : interestsThreads) {
ip.start();
}
}
/**
* Remove all the Threads and shut down this take manager.
*/
@Deprecated
public void forceShutdown() {
if (!started)
return;
started = false;
messageProcesor.forceShutdown();
for (Thread ip : interestsThreads) {
ip.interrupt();
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* Remove all the Threads and shut down this take manager.
*/
public void shutdown() {
if (!started)
return;
started = false;
messageProcesor.shutdown();
for (Thread ip : interestsThreads) {
ip.interrupt();
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* Remove processed messages from queue. If ThreadId is not specified, or
* does not match do not remove.
*
* @param messages
* The messages to be removed by their Id if ThreaId matches.
* @param <M> The message type.
*/
public <M> void removeUnassignedTasksByIdOf(List<M> messages) {
messageProcesor.removeUnassignedTasksByIdOf(messages);
}
/**
* Obtain the tasks size that are assigned to a thread and being executed.
*
* @return Assigned set size.
*/
public int getAssignedSize() {
return messageProcesor.getAssignedSize();
}
/**
* Obtain the tasks size that have not been assigned yet to a thread.
*
* @return Assigned set size.
*/
public int getUnassignedSize() {
return messageProcesor.getUnassignedSize();
}
/**
* Obtain the tasks size that are assigned to a thread and being executed.
*
* @return All the tasks that are being executed.
*/
public List<TakeTask<T, ?>> getAssignedTasks() {
return messageProcesor.getAssignedTasks();
}
/**
* Obtain the tasks size that are pending for this thread.
* @param threadId ThreadId
* @return All the tasks that are pending.
*/
public List<TakeTask<T, ?>> getUnassignedTasksForThread(T threadId) {
return messageProcesor.getUnassignedTasksForThread(threadId);
}
/**
* Obtain the tasks that have not been assigned yet to a thread.
*
* @return All the tasks that has not been aasigned.
*/
public List<TakeTask<T, ?>> getUnassignedTasks() {
return messageProcesor.getUnassignedTasks();
}
/**
* Obtain the locked threads.
* @return The locked threads.
*/
public Set<T> getLockedThreads() {
return messageProcesor.getLockedThreads();
}
/**
* Lock execution of a thread.
* @param thread The thread to lock.
*/
public void lockThread (T thread) {
messageProcesor.lockThread(thread);
}
/**
* Unlock execution of a thread.
* @param thread The thread.
*/
public void unlockThread (T thread) {
messageProcesor.unlockThread(thread);
}
/**
* Suspend message processing.
* <p>
* Suspend task assignments, and wait for all threads to complete.
* <p>
* When called within TakeManager thread it will be excluded.
* <p>
* @param timeoutMilliseconds is the timeout to wait for all threads.
*/
public void suspendMessaging (long timeoutMilliseconds) {
messageProcesor.suspendMessaging(timeoutMilliseconds);
}
/**
* Unsuspend message processing.
*/
public void unsuspendMessaging () {
messageProcesor.unsuspendMessaging();
}
/**
* An event based blocking take. Each takes will have their own thread.
*
* @param <M>
* is the Table type.
*/
class InterestPoller<M> extends Thread {
TakeInterest<T, M> interest;
public InterestPoller(TakeInterest<T, M> interest, int threadIndex) {
super("takemanager-" + thisInstance + "-poller-" + threadIndex);
this.interest = interest;
}
public TakeInterest <T,M>getInterest() {
return interest;
}
public String getQueryId () {
return interest.getQuery().getId();
}
@Override
public void run() {
while (started) {
try {
// log.info("Take: in");
M message = null;
if (space.isInterrupted()) {
log.info("Interruped before take: $*", interest.getQuery());
break;
}
try {
message = space.getNextMessage(interest.getQuery(),
Long.MAX_VALUE);
} catch (BackupSpaceException ex) {
log.warn("BackupSpaceException: $*", ex, interest.getQuery());
continue;
}
if (space.isInterrupted()) {
log.info("Interruped after take: $*", interest.getQuery());
break;
}
// Thread interrupted not always work in certain
// implementations.
if (message == null) {
log.trace("Take returned null started=$* $*", started, interest.getQuery());
if (!started) { // Thread shutdown.
log.error("Take returned null, not started $*", null, interest.getQuery());
break;
}
log.warn("Take returned null $*", null, interest.getQuery());
continue;
}
boolean success = messageProcesor.schedule(interest,
message);
if (!success) {
log.error(
"Intended subscriber: $* message: $* not delivered",
null, interest, message);
}
} catch (Throwable e) {
log.error("InterestPoller started=$* $*", e, started, interest.getQuery());
}
}
log.info("Worker thread $* exited started=$* $*", getName(), started, interest.getQuery());
space.unregisterMessageQueue(interest.getQuery());
countDownLatch.countDown();
}
}
}