Mercurial > lasercutter
view src/clojure/lang/Agent.java @ 10:ef7dbbd6452c
added clojure source goodness
author | Robert McIntyre <rlm@mit.edu> |
---|---|
date | Sat, 21 Aug 2010 06:25:44 -0400 |
parents | |
children |
line wrap: on
line source
1 /**2 * Copyright (c) Rich Hickey. All rights reserved.3 * The use and distribution terms for this software are covered by the4 * Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)5 * which can be found in the file epl-v10.html at the root of this distribution.6 * By using this software in any fashion, you are agreeing to be bound by7 * the terms of this license.8 * You must not remove this notice, or any other, from this software.9 **/11 /* rich Nov 17, 2007 */13 package clojure.lang;15 import java.util.concurrent.*;16 import java.util.concurrent.atomic.AtomicReference;17 import java.util.Map;19 public class Agent extends ARef {21 static class ActionQueue {22 public final IPersistentStack q;23 public final Throwable error; // non-null indicates fail state24 static final ActionQueue EMPTY = new ActionQueue(PersistentQueue.EMPTY, null);26 public ActionQueue( IPersistentStack q, Throwable error )27 {28 this.q = q;29 this.error = error;30 }31 }33 static final Keyword CONTINUE = Keyword.intern(null, "continue");34 static final Keyword FAIL = Keyword.intern(null, "fail");36 volatile Object state;37 AtomicReference<ActionQueue> aq = new AtomicReference(ActionQueue.EMPTY);39 volatile Keyword errorMode = CONTINUE;40 volatile IFn errorHandler = null;42 final public static ExecutorService pooledExecutor =43 Executors.newFixedThreadPool(2 + Runtime.getRuntime().availableProcessors());45 final public static ExecutorService soloExecutor = Executors.newCachedThreadPool();47 final static ThreadLocal<IPersistentVector> nested = new ThreadLocal<IPersistentVector>();50 public static void shutdown(){51 soloExecutor.shutdown();52 pooledExecutor.shutdown();53 }55 static class Action implements Runnable{56 final Agent agent;57 final IFn fn;58 final ISeq args;59 final boolean solo;62 public Action(Agent agent, IFn fn, ISeq args, boolean solo){63 this.agent = agent;64 this.args = args;65 this.fn = fn;66 this.solo = solo;67 }69 void execute(){70 try71 {72 if(solo)73 soloExecutor.execute(this);74 else75 pooledExecutor.execute(this);76 }77 catch(Throwable error)78 {79 if(agent.errorHandler != null)80 {81 try82 {83 agent.errorHandler.invoke(agent, error);84 }85 catch(Throwable e) {} // ignore errorHandler errors86 }87 }88 }90 static void doRun(Action action){91 try92 {93 Var.pushThreadBindings(RT.map(RT.AGENT, action.agent));94 nested.set(PersistentVector.EMPTY);96 Throwable error = null;97 try98 {99 Object oldval = action.agent.state;100 Object newval = action.fn.applyTo(RT.cons(action.agent.state, action.args));101 action.agent.setState(newval);102 action.agent.notifyWatches(oldval,newval);103 }104 catch(Throwable e)105 {106 error = e;107 }109 if(error == null)110 {111 releasePendingSends();112 }113 else114 {115 nested.set(PersistentVector.EMPTY);116 if(action.agent.errorHandler != null)117 {118 try119 {120 action.agent.errorHandler.invoke(action.agent, error);121 }122 catch(Throwable e) {} // ignore errorHandler errors123 }124 if(action.agent.errorMode == CONTINUE)125 {126 error = null;127 }128 }130 boolean popped = false;131 ActionQueue next = null;132 while(!popped)133 {134 ActionQueue prior = action.agent.aq.get();135 next = new ActionQueue(prior.q.pop(), error);136 popped = action.agent.aq.compareAndSet(prior, next);137 }139 if(error == null && next.q.count() > 0)140 ((Action) next.q.peek()).execute();141 }142 finally143 {144 nested.set(null);145 Var.popThreadBindings();146 }147 }149 public void run(){150 doRun(this);151 }152 }154 public Agent(Object state) throws Exception{155 this(state,null);156 }158 public Agent(Object state, IPersistentMap meta) throws Exception {159 super(meta);160 setState(state);161 }163 boolean setState(Object newState) throws Exception{164 validate(newState);165 boolean ret = state != newState;166 state = newState;167 return ret;168 }170 public Object deref() throws Exception{171 return state;172 }174 public Throwable getError(){175 return aq.get().error;176 }178 public void setErrorMode(Keyword k){179 errorMode = k;180 }182 public Keyword getErrorMode(){183 return errorMode;184 }186 public void setErrorHandler(IFn f){187 errorHandler = f;188 }190 public IFn getErrorHandler(){191 return errorHandler;192 }194 synchronized public Object restart(Object newState, boolean clearActions){195 if(getError() == null)196 {197 throw new RuntimeException("Agent does not need a restart");198 }199 validate(newState);200 state = newState;202 if(clearActions)203 aq.set(ActionQueue.EMPTY);204 else205 {206 boolean restarted = false;207 ActionQueue prior = null;208 while(!restarted)209 {210 prior = aq.get();211 restarted = aq.compareAndSet(prior, new ActionQueue(prior.q, null));212 }214 if(prior.q.count() > 0)215 ((Action) prior.q.peek()).execute();216 }218 return newState;219 }221 public Object dispatch(IFn fn, ISeq args, boolean solo) {222 Throwable error = getError();223 if(error != null)224 {225 throw new RuntimeException("Agent is failed, needs restart", error);226 }227 Action action = new Action(this, fn, args, solo);228 dispatchAction(action);230 return this;231 }233 static void dispatchAction(Action action){234 LockingTransaction trans = LockingTransaction.getRunning();235 if(trans != null)236 trans.enqueue(action);237 else if(nested.get() != null)238 {239 nested.set(nested.get().cons(action));240 }241 else242 action.agent.enqueue(action);243 }245 void enqueue(Action action){246 boolean queued = false;247 ActionQueue prior = null;248 while(!queued)249 {250 prior = aq.get();251 queued = aq.compareAndSet(prior, new ActionQueue((IPersistentStack)prior.q.cons(action), prior.error));252 }254 if(prior.q.count() == 0 && prior.error == null)255 action.execute();256 }258 public int getQueueCount(){259 return aq.get().q.count();260 }262 static public int releasePendingSends(){263 IPersistentVector sends = nested.get();264 if(sends == null)265 return 0;266 for(int i=0;i<sends.count();i++)267 {268 Action a = (Action) sends.valAt(i);269 a.agent.enqueue(a);270 }271 nested.set(PersistentVector.EMPTY);272 return sends.count();273 }274 }