MultiFilteredQueue.java
package org.microspace.table;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import org.microspace.table.column.Accessor;
import org.microspace.table.query.TableQuery;
/**
* A queue that supports multiple filters.
*
* @author Gaspar Sinai - {@literal gaspar.sinai@microspace.org}
* @version 2016-06-26
* @param <T> is the type of the queue.
*/
public class MultiFilteredQueue<T> implements TransactionalQueue<T> {
ConcurrentHashMap<String, SingleFilteredQueue<T>> filters =
new ConcurrentHashMap<String, SingleFilteredQueue<T>>();
final Accessor<T> accessor;
private final ThreadLocal<Boolean> interrupted = new ThreadLocal<Boolean> () {
@Override
public Boolean initialValue () {
return Boolean.FALSE;
}
};
/**
* Create a queue
* @param accessor is the accessor of this type.
*/
public MultiFilteredQueue (Accessor<T> accessor) {
this.accessor = accessor;
}
/**
* Add a queue filter.
* @param query The query to be used.
*/
public void addQueueFilter(TableQuery<T> query) {
SingleFilteredQueue<T> single = new SingleFilteredQueue<T> (query, accessor);
SingleFilteredQueue<T> prev = filters.putIfAbsent(query.getId(), single);
if (prev != null) {
throw new IllegalArgumentException ("Query already added." + query.toString());
}
}
/**
* Remove a queue filter identified by its query id.
* @param query The query which will be removed.
*/
public void removeQueueFilter(TableQuery<T> query) {
@SuppressWarnings("unused")
SingleFilteredQueue<T> prev = filters.remove(query.getId());
return;
}
/* (non-Javadoc)
* @see org.microspace.table.TransactionalQueue#write(T)
*/
@Override
public void write (T t) {
for (SingleFilteredQueue<T> sfq : filters.values()) {
sfq.write(t);
}
}
/* (non-Javadoc)
* @see org.microspace.table.TransactionalQueue#write(T, org.microspace.table.query.TableQuery)
*/
@Override
public void write (T t, TableQuery<T> query) {
SingleFilteredQueue<T> sfq = filters.get(query.getId());
if (sfq == null) return;
sfq.write(t);
}
/* (non-Javadoc)
* @see org.microspace.table.TransactionalQueue#take(T)
*/
@Override
public void take (T t) {
for (SingleFilteredQueue<T> sfq : filters.values()) {
sfq.take(t);
}
}
/* (non-Javadoc)
* @see org.microspace.table.TransactionalQueue#getNext(org.microspace.table.query.TableQuery)
*/
@Override
public T getNext (TableQuery<T> query) {
SingleFilteredQueue<T> sfq = filters.get(query.getId());
if (sfq == null) {
throw new IllegalArgumentException ("Query not registered");
}
T ret = sfq.getNext();
if (sfq.isInterrupted()) {
interrupted.set(true);
}
return ret;
}
/* (non-Javadoc)
* @see org.microspace.table.TransactionalQueue#getNext(org.microspace.table.query.TableQuery, long)
*/
@Override
public T getNext (TableQuery<T> query, long timeoutMs) {
SingleFilteredQueue<T> sfq = filters.get(query.getId());
if (sfq == null) {
throw new IllegalArgumentException ("Query not registered");
}
T ret = sfq.getNext(timeoutMs);
if (sfq.isInterrupted()) {
interrupted.set(true);
}
return ret;
}
/* (non-Javadoc)
* @see org.microspace.table.TransactionalQueue#commit()
*/
@Override
public void commit() {
for (SingleFilteredQueue<T> sfq : filters.values()) {
sfq.commit();
}
}
/* (non-Javadoc)
* @see org.microspace.table.TransactionalQueue#rollback()
*/
@Override
public void rollback() {
for (SingleFilteredQueue<T> sfq : filters.values()) {
sfq.rollback();
}
}
/**
* Return the current queue filters for the queue.
* @return The resgistered queries.
*/
public List<TableQuery<T>> getRegisteredQueries () {
LinkedList<TableQuery<T>> values = new LinkedList<TableQuery<T>>();
for (SingleFilteredQueue<T> sfq : filters.values()) {
values.add (sfq.getQuery());
}
return values;
}
/**
* Get The queue size.
* @param query The query.
* @return The queue size.
*/
public int size (TableQuery<T> query) {
SingleFilteredQueue<T> sfq = filters.get(query.getId());
if (sfq == null) {
throw new IllegalArgumentException ("Query not registered");
}
return sfq.size();
}
/* (non-Javadoc)
* @see org.microspace.table.TransactionalQueue#isInterrupted()
*/
@Override
public boolean isInterrupted () {
return interrupted.get();
}
public boolean match (T t) {
for (SingleFilteredQueue<T> sfq : filters.values()) {
if (sfq.match(t)) {
return true;
}
}
return false;
}
}