TakeManager.java

  1. package org.microspace.event;

  2. import java.util.ArrayList;
  3. import java.util.List;
  4. import java.util.Set;
  5. import java.util.concurrent.CountDownLatch;
  6. import java.util.concurrent.atomic.AtomicInteger;

  7. import org.microspace.annotation.PartitionId;
  8. import org.microspace.annotation.ThreadId;
  9. import org.microspace.exception.BackupSpaceException;
  10. import org.microspace.exception.IllegalOperationException;
  11. import org.microspace.space.MicroSpace;
  12. import org.microspace.table.column.Accessor;
  13. import org.microspace.table.query.TableQuery;
  14. import org.microspace.table.query.TemplateQuery;
  15. import org.microspace.util.AccessorCache;
  16. import org.microspace.util.MicroLogger;

  17. /**
  18.  * TakeHandler registers take Queries and Templates and hands them over to the
  19.  * takers {@link TakeHandler TakeHandler}.
  20.  * <p>
  21.  * A ThreadPool is used to dispatch these Object taken from the space. The taken
  22.  * Objects must have an annotated {@link ThreadId ThreadId} field that
  23.  * synchronizes threads across all takes, even across Tables. The ThreadId for
  24.  * taken messages can never be null.
  25.  * <p>
  26.  * The {@link PartitionId PartitionId} has similar functionality than
  27.  * {@link ThreadId ThreadId} but it can be also null to designate current
  28.  * partition.
  29.  *
  30.  * @param <T>
  31.  *            is the type of the {@link ThreadId ThreadId} and
  32.  *            {@link PartitionId PartitionId} field - they should match).
  33.  * @author gaspar.sinai at microspace.org
  34.  * @version 2016-07-22
  35.  */
  36. public class TakeManager<T> {

  37.     final MicroSpace space;
  38.     final TakeScheduler<T> messageProcesor;

  39.     boolean started = false;

  40.     /*
  41.      * final HashMap<ConsumerId, Consumer<? extends MicroMessage>> consumers =
  42.      * new HashMap<ConsumerId, Consumer<? extends MicroMessage>>();
  43.      *
  44.      * final ArrayList<TakeInterest<? extends MicroMessage>> interests = new
  45.      * ArrayList<TakeInterest<? extends MicroMessage>>(); final
  46.      * ArrayList<Thread> interestsThreads = new ArrayList<Thread>();
  47.      */
  48.     private static final MicroLogger log = new MicroLogger(TakeManager.class);
  49.     final ArrayList<TakeInterest<T, ?>> interests = new ArrayList<TakeInterest<T, ?>>();
  50.     final ArrayList<InterestPoller<?>> interestsThreads = new ArrayList<>();

  51.     CountDownLatch countDownLatch;

  52.     final AccessorCache accessorCache;
  53.     final Class<T> threadIdType;
  54.     private static AtomicInteger instance = new AtomicInteger();
  55.     private final int thisInstance;
  56.    
  57.     /**
  58.      * Create a take manager.
  59.      *
  60.      * @param space
  61.      *            is the underlying Space.
  62.      * @param threadIdType
  63.      *            is the ThreadId class that will be shared between all taken
  64.      *            objects.
  65.      * @param threadPoolSize
  66.      *            is the size of the worker threads.
  67.      */
  68.     public TakeManager(MicroSpace space, Class<T> threadIdType,
  69.             int threadPoolSize) {
  70.         this.space = space;
  71.         this.threadIdType = threadIdType;
  72.         this.accessorCache = new AccessorCache(space.getAccessorGenerator());
  73.         this.messageProcesor = new TakeScheduler<T>(space, threadPoolSize);
  74.         this.thisInstance = instance.incrementAndGet();

  75.     }

  76.     /**
  77.      * Create a take manager.
  78.      *
  79.      * @param space
  80.      *            is the underlying Space.
  81.      * @param threadIdType
  82.      *            is the ThreadId class that will be shared between all taken
  83.      *            objects.
  84.      */
  85.     public TakeManager(MicroSpace space, Class<T> threadIdType) {
  86.         this.space = space;
  87.         this.threadIdType = threadIdType;
  88.         this.accessorCache = new AccessorCache(space.getAccessorGenerator());
  89.         this.messageProcesor = new TakeScheduler<T>(space);
  90.         this.thisInstance = instance.incrementAndGet();

  91.     }

  92.     /**
  93.      * This method can be called when {link
  94.      * {@link TakeHandler#handleTake(Object)} handleTake} is called in
  95.      * TakeHandler.
  96.      *
  97.      * @return the current ThreadId.
  98.      */
  99.     public T getCurrentThreadId() {
  100.         return messageProcesor.getCurrentThreadId();
  101.     }

  102.     /**
  103.      * Register a new take handler using a Template object.
  104.      * <p>
  105.      * All registrations should occur before this TakeManager starts.
  106.      *
  107.      * @param handler
  108.      *            is the take handler.
  109.      * @param templ
  110.      *            is the template that should match.
  111.      * @param <M>
  112.      *            is the Table class
  113.      */
  114.     public synchronized <M> void register(TakeHandler<M> handler, M templ) {
  115.         register(handler, new TemplateQuery<M>(templ));
  116.     }

  117.     /**
  118.      * Register a new take handler using a TableQuery.
  119.      * <p>
  120.      * All registrations should occur before this TakeManager starts.
  121.      *
  122.      * @param handler
  123.      *            is the take handler.
  124.      * @param query
  125.      *            is the template that should match.
  126.      * @param <M>
  127.      *            is the Table class
  128.      */
  129.     public synchronized <M> void register(TakeHandler<M> handler,
  130.             TableQuery<M> query) {
  131.         if (started) {
  132.             throw new IllegalOperationException("Service already started");
  133.         }
  134.         Accessor<M> accessor = accessorCache.get(query.getTableClass());
  135.         if (accessor.getThreadIdGetSetPair() == null) {
  136.             throw new IllegalArgumentException(
  137.                     "ThreadId notation is not found in "
  138.                             + query.getTableClass().getName());

  139.         }
  140.         if (threadIdType != accessor.getThreadIdGetSetPair().getReturnType()) {
  141.             log.error("$* ThreadId type is expected in $*, got $*", null,
  142.                     threadIdType, query.getTableClass(), accessor
  143.                             .getThreadIdGetSetPair().getReturnType());

  144.             throw new IllegalArgumentException("ThreadId type mismatch in "
  145.                     + query.getTableClass().getName());
  146.         }
  147.         TakeInterest<T, M> ti = new TakeInterest<T, M>(accessor, query, handler);
  148.         interests.add(ti);
  149.         interestsThreads
  150.                 .add(new InterestPoller<M>(ti, interestsThreads.size()+1));
  151.         space.registerMessageQueue(query);
  152.     }

  153.     /**
  154.      * Start the TakeHandler.
  155.      *
  156.      * Allocate a Thread for each takes, and redistribute them using a
  157.      * ThreadPool. The ThreadPool will not execute simultaneously the takes with
  158.      * the same ThreadId.
  159.      */
  160.     public void start() {
  161.         if (started) {
  162.             throw new IllegalOperationException("Service already started");
  163.         }
  164.         countDownLatch = new CountDownLatch(interestsThreads.size());
  165.         messageProcesor.start();
  166.         started = true;
  167.         for (InterestPoller<?> ip : interestsThreads) {
  168.             ip.start();
  169.         }
  170.     }

  171.     /**
  172.      * Remove all the Threads and shut down this take manager.
  173.      */
  174.     @Deprecated
  175.     public void forceShutdown() {
  176.         if (!started)
  177.             return;
  178.         started = false;
  179.         messageProcesor.forceShutdown();
  180.         for (Thread ip : interestsThreads) {
  181.             ip.interrupt();
  182.         }
  183.         try {
  184.             countDownLatch.await();
  185.         } catch (InterruptedException e) {
  186.             e.printStackTrace();
  187.         }
  188.     }
  189.    
  190.     /**
  191.      * Remove all the Threads and shut down this take manager.
  192.      */
  193.     public void shutdown() {
  194.         if (!started)
  195.             return;
  196.         started = false;
  197.         messageProcesor.shutdown();
  198.         for (Thread ip : interestsThreads) {
  199.             ip.interrupt();
  200.         }
  201.         try {
  202.             countDownLatch.await();
  203.         } catch (InterruptedException e) {
  204.             e.printStackTrace();
  205.         }
  206.     }
  207.     /**
  208.      * Remove processed messages from queue. If ThreadId is not specified, or
  209.      * does not match do not remove.
  210.      *
  211.      * @param messages
  212.      *            The messages to be removed by their Id if ThreaId matches.
  213.      * @param <M>  The message type.
  214.      */
  215.     public <M> void removeUnassignedTasksByIdOf(List<M> messages) {
  216.         messageProcesor.removeUnassignedTasksByIdOf(messages);
  217.     }

  218.     /**
  219.      * Obtain the tasks size that are assigned to a thread and being executed.
  220.      *
  221.      * @return Assigned set size.
  222.      */
  223.     public int getAssignedSize() {
  224.         return messageProcesor.getAssignedSize();
  225.     }

  226.     /**
  227.      * Obtain the tasks size that have not been assigned yet to a thread.
  228.      *
  229.      * @return Assigned set size.
  230.      */
  231.     public int getUnassignedSize() {
  232.         return messageProcesor.getUnassignedSize();
  233.     }
  234.     /**
  235.      * Obtain the tasks size that are assigned to a thread and being executed.
  236.      *
  237.      * @return All the tasks that are being executed.
  238.      */
  239.     public List<TakeTask<T, ?>> getAssignedTasks() {
  240.         return messageProcesor.getAssignedTasks();
  241.     }
  242.    
  243.     /**
  244.      * Obtain the tasks size that are pending for this thread.
  245.      * @param threadId ThreadId
  246.      * @return All the tasks that are pending.
  247.      */
  248.     public List<TakeTask<T, ?>> getUnassignedTasksForThread(T threadId) {
  249.         return messageProcesor.getUnassignedTasksForThread(threadId);
  250.     }
  251.    
  252.    

  253.     /**
  254.      * Obtain the tasks that have not been assigned yet to a thread.
  255.      *
  256.      * @return All the tasks that has not been aasigned.
  257.      */
  258.     public List<TakeTask<T, ?>> getUnassignedTasks() {
  259.         return messageProcesor.getUnassignedTasks();
  260.     }
  261.    
  262.     /**
  263.      * Obtain the locked threads.
  264.      * @return The locked threads.
  265.      */
  266.     public Set<T> getLockedThreads() {
  267.         return messageProcesor.getLockedThreads();
  268.     }
  269.    
  270.     /**
  271.      * Lock execution of a thread.
  272.      * @param thread The thread to lock.
  273.      */
  274.     public void lockThread (T thread) {
  275.         messageProcesor.lockThread(thread);
  276.     }
  277.    
  278.     /**
  279.      * Unlock execution of a thread.
  280.      * @param thread The thread.
  281.      */
  282.     public void unlockThread (T thread) {
  283.         messageProcesor.unlockThread(thread);
  284.     }
  285.    
  286.     /**
  287.      * Suspend message processing.
  288.      * <p>
  289.      * Suspend task assignments, and wait for all threads to complete.
  290.      * <p>
  291.      * When called within TakeManager thread it will be excluded.
  292.      * <p>
  293.      * @param timeoutMilliseconds is the timeout to wait for all threads.
  294.      */
  295.     public void suspendMessaging (long timeoutMilliseconds) {
  296.         messageProcesor.suspendMessaging(timeoutMilliseconds);
  297.     }
  298.    
  299.     /**
  300.      * Unsuspend message processing.
  301.      */
  302.     public void unsuspendMessaging () {
  303.         messageProcesor.unsuspendMessaging();
  304.     }

  305.     /**
  306.      * An event based blocking take. Each takes will have their own thread.
  307.      *
  308.      * @param <M>
  309.      *            is the Table type.
  310.      */
  311.     class InterestPoller<M> extends Thread {
  312.         TakeInterest<T, M> interest;

  313.         public InterestPoller(TakeInterest<T, M> interest, int threadIndex) {
  314.             super("takemanager-" + thisInstance + "-poller-" + threadIndex);
  315.             this.interest = interest;
  316.         }
  317.        
  318.         public TakeInterest <T,M>getInterest() {
  319.             return interest;
  320.         }
  321.        
  322.         public String getQueryId () {
  323.             return interest.getQuery().getId();
  324.         }

  325.         @Override
  326.         public void run() {
  327.             while (started) {
  328.                 try {
  329.                     // log.info("Take: in");
  330.                     M message = null;
  331.                     if (space.isInterrupted()) {
  332.                         log.info("Interruped before take: $*", interest.getQuery());
  333.                         break;
  334.                     }
  335.                     try {
  336.                         message = space.getNextMessage(interest.getQuery(),
  337.                                 Long.MAX_VALUE);
  338.                     } catch (BackupSpaceException ex) {
  339.                         log.warn("BackupSpaceException: $*", ex, interest.getQuery());
  340.                         continue;
  341.                     }
  342.                     if (space.isInterrupted()) {
  343.                         log.info("Interruped after take: $*", interest.getQuery());
  344.                         break;
  345.                     }
  346.                     // Thread interrupted not always work in certain
  347.                     // implementations.
  348.                     if (message == null) {
  349.                         log.trace("Take returned null started=$* $*", started, interest.getQuery());
  350.                         if (!started) { // Thread shutdown.
  351.                             log.error("Take returned null, not started $*", null, interest.getQuery());
  352.                             break;
  353.                         }
  354.                         log.warn("Take returned null $*", null, interest.getQuery());
  355.                         continue;
  356.                     }
  357.                     boolean success = messageProcesor.schedule(interest,
  358.                             message);
  359.                     if (!success) {
  360.                         log.error(
  361.                                 "Intended subscriber: $* message: $* not delivered",
  362.                                 null, interest, message);

  363.                     }
  364.                 } catch (Throwable e) {
  365.                     log.error("InterestPoller started=$* $*", e, started, interest.getQuery());
  366.                 }
  367.             }
  368.             log.info("Worker thread $* exited started=$* $*", getName(), started, interest.getQuery());
  369.             space.unregisterMessageQueue(interest.getQuery());
  370.             countDownLatch.countDown();
  371.         }
  372.     }
  373. }