IncomingQueue.java
package org.microspace.event;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
/**
* A thread based queue of incoming messages.
* @author Gaspar Sinai - {@literal gaspar.sinai@microspace.org}
* @version 2016-06-26
*/
public class IncomingQueue<T> implements Iterable<TakeTask<T,?>>{
SortedMap<TakeTask<T,?>, LinkedList<TakeTask<T,?>>> sequenceMap
= new TreeMap<> (new Comparator<TakeTask<T,?>>() {
@Override
public int compare (TakeTask<T,?> task0, TakeTask<T,?> task1) {
return task0.getSequenceNumber().compareTo(task1.getSequenceNumber());
}
});
Map<T, LinkedList<TakeTask<T,?>>> threadMap
= new HashMap<> ();
public List<TakeTask<T, ?>> getTasksByThread (T threadId) {
return threadMap.get(threadId);
}
/**
* Add a take task to the incoming queue.
*
* @param task The task to be added.
*/
public void add(TakeTask<T, ?> task) {
LinkedList<TakeTask<T, ?>> list = threadMap.get(task.getThreadId());
if (task.getThreadId() == null) {
throw new IllegalArgumentException ("NULL Thread Id");
}
if (task.getSequenceNumber() == null) {
throw new IllegalArgumentException ("Bad Seqeunce Number");
}
if (list == null) {
list = new LinkedList<>();
list.add(task);
if (sequenceMap.get(task) != null) {
throw new IllegalArgumentException ("Duplicate Sequence Number");
}
sequenceMap.put (task, list);
threadMap.put(task.getThreadId(), list);
} else {
list.add(task);
}
}
public int getHeadSize () {
return sequenceMap.size();
}
/**
* Get the iterator of the head TakeTasks.
* @return The forward iterator in FIFO order.
*/
public Iterator<TakeTask<T, ?>> iterator() {
return sequenceMap.keySet().iterator();
}
/**
* Get the TakeTask list.
* @param origin The head.
* @return The TakeTakes list.
*/
public LinkedList<TakeTask<T,?>> getList (TakeTask<T, ?> origin) {
return sequenceMap.get(origin);
}
/**
* Remove the next task from the list.
* @param origin The head.
* @return The remove next element.
*/
public TakeTask<T, ?> removeNext (TakeTask<T, ?> origin) {
LinkedList<TakeTask<T, ?>> list = sequenceMap.get(origin);
if (list.size() == 0) return null;
return list.removeFirst();
}
/**
* Recalculate the head.
* @param origin The new head.
*/
public void recalculate (TakeTask<T, ?> origin) {
LinkedList<TakeTask<T, ?>> list = sequenceMap.get(origin);
sequenceMap.remove (origin);
if (list.size() == 0) {
threadMap.remove(origin.getThreadId());
} else {
sequenceMap.put(list.getFirst(), list);
}
}
/**
* Clear the Queue.
*/
public void clear() {
sequenceMap.clear();
threadMap.clear();
}
}