IncomingQueue.java

package org.microspace.event;

import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;

/**
 * A thread based queue of incoming messages.
 * @author Gaspar Sinai - {@literal gaspar.sinai@microspace.org}
 * @version 2016-06-26
 */
public class IncomingQueue<T> implements Iterable<TakeTask<T,?>>{
	
	SortedMap<TakeTask<T,?>, LinkedList<TakeTask<T,?>>> sequenceMap 
		= new TreeMap<> (new Comparator<TakeTask<T,?>>() {
			@Override
			public int compare (TakeTask<T,?> task0, TakeTask<T,?> task1) {
				return task0.getSequenceNumber().compareTo(task1.getSequenceNumber());
			}
		});
	
	Map<T, LinkedList<TakeTask<T,?>>> threadMap 
	    = new HashMap<> ();
	
	public List<TakeTask<T, ?>> getTasksByThread (T threadId) {
		return threadMap.get(threadId);
	}
	
	/**
	 * Add a take task to the incoming queue.
	 * 
	 * @param task The task to be added.
	 */
	public void add(TakeTask<T, ?> task) {
		LinkedList<TakeTask<T, ?>> list = threadMap.get(task.getThreadId());
		if (task.getThreadId() == null) {
			throw new IllegalArgumentException ("NULL Thread Id");
		}
		if (task.getSequenceNumber() == null) {
			throw new IllegalArgumentException ("Bad Seqeunce Number");
		}
		if (list == null) {
			list = new LinkedList<>();
			list.add(task);
			if (sequenceMap.get(task) != null) {
				throw new IllegalArgumentException ("Duplicate Sequence Number");
			}
			sequenceMap.put (task, list);
			threadMap.put(task.getThreadId(), list);
		} else {
			list.add(task);
		}
	}
	public int getHeadSize () {
		return sequenceMap.size();
	}
	/**
	 * Get the iterator of the head TakeTasks.
	 * @return The forward iterator in FIFO order.
	 */
	public Iterator<TakeTask<T, ?>> iterator() {
		return sequenceMap.keySet().iterator();
	}
	/**
	 * Get the TakeTask list.
	 * @param origin The head.
	 * @return The TakeTakes list.
	 */
	public LinkedList<TakeTask<T,?>> getList (TakeTask<T, ?> origin) {
		return sequenceMap.get(origin);
	}
	
	/**
	 * Remove the next task from the list.
	 * @param origin The head.
	 * @return The remove next element.
	 */
	public TakeTask<T, ?> removeNext (TakeTask<T, ?> origin) {
		LinkedList<TakeTask<T, ?>> list = sequenceMap.get(origin);
		if (list.size() == 0) return null;
		return list.removeFirst();
	}
	
	/**
	 * Recalculate the head.
	 * @param origin The new head.
	 */
	public void recalculate (TakeTask<T, ?> origin) {
		LinkedList<TakeTask<T, ?>> list = sequenceMap.get(origin);
		sequenceMap.remove (origin);
		if (list.size() == 0) {
			threadMap.remove(origin.getThreadId());
		} else {
			sequenceMap.put(list.getFirst(), list);
		}
	}
	/**
	 * Clear the Queue.
	 */
	public void clear() {
		sequenceMap.clear();
		threadMap.clear();
	}
}