SimpleSpace.java
package org.microspace.space;
import org.microspace.annotation.SpaceRecord;
import org.microspace.event.SpaceRecordListener;
import org.microspace.exception.LockException;
import org.microspace.specific.CurrentTimeProvider;
import org.microspace.table.*;
import org.microspace.table.column.Accessor;
import org.microspace.table.query.TableQuery;
import org.microspace.table.query.TemplateQuery;
import org.microspace.transaction.LockableTransactional;
import org.microspace.util.AccessorCache;
import org.microspace.util.PojoUtil;
import org.microspace.util.UniqueId;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* SimpleSpace is a memory based implementation if MicroSpace.
*
* @author Gaspar Sinai
* @version 2012-06-11
*
*/
public class SimpleSpace implements LockableTransactional, MicroSpace, TableListener<Object> {
private final SpaceConfig spaceConfig;
protected final ConcurrentHashMap <Class<?>, SimpleTable<?>> maps;
protected final ConcurrentHashMap<Class<? extends Object>, MultiFilteredQueue<?>> queues;
protected final ReadWriteLock commitLock = new ReentrantReadWriteLock ();
private final Object commitNotifier = new Object ();
final AccessorCache accessorCache;
private final UniqueId instanceId;
CommitListener commitListener;
protected final CurrentTimeProvider currentTimeProvider;
private final ThreadLocal<Boolean> interrupted = new ThreadLocal<Boolean> () {
@Override
public Boolean initialValue () {
return Boolean.FALSE;
}
};
public SimpleSpace (SpaceConfig spaceConfig) {
this(spaceConfig, null);
}
public SimpleSpace (SpaceConfig spaceConfig, CurrentTimeProvider currentTimeProvider) {
this.currentTimeProvider = currentTimeProvider;
this.maps = new ConcurrentHashMap <Class<?>, SimpleTable<?>> ();
this.queues = new ConcurrentHashMap<Class<? extends Object>, MultiFilteredQueue<?>>();
this.spaceConfig = PojoUtil.copy(spaceConfig);
this.accessorCache = new AccessorCache(this.spaceConfig);
this.instanceId = new UniqueId();
}
protected void setCommitListener (CommitListener listener) {
this.commitListener = listener;
}
public SimpleSpace () {
this (new SpaceConfig());
}
/**
* {@inheritDoc}
*/
@Override
public <T> void write (T object) {
if (!localObjectRetained (object)) {
throw new IllegalArgumentException ("Local Object Has No Matching GetNext Queue.");
}
SimpleTable<Object> map = getMap (object.getClass());
// We are writing to front map, so a read lock is fine.
SpaceRecord anno = object.getClass().getAnnotation(SpaceRecord.class);
if (anno == null || anno.local() == false) {
map.write (object);
} else {
SimpleUnsafeTable.autoTimestamp(object, currentTimeProvider);
}
TransactionalQueue<Object> queue = getQueue (object.getClass(), false);
if (queue != null) {
queue.write(object);
}
if(spaceRecordListener != null){
spaceRecordListener.write(object);
}
return;
}
private <T> T takeFromQueue (Class<T> tableClass, T t) {
if (t != null) {
TransactionalQueue<Object> queue = getQueue (tableClass, false);
if (queue != null) {
queue.take(t);
}
}
return t;
}
/**
* {@inheritDoc}
*/
@Override
public <T> T take (TableQuery<T> query) {
query.prepareQuery(this);
SimpleTable<T> map = getMap (query.getTableClass());
T t = takeFromQueue (query.getTableClass(), map.take (query));
query.finishQuery();
if(spaceRecordListener != null){
spaceRecordListener.takeByIdOf(t);
}
return t;
}
/**
* {@inheritDoc}
*/
@Override
public <T> T takeById (Class<T> tableClass, Object primaryKey) {
SimpleTable<T> map = getMap (tableClass);
T t = takeFromQueue (tableClass, map.takeById (primaryKey));
if(spaceRecordListener != null){
spaceRecordListener.takeByIdOf(t);
}
return t;
}
/**
* {@inheritDoc}
*/
@Override
public <T> T take (TableQuery<T> query, long timeout) {
long start = System.currentTimeMillis();
long remainingTime = timeout;
while (true) {
T ret = null;
synchronized (commitNotifier) {
ret = take(query);
if (ret != null) {
if (spaceRecordListener != null) {
spaceRecordListener.takeByIdOf(ret);
}
return takeFromQueue(query.getTableClass(), ret);
}
try {
commitNotifier.wait(remainingTime);
} catch (InterruptedException e) {
interrupted.set(true);
return null;
}
}
// Take is faster than iterating through underlyingChanged.
ret = take(query);
if (ret != null) {
if (spaceRecordListener != null) {
spaceRecordListener.takeByIdOf(ret);
}
return takeFromQueue(query.getTableClass(), ret);
}
long now = System.currentTimeMillis();
if (now < start) start = now;
if (now - start >= timeout) return null;
remainingTime = timeout - (now - start);
}
}
/**
* {@inheritDoc}
*/
@Override
public <T> List<T> takeMultiple(TableQuery<T> query) {
query.prepareQuery(this);
SimpleTable<T> map = getMap (query.getTableClass());
List<T> ret = map.takeMultiple (query);
for (T t: ret) {
takeFromQueue (query.getTableClass(), t);
if (spaceRecordListener != null) {
spaceRecordListener.takeByIdOf(t);
}
}
query.finishQuery();
return ret;
}
/**
* {@inheritDoc}
*/
@Override
public <T> T read (TableQuery<T> query) {
query.prepareQuery(this);
SimpleTable<T> map = getMap (query.getTableClass());
T t = map.read (query);
query.finishQuery();
return t;
}
/**
* {@inheritDoc}
*/
@Override
public <T> List<T> readMultiple(TableQuery<T> query) {
query.prepareQuery(this);
SimpleTable<T> map = getMap (query.getTableClass());
List<T> ret = map.readMultiple (query);
query.finishQuery();
return ret;
}
/**
* {@inheritDoc}
*/
@Override
public <T> T readById (Class<T> tableClass, Object key) {
SimpleTable<T> map = getMap (tableClass, false);
if (map == null) return null;
return map.readById (key);
}
/**
* {@inheritDoc}
*/
@Override
public <T> void clear(Class<T> clazz) {
if (clazz == null) {
Collection<SimpleTable<?>> maps = getMaps();
for (SimpleTable<?> map : maps) {
try {
map.clear ();
} catch (Exception e) {
e.printStackTrace();
}
}
commit();
} else {
SimpleTable<?> map = getMap (clazz, false);
if (map == null) return;
map.clear ();
commit();
}
}
/**
* {@inheritDoc}
*/
@Override
public void clear () {
clear(null);
}
private <T> SimpleTable<T> getMap (Class<? extends Object> clazz) {
return getMap (clazz, true);
}
@SuppressWarnings("unchecked")
public <T> SimpleTable<T> getMap (Class<? extends Object> clazz, boolean createIfNotFound) {
SimpleTable<T> ret = (SimpleTable<T>) maps.get (clazz);
if (ret != null) {
return ret;
}
if (!createIfNotFound) {
return null;
}
NotifyingTable<T> basic = new NotifyingTable<T>(
spaceConfig.getAccessGenerator().newAccessor((Class<T>) clazz),
currentTimeProvider);
basic.addTableListener((TableListener<T>) this);
SimpleTable<T> isolated = new SimpleTable<T>(basic);
isolated.setSerialized(spaceConfig.isSerialized());
maps.putIfAbsent(clazz, isolated);
return (SimpleTable<T>) maps.get (clazz);
}
@SuppressWarnings("unchecked")
private <T> MultiFilteredQueue<T> getQueue (Class<? extends Object> clazz, boolean createIfNotFound) {
MultiFilteredQueue<T> ret = (MultiFilteredQueue<T>) queues.get (clazz);
if (ret != null) {
return ret;
}
if (!createIfNotFound) {
return null;
}
Accessor<T> accessor = spaceConfig.getAccessGenerator().newAccessor((Class<T>)clazz);
MultiFilteredQueue<T> created = new MultiFilteredQueue<T>(accessor);
queues.putIfAbsent(clazz, created);
ret = (MultiFilteredQueue<T>) queues.get (clazz);
return ret;
}
private Collection<SimpleTable<?>> getMaps () {
commitLock.readLock().lock();
try {
Collection<SimpleTable<?>> ret = new ArrayList<SimpleTable<?>> ();
ret.addAll(maps.values());
return ret;
} finally {
commitLock.readLock().unlock();
}
}
private Collection<MultiFilteredQueue<?>> getQueues () {
Collection<MultiFilteredQueue<?>> ret = new ArrayList<MultiFilteredQueue<?>>();
ret.addAll(queues.values());
return ret;
}
/**
* {@inheritDoc}
*/
@Override
public void commit() {
Collection<SimpleTable<?>> maps = getMaps();
// We are using a lock so that all the objects will be updated in one atomic operation.
commitLock.writeLock().lock();
LockException exception = null;
SimpleTable<?> currentMap = null;
try {
onCommitBegin();
// All or nothing. Lock throws exception.
for (SimpleTable<?> map : maps) {
currentMap = map;
map.lock();
}
// This block should not throw exception.
for (SimpleTable<?> map : maps) {
currentMap = map;
if (map.getAccessor().isLocalRecord()) {
map.rollback();
} else {
map.commit();
}
}
} catch (Exception e) {
for (SimpleTable<?> map : maps) {
map.rollback();
}
exception = new LockException ((currentMap!=null ? currentMap.getAccessor().getTargetClass().toString() : "Check with developer Unknown Class") +" caused Exception. "+ e.getMessage(), e);
} finally {
try {
onCommitEnd();
} catch (Exception ec) {
exception = new LockException (ec);
}
commitLock.writeLock().unlock();
}
if (exception != null) {
throw exception;
}
Collection<MultiFilteredQueue<?>> queues = getQueues();
for (MultiFilteredQueue<?> queue : queues) {
queue.commit();
}
synchronized (commitNotifier) {
commitNotifier.notifyAll();
}
}
/**
* {@inheritDoc}
*/
@Override
public void rollback() {
Collection<SimpleTable<?>> maps = getMaps();
for (SimpleTable<?> map : maps) {
map.rollback();
}
Collection<MultiFilteredQueue<?>> queues = getQueues();
for (MultiFilteredQueue<?> queue : queues) {
queue.rollback();
}
}
/**
* {@inheritDoc}
*/
@Override
public <T> int size(Class<T> clazz) {
SimpleTable<Object> map = getMap (clazz);
return map.size();
}
/**
* {@inheritDoc}
*/
@Override
public int size() {
Collection<SimpleTable<?>> maps = getMaps();
int ret = 0;
for (SimpleTable<?> t : maps) {
ret += t.size();
}
return ret;
}
/**
* {@inheritDoc}
*/
@Override
public List<Class<?>> getTableClasses() {
LinkedList<Class<?>> ret = new LinkedList<Class<?>> ();
Collection<SimpleTable<?>> maps = getMaps();
for (SimpleTable<?> t : maps) {
ret.add(t.getAccessor().getTargetClass());
}
Collections.sort(ret, new Comparator<Class<?>>() {
@Override
public int compare(final Class<?> object1, final Class<?> object2) {
return object1.getName().compareTo(object2.getName());
}
}
);
return ret;
}
/**
* {@inheritDoc}
*/
@Override
public void lock() {
Collection<SimpleTable<?>> maps = getMaps();
try {
for (SimpleTable<?> map : maps) {
map.lock();
}
} catch (LockException le) {
for (SimpleTable<?> map : maps) {
map.unlock();
}
throw new LockException (le.getMessage());
} catch (IllegalArgumentException ie) {
for (SimpleTable<?> map : maps) {
map.unlock();
}
throw new IllegalArgumentException(ie.getMessage());
}
}
/**
* {@inheritDoc}
*/
@Override
public void unlock() {
Collection<SimpleTable<?>> maps = getMaps();
for (SimpleTable<?> map : maps) {
map.unlock();
}
}
private void onCommitBegin () {
if (commitListener != null) commitListener.commitBegins();
}
public void tableChangeEvent(Object src, Entry<Object> oldEntry, Entry<Object> newEntry) {
if (commitListener != null) commitListener.entryAdded(newEntry);
}
private void onCommitEnd () {
if (commitListener != null) commitListener.commitEnds();
}
SpaceConfig getSpaceConfig () {
return spaceConfig;
}
public AccessorGenerator getAccessorGenerator() {
return spaceConfig.getAccessGenerator();
}
public <T> void setEntry (Entry<T> object) {
SimpleTable<T> map = getMap (object.getSpaceEntry().getClass());
// We are writing to front map, so a read lock is fine.
map.setEntry (object);
return;
}
@SuppressWarnings("unchecked")
public <T> T getBlankObject(Class<T> tableClass) {
return (T) accessorCache.get(tableClass.getName()).getBlankObject();
}
@SuppressWarnings("unchecked")
@Override
public <T> T readByIdOf(T initializedObject) {
Accessor<T> accessor = (Accessor<T>) accessorCache.get(initializedObject.getClass().getName());
Object key = accessor.getPrimaryKeyGetSetPair().get(initializedObject);
if (isRemoteSpace() && accessor.getPartitionIdGetSetPair() != null) {
Integer partition = (Integer) accessor.getPartitionIdGetSetPair().get(initializedObject);
T obj = accessor.getBlankObject();
accessor.getPrimaryKeyGetSetPair().set(obj, key);
accessor.getPartitionIdGetSetPair().set(obj, partition);
return read(obj);
}
return readById ((Class<T>) initializedObject.getClass(), key);
}
@SuppressWarnings("unchecked")
@Override
public <T> T takeByIdOf(T initializedObject) {
Accessor<T> accessor = (Accessor<T>) accessorCache.get(initializedObject.getClass().getName());
Object key = accessor.getPrimaryKeyGetSetPair().get(initializedObject);
if (isRemoteSpace() && accessor.getPartitionIdGetSetPair() != null) {
Integer partition = (Integer) accessor.getPartitionIdGetSetPair().get(initializedObject);
T obj = accessor.getBlankObject();
accessor.getPrimaryKeyGetSetPair().set(obj, key);
accessor.getPartitionIdGetSetPair().set(obj, partition);
return take(obj);
}
return takeById ((Class<T>) initializedObject.getClass(), key);
}
/**
* {@inheritDoc}
*/
@Override
public <T> T take(T templateObject) {
return take (new TemplateQuery<T> (templateObject));
}
/**
* {@inheritDoc}
*/
@Override
public <T> T take(T templateObject, long timeout) {
return take (new TemplateQuery<T> (templateObject), timeout);
}
/**
* {@inheritDoc}
*/
@Override
public <T> List<T> takeMultiple(T templateObject) {
return takeMultiple (new TemplateQuery<T> (templateObject));
}
/**
* {@inheritDoc}
*/
@Override
public <T> T read(T templateObject) {
return read (new TemplateQuery<T> (templateObject));
}
/**
* {@inheritDoc}
*/
@Override
public <T> List<T> readMultiple(T templateObject) {
TemplateQuery<T> query = new TemplateQuery<T> (templateObject);
return readMultiple (query);
}
/**
* {@inheritDoc}
*/
@Override
public <T> List<T> readMultiple(T templateObject, int maxEntries) {
TemplateQuery<T> query = new TemplateQuery<T> (templateObject, maxEntries);
return readMultiple (query);
}
/**
* {@inheritDoc}
*/
@Override
public <T> T getNextMessage(TableQuery<T> query) {
TransactionalQueue<T> queue = getQueue ((Class<T>)query.getTableClass(), false);
if (queue == null) {
throw new IllegalArgumentException ("Unregistered Queue");
}
return queue.getNext(query);
}
/**
* {@inheritDoc}
*/
@Override
public <T> T getNextMessage(TableQuery<T> query, long timeout) {
TransactionalQueue<T> queue = getQueue ((Class<T>)query.getTableClass(), false);
if (queue == null) {
throw new IllegalArgumentException ("Unregistered Queue");
}
T ret = queue.getNext(query, timeout);
if (queue.isInterrupted()) {
interrupted.set (true);
}
return ret;
}
/**
* {@inheritDoc}
*/
@Override
public <T> T getNextMessage(T templateObject) {
return getNextMessage (new TemplateQuery<T> (templateObject));
}
/**
* {@inheritDoc}
*/
@Override
public <T> T getNextMessage(T templateObject, long timeout) {
return getNextMessage (new TemplateQuery<T> (templateObject), timeout);
}
/**
* {@inheritDoc}
*/
@Override
public <T> void registerMessageQueue(TableQuery<T> query) {
MultiFilteredQueue<T> queue = getQueue (query.getTableClass(), true);
SimpleTable<T> map = getMap (query.getTableClass(), true);
queue.addQueueFilter (query);
List<T> all = map.readMultiple(query);
for (T o : all) {
queue.write(o, query);
}
queue.commit();
}
/**
* {@inheritDoc}
*/
@Override
public <T> void registerMessageQueue(T templateObject) {
registerMessageQueue((TableQuery<Object>)new TemplateQuery<Object>(templateObject));
}
/**
* {@inheritDoc}
*/
@Override
public <T> void unregisterMessageQueue(TableQuery<T> query) {
MultiFilteredQueue<T> queue = getQueue ((Class<T>)query.getTableClass(), false);
if (queue == null) {
throw new IllegalArgumentException ("Never registered a temlate for this class");
}
queue.removeQueueFilter(query);
}
/**
* {@inheritDoc}
*/
@Override public void unregisterMessageQueue(Object templateObject) {
unregisterMessageQueue((TableQuery<Object>)new TemplateQuery<Object>(templateObject));
}
/**
* {@inheritDoc}
*/
@Override
public <T> List<TableQuery<T>> getRegisteredMessageQueueQueries (Class<T> clazz) {
MultiFilteredQueue<T> queue = getQueue ((Class<T>)clazz, false);
if (queue == null) {
return new LinkedList<TableQuery<T>>();
}
return queue.getRegisteredQueries();
}
/**
* {@inheritDoc}
*/
@Override
public <T> int getMessageQueueSize (TableQuery<T> query) {
MultiFilteredQueue<T> queue = getQueue (query.getTableClass(), false);
if (queue == null) {
return 0;
}
return queue.size(query);
}
/**
* {@inheritDoc}
*/
@Override
public boolean isRemoteSpace () {
return getSpaceConfig().isRemoteSpace();
}
/**
* {@inheritDoc}
*/
@Override
public boolean isInterrupted () {
return interrupted.get();
}
/**
* Local objects needs to have a matching queue, otherwise
* the write writes them nowhere.
*
* @param object The object to check.
*/
private <T> boolean localObjectRetained (T object) {
SpaceRecord anno = object.getClass().getAnnotation(SpaceRecord.class);
if (anno == null || anno.local() == false) {
return true;
}
@SuppressWarnings("unchecked")
MultiFilteredQueue<T> ret = (MultiFilteredQueue<T>) queues.get (object.getClass());
if (ret == null) {
return false;
}
return ret.match(object);
}
/**
* {@inheritDoc}
*/
@Override
public Integer getPartitionId () {
return null; // Partitioning is not supported.
}
@Override
public UniqueId getSpaceInstanceId() {
return instanceId;
}
SpaceRecordListener spaceRecordListener;
/**
* Set a data listener.
* @param listener The data listener.
*/
@Override
public void setSpaceRecordListener (SpaceRecordListener listener) {
this.spaceRecordListener = listener;
}
/**
* Get the current Time provider.
* @return The current time provider for timestamps.
*/
public CurrentTimeProvider getCurrentTimeProvider() {
return currentTimeProvider;
}
}