SimpleSpace.java

package org.microspace.space;

import org.microspace.annotation.SpaceRecord;
import org.microspace.event.SpaceRecordListener;
import org.microspace.exception.LockException;
import org.microspace.specific.CurrentTimeProvider;
import org.microspace.table.*;
import org.microspace.table.column.Accessor;
import org.microspace.table.query.TableQuery;
import org.microspace.table.query.TemplateQuery;
import org.microspace.transaction.LockableTransactional;
import org.microspace.util.AccessorCache;
import org.microspace.util.PojoUtil;
import org.microspace.util.UniqueId;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * SimpleSpace is a memory based implementation if MicroSpace.
 * 
 * @author Gaspar Sinai
 * @version 2012-06-11
 *
 */
public class SimpleSpace implements LockableTransactional, MicroSpace, TableListener<Object> {

	private final SpaceConfig spaceConfig;

	protected final ConcurrentHashMap <Class<?>, SimpleTable<?>> maps;
	
	protected final ConcurrentHashMap<Class<? extends Object>, MultiFilteredQueue<?>> queues;
	
	
	
	protected final ReadWriteLock commitLock = new ReentrantReadWriteLock ();
	
	private final Object commitNotifier = new Object ();
	
	final AccessorCache accessorCache;
	private final UniqueId instanceId;
	CommitListener commitListener;
	protected final CurrentTimeProvider currentTimeProvider;
	
	private final ThreadLocal<Boolean> interrupted = new ThreadLocal<Boolean> ()  {
		@Override
		public Boolean initialValue () {
			return Boolean.FALSE;
		}
	};
	
	public SimpleSpace (SpaceConfig spaceConfig) {
		this(spaceConfig, null);
	}
	
	public SimpleSpace (SpaceConfig spaceConfig, CurrentTimeProvider currentTimeProvider) {
		this.currentTimeProvider = currentTimeProvider;
		this.maps = new ConcurrentHashMap <Class<?>, SimpleTable<?>> ();
		this.queues = new ConcurrentHashMap<Class<? extends Object>, MultiFilteredQueue<?>>();
		this.spaceConfig = PojoUtil.copy(spaceConfig);
		this.accessorCache = new AccessorCache(this.spaceConfig);
		this.instanceId = new UniqueId();
	}
	
	
	
	protected void setCommitListener (CommitListener listener) {
		this.commitListener = listener;
	}
	public SimpleSpace () {
		this (new SpaceConfig());
	}
	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> void write (T object) {
		if (!localObjectRetained (object)) {
			throw new IllegalArgumentException ("Local Object Has No Matching  GetNext Queue.");
		}
		SimpleTable<Object> map =  getMap (object.getClass());
		// We are writing to front map, so a read lock is fine.
		SpaceRecord anno = object.getClass().getAnnotation(SpaceRecord.class);
		if (anno == null || anno.local() == false) {
			map.write (object);
		} else {
			SimpleUnsafeTable.autoTimestamp(object, currentTimeProvider);
		}
		TransactionalQueue<Object> queue = getQueue (object.getClass(), false);
		if (queue != null) {
			queue.write(object);
		}
		if(spaceRecordListener != null){
			spaceRecordListener.write(object);
		}
		return;
	}
	
	private <T> T takeFromQueue (Class<T> tableClass, T t) {
		if (t != null) {
			TransactionalQueue<Object> queue = getQueue (tableClass, false);
			if (queue != null) {
				queue.take(t);
			}
		}
		return t;
	}
	
	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> T take (TableQuery<T> query) {
		query.prepareQuery(this);
		SimpleTable<T> map =  getMap (query.getTableClass());
		T t = takeFromQueue (query.getTableClass(), map.take (query));
		query.finishQuery();
		if(spaceRecordListener != null){
			spaceRecordListener.takeByIdOf(t);
		}
		return t;
	}
	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> T takeById (Class<T> tableClass, Object primaryKey) {
		SimpleTable<T> map =  getMap (tableClass);
		T t = takeFromQueue (tableClass, map.takeById (primaryKey));
		if(spaceRecordListener != null){
			spaceRecordListener.takeByIdOf(t);
		}
		return t;
	}
	
	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> T take (TableQuery<T> query, long timeout) {
		long start = System.currentTimeMillis();
		long remainingTime = timeout;
		while (true) {
			T ret = null;
			synchronized (commitNotifier) {
				ret = take(query);
				if (ret != null) {
					if (spaceRecordListener != null) {
						spaceRecordListener.takeByIdOf(ret);
					}
					return takeFromQueue(query.getTableClass(), ret);
				}
				try {
					commitNotifier.wait(remainingTime);
				} catch (InterruptedException e) {
					interrupted.set(true);
					return null;
				}
			}
			// Take is faster than iterating through underlyingChanged.
			ret = take(query);
			if (ret != null) {
				if (spaceRecordListener != null) {
					spaceRecordListener.takeByIdOf(ret);
				}
				return takeFromQueue(query.getTableClass(), ret);
			}
			long now = System.currentTimeMillis();
			if (now < start) start = now;
			if (now - start >= timeout) return null;
			remainingTime = timeout - (now - start);
		}
	}
	
	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> List<T> takeMultiple(TableQuery<T> query) {
		query.prepareQuery(this);
		SimpleTable<T> map =  getMap (query.getTableClass());
		List<T> ret = map.takeMultiple (query);
		for (T t: ret) {
			takeFromQueue (query.getTableClass(), t);
			if (spaceRecordListener != null) {
				spaceRecordListener.takeByIdOf(t);
			}
		}
		query.finishQuery();
		return ret;
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> T read (TableQuery<T> query) {
		query.prepareQuery(this);
		SimpleTable<T> map =  getMap (query.getTableClass());
		T t = map.read (query);
		query.finishQuery();
		return t;
	}
	
	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> List<T> readMultiple(TableQuery<T> query) {
		query.prepareQuery(this);
		SimpleTable<T> map =  getMap (query.getTableClass());
		List<T> ret = map.readMultiple (query);
		query.finishQuery();
		return ret;
	}
	
	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> T readById (Class<T> tableClass, Object key) {
		SimpleTable<T> map =  getMap (tableClass, false);
		if (map == null) return null;
		return map.readById (key);
	}
	
	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> void clear(Class<T> clazz) {
		if (clazz == null) {
			Collection<SimpleTable<?>> maps = getMaps();
			for (SimpleTable<?> map : maps) {
				try {
					map.clear ();
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
			commit();
		} else {
			SimpleTable<?> map =  getMap (clazz, false);
			if (map == null) return;
			map.clear ();
			commit();
		}
	}
	
	/**
	 * {@inheritDoc}
	 */
	@Override
	public void clear () {
		clear(null);
	}
	
	private <T> SimpleTable<T> getMap (Class<? extends Object> clazz) {
		return getMap (clazz, true);
	}	
	
	@SuppressWarnings("unchecked")
	public <T> SimpleTable<T> getMap (Class<? extends Object> clazz, boolean createIfNotFound) {
		SimpleTable<T> ret = (SimpleTable<T>) maps.get (clazz);
		if (ret != null) {
			return ret;
		}
		if (!createIfNotFound) {
			return null;
		}
		NotifyingTable<T> basic = new NotifyingTable<T>(
				spaceConfig.getAccessGenerator().newAccessor((Class<T>) clazz), 
				currentTimeProvider);
		basic.addTableListener((TableListener<T>) this);
		SimpleTable<T> isolated = new SimpleTable<T>(basic);
		isolated.setSerialized(spaceConfig.isSerialized());
		maps.putIfAbsent(clazz, isolated);
		return (SimpleTable<T>) maps.get (clazz);
	}
	

	
	@SuppressWarnings("unchecked")
	private  <T> MultiFilteredQueue<T> getQueue (Class<? extends Object> clazz, boolean createIfNotFound) {
		MultiFilteredQueue<T> ret = (MultiFilteredQueue<T>) queues.get (clazz);
		if (ret != null) {
			return ret;
		}
		if (!createIfNotFound) {
			return null;
		}
		Accessor<T> accessor = spaceConfig.getAccessGenerator().newAccessor((Class<T>)clazz);
		MultiFilteredQueue<T> created = new MultiFilteredQueue<T>(accessor);
		queues.putIfAbsent(clazz, created);
		ret = (MultiFilteredQueue<T>) queues.get (clazz);
		return ret;
	}
	
	private Collection<SimpleTable<?>> getMaps () {
		commitLock.readLock().lock();
		try {
			Collection<SimpleTable<?>> ret = new ArrayList<SimpleTable<?>> ();
			ret.addAll(maps.values());
			return ret;
		} finally {
			commitLock.readLock().unlock();
		}
	}
	private Collection<MultiFilteredQueue<?>> getQueues () {
		Collection<MultiFilteredQueue<?>> ret = new ArrayList<MultiFilteredQueue<?>>();
		ret.addAll(queues.values());
		return ret;
	}
	
	/**
	 * {@inheritDoc}
	 */
	@Override
	public void commit() {
		Collection<SimpleTable<?>> maps = getMaps();
		// We are using a lock so that all the objects will be updated in one atomic operation.
		commitLock.writeLock().lock();
		LockException exception = null;
		SimpleTable<?> currentMap = null;
		try {
			onCommitBegin();
			// All or nothing. Lock throws exception.
			for (SimpleTable<?> map : maps) {
				currentMap = map;
				map.lock();
			}
			// This block should not throw exception.
			for (SimpleTable<?> map : maps) {
				currentMap = map;
				if (map.getAccessor().isLocalRecord()) {
					map.rollback();
				} else {
					map.commit();
				}
			}
			
		} catch (Exception e) {
			for (SimpleTable<?> map : maps) {
				map.rollback();
			}
			exception = new LockException ((currentMap!=null ? currentMap.getAccessor().getTargetClass().toString() : "Check with developer Unknown Class") +" caused Exception. "+ e.getMessage(), e);
		} finally {
			try {
				onCommitEnd();
			} catch (Exception ec) {
				exception = new LockException (ec);
			}
			commitLock.writeLock().unlock();
		}
		if (exception != null) {
			throw exception;
		}
		Collection<MultiFilteredQueue<?>> queues = getQueues();
		for (MultiFilteredQueue<?> queue : queues) {
			queue.commit();
		}
		synchronized (commitNotifier) {
			commitNotifier.notifyAll();
		}
	}
	
	/**
	 * {@inheritDoc}
	 */
	@Override
	public void rollback() {
		Collection<SimpleTable<?>> maps = getMaps();
		for (SimpleTable<?> map : maps) {
			map.rollback();
		}
		Collection<MultiFilteredQueue<?>> queues = getQueues();
		for (MultiFilteredQueue<?> queue : queues) {
			queue.rollback();
		}
		
	}
	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> int size(Class<T> clazz)  {
		SimpleTable<Object> map =  getMap (clazz);
		return map.size();
	}
	
	/**
	 * {@inheritDoc}
	 */
	@Override
	public int size() {
		Collection<SimpleTable<?>> maps = getMaps();
		int ret = 0;
		for (SimpleTable<?> t : maps) {
			ret += t.size();
		}
		return ret;
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public List<Class<?>> getTableClasses() {
		LinkedList<Class<?>> ret = new LinkedList<Class<?>> ();
		Collection<SimpleTable<?>> maps = getMaps();
		for (SimpleTable<?> t : maps) {
			ret.add(t.getAccessor().getTargetClass());
		}
		Collections.sort(ret, new Comparator<Class<?>>() {
	        @Override
	        public int compare(final Class<?> object1, final Class<?> object2) {
	            return object1.getName().compareTo(object2.getName());
	        }
	       } 
		);
		return ret;
	}
	
	/**
	 * {@inheritDoc}
	 */
	@Override
	public void lock() {
		Collection<SimpleTable<?>> maps = getMaps();
		try {
			for (SimpleTable<?> map : maps) {
				map.lock();
			}
		} catch (LockException le) {
			for (SimpleTable<?> map : maps) {
				map.unlock();
			}
			throw new LockException (le.getMessage());
		} catch (IllegalArgumentException ie) {
			for (SimpleTable<?> map : maps) {
				map.unlock();
			}
			throw new IllegalArgumentException(ie.getMessage());
		}
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public void unlock() {
		Collection<SimpleTable<?>> maps = getMaps();
		for (SimpleTable<?> map : maps) {
			map.unlock();
		}
	}
	
	private void onCommitBegin () {
		if (commitListener != null) commitListener.commitBegins();
	}

	public void tableChangeEvent(Object src, Entry<Object> oldEntry, Entry<Object> newEntry) {
		if (commitListener != null) commitListener.entryAdded(newEntry);
		
	}
	
	private void onCommitEnd () {
		if (commitListener != null) commitListener.commitEnds();
	}
	
	SpaceConfig getSpaceConfig () {
		return spaceConfig;
	}
	public AccessorGenerator getAccessorGenerator() {
		return spaceConfig.getAccessGenerator();
	}
	

	public <T> void setEntry (Entry<T> object) {
		SimpleTable<T> map =  getMap (object.getSpaceEntry().getClass());
		// We are writing to front map, so a read lock is fine.
		map.setEntry (object);
		return;
	}
	
	@SuppressWarnings("unchecked")
	public <T> T getBlankObject(Class<T> tableClass) {
		return (T) accessorCache.get(tableClass.getName()).getBlankObject();
	}
	
	@SuppressWarnings("unchecked")
	@Override 
	public <T> T readByIdOf(T initializedObject) {
		Accessor<T> accessor = (Accessor<T>) accessorCache.get(initializedObject.getClass().getName());
		Object key = accessor.getPrimaryKeyGetSetPair().get(initializedObject);
		if (isRemoteSpace() && accessor.getPartitionIdGetSetPair() != null) {
			Integer partition = (Integer) accessor.getPartitionIdGetSetPair().get(initializedObject);
			T obj = accessor.getBlankObject();
			accessor.getPrimaryKeyGetSetPair().set(obj, key);
			accessor.getPartitionIdGetSetPair().set(obj, partition);
			return read(obj);
		}
		return readById ((Class<T>) initializedObject.getClass(), key);
	}
	@SuppressWarnings("unchecked")
	@Override 
	public <T> T takeByIdOf(T initializedObject) {
		Accessor<T> accessor = (Accessor<T>) accessorCache.get(initializedObject.getClass().getName());
		Object key = accessor.getPrimaryKeyGetSetPair().get(initializedObject);
		if (isRemoteSpace() && accessor.getPartitionIdGetSetPair() != null) {
			Integer partition = (Integer) accessor.getPartitionIdGetSetPair().get(initializedObject);
			T obj = accessor.getBlankObject();
			accessor.getPrimaryKeyGetSetPair().set(obj, key);
			accessor.getPartitionIdGetSetPair().set(obj, partition);
			return take(obj);
		}
		return takeById ((Class<T>) initializedObject.getClass(), key);
		
	}  
	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> T take(T templateObject) {
		return take (new TemplateQuery<T> (templateObject));
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> T take(T templateObject, long timeout) {
		return take (new TemplateQuery<T> (templateObject), timeout);
	}
	
	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> List<T> takeMultiple(T templateObject) {
		return takeMultiple (new TemplateQuery<T> (templateObject));
	}
	
	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> T read(T templateObject) {
		return read (new TemplateQuery<T> (templateObject));
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> List<T> readMultiple(T templateObject) {
		TemplateQuery<T> query = new TemplateQuery<T> (templateObject);
		return readMultiple (query);
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> List<T> readMultiple(T templateObject, int maxEntries) {
		TemplateQuery<T> query = new TemplateQuery<T> (templateObject, maxEntries);
		return readMultiple (query);
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> T getNextMessage(TableQuery<T> query) {
		TransactionalQueue<T> queue = getQueue ((Class<T>)query.getTableClass(), false);
		if (queue == null) {
			throw new IllegalArgumentException ("Unregistered Queue");
		}
		return queue.getNext(query);
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> T getNextMessage(TableQuery<T> query, long timeout) {
		TransactionalQueue<T> queue = getQueue ((Class<T>)query.getTableClass(), false);
		if (queue == null) {
			throw new IllegalArgumentException ("Unregistered Queue");
		}
		T ret = queue.getNext(query, timeout);
		if (queue.isInterrupted()) {
			interrupted.set (true);
		}
		return ret;
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> T getNextMessage(T templateObject) {
		return getNextMessage (new TemplateQuery<T> (templateObject));
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> T getNextMessage(T templateObject, long timeout) {
		return getNextMessage (new TemplateQuery<T> (templateObject), timeout);
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> void registerMessageQueue(TableQuery<T> query) {
		MultiFilteredQueue<T> queue = getQueue (query.getTableClass(), true);
		SimpleTable<T> map = getMap (query.getTableClass(), true);
		queue.addQueueFilter (query);
		List<T> all = map.readMultiple(query);
		for (T o : all) {
			queue.write(o, query);
		}
		queue.commit();
	}


	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> void registerMessageQueue(T templateObject) {
		registerMessageQueue((TableQuery<Object>)new TemplateQuery<Object>(templateObject));
	}



	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> void unregisterMessageQueue(TableQuery<T> query) {
		MultiFilteredQueue<T> queue = getQueue ((Class<T>)query.getTableClass(), false);
		if (queue == null) {
			throw new IllegalArgumentException ("Never registered a temlate for this class");
		}
		queue.removeQueueFilter(query);
	}



	/**
	 * {@inheritDoc}
	 */
	@Override	public void unregisterMessageQueue(Object templateObject) {
		unregisterMessageQueue((TableQuery<Object>)new TemplateQuery<Object>(templateObject));
	}
	
	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> List<TableQuery<T>> getRegisteredMessageQueueQueries (Class<T> clazz) {
		MultiFilteredQueue<T> queue = getQueue ((Class<T>)clazz, false);
		if (queue == null) {
			return new LinkedList<TableQuery<T>>();
		}
		return queue.getRegisteredQueries();
	}
	
	/**
	 * {@inheritDoc}
	 */
	@Override
	public <T> int getMessageQueueSize (TableQuery<T> query) {
		MultiFilteredQueue<T> queue = getQueue (query.getTableClass(), false);
		if (queue == null) {
			return 0;
		}
		return queue.size(query);
	}
	/**
	 * {@inheritDoc}
	 */
	@Override
	public boolean isRemoteSpace () {
		return getSpaceConfig().isRemoteSpace();
	}
	/**
	 * {@inheritDoc}
	 */
	@Override
	public boolean isInterrupted () {
		return interrupted.get();
	}
	
	/**
	 * Local objects needs to have a matching queue, otherwise
	 * the write writes them nowhere.
	 * 
	 * @param object The object to check.
	 */
	private <T> boolean localObjectRetained (T object) {
		SpaceRecord anno = object.getClass().getAnnotation(SpaceRecord.class);
		if (anno == null || anno.local() == false) {
			return true;
		}
		@SuppressWarnings("unchecked")
		MultiFilteredQueue<T> ret = (MultiFilteredQueue<T>) queues.get (object.getClass());
		if (ret == null) {
			return false;
		}
		return ret.match(object);
	}
	
	/**
	 * {@inheritDoc}
	 */
	@Override
	public Integer getPartitionId () {
		return null; // Partitioning is not supported.
	}

	@Override
	public UniqueId getSpaceInstanceId() {
		return instanceId;
	}
	SpaceRecordListener spaceRecordListener;

	/**
	 * Set a data listener.
	 * @param listener The data listener.
	 */
	@Override
	public void setSpaceRecordListener (SpaceRecordListener listener) {
		this.spaceRecordListener = listener;
	}
	/**
	 * Get the current Time provider.
	 * @return The current time provider for timestamps.
	 */
	public CurrentTimeProvider getCurrentTimeProvider() {
		return currentTimeProvider;
	}
}