SimpleTable.java
package org.microspace.table;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.microspace.exception.LockException;
import org.microspace.specific.CurrentTimeProvider;
import org.microspace.table.column.Accessor;
import org.microspace.table.column.IndexedMap;
import org.microspace.table.column.IndexedSet;
import org.microspace.table.query.TableQuery;
import org.microspace.table.query.TemplateQuery;
/**
* Make a shallow copy of the object using the empty constructor and
* the getA setA getB setB methods. This object is presented to the front side
* of the map, and it can be committed.
* The objects in a new transacton (after commmit or rollback) are using repeatable
* read, they always present the user with the same object, and it wont be re-read.
* @author Gaspar Sinai - {@literal gaspar.sinai@microspace.org}
* @version 2012-01-29
*/
public class SimpleTable<T> implements TransactionalTable<T> {
private final SimpleUnsafeTable<T> back;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final Object underlyingChanged;
private final IndexedSet<Object> lockedObjects;
private final ThreadLocal<IsolatorContext<T>> threadContext;
private final Accessor<T> accessor;
private boolean serialized;
private EntryListener<T> listener;
private final ThreadLocal<Boolean> interrupted = new ThreadLocal<Boolean> () {
@Override
public Boolean initialValue () {
return Boolean.FALSE;
}
};
public SimpleTable(SimpleUnsafeTable<T> backMap) {
this.back = backMap;
this.underlyingChanged = new Object();
this.accessor = back.getAccessor();
this.lockedObjects = new IndexedSet<Object> (accessor.getPrimaryKeyGetSetPair().getIndexType());
this.threadContext = new ThreadLocal<IsolatorContext<T>> () {
@Override
protected IsolatorContext<T> initialValue() {
return new IsolatorContext<T>(accessor, backMap.getCurrentTimeProvider());
}
};
this.serialized = false;
accessor.getBlankObject();
}
public SimpleTable(Class<T> clazz) {
this(clazz, null);
}
public SimpleTable(Class<T> clazz, CurrentTimeProvider currentTimeProvider) {
this (new SimpleUnsafeTable<>(clazz, currentTimeProvider));
}
public void setEntryListener(EntryListener<T> listener) {
this.listener = listener;
}
public void setSerialized (boolean serialized) {
this.serialized = serialized;
}
public int size () {
lock.readLock().lock ();
try {
return back.size();
} finally {
lock.readLock().unlock ();
}
}
public int getPendingSize() {
IsolatorContext<T> context = threadContext.get();
return context.getIsolator().getUpdatedKeys().size();
}
/* (non-Javadoc)
* @see org.microspace.map.LocalbleTransactional#lock()
*/
public void lock () throws LockException {
IsolatorContext<T> context = threadContext.get();
if (context.getIsolator().getState() == Isolator.State.LOCKED) return;
context.getIsolator().setState (Isolator.State.LOCKED);
if (context.getIsolator().getUpdatedKeys().size() == 0) return;
lock.writeLock().lock();
try {
IndexedSet<Object> myobjects = new IndexedSet<Object> (accessor.getPrimaryKeyGetSetPair().getIndexType());
// Check data
for (Object key : context.getIsolator().getUpdatedKeys() ) {
if (lockedObjects.contains(key)) {
rollback();
throw new LockException ("Locked object key = "+ key);
}
if (serialized) {
Entry<T> frontEntry = context.getIsolator().getEntry(key);
try {
frontEntry.serialize();
} catch (Exception e) {
rollback();
throw new IllegalArgumentException ("Failed to serialize a field in " + frontEntry.getSpaceEntry().getClass());
}
}
myobjects.add (key);
Entry<T> uc = back.getEntry(key);
BigInteger updateCount = context.getIsolator().getBackUpdateCount (key);
if (updateCount != null) {
if (uc == null) {
rollback();
throw new LockException ("Removed data key = "+key);
}
BigInteger oldCount = uc.getUpdateCount();
// Someone updated the data since last time we read it.
if (oldCount == null || !updateCount.equals (oldCount)) {
rollback();
throw new LockException ("Existing data key = "+key);
}
} else {
// This must be a new object
if (uc != null) {
rollback();
// It can be 2 reasons: id clash or the object was not leased out
throw new LockException ("Unknown object already exists key = "+key);
}
}
}
lockedObjects.addAll(myobjects);
context.getLockedKeys().addAll(myobjects);
} finally {
lock.writeLock().unlock();
}
}
/* (non-Javadoc)
* @see org.microspace.map.LocalbleTransactional#unlock()
*/
public void unlock () {
IsolatorContext<T> context = threadContext.get();
context.getIsolator().setState (Isolator.State.CREATED);
if (context.getLockedKeys().size() == 0) {
return;
}
lock.writeLock().lock();
lockedObjects.removeAll(context.getLockedKeys());
lock.writeLock().unlock();
context.getLockedKeys().clear();
}
/* (non-Javadoc)
* @see org.microspace.map.Transactional#commit()
*/
public void commit () {
lock();
IsolatorContext<T> context = threadContext.get();
if (context.getLockedKeys().size() == 0) {
interrupted.remove();
threadContext.remove();
return;
}
lock.writeLock().lock ();
try {
// Preserve order
IndexedSet<Object> keyset = context.getIsolator().getUpdatedKeys();
ArrayList<Entry<T>> entryArray = new ArrayList<Entry<T>>(keyset.size());
for (Object key : keyset) {
Entry<T> fc = context.getIsolator().getEntry(key);
entryArray.add(fc);
}
Collections.sort(entryArray);
for (Entry<T> entry : entryArray) {
back.setEntry (new Entry<T> (entry));
}
} finally {
lock.writeLock().unlock ();
lockedObjects.removeAll(context.getLockedKeys());
context.clear();
threadContext.remove();
interrupted.remove();
}
synchronized (underlyingChanged) {
underlyingChanged.notifyAll();
}
}
public void rollback () {
IsolatorContext<T> context = threadContext.get();
lockedObjects.removeAll(context.getLockedKeys());
threadContext.remove();
interrupted.remove();
}
/**
* {@inheritDoc}
*/
public T take (TableQuery<T> query, long timeout) {
long start = System.currentTimeMillis();
long remainingTime = timeout;
while (true) {
T ret = null;
synchronized (underlyingChanged) {
ret = take (query);
if (ret != null) return ret;
try {
underlyingChanged.wait(remainingTime);
} catch (InterruptedException e) {
interrupted.set(true);
return null;
}
}
// Take is faster than iterating through underlyingChanged.
ret = take (query);
if (ret != null) return ret;
long now = System.currentTimeMillis();
if (now < start) start = now;
if (now - start >= timeout) return null;
remainingTime = timeout - (now - start);
}
}
public void write(T object) {
IsolatorContext<T> context = threadContext.get();
// Update if same key is found
Object primaryKey = accessor.getPrimaryKeyGetSetPair().get(object);
readById(primaryKey);
Object id = accessor.getPrimaryKeyGetSetPair().get(object);
if (id == null) {
throw new IllegalArgumentException ("primary key is null");
}
SimpleUnsafeTable.autoTimestamp(object, back.getCurrentTimeProvider());
BigInteger nextCount = back.updateCounter.incrementAndGet();
// GASPAR 2017-10-12 setentry also sets update counter, but the code is more readable.
Entry<T> container = new Entry<T> (object, accessor, nextCount);
context.getIsolator().setEntry (container);
if (listener != null) {
listener.entryAdded(accessor, object);
}
}
private void entryRemoved (IsolatorContext<T> context, T object) {
if (listener != null) {
listener.entryRemoved(accessor, object);
}
}
/**
* First try to read the object from frontMap if it fails, copy one from the underlying map.
*/
public T read(TableQuery<T> query) {
IsolatorContext<T> context = threadContext.get ();
IndexedMap<Object, Entry<T>> local = context.getIsolator().readMultipleEntries (query);
IndexedMap<Object, Entry<T>> back = readMultipleBackEntries(query);
List<Entry<T>> entries = mergeAndSortValidEntries (query, local, back);
if (entries.size()==0) return null;
Entry<T> entry = entries.get(0);
if (context.getIsolator().getEntry(entry.getPrimaryKey()) == null) {
entry = new Entry<T> (entry);
context.getIsolator().setFirstEntry (entry);
}
return entry.getSpaceEntry();
}
/**
* Merge and sort not removed entries.
* @param query The query
* @param local The local entries.
* @param back The back entries.
* @return The sorted list of entries.
*/
private List<Entry<T>> mergeAndSortValidEntries (TableQuery<T> query,
IndexedMap<Object, Entry<T>> local,
IndexedMap<Object, Entry<T>> back) {
IsolatorContext<T> context = threadContext.get ();
IndexedMap<Object, Entry<T>> entryMap = new IndexedMap<>(accessor.getPrimaryKeyGetSetPair().getIndexType());
for (Object key : back.keySet()) {
Entry<T> localEntry = context.getIsolator().getEntry(key);
if (localEntry != null) continue;
Entry<T> backEntry = back.get(key);
if (backEntry.isRemoved()) continue; // never happens
entryMap.put(key, backEntry);
}
for (Object key : local.keySet()) {
Entry<T> localEntry = local.get(key);
if (localEntry.isRemoved()) continue;
entryMap.put(key, localEntry);
}
return query.sortAndLimit(accessor, entryMap);
}
/**
* Read objects first from the front map, then from backup.
*/
public List<T> readMultiple(TableQuery<T> query) {
IsolatorContext<T> context = threadContext.get ();
IndexedMap<Object, Entry<T>> local = context.getIsolator().readMultipleEntries (query);
IndexedMap<Object, Entry<T>> back = readMultipleBackEntries(query);
List<Entry<T>> entries = mergeAndSortValidEntries (query, local, back);
ArrayList<T> ret = new ArrayList<T> (entries.size());
if (entries.size()==0) return ret;
for (Entry<T> entry : entries) {
if (context.getIsolator().getEntry(entry.getPrimaryKey()) == null) {
entry = new Entry<T> (entry);
context.getIsolator().setFirstEntry (entry);
}
ret.add(entry.getSpaceEntry());
}
return ret;
}
/**
* Try to take from underlying then front Mark objects taken as removed in frontMap.
*/
public T take(TableQuery<T> query) {
IsolatorContext<T> context = threadContext.get ();
IndexedMap<Object, Entry<T>> local = context.getIsolator().readMultipleEntries (query);
IndexedMap<Object, Entry<T>> back = readMultipleBackEntries(query);
List<Entry<T>> entries = mergeAndSortValidEntries (query, local, back);
if (entries.size()==0) return null;
Entry<T> entry = entries.get(0);
entry = new Entry<T> (entry);
entry.setRemoved(true);
if (context.getIsolator().getEntry(entry.getPrimaryKey()) == null) {
context.getIsolator().setFirstEntry (entry);
} else {
context.getIsolator().setEntry (entry);
}
entryRemoved (context, entry.getSpaceEntry());
return entry.getSpaceEntry();
}
/**
* Try to take from front map, then underlying. Mark objects taken as removed in frontMap.
*/
public List<T> takeMultiple(TableQuery<T> query) {
IsolatorContext<T> context = threadContext.get ();
IndexedMap<Object, Entry<T>> local = context.getIsolator().readMultipleEntries (query);
IndexedMap<Object, Entry<T>> back = readMultipleBackEntries(query);
List<Entry<T>> entries = mergeAndSortValidEntries (query, local, back);
ArrayList<T> ret = new ArrayList<T> (entries.size());
if (entries.size()==0) return ret;
for (Entry<T> entry : entries) {
entry = new Entry<T> (entry);
entry.setRemoved(true);
if (context.getIsolator().getEntry(entry.getPrimaryKey()) == null) {
context.getIsolator().setFirstEntry (entry);
} else {
context.getIsolator().setEntry (entry);
}
entryRemoved (context, entry.getSpaceEntry());
ret.add(entry.getSpaceEntry());
}
return ret;
}
public void clearUpdates() {
IsolatorContext<T> context = threadContext.get ();
context.getIsolator().clear();
}
public void clear() {
T emptyObject = accessor.getBlankObject();
takeMultiple(new TemplateQuery<T>(emptyObject));
}
public Entry<T> getEntry(Object key) {
IsolatorContext<T> context = threadContext.get ();
Entry<T> ret = context.getIsolator().getEntry(key);
if (ret != null) return ret;
lock.readLock().lock();
try {
ret = back.getEntry (key);
return ret;
} finally {
lock.readLock().unlock();
}
}
public void setEntry (Entry<T> entry) {
IsolatorContext<T> context = threadContext.get ();
// Update if same key is alredy in map.
readById(entry.getPrimaryKey());
context.getIsolator().setEntry (entry);
if (listener != null) {
if (entry.isRemoved()) {
listener.entryRemoved(
accessor, entry.getSpaceEntry());
} else {
listener.entryAdded(
accessor, entry.getSpaceEntry());
}
}
}
public IndexedSet<Object> keySet() {
IndexedSet<Object> ret = new IndexedSet<Object>(accessor.getPrimaryKeyGetSetPair().getIndexType());
IsolatorContext<T> context = threadContext.get ();
ret.addAll (context.getIsolator().keySet());
lock.readLock().lock();
try {
ret.addAll(back.keySet());
return ret;
} finally {
lock.readLock().unlock();
}
}
public T readById (Object primaryKey) {
IsolatorContext<T> context = threadContext.get();
Entry<T> entry = context.getIsolator().getEntry(primaryKey);
if (entry != null) {
if (entry.isRemoved()) return null;
return entry.getSpaceEntry();
}
lock.readLock().lock();
try {
entry = back.getEntry(primaryKey);
} finally {
lock.readLock().unlock();
if (entry == null || entry.isRemoved()) return null;
entry = new Entry<T> (entry);
}
context.getIsolator().setFirstEntry (entry);
return entry.getSpaceEntry();
}
public T takeById(Object primaryKey) {
IsolatorContext<T> context = threadContext.get ();
Entry<T> entry = context.getIsolator().getEntry(primaryKey);
if (entry != null) {
if (entry.isRemoved()) return null;
entry.setRemoved(true);
context.getIsolator().setEntry (entry);
entryRemoved(context, entry.getSpaceEntry());
return entry.getSpaceEntry();
}
lock.writeLock().lock();
try {
entry = back.getEntry(primaryKey);
} finally {
lock.writeLock().unlock();
if (entry == null || entry.isRemoved()) return null;
entry = new Entry<T> (entry);
}
entry = new Entry<T> (entry);
entry.setRemoved(true);
context.getIsolator().setFirstEntry (entry);
entryRemoved(context, entry.getSpaceEntry());
return entry.getSpaceEntry();
}
public IndexedMap<Object, Entry<T>> readMultipleBackEntries(TableQuery<T>query) {
lock.readLock().lock ();
try {
return back.readMultipleEntries(query);
} finally {
lock.readLock().unlock();
}
}
public Accessor<T> getAccessor () {
return accessor;
}
/**
* Check if the thread has been interrupted in take with a timeout.
* @return true if the thread has been interrupted.
*/
public boolean isInterrupted () {
return interrupted.get();
}
/**
* Get the current Time provider.
* @return The current time provider for timestamps.
*/
public CurrentTimeProvider getCurrentTimeProvider() {
return back.getCurrentTimeProvider();
}
}