SingleFilteredQueue.java

package org.microspace.table;

import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import org.microspace.table.column.Accessor;
import org.microspace.table.query.TableQuery;
import org.microspace.transaction.Transactional;
import org.microspace.util.PojoUtil;

/**
 * A multi-threaded transactional queue with a single filter.
 * 
 * @author Gaspar Sinai - {@literal gaspar.sinai@microspace.org}
 * @version 2017-06-24
 * @param <T> The type of the queue.
 */
public class SingleFilteredQueue<T> implements Transactional {
	final TableQuery<T> query;
	final Accessor<T> accessor;
	
	final ThreadLocal<QueueIsolator<T>> frontQueue = new ThreadLocal<QueueIsolator<T>> () {
		@Override
		public QueueIsolator<T> initialValue () {
			return new QueueIsolator<T> (accessor.getPrimaryKeyGetSetPair().getIndexType(), null);
		}
	};
	
	final AtomicLong updateCounter = new AtomicLong();
	
	private final ThreadLocal<Boolean> interrupted = new ThreadLocal<Boolean> ()  {
		@Override
		public Boolean initialValue () {
			return Boolean.FALSE;
		}
	};
	
	
	final BlockingDeque<T> committedQueue = new LinkedBlockingDeque<T>();
	boolean collapseEvents= false;
	
	/**
	 * Construct a filtered queue.
	 * @param query The filter.
	 * @param accessor The accessor for the objects.
	 */
	public SingleFilteredQueue (TableQuery<T> query, Accessor<T> accessor) {
		this.query = query;
		this.accessor = accessor;
		this.collapseEvents = accessor.isUpdatableRecord();
		//System.err.println ("Collapse: " + collapseEvents);
	}
	
	
	/* (non-Javadoc)
	 * @see org.microspace.table.TransactionalQueue#write(T)
	 */
	public void write (T e) {
		if (!query.match(e, accessor)) {
			//System.err.println ("Writing no match: " + query);
			return;
		}
		Entry<T> entry = new Entry<T> (e, accessor, new BigInteger("" + updateCounter.incrementAndGet()));
		//System.err.println ("Writing " + accessor.getPrimaryKeyGetter().get(entry.getEntryCopy()));
		frontQueue.get().put(accessor.getPrimaryKeyGetSetPair().get(e), entry);
	}
	
	public boolean match (T e) {
		return query.match(e, accessor);
	}
	
	/* (non-Javadoc)
	 * @see org.microspace.table.TransactionalQueue#take(T)
	 */
	public void take (T e) {
		if (!query.match(e, accessor)) return;
		frontQueue.get().remove (accessor.getPrimaryKeyGetSetPair().get(e));
	}
	/* (non-Javadoc)
	 * @see org.microspace.table.TransactionalQueue#getNext()
	 */
	public T getNext () {
		T ret = committedQueue.poll();
		return tryCollapse (ret);
	}
	
	/**
	 * Get the queue size.
	 * @return The queueSize
	 */
	public int size() {
		return committedQueue.size();
	}
	
	/* (non-Javadoc)
	 * @see org.microspace.table.TransactionalQueue#getNext(long)
	 */
	public T getNext (long timeoutMs) {
		T ret = null;
		try {
			ret = committedQueue.poll(timeoutMs, TimeUnit.MILLISECONDS);
		} catch (InterruptedException e) {
			interrupted.set(true);
			return null;
		}
		return tryCollapse (ret);
	}
	
	/**
	 * Try to collapse events in the committedQuue.
	 * @param e is the element.
	 */
	private T tryCollapse (T e) {
		if (e == null) return null;
		if (!collapseEvents) return e;
		Iterator<T> it = committedQueue.iterator();
		T lastElement = e;
		while (it.hasNext()) {
			T next = it.next();
			if (PojoUtil.keyEquals (e, next, accessor)) {
				it.remove();
				lastElement = next;
			}
		}
		return lastElement;
	}
	
	/* (non-Javadoc)
	 * @see org.microspace.table.TransactionalQueue#isInterrupted()
	 */
	public boolean isInterrupted () {
		return interrupted.get();
	}
	
	/* (non-Javadoc)
	 * @see org.microspace.table.TransactionalQueue#commit()
	 */
	@Override
	public void commit() {
		if (frontQueue.get().size() == 0) {
			frontQueue.remove();
			interrupted.remove();
			return;
		}
		
		ArrayList<Entry<T>> list = new ArrayList<Entry<T>> ();
		list.ensureCapacity(frontQueue.get().size());
		//System.err.println ("FrontQueue size= " + frontQueue.get().size());
		for (Entry<T> e : frontQueue.get().values()) {
			//System.err.println ("FrontQueue " + accessor.getPrimaryKeyGetter().get(e.getEntryCopy()));
			list.add(e);
		}
		Collections.sort(list);
		for (Entry<T> e : list){
			//System.err.println ("Adding " + accessor.getPrimaryKeyGetter().get(e.getEntryCopy()));
			committedQueue.add(e.getEntryCopy());
		}
		//System.err.println ("Commited size= " + committedQueue.size());
		frontQueue.remove();
		interrupted.remove();
	}

	/* (non-Javadoc)
	 * @see org.microspace.table.TransactionalQueue#rollback()
	 */
	@Override
	public void rollback() {
		frontQueue.remove();
		interrupted.remove();
	}

	
	   /**
     * Return the current queue filters for the queue.
     * @return The resgistered queries.
     */
    public TableQuery<T> getQuery () {
    	return query;
    }

}