MultiFilteredQueue.java

package org.microspace.table;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;

import org.microspace.table.column.Accessor;
import org.microspace.table.query.TableQuery;

/**
 * A queue that supports multiple filters.
 * 
 * @author Gaspar Sinai - {@literal gaspar.sinai@microspace.org}
 * @version 2016-06-26
 * @param <T> is the type of the queue.
 */
public class MultiFilteredQueue<T> implements TransactionalQueue<T> {
	
	ConcurrentHashMap<String, SingleFilteredQueue<T>> filters = 
			new ConcurrentHashMap<String, SingleFilteredQueue<T>>();
	
	final Accessor<T> accessor;
	
	private final ThreadLocal<Boolean> interrupted = new ThreadLocal<Boolean> ()  {
		@Override
		public Boolean initialValue () {
			return Boolean.FALSE;
		}
	};
	/**
	 * Create a  queue
	 * @param accessor is the accessor of this type.
	 */
	public MultiFilteredQueue (Accessor<T> accessor) {
		this.accessor = accessor;
	}
	
	/**
	 * Add a queue filter.
	 * @param query The query to be used.
	 */
	public void addQueueFilter(TableQuery<T> query) {
		SingleFilteredQueue<T> single = new SingleFilteredQueue<T> (query, accessor);
		SingleFilteredQueue<T>  prev = filters.putIfAbsent(query.getId(), single);
		if (prev != null) {
			throw new IllegalArgumentException ("Query already added." + query.toString());
		}
	}
	
	/**
	 * Remove a queue filter identified by its query id.
	 * @param query The query which will be removed.
	 */
	public void removeQueueFilter(TableQuery<T> query) {
		@SuppressWarnings("unused")
		SingleFilteredQueue<T>  prev = filters.remove(query.getId());
		return;
	}
	
	/* (non-Javadoc)
	 * @see org.microspace.table.TransactionalQueue#write(T)
	 */
	@Override
	public void write (T t) {
		for (SingleFilteredQueue<T> sfq : filters.values()) {
			sfq.write(t);
		}
	}
	
	/* (non-Javadoc)
	 * @see org.microspace.table.TransactionalQueue#write(T, org.microspace.table.query.TableQuery)
	 */
	@Override
	public void write (T t, TableQuery<T> query) {
		SingleFilteredQueue<T> sfq = filters.get(query.getId());
		if (sfq == null) return;
		sfq.write(t);
	}
	
	/* (non-Javadoc)
	 * @see org.microspace.table.TransactionalQueue#take(T)
	 */
	@Override
	public void take (T t) {
		for (SingleFilteredQueue<T> sfq : filters.values()) {
			sfq.take(t);
		}
	}
	
	/* (non-Javadoc)
	 * @see org.microspace.table.TransactionalQueue#getNext(org.microspace.table.query.TableQuery)
	 */
	@Override
	public T getNext (TableQuery<T> query) {
		SingleFilteredQueue<T>  sfq = filters.get(query.getId());
		if (sfq == null) {
			throw new IllegalArgumentException ("Query not registered");
		}
		T ret =  sfq.getNext();
		if (sfq.isInterrupted()) {
			interrupted.set(true);
		}
		return ret;
	}
	
	/* (non-Javadoc)
	 * @see org.microspace.table.TransactionalQueue#getNext(org.microspace.table.query.TableQuery, long)
	 */
	@Override
	public T getNext (TableQuery<T> query, long timeoutMs) {
		SingleFilteredQueue<T>  sfq = filters.get(query.getId());
		if (sfq == null) {
			throw new IllegalArgumentException ("Query not registered");
		}
		T ret =  sfq.getNext(timeoutMs);
		if (sfq.isInterrupted()) {
			interrupted.set(true);
		}
		return ret;
	}
	
	/* (non-Javadoc)
	 * @see org.microspace.table.TransactionalQueue#commit()
	 */
	@Override
	public void commit() {
		for (SingleFilteredQueue<T> sfq : filters.values()) {
			sfq.commit();
		}
	}

	/* (non-Javadoc)
	 * @see org.microspace.table.TransactionalQueue#rollback()
	 */
	@Override
	public void rollback() {
		for (SingleFilteredQueue<T> sfq : filters.values()) {
			sfq.rollback();
		}
	}
	
    /**
     * Return the current queue filters for the queue.
     * @return The resgistered queries.
     */
    public List<TableQuery<T>> getRegisteredQueries () {
    	LinkedList<TableQuery<T>> values = new LinkedList<TableQuery<T>>();
    	for (SingleFilteredQueue<T> sfq : filters.values()) {
			values.add (sfq.getQuery());
		}
    	return values;
    }
    

    /**
     * Get The queue size.
     * @param query The query.
     * @return The queue size.
     */
    public int size (TableQuery<T> query) {
    	SingleFilteredQueue<T>  sfq = filters.get(query.getId());
		if (sfq == null) {
			throw new IllegalArgumentException ("Query not registered");
		}
    	return sfq.size();
    }
    
	/* (non-Javadoc)
	 * @see org.microspace.table.TransactionalQueue#isInterrupted()
	 */
	 @Override
	public boolean isInterrupted () {
		 return interrupted.get();
	 }
	 
	 public boolean match (T t) {
		for (SingleFilteredQueue<T> sfq : filters.values()) {
			if (sfq.match(t)) {
				return true;
			}
		}
		return false;
	 }

}