SingleFilteredQueue.java
package org.microspace.table;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.microspace.table.column.Accessor;
import org.microspace.table.query.TableQuery;
import org.microspace.transaction.Transactional;
import org.microspace.util.PojoUtil;
/**
* A multi-threaded transactional queue with a single filter.
*
* @author Gaspar Sinai - {@literal gaspar.sinai@microspace.org}
* @version 2017-06-24
* @param <T> The type of the queue.
*/
public class SingleFilteredQueue<T> implements Transactional {
final TableQuery<T> query;
final Accessor<T> accessor;
final ThreadLocal<QueueIsolator<T>> frontQueue = new ThreadLocal<QueueIsolator<T>> () {
@Override
public QueueIsolator<T> initialValue () {
return new QueueIsolator<T> (accessor.getPrimaryKeyGetSetPair().getIndexType(), null);
}
};
final AtomicLong updateCounter = new AtomicLong();
private final ThreadLocal<Boolean> interrupted = new ThreadLocal<Boolean> () {
@Override
public Boolean initialValue () {
return Boolean.FALSE;
}
};
final BlockingDeque<T> committedQueue = new LinkedBlockingDeque<T>();
boolean collapseEvents= false;
/**
* Construct a filtered queue.
* @param query The filter.
* @param accessor The accessor for the objects.
*/
public SingleFilteredQueue (TableQuery<T> query, Accessor<T> accessor) {
this.query = query;
this.accessor = accessor;
this.collapseEvents = accessor.isUpdatableRecord();
//System.err.println ("Collapse: " + collapseEvents);
}
/* (non-Javadoc)
* @see org.microspace.table.TransactionalQueue#write(T)
*/
public void write (T e) {
if (!query.match(e, accessor)) {
//System.err.println ("Writing no match: " + query);
return;
}
Entry<T> entry = new Entry<T> (e, accessor, new BigInteger("" + updateCounter.incrementAndGet()));
//System.err.println ("Writing " + accessor.getPrimaryKeyGetter().get(entry.getEntryCopy()));
frontQueue.get().put(accessor.getPrimaryKeyGetSetPair().get(e), entry);
}
public boolean match (T e) {
return query.match(e, accessor);
}
/* (non-Javadoc)
* @see org.microspace.table.TransactionalQueue#take(T)
*/
public void take (T e) {
if (!query.match(e, accessor)) return;
frontQueue.get().remove (accessor.getPrimaryKeyGetSetPair().get(e));
}
/* (non-Javadoc)
* @see org.microspace.table.TransactionalQueue#getNext()
*/
public T getNext () {
T ret = committedQueue.poll();
return tryCollapse (ret);
}
/**
* Get the queue size.
* @return The queueSize
*/
public int size() {
return committedQueue.size();
}
/* (non-Javadoc)
* @see org.microspace.table.TransactionalQueue#getNext(long)
*/
public T getNext (long timeoutMs) {
T ret = null;
try {
ret = committedQueue.poll(timeoutMs, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
interrupted.set(true);
return null;
}
return tryCollapse (ret);
}
/**
* Try to collapse events in the committedQuue.
* @param e is the element.
*/
private T tryCollapse (T e) {
if (e == null) return null;
if (!collapseEvents) return e;
Iterator<T> it = committedQueue.iterator();
T lastElement = e;
while (it.hasNext()) {
T next = it.next();
if (PojoUtil.keyEquals (e, next, accessor)) {
it.remove();
lastElement = next;
}
}
return lastElement;
}
/* (non-Javadoc)
* @see org.microspace.table.TransactionalQueue#isInterrupted()
*/
public boolean isInterrupted () {
return interrupted.get();
}
/* (non-Javadoc)
* @see org.microspace.table.TransactionalQueue#commit()
*/
@Override
public void commit() {
if (frontQueue.get().size() == 0) {
frontQueue.remove();
interrupted.remove();
return;
}
ArrayList<Entry<T>> list = new ArrayList<Entry<T>> ();
list.ensureCapacity(frontQueue.get().size());
//System.err.println ("FrontQueue size= " + frontQueue.get().size());
for (Entry<T> e : frontQueue.get().values()) {
//System.err.println ("FrontQueue " + accessor.getPrimaryKeyGetter().get(e.getEntryCopy()));
list.add(e);
}
Collections.sort(list);
for (Entry<T> e : list){
//System.err.println ("Adding " + accessor.getPrimaryKeyGetter().get(e.getEntryCopy()));
committedQueue.add(e.getEntryCopy());
}
//System.err.println ("Commited size= " + committedQueue.size());
frontQueue.remove();
interrupted.remove();
}
/* (non-Javadoc)
* @see org.microspace.table.TransactionalQueue#rollback()
*/
@Override
public void rollback() {
frontQueue.remove();
interrupted.remove();
}
/**
* Return the current queue filters for the queue.
* @return The resgistered queries.
*/
public TableQuery<T> getQuery () {
return query;
}
}