ContextSwitch.java
package org.microspace.thread;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.HashMap;
import java.util.Map;
/**
* A class which can invoke methods of a class in a separate thread
* assigned by a contextId.
* @author Gaspar Sinai - {@literal gaspar.sinai@microspace.org}
* @version 2017-011-08
* @param <T> The class.
* @param <I> The interface the class implements.
*/
public class ContextSwitch<T extends I, I> implements Contextual, InvocationHandler {
ContextId contextId;
MicroSpaceThreadFactory threadFactory;
final T original;
final I proxyInstance;
final Map<ContextId, ThreadInvoker> threads = new HashMap<>();
boolean started = true;
@SuppressWarnings("unchecked")
public ContextSwitch (T original, Class<I> interf) {
this.original = original;
this.threadFactory = new MicroSpaceThreadFactory("proxy-"
+ original.getClass().getSimpleName());
this.proxyInstance = (I) Proxy.newProxyInstance(interf.getClassLoader(),
new Class<?>[] { interf },
this);
this.contextId = new ContextId();
}
public I proxy() {
return proxyInstance;
}
public void shutdown() {
started = false;
for (ThreadInvoker invoker : threads.values()) {
invoker.terminate();
}
try {
threadFactory.shutdown();
} catch (Exception e) {
System.err.println("Can not shut down thread factory.");
}
}
@Override
public void changeContext(ContextId contextId) {
this.contextId = contextId;
}
@Override
public ContextId getContextId() {
return contextId;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
ThreadInvoker invoker = threads.get(contextId);
if (invoker == null) {
invoker = new ThreadInvoker();
threads.put(contextId, invoker);
Thread thread = threadFactory.newThread(invoker);
invoker.thread = thread;
synchronized (invoker.lock) {
thread.start();
invoker.lock.wait();
}
}
try {
Object value = invoker.invoke(method, args);
return value;
}
catch (InvocationTargetException ex) {
throw ex.getCause();
}
}
class ThreadInvoker implements Runnable {
final Object lock = new Object();
Method method;
Object[] args;
Object returnValue;
Throwable throwable;
boolean terminateRequest = false;
Thread thread;
public Object invoke (Method method, Object[] args) throws Throwable {
this.method = method;
this.args = args;
this.returnValue = null;
this.throwable = null;
synchronized (lock) {
lock.notify();
lock.wait();
}
if (throwable != null) throw throwable;
return returnValue;
}
public void terminate () {
terminateRequest = true;
started = false;
synchronized (lock) {
lock.notify();
}
try {
thread.join();
} catch (InterruptedException e) {
}
}
@Override
public void run() {
while (started) {
try {
synchronized (lock) {
lock.notify();
lock.wait();
if (terminateRequest) {
return;
}
returnValue = method.invoke(original, args);
}
} catch (Throwable th) {
throwable = th;
}
}
}
}
}