TakeManager.java
- package org.microspace.event;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.Set;
- import java.util.concurrent.CountDownLatch;
- import java.util.concurrent.atomic.AtomicInteger;
- import org.microspace.annotation.PartitionId;
- import org.microspace.annotation.ThreadId;
- import org.microspace.exception.BackupSpaceException;
- import org.microspace.exception.IllegalOperationException;
- import org.microspace.space.MicroSpace;
- import org.microspace.table.column.Accessor;
- import org.microspace.table.query.TableQuery;
- import org.microspace.table.query.TemplateQuery;
- import org.microspace.util.AccessorCache;
- import org.microspace.util.MicroLogger;
- /**
- * TakeHandler registers take Queries and Templates and hands them over to the
- * takers {@link TakeHandler TakeHandler}.
- * <p>
- * A ThreadPool is used to dispatch these Object taken from the space. The taken
- * Objects must have an annotated {@link ThreadId ThreadId} field that
- * synchronizes threads across all takes, even across Tables. The ThreadId for
- * taken messages can never be null.
- * <p>
- * The {@link PartitionId PartitionId} has similar functionality than
- * {@link ThreadId ThreadId} but it can be also null to designate current
- * partition.
- *
- * @param <T>
- * is the type of the {@link ThreadId ThreadId} and
- * {@link PartitionId PartitionId} field - they should match).
- * @author gaspar.sinai at microspace.org
- * @version 2016-07-22
- */
- public class TakeManager<T> {
- final MicroSpace space;
- final TakeScheduler<T> messageProcesor;
- boolean started = false;
- /*
- * final HashMap<ConsumerId, Consumer<? extends MicroMessage>> consumers =
- * new HashMap<ConsumerId, Consumer<? extends MicroMessage>>();
- *
- * final ArrayList<TakeInterest<? extends MicroMessage>> interests = new
- * ArrayList<TakeInterest<? extends MicroMessage>>(); final
- * ArrayList<Thread> interestsThreads = new ArrayList<Thread>();
- */
- private static final MicroLogger log = new MicroLogger(TakeManager.class);
- final ArrayList<TakeInterest<T, ?>> interests = new ArrayList<TakeInterest<T, ?>>();
- final ArrayList<InterestPoller<?>> interestsThreads = new ArrayList<>();
- CountDownLatch countDownLatch;
- final AccessorCache accessorCache;
- final Class<T> threadIdType;
- private static AtomicInteger instance = new AtomicInteger();
- private final int thisInstance;
-
- /**
- * Create a take manager.
- *
- * @param space
- * is the underlying Space.
- * @param threadIdType
- * is the ThreadId class that will be shared between all taken
- * objects.
- * @param threadPoolSize
- * is the size of the worker threads.
- */
- public TakeManager(MicroSpace space, Class<T> threadIdType,
- int threadPoolSize) {
- this.space = space;
- this.threadIdType = threadIdType;
- this.accessorCache = new AccessorCache(space.getAccessorGenerator());
- this.messageProcesor = new TakeScheduler<T>(space, threadPoolSize);
- this.thisInstance = instance.incrementAndGet();
- }
- /**
- * Create a take manager.
- *
- * @param space
- * is the underlying Space.
- * @param threadIdType
- * is the ThreadId class that will be shared between all taken
- * objects.
- */
- public TakeManager(MicroSpace space, Class<T> threadIdType) {
- this.space = space;
- this.threadIdType = threadIdType;
- this.accessorCache = new AccessorCache(space.getAccessorGenerator());
- this.messageProcesor = new TakeScheduler<T>(space);
- this.thisInstance = instance.incrementAndGet();
- }
- /**
- * This method can be called when {link
- * {@link TakeHandler#handleTake(Object)} handleTake} is called in
- * TakeHandler.
- *
- * @return the current ThreadId.
- */
- public T getCurrentThreadId() {
- return messageProcesor.getCurrentThreadId();
- }
- /**
- * Register a new take handler using a Template object.
- * <p>
- * All registrations should occur before this TakeManager starts.
- *
- * @param handler
- * is the take handler.
- * @param templ
- * is the template that should match.
- * @param <M>
- * is the Table class
- */
- public synchronized <M> void register(TakeHandler<M> handler, M templ) {
- register(handler, new TemplateQuery<M>(templ));
- }
- /**
- * Register a new take handler using a TableQuery.
- * <p>
- * All registrations should occur before this TakeManager starts.
- *
- * @param handler
- * is the take handler.
- * @param query
- * is the template that should match.
- * @param <M>
- * is the Table class
- */
- public synchronized <M> void register(TakeHandler<M> handler,
- TableQuery<M> query) {
- if (started) {
- throw new IllegalOperationException("Service already started");
- }
- Accessor<M> accessor = accessorCache.get(query.getTableClass());
- if (accessor.getThreadIdGetSetPair() == null) {
- throw new IllegalArgumentException(
- "ThreadId notation is not found in "
- + query.getTableClass().getName());
- }
- if (threadIdType != accessor.getThreadIdGetSetPair().getReturnType()) {
- log.error("$* ThreadId type is expected in $*, got $*", null,
- threadIdType, query.getTableClass(), accessor
- .getThreadIdGetSetPair().getReturnType());
- throw new IllegalArgumentException("ThreadId type mismatch in "
- + query.getTableClass().getName());
- }
- TakeInterest<T, M> ti = new TakeInterest<T, M>(accessor, query, handler);
- interests.add(ti);
- interestsThreads
- .add(new InterestPoller<M>(ti, interestsThreads.size()+1));
- space.registerMessageQueue(query);
- }
- /**
- * Start the TakeHandler.
- *
- * Allocate a Thread for each takes, and redistribute them using a
- * ThreadPool. The ThreadPool will not execute simultaneously the takes with
- * the same ThreadId.
- */
- public void start() {
- if (started) {
- throw new IllegalOperationException("Service already started");
- }
- countDownLatch = new CountDownLatch(interestsThreads.size());
- messageProcesor.start();
- started = true;
- for (InterestPoller<?> ip : interestsThreads) {
- ip.start();
- }
- }
- /**
- * Remove all the Threads and shut down this take manager.
- */
- @Deprecated
- public void forceShutdown() {
- if (!started)
- return;
- started = false;
- messageProcesor.forceShutdown();
- for (Thread ip : interestsThreads) {
- ip.interrupt();
- }
- try {
- countDownLatch.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-
- /**
- * Remove all the Threads and shut down this take manager.
- */
- public void shutdown() {
- if (!started)
- return;
- started = false;
- messageProcesor.shutdown();
- for (Thread ip : interestsThreads) {
- ip.interrupt();
- }
- try {
- countDownLatch.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- /**
- * Remove processed messages from queue. If ThreadId is not specified, or
- * does not match do not remove.
- *
- * @param messages
- * The messages to be removed by their Id if ThreaId matches.
- * @param <M> The message type.
- */
- public <M> void removeUnassignedTasksByIdOf(List<M> messages) {
- messageProcesor.removeUnassignedTasksByIdOf(messages);
- }
- /**
- * Obtain the tasks size that are assigned to a thread and being executed.
- *
- * @return Assigned set size.
- */
- public int getAssignedSize() {
- return messageProcesor.getAssignedSize();
- }
- /**
- * Obtain the tasks size that have not been assigned yet to a thread.
- *
- * @return Assigned set size.
- */
- public int getUnassignedSize() {
- return messageProcesor.getUnassignedSize();
- }
- /**
- * Obtain the tasks size that are assigned to a thread and being executed.
- *
- * @return All the tasks that are being executed.
- */
- public List<TakeTask<T, ?>> getAssignedTasks() {
- return messageProcesor.getAssignedTasks();
- }
-
- /**
- * Obtain the tasks size that are pending for this thread.
- * @param threadId ThreadId
- * @return All the tasks that are pending.
- */
- public List<TakeTask<T, ?>> getUnassignedTasksForThread(T threadId) {
- return messageProcesor.getUnassignedTasksForThread(threadId);
- }
-
-
- /**
- * Obtain the tasks that have not been assigned yet to a thread.
- *
- * @return All the tasks that has not been aasigned.
- */
- public List<TakeTask<T, ?>> getUnassignedTasks() {
- return messageProcesor.getUnassignedTasks();
- }
-
- /**
- * Obtain the locked threads.
- * @return The locked threads.
- */
- public Set<T> getLockedThreads() {
- return messageProcesor.getLockedThreads();
- }
-
- /**
- * Lock execution of a thread.
- * @param thread The thread to lock.
- */
- public void lockThread (T thread) {
- messageProcesor.lockThread(thread);
- }
-
- /**
- * Unlock execution of a thread.
- * @param thread The thread.
- */
- public void unlockThread (T thread) {
- messageProcesor.unlockThread(thread);
- }
-
- /**
- * Suspend message processing.
- * <p>
- * Suspend task assignments, and wait for all threads to complete.
- * <p>
- * When called within TakeManager thread it will be excluded.
- * <p>
- * @param timeoutMilliseconds is the timeout to wait for all threads.
- */
- public void suspendMessaging (long timeoutMilliseconds) {
- messageProcesor.suspendMessaging(timeoutMilliseconds);
- }
-
- /**
- * Unsuspend message processing.
- */
- public void unsuspendMessaging () {
- messageProcesor.unsuspendMessaging();
- }
- /**
- * An event based blocking take. Each takes will have their own thread.
- *
- * @param <M>
- * is the Table type.
- */
- class InterestPoller<M> extends Thread {
- TakeInterest<T, M> interest;
- public InterestPoller(TakeInterest<T, M> interest, int threadIndex) {
- super("takemanager-" + thisInstance + "-poller-" + threadIndex);
- this.interest = interest;
- }
-
- public TakeInterest <T,M>getInterest() {
- return interest;
- }
-
- public String getQueryId () {
- return interest.getQuery().getId();
- }
- @Override
- public void run() {
- while (started) {
- try {
- // log.info("Take: in");
- M message = null;
- if (space.isInterrupted()) {
- log.info("Interruped before take: $*", interest.getQuery());
- break;
- }
- try {
- message = space.getNextMessage(interest.getQuery(),
- Long.MAX_VALUE);
- } catch (BackupSpaceException ex) {
- log.warn("BackupSpaceException: $*", ex, interest.getQuery());
- continue;
- }
- if (space.isInterrupted()) {
- log.info("Interruped after take: $*", interest.getQuery());
- break;
- }
- // Thread interrupted not always work in certain
- // implementations.
- if (message == null) {
- log.trace("Take returned null started=$* $*", started, interest.getQuery());
- if (!started) { // Thread shutdown.
- log.error("Take returned null, not started $*", null, interest.getQuery());
- break;
- }
- log.warn("Take returned null $*", null, interest.getQuery());
- continue;
- }
- boolean success = messageProcesor.schedule(interest,
- message);
- if (!success) {
- log.error(
- "Intended subscriber: $* message: $* not delivered",
- null, interest, message);
- }
- } catch (Throwable e) {
- log.error("InterestPoller started=$* $*", e, started, interest.getQuery());
- }
- }
- log.info("Worker thread $* exited started=$* $*", getName(), started, interest.getQuery());
- space.unregisterMessageQueue(interest.getQuery());
- countDownLatch.countDown();
- }
- }
- }