annotate src/clojure/lang/Agent.java @ 10:ef7dbbd6452c

added clojure source goodness
author Robert McIntyre <rlm@mit.edu>
date Sat, 21 Aug 2010 06:25:44 -0400
parents
children
rev   line source
rlm@10 1 /**
rlm@10 2 * Copyright (c) Rich Hickey. All rights reserved.
rlm@10 3 * The use and distribution terms for this software are covered by the
rlm@10 4 * Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)
rlm@10 5 * which can be found in the file epl-v10.html at the root of this distribution.
rlm@10 6 * By using this software in any fashion, you are agreeing to be bound by
rlm@10 7 * the terms of this license.
rlm@10 8 * You must not remove this notice, or any other, from this software.
rlm@10 9 **/
rlm@10 10
rlm@10 11 /* rich Nov 17, 2007 */
rlm@10 12
rlm@10 13 package clojure.lang;
rlm@10 14
rlm@10 15 import java.util.concurrent.*;
rlm@10 16 import java.util.concurrent.atomic.AtomicReference;
rlm@10 17 import java.util.Map;
rlm@10 18
rlm@10 19 public class Agent extends ARef {
rlm@10 20
rlm@10 21 static class ActionQueue {
rlm@10 22 public final IPersistentStack q;
rlm@10 23 public final Throwable error; // non-null indicates fail state
rlm@10 24 static final ActionQueue EMPTY = new ActionQueue(PersistentQueue.EMPTY, null);
rlm@10 25
rlm@10 26 public ActionQueue( IPersistentStack q, Throwable error )
rlm@10 27 {
rlm@10 28 this.q = q;
rlm@10 29 this.error = error;
rlm@10 30 }
rlm@10 31 }
rlm@10 32
rlm@10 33 static final Keyword CONTINUE = Keyword.intern(null, "continue");
rlm@10 34 static final Keyword FAIL = Keyword.intern(null, "fail");
rlm@10 35
rlm@10 36 volatile Object state;
rlm@10 37 AtomicReference<ActionQueue> aq = new AtomicReference(ActionQueue.EMPTY);
rlm@10 38
rlm@10 39 volatile Keyword errorMode = CONTINUE;
rlm@10 40 volatile IFn errorHandler = null;
rlm@10 41
rlm@10 42 final public static ExecutorService pooledExecutor =
rlm@10 43 Executors.newFixedThreadPool(2 + Runtime.getRuntime().availableProcessors());
rlm@10 44
rlm@10 45 final public static ExecutorService soloExecutor = Executors.newCachedThreadPool();
rlm@10 46
rlm@10 47 final static ThreadLocal<IPersistentVector> nested = new ThreadLocal<IPersistentVector>();
rlm@10 48
rlm@10 49
rlm@10 50 public static void shutdown(){
rlm@10 51 soloExecutor.shutdown();
rlm@10 52 pooledExecutor.shutdown();
rlm@10 53 }
rlm@10 54
rlm@10 55 static class Action implements Runnable{
rlm@10 56 final Agent agent;
rlm@10 57 final IFn fn;
rlm@10 58 final ISeq args;
rlm@10 59 final boolean solo;
rlm@10 60
rlm@10 61
rlm@10 62 public Action(Agent agent, IFn fn, ISeq args, boolean solo){
rlm@10 63 this.agent = agent;
rlm@10 64 this.args = args;
rlm@10 65 this.fn = fn;
rlm@10 66 this.solo = solo;
rlm@10 67 }
rlm@10 68
rlm@10 69 void execute(){
rlm@10 70 try
rlm@10 71 {
rlm@10 72 if(solo)
rlm@10 73 soloExecutor.execute(this);
rlm@10 74 else
rlm@10 75 pooledExecutor.execute(this);
rlm@10 76 }
rlm@10 77 catch(Throwable error)
rlm@10 78 {
rlm@10 79 if(agent.errorHandler != null)
rlm@10 80 {
rlm@10 81 try
rlm@10 82 {
rlm@10 83 agent.errorHandler.invoke(agent, error);
rlm@10 84 }
rlm@10 85 catch(Throwable e) {} // ignore errorHandler errors
rlm@10 86 }
rlm@10 87 }
rlm@10 88 }
rlm@10 89
rlm@10 90 static void doRun(Action action){
rlm@10 91 try
rlm@10 92 {
rlm@10 93 Var.pushThreadBindings(RT.map(RT.AGENT, action.agent));
rlm@10 94 nested.set(PersistentVector.EMPTY);
rlm@10 95
rlm@10 96 Throwable error = null;
rlm@10 97 try
rlm@10 98 {
rlm@10 99 Object oldval = action.agent.state;
rlm@10 100 Object newval = action.fn.applyTo(RT.cons(action.agent.state, action.args));
rlm@10 101 action.agent.setState(newval);
rlm@10 102 action.agent.notifyWatches(oldval,newval);
rlm@10 103 }
rlm@10 104 catch(Throwable e)
rlm@10 105 {
rlm@10 106 error = e;
rlm@10 107 }
rlm@10 108
rlm@10 109 if(error == null)
rlm@10 110 {
rlm@10 111 releasePendingSends();
rlm@10 112 }
rlm@10 113 else
rlm@10 114 {
rlm@10 115 nested.set(PersistentVector.EMPTY);
rlm@10 116 if(action.agent.errorHandler != null)
rlm@10 117 {
rlm@10 118 try
rlm@10 119 {
rlm@10 120 action.agent.errorHandler.invoke(action.agent, error);
rlm@10 121 }
rlm@10 122 catch(Throwable e) {} // ignore errorHandler errors
rlm@10 123 }
rlm@10 124 if(action.agent.errorMode == CONTINUE)
rlm@10 125 {
rlm@10 126 error = null;
rlm@10 127 }
rlm@10 128 }
rlm@10 129
rlm@10 130 boolean popped = false;
rlm@10 131 ActionQueue next = null;
rlm@10 132 while(!popped)
rlm@10 133 {
rlm@10 134 ActionQueue prior = action.agent.aq.get();
rlm@10 135 next = new ActionQueue(prior.q.pop(), error);
rlm@10 136 popped = action.agent.aq.compareAndSet(prior, next);
rlm@10 137 }
rlm@10 138
rlm@10 139 if(error == null && next.q.count() > 0)
rlm@10 140 ((Action) next.q.peek()).execute();
rlm@10 141 }
rlm@10 142 finally
rlm@10 143 {
rlm@10 144 nested.set(null);
rlm@10 145 Var.popThreadBindings();
rlm@10 146 }
rlm@10 147 }
rlm@10 148
rlm@10 149 public void run(){
rlm@10 150 doRun(this);
rlm@10 151 }
rlm@10 152 }
rlm@10 153
rlm@10 154 public Agent(Object state) throws Exception{
rlm@10 155 this(state,null);
rlm@10 156 }
rlm@10 157
rlm@10 158 public Agent(Object state, IPersistentMap meta) throws Exception {
rlm@10 159 super(meta);
rlm@10 160 setState(state);
rlm@10 161 }
rlm@10 162
rlm@10 163 boolean setState(Object newState) throws Exception{
rlm@10 164 validate(newState);
rlm@10 165 boolean ret = state != newState;
rlm@10 166 state = newState;
rlm@10 167 return ret;
rlm@10 168 }
rlm@10 169
rlm@10 170 public Object deref() throws Exception{
rlm@10 171 return state;
rlm@10 172 }
rlm@10 173
rlm@10 174 public Throwable getError(){
rlm@10 175 return aq.get().error;
rlm@10 176 }
rlm@10 177
rlm@10 178 public void setErrorMode(Keyword k){
rlm@10 179 errorMode = k;
rlm@10 180 }
rlm@10 181
rlm@10 182 public Keyword getErrorMode(){
rlm@10 183 return errorMode;
rlm@10 184 }
rlm@10 185
rlm@10 186 public void setErrorHandler(IFn f){
rlm@10 187 errorHandler = f;
rlm@10 188 }
rlm@10 189
rlm@10 190 public IFn getErrorHandler(){
rlm@10 191 return errorHandler;
rlm@10 192 }
rlm@10 193
rlm@10 194 synchronized public Object restart(Object newState, boolean clearActions){
rlm@10 195 if(getError() == null)
rlm@10 196 {
rlm@10 197 throw new RuntimeException("Agent does not need a restart");
rlm@10 198 }
rlm@10 199 validate(newState);
rlm@10 200 state = newState;
rlm@10 201
rlm@10 202 if(clearActions)
rlm@10 203 aq.set(ActionQueue.EMPTY);
rlm@10 204 else
rlm@10 205 {
rlm@10 206 boolean restarted = false;
rlm@10 207 ActionQueue prior = null;
rlm@10 208 while(!restarted)
rlm@10 209 {
rlm@10 210 prior = aq.get();
rlm@10 211 restarted = aq.compareAndSet(prior, new ActionQueue(prior.q, null));
rlm@10 212 }
rlm@10 213
rlm@10 214 if(prior.q.count() > 0)
rlm@10 215 ((Action) prior.q.peek()).execute();
rlm@10 216 }
rlm@10 217
rlm@10 218 return newState;
rlm@10 219 }
rlm@10 220
rlm@10 221 public Object dispatch(IFn fn, ISeq args, boolean solo) {
rlm@10 222 Throwable error = getError();
rlm@10 223 if(error != null)
rlm@10 224 {
rlm@10 225 throw new RuntimeException("Agent is failed, needs restart", error);
rlm@10 226 }
rlm@10 227 Action action = new Action(this, fn, args, solo);
rlm@10 228 dispatchAction(action);
rlm@10 229
rlm@10 230 return this;
rlm@10 231 }
rlm@10 232
rlm@10 233 static void dispatchAction(Action action){
rlm@10 234 LockingTransaction trans = LockingTransaction.getRunning();
rlm@10 235 if(trans != null)
rlm@10 236 trans.enqueue(action);
rlm@10 237 else if(nested.get() != null)
rlm@10 238 {
rlm@10 239 nested.set(nested.get().cons(action));
rlm@10 240 }
rlm@10 241 else
rlm@10 242 action.agent.enqueue(action);
rlm@10 243 }
rlm@10 244
rlm@10 245 void enqueue(Action action){
rlm@10 246 boolean queued = false;
rlm@10 247 ActionQueue prior = null;
rlm@10 248 while(!queued)
rlm@10 249 {
rlm@10 250 prior = aq.get();
rlm@10 251 queued = aq.compareAndSet(prior, new ActionQueue((IPersistentStack)prior.q.cons(action), prior.error));
rlm@10 252 }
rlm@10 253
rlm@10 254 if(prior.q.count() == 0 && prior.error == null)
rlm@10 255 action.execute();
rlm@10 256 }
rlm@10 257
rlm@10 258 public int getQueueCount(){
rlm@10 259 return aq.get().q.count();
rlm@10 260 }
rlm@10 261
rlm@10 262 static public int releasePendingSends(){
rlm@10 263 IPersistentVector sends = nested.get();
rlm@10 264 if(sends == null)
rlm@10 265 return 0;
rlm@10 266 for(int i=0;i<sends.count();i++)
rlm@10 267 {
rlm@10 268 Action a = (Action) sends.valAt(i);
rlm@10 269 a.agent.enqueue(a);
rlm@10 270 }
rlm@10 271 nested.set(PersistentVector.EMPTY);
rlm@10 272 return sends.count();
rlm@10 273 }
rlm@10 274 }