TakeManager.java

package org.microspace.event;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

import org.microspace.annotation.PartitionId;
import org.microspace.annotation.ThreadId;
import org.microspace.exception.BackupSpaceException;
import org.microspace.exception.IllegalOperationException;
import org.microspace.space.MicroSpace;
import org.microspace.table.column.Accessor;
import org.microspace.table.query.TableQuery;
import org.microspace.table.query.TemplateQuery;
import org.microspace.util.AccessorCache;
import org.microspace.util.MicroLogger;

/**
 * TakeHandler registers take Queries and Templates and hands them over to the
 * takers {@link TakeHandler TakeHandler}.
 * <p>
 * A ThreadPool is used to dispatch these Object taken from the space. The taken
 * Objects must have an annotated {@link ThreadId ThreadId} field that
 * synchronizes threads across all takes, even across Tables. The ThreadId for
 * taken messages can never be null.
 * <p>
 * The {@link PartitionId PartitionId} has similar functionality than
 * {@link ThreadId ThreadId} but it can be also null to designate current
 * partition.
 * 
 * @param <T>
 *            is the type of the {@link ThreadId ThreadId} and
 *            {@link PartitionId PartitionId} field - they should match).
 * @author gaspar.sinai at microspace.org
 * @version 2016-07-22
 */
public class TakeManager<T> {

	final MicroSpace space;
	final TakeScheduler<T> messageProcesor;

	boolean started = false;

	/*
	 * final HashMap<ConsumerId, Consumer<? extends MicroMessage>> consumers =
	 * new HashMap<ConsumerId, Consumer<? extends MicroMessage>>();
	 * 
	 * final ArrayList<TakeInterest<? extends MicroMessage>> interests = new
	 * ArrayList<TakeInterest<? extends MicroMessage>>(); final
	 * ArrayList<Thread> interestsThreads = new ArrayList<Thread>();
	 */
	private static final MicroLogger log = new MicroLogger(TakeManager.class);
	final ArrayList<TakeInterest<T, ?>> interests = new ArrayList<TakeInterest<T, ?>>();
	final ArrayList<InterestPoller<?>> interestsThreads = new ArrayList<>();

	CountDownLatch countDownLatch;

	final AccessorCache accessorCache;
	final Class<T> threadIdType;
	private static AtomicInteger instance = new AtomicInteger();
	private final int thisInstance;
	
	/**
	 * Create a take manager.
	 * 
	 * @param space
	 *            is the underlying Space.
	 * @param threadIdType
	 *            is the ThreadId class that will be shared between all taken
	 *            objects.
	 * @param threadPoolSize
	 *            is the size of the worker threads.
	 */
	public TakeManager(MicroSpace space, Class<T> threadIdType,
			int threadPoolSize) {
		this.space = space;
		this.threadIdType = threadIdType;
		this.accessorCache = new AccessorCache(space.getAccessorGenerator());
		this.messageProcesor = new TakeScheduler<T>(space, threadPoolSize);
		this.thisInstance = instance.incrementAndGet();

	}

	/**
	 * Create a take manager.
	 * 
	 * @param space
	 *            is the underlying Space.
	 * @param threadIdType
	 *            is the ThreadId class that will be shared between all taken
	 *            objects.
	 */
	public TakeManager(MicroSpace space, Class<T> threadIdType) {
		this.space = space;
		this.threadIdType = threadIdType;
		this.accessorCache = new AccessorCache(space.getAccessorGenerator());
		this.messageProcesor = new TakeScheduler<T>(space);
		this.thisInstance = instance.incrementAndGet();

	}

	/**
	 * This method can be called when {link
	 * {@link TakeHandler#handleTake(Object)} handleTake} is called in
	 * TakeHandler.
	 * 
	 * @return the current ThreadId.
	 */
	public T getCurrentThreadId() {
		return messageProcesor.getCurrentThreadId();
	}

	/**
	 * Register a new take handler using a Template object.
	 * <p>
	 * All registrations should occur before this TakeManager starts.
	 * 
	 * @param handler
	 *            is the take handler.
	 * @param templ
	 *            is the template that should match.
	 * @param <M>
	 *            is the Table class
	 */
	public synchronized <M> void register(TakeHandler<M> handler, M templ) {
		register(handler, new TemplateQuery<M>(templ));
	}

	/**
	 * Register a new take handler using a TableQuery.
	 * <p>
	 * All registrations should occur before this TakeManager starts.
	 * 
	 * @param handler
	 *            is the take handler.
	 * @param query
	 *            is the template that should match.
	 * @param <M>
	 *            is the Table class
	 */
	public synchronized <M> void register(TakeHandler<M> handler,
			TableQuery<M> query) {
		if (started) {
			throw new IllegalOperationException("Service already started");
		}
		Accessor<M> accessor = accessorCache.get(query.getTableClass());
		if (accessor.getThreadIdGetSetPair() == null) {
			throw new IllegalArgumentException(
					"ThreadId notation is not found in "
							+ query.getTableClass().getName());

		}
		if (threadIdType != accessor.getThreadIdGetSetPair().getReturnType()) {
			log.error("$* ThreadId type is expected in $*, got $*", null,
					threadIdType, query.getTableClass(), accessor
							.getThreadIdGetSetPair().getReturnType());

			throw new IllegalArgumentException("ThreadId type mismatch in "
					+ query.getTableClass().getName());
		}
		TakeInterest<T, M> ti = new TakeInterest<T, M>(accessor, query, handler);
		interests.add(ti);
		interestsThreads
				.add(new InterestPoller<M>(ti, interestsThreads.size()+1));
		space.registerMessageQueue(query);
	}

	/**
	 * Start the TakeHandler.
	 * 
	 * Allocate a Thread for each takes, and redistribute them using a
	 * ThreadPool. The ThreadPool will not execute simultaneously the takes with
	 * the same ThreadId.
	 */
	public void start() {
		if (started) {
			throw new IllegalOperationException("Service already started");
		}
		countDownLatch = new CountDownLatch(interestsThreads.size());
		messageProcesor.start();
		started = true;
		for (InterestPoller<?> ip : interestsThreads) {
			ip.start();
		}
	}

	/**
	 * Remove all the Threads and shut down this take manager.
	 */
	@Deprecated
	public void forceShutdown() {
		if (!started)
			return;
		started = false;
		messageProcesor.forceShutdown();
		for (Thread ip : interestsThreads) {
			ip.interrupt();
		}
		try {
			countDownLatch.await();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
	
	/**
	 * Remove all the Threads and shut down this take manager.
	 */
	public void shutdown() {
		if (!started)
			return;
		started = false;
		messageProcesor.shutdown();
		for (Thread ip : interestsThreads) {
			ip.interrupt();
		}
		try {
			countDownLatch.await();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
	/**
	 * Remove processed messages from queue. If ThreadId is not specified, or
	 * does not match do not remove.
	 * 
	 * @param messages
	 *            The messages to be removed by their Id if ThreaId matches.
	 * @param <M>  The message type.
	 */
	public <M> void removeUnassignedTasksByIdOf(List<M> messages) {
		messageProcesor.removeUnassignedTasksByIdOf(messages);
	}

	/**
	 * Obtain the tasks size that are assigned to a thread and being executed.
	 * 
	 * @return Assigned set size.
	 */
	public int getAssignedSize() {
		return messageProcesor.getAssignedSize();
	}

	/**
	 * Obtain the tasks size that have not been assigned yet to a thread.
	 * 
	 * @return Assigned set size.
	 */
	public int getUnassignedSize() {
		return messageProcesor.getUnassignedSize();
	}
	/**
	 * Obtain the tasks size that are assigned to a thread and being executed.
	 * 
	 * @return All the tasks that are being executed.
	 */
	public List<TakeTask<T, ?>> getAssignedTasks() {
		return messageProcesor.getAssignedTasks();
	}
	
	/**
	 * Obtain the tasks size that are pending for this thread.
	 * @param threadId ThreadId
	 * @return All the tasks that are pending.
	 */
	public List<TakeTask<T, ?>> getUnassignedTasksForThread(T threadId) {
		return messageProcesor.getUnassignedTasksForThread(threadId);
	}
	
	

	/**
	 * Obtain the tasks that have not been assigned yet to a thread.
	 * 
	 * @return All the tasks that has not been aasigned.
	 */
	public List<TakeTask<T, ?>> getUnassignedTasks() {
		return messageProcesor.getUnassignedTasks();
	}
	
	/**
	 * Obtain the locked threads.
	 * @return The locked threads.
	 */
	public Set<T> getLockedThreads() {
		return messageProcesor.getLockedThreads();
	}
	
	/**
	 * Lock execution of a thread.
	 * @param thread The thread to lock.
	 */
	public void lockThread (T thread) {
		messageProcesor.lockThread(thread);
	}
	
	/**
	 * Unlock execution of a thread.
	 * @param thread The thread.
	 */
	public void unlockThread (T thread) {
		messageProcesor.unlockThread(thread);
	}
	
	/**
	 * Suspend message processing.
	 * <p>
	 * Suspend task assignments, and wait for all threads to complete.
	 * <p>
	 * When called within TakeManager thread it will be excluded.
	 * <p>
	 * @param timeoutMilliseconds is the timeout to wait for all threads.
	 */
	public void suspendMessaging (long timeoutMilliseconds) {
		messageProcesor.suspendMessaging(timeoutMilliseconds);
	}
	
	/**
	 * Unsuspend message processing.
	 */
	public void unsuspendMessaging () {
		messageProcesor.unsuspendMessaging();
	}

	/**
	 * An event based blocking take. Each takes will have their own thread.
	 * 
	 * @param <M>
	 *            is the Table type.
	 */
	class InterestPoller<M> extends Thread {
		TakeInterest<T, M> interest;

		public InterestPoller(TakeInterest<T, M> interest, int threadIndex) {
			super("takemanager-" + thisInstance + "-poller-" + threadIndex);
			this.interest = interest;
		}
		
		public TakeInterest <T,M>getInterest() {
			return interest;
		}
		
		public String getQueryId () {
			return interest.getQuery().getId();
		}

		@Override
		public void run() {
			while (started) {
				try {
					// log.info("Take: in");
					M message = null;
					if (space.isInterrupted()) {
						log.info("Interruped before take: $*", interest.getQuery());
						break;
					}
					try {
						message = space.getNextMessage(interest.getQuery(),
								Long.MAX_VALUE);
					} catch (BackupSpaceException ex) {
						log.warn("BackupSpaceException: $*", ex, interest.getQuery());
						continue;
					}
					if (space.isInterrupted()) {
						log.info("Interruped after take: $*", interest.getQuery());
						break;
					}
					// Thread interrupted not always work in certain
					// implementations.
					if (message == null) {
						log.trace("Take returned null started=$* $*", started, interest.getQuery());
						if (!started) { // Thread shutdown.
							log.error("Take returned null, not started $*", null, interest.getQuery());
							break;
						}
						log.warn("Take returned null $*", null, interest.getQuery());
						continue;
					}
					boolean success = messageProcesor.schedule(interest,
							message);
					if (!success) {
						log.error(
								"Intended subscriber: $* message: $* not delivered",
								null, interest, message);

					}
				} catch (Throwable e) {
					log.error("InterestPoller started=$* $*", e, started, interest.getQuery());
				}
			}
			log.info("Worker thread $* exited started=$* $*", getName(), started, interest.getQuery());
			space.unregisterMessageQueue(interest.getQuery());
			countDownLatch.countDown();
		}
	}
}