SimpleTable.java

package org.microspace.table;

import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.microspace.exception.LockException;
import org.microspace.specific.CurrentTimeProvider;
import org.microspace.table.column.Accessor;
import org.microspace.table.column.IndexedMap;
import org.microspace.table.column.IndexedSet;
import org.microspace.table.query.TableQuery;
import org.microspace.table.query.TemplateQuery;

/**
 * Make a shallow copy of the object using the empty constructor and 
 * the getA setA getB setB methods. This object is presented to the front side
 * of the map, and it can be committed.
 * The objects in a new transacton (after commmit or rollback) are using repeatable
 * read, they always present the user with the same object, and it wont be re-read.
 * @author Gaspar Sinai - {@literal gaspar.sinai@microspace.org}
 * @version 2012-01-29
 */
public class SimpleTable<T> implements TransactionalTable<T> {
	
	private final SimpleUnsafeTable<T> back;
	private final ReadWriteLock lock = new ReentrantReadWriteLock();
	private final Object underlyingChanged;
	private final IndexedSet<Object> lockedObjects;
	private final ThreadLocal<IsolatorContext<T>> threadContext;
	private final Accessor<T> accessor;
	private boolean serialized;
	
	private EntryListener<T> listener;
	private final ThreadLocal<Boolean> interrupted = new ThreadLocal<Boolean> ()  {
		@Override
		public Boolean initialValue () {
			return Boolean.FALSE;
		}
	};
	
	public SimpleTable(SimpleUnsafeTable<T> backMap) {
		this.back = backMap;
		this.underlyingChanged = new Object();
		this.accessor = back.getAccessor();
		this.lockedObjects = new IndexedSet<Object> (accessor.getPrimaryKeyGetSetPair().getIndexType());
		this.threadContext = new ThreadLocal<IsolatorContext<T>> () {
			@Override
			protected IsolatorContext<T> initialValue() {
                return new IsolatorContext<T>(accessor, backMap.getCurrentTimeProvider());
			}
		};
		this.serialized = false;
		accessor.getBlankObject();
	}
	public SimpleTable(Class<T> clazz) {
		this(clazz, null);
	}
	
	public SimpleTable(Class<T> clazz, CurrentTimeProvider currentTimeProvider) {
		this (new SimpleUnsafeTable<>(clazz, currentTimeProvider));
	}
	
	public void setEntryListener(EntryListener<T> listener) {
		this.listener = listener;
	}
	
	public void setSerialized (boolean serialized) {
		this.serialized = serialized;
	}
	
	public int size () {
		lock.readLock().lock ();
		try {
			return back.size();
		} finally {
			lock.readLock().unlock ();
		}
	}
	
	public int getPendingSize() {
		IsolatorContext<T> context = threadContext.get();
		return context.getIsolator().getUpdatedKeys().size();
	}
		
	/* (non-Javadoc)
	 * @see org.microspace.map.LocalbleTransactional#lock()
	 */
	public void lock () throws LockException {
		IsolatorContext<T> context = threadContext.get();
		if (context.getIsolator().getState() == Isolator.State.LOCKED) return;
		context.getIsolator().setState (Isolator.State.LOCKED);
		if (context.getIsolator().getUpdatedKeys().size() == 0) return;
		lock.writeLock().lock();
		try {
			IndexedSet<Object> myobjects = new IndexedSet<Object> (accessor.getPrimaryKeyGetSetPair().getIndexType());
			// Check data
			for (Object key : context.getIsolator().getUpdatedKeys() ) {
				if (lockedObjects.contains(key)) {
					rollback();
					throw new LockException ("Locked object key = "+ key);
				}
				if (serialized) {
					Entry<T> frontEntry = context.getIsolator().getEntry(key);
					try {
						frontEntry.serialize();
					} catch (Exception e) {
						rollback();
						throw new IllegalArgumentException ("Failed to serialize a field in " + frontEntry.getSpaceEntry().getClass());
					}
				}
				myobjects.add (key);
				Entry<T> uc = back.getEntry(key);
				BigInteger updateCount = context.getIsolator().getBackUpdateCount (key);
				if (updateCount != null) {
					if (uc == null) {
						rollback();
						throw new LockException ("Removed data key = "+key);
					}
					BigInteger oldCount = uc.getUpdateCount();
					// Someone updated the data since last time we read it.
					if (oldCount == null || !updateCount.equals (oldCount)) {
						rollback();
						throw new LockException ("Existing data key = "+key);
					}
				} else {
					// This must be a new object
					if (uc != null) {
						rollback();
						// It can be 2 reasons: id clash or the object was not leased out
						throw new LockException ("Unknown object already exists key = "+key);
					}
				}
			}
			lockedObjects.addAll(myobjects);
			context.getLockedKeys().addAll(myobjects);
			
		} finally {
			lock.writeLock().unlock();
		}
	}
	
	/* (non-Javadoc)
	 * @see org.microspace.map.LocalbleTransactional#unlock()
	 */
	
	public void unlock () {
		IsolatorContext<T> context = threadContext.get();
		context.getIsolator().setState (Isolator.State.CREATED);
		if (context.getLockedKeys().size() == 0) {
			return;
		}
		lock.writeLock().lock();
		lockedObjects.removeAll(context.getLockedKeys());
		lock.writeLock().unlock();
		context.getLockedKeys().clear();
	}

	/* (non-Javadoc)
	 * @see org.microspace.map.Transactional#commit()
	 */
	public void commit () {
		
		lock();
		IsolatorContext<T> context = threadContext.get();
		if (context.getLockedKeys().size() == 0) {
			interrupted.remove();
			threadContext.remove();
			return;
		}
		lock.writeLock().lock ();
		try {
			// Preserve order
			IndexedSet<Object> keyset = context.getIsolator().getUpdatedKeys();
			ArrayList<Entry<T>> entryArray = new ArrayList<Entry<T>>(keyset.size());
			
			for (Object key : keyset) {
				Entry<T> fc = context.getIsolator().getEntry(key);
				entryArray.add(fc);
			}
			Collections.sort(entryArray);
			for (Entry<T> entry : entryArray) {
				back.setEntry (new Entry<T> (entry));
			}
			
		} finally {
			lock.writeLock().unlock ();
			
			lockedObjects.removeAll(context.getLockedKeys());
			context.clear();
			threadContext.remove();
			interrupted.remove();
			
		}
		synchronized (underlyingChanged) {
			underlyingChanged.notifyAll();
		}
	}

	public void rollback () {
		IsolatorContext<T> context = threadContext.get();
		lockedObjects.removeAll(context.getLockedKeys());
		threadContext.remove();
		interrupted.remove();
		
	}
	

	/**
	 * {@inheritDoc}
	 */
	public T take (TableQuery<T> query, long timeout) {
		long start = System.currentTimeMillis();
		long remainingTime = timeout;
		while (true) {
			T ret = null;
			synchronized (underlyingChanged) {
				ret = take (query);
				if (ret != null) return ret;
					try {
						underlyingChanged.wait(remainingTime);
					} catch (InterruptedException e) {
						interrupted.set(true);
						return null;
					}

			}
			// Take is faster than iterating through underlyingChanged.
			ret = take (query);
			if (ret != null) return ret;
			long now = System.currentTimeMillis();
			if (now < start) start = now;
			if (now - start >= timeout) return null;
			remainingTime = timeout - (now - start);
		}
	}

	public void write(T object) {
		IsolatorContext<T> context = threadContext.get();
		// Update if same key is found
		Object primaryKey = accessor.getPrimaryKeyGetSetPair().get(object);
		readById(primaryKey);
		
		Object id = accessor.getPrimaryKeyGetSetPair().get(object);
		if (id == null) {
			throw new IllegalArgumentException ("primary key is null");
		}
		SimpleUnsafeTable.autoTimestamp(object, back.getCurrentTimeProvider());
		BigInteger nextCount = back.updateCounter.incrementAndGet();
		// GASPAR 2017-10-12 setentry also sets update counter, but the code is more readable.
		Entry<T> container = new Entry<T> (object, accessor, nextCount);
		context.getIsolator().setEntry (container);
		
		if (listener != null) {
			listener.entryAdded(accessor, object);
		}
	}
	
	private void entryRemoved (IsolatorContext<T> context, T object) {
		if (listener != null) {
			listener.entryRemoved(accessor, object);
		}
	}

	/**
	 * First try to read the object from frontMap if it fails, copy one from the underlying map. 
	 */
	public T read(TableQuery<T> query) {
		IsolatorContext<T> context = threadContext.get ();
		IndexedMap<Object, Entry<T>>  local = context.getIsolator().readMultipleEntries (query);
		IndexedMap<Object, Entry<T>> back = readMultipleBackEntries(query);
		List<Entry<T>> entries = mergeAndSortValidEntries (query, local, back);
		if (entries.size()==0) return null;
		Entry<T> entry = entries.get(0);
		if (context.getIsolator().getEntry(entry.getPrimaryKey()) == null) {
			entry = new Entry<T> (entry);
			context.getIsolator().setFirstEntry (entry);
		}
		return entry.getSpaceEntry();
	}
	
	/**
	 * Merge and sort not removed entries.
	 * @param query The query
	 * @param local The local entries.
	 * @param back The back entries.
	 * @return The sorted list of entries.
	 */
	private List<Entry<T>> mergeAndSortValidEntries (TableQuery<T> query, 
			IndexedMap<Object, Entry<T>>  local,
			IndexedMap<Object, Entry<T>>  back) {
		IsolatorContext<T> context = threadContext.get ();
		IndexedMap<Object, Entry<T>> entryMap = new IndexedMap<>(accessor.getPrimaryKeyGetSetPair().getIndexType());
		for (Object key : back.keySet()) {
			Entry<T> localEntry = context.getIsolator().getEntry(key);
			if (localEntry != null) continue;
			Entry<T> backEntry = back.get(key);
			if (backEntry.isRemoved()) continue; // never happens
			entryMap.put(key, backEntry);
		}
		for (Object key : local.keySet()) {
			Entry<T> localEntry = local.get(key);
			if (localEntry.isRemoved()) continue;
			entryMap.put(key, localEntry);
		}
		return query.sortAndLimit(accessor, entryMap);
	}
	
	/**
	 * Read objects first from the front map, then from backup.
	 */
	public List<T> readMultiple(TableQuery<T> query) {
		IsolatorContext<T> context = threadContext.get ();
		IndexedMap<Object, Entry<T>>  local = context.getIsolator().readMultipleEntries (query);
		IndexedMap<Object, Entry<T>> back = readMultipleBackEntries(query);
		List<Entry<T>> entries = mergeAndSortValidEntries (query, local, back);
		ArrayList<T> ret = new ArrayList<T> (entries.size());
		if (entries.size()==0) return ret;
		for (Entry<T> entry : entries) {
			if (context.getIsolator().getEntry(entry.getPrimaryKey()) == null) {
				entry = new Entry<T> (entry);
				context.getIsolator().setFirstEntry (entry);
			}
			ret.add(entry.getSpaceEntry());
		}
		return ret;
	}

	/**
	 * Try to take from underlying then front Mark objects taken as removed in frontMap.
	 */
	public T take(TableQuery<T> query) {
		IsolatorContext<T> context = threadContext.get ();
		IndexedMap<Object, Entry<T>>  local = context.getIsolator().readMultipleEntries (query);
		IndexedMap<Object, Entry<T>> back = readMultipleBackEntries(query);
		List<Entry<T>> entries = mergeAndSortValidEntries (query, local, back);
		if (entries.size()==0) return null;
		Entry<T> entry = entries.get(0);
		entry = new Entry<T> (entry);
		entry.setRemoved(true);
		if (context.getIsolator().getEntry(entry.getPrimaryKey()) == null) {
			context.getIsolator().setFirstEntry (entry);
		} else {
			context.getIsolator().setEntry (entry);
		}
		entryRemoved (context, entry.getSpaceEntry());
		return entry.getSpaceEntry();
	}

	/**
	 * Try to take from front map, then underlying. Mark objects taken as removed in frontMap.
	 */
	public List<T> takeMultiple(TableQuery<T> query) {
		IsolatorContext<T> context = threadContext.get ();
		IndexedMap<Object, Entry<T>>  local = context.getIsolator().readMultipleEntries (query);
		IndexedMap<Object, Entry<T>> back = readMultipleBackEntries(query);
		List<Entry<T>> entries = mergeAndSortValidEntries (query, local, back);
		ArrayList<T> ret = new ArrayList<T> (entries.size());
		if (entries.size()==0) return ret;
		for (Entry<T> entry : entries) {
			entry = new Entry<T> (entry);
			entry.setRemoved(true);
			if (context.getIsolator().getEntry(entry.getPrimaryKey()) == null) {
				context.getIsolator().setFirstEntry (entry);
			} else {
				context.getIsolator().setEntry (entry);
			}
			entryRemoved (context, entry.getSpaceEntry());
			ret.add(entry.getSpaceEntry());
		}
		return ret;
	}
	

	public void clearUpdates() {
		IsolatorContext<T> context = threadContext.get ();
		context.getIsolator().clear();
	}
	
	public void clear() {
		T emptyObject = accessor.getBlankObject();
		takeMultiple(new TemplateQuery<T>(emptyObject));
	}

	
	public Entry<T> getEntry(Object key) {
		IsolatorContext<T> context = threadContext.get ();
		Entry<T> ret = context.getIsolator().getEntry(key);
		if (ret != null) return ret;
		lock.readLock().lock();
		try {
			ret = back.getEntry (key);
			return ret;
		} finally {
			lock.readLock().unlock();
		}
		
	}

	public void setEntry (Entry<T> entry) {
		IsolatorContext<T> context = threadContext.get ();
		// Update if same key is alredy in map.
		readById(entry.getPrimaryKey());
		context.getIsolator().setEntry (entry);
		if (listener != null) {
			if (entry.isRemoved()) {
				listener.entryRemoved( 
						accessor, entry.getSpaceEntry());
			} else {
				listener.entryAdded(
						accessor, entry.getSpaceEntry());
			}
		}
	}


	public IndexedSet<Object> keySet() {
		IndexedSet<Object> ret = new IndexedSet<Object>(accessor.getPrimaryKeyGetSetPair().getIndexType());
		IsolatorContext<T> context = threadContext.get ();
		ret.addAll (context.getIsolator().keySet());
		lock.readLock().lock();
		try {
			ret.addAll(back.keySet());
			return ret;
		} finally {
			lock.readLock().unlock();
		}
	}
	
	public T readById (Object primaryKey) {
		IsolatorContext<T> context = threadContext.get();
		Entry<T> entry = context.getIsolator().getEntry(primaryKey);
		if (entry != null) {
			if (entry.isRemoved()) return null;
			return entry.getSpaceEntry();
		}
		lock.readLock().lock();
		try {
			entry = back.getEntry(primaryKey);
			

		} finally {
			lock.readLock().unlock();
			if (entry == null || entry.isRemoved()) return null;
			entry = new Entry<T> (entry);
		}
		
		context.getIsolator().setFirstEntry (entry);
		return entry.getSpaceEntry();
	}
	public T takeById(Object primaryKey) {
		IsolatorContext<T> context = threadContext.get ();
		Entry<T> entry = context.getIsolator().getEntry(primaryKey);
		if (entry != null) {
			if (entry.isRemoved()) return null;
			entry.setRemoved(true);
			context.getIsolator().setEntry (entry);
			entryRemoved(context, entry.getSpaceEntry());
			return entry.getSpaceEntry();
		}
		lock.writeLock().lock();
		try {
			entry = back.getEntry(primaryKey);
		} finally {
			lock.writeLock().unlock();
			if (entry == null || entry.isRemoved()) return null;
			entry = new Entry<T> (entry);
		}
		entry = new Entry<T> (entry);
		entry.setRemoved(true);
		context.getIsolator().setFirstEntry (entry);
		entryRemoved(context, entry.getSpaceEntry());
		return entry.getSpaceEntry();
	}

	
	public IndexedMap<Object, Entry<T>> readMultipleBackEntries(TableQuery<T>query) {
		lock.readLock().lock ();
		try {
			return back.readMultipleEntries(query);
		} finally {
			lock.readLock().unlock();
		}
	}

	public Accessor<T> getAccessor () {
		return accessor;
	}
	/**
	 * Check if the thread has been interrupted in take with a timeout.
	 * @return true if the thread has been interrupted.
	 */
	 public boolean isInterrupted () {
		 return interrupted.get();
	 }
	 
	/**
	 * Get the current Time provider.
	 * @return The current time provider for timestamps.
	 */
	public CurrentTimeProvider getCurrentTimeProvider() {
		return back.getCurrentTimeProvider();
	}

}