view src/clojure/lang/Agent.java @ 10:ef7dbbd6452c

added clojure source goodness
author Robert McIntyre <rlm@mit.edu>
date Sat, 21 Aug 2010 06:25:44 -0400
parents
children
line wrap: on
line source
1 /**
2 * Copyright (c) Rich Hickey. All rights reserved.
3 * The use and distribution terms for this software are covered by the
4 * Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)
5 * which can be found in the file epl-v10.html at the root of this distribution.
6 * By using this software in any fashion, you are agreeing to be bound by
7 * the terms of this license.
8 * You must not remove this notice, or any other, from this software.
9 **/
11 /* rich Nov 17, 2007 */
13 package clojure.lang;
15 import java.util.concurrent.*;
16 import java.util.concurrent.atomic.AtomicReference;
17 import java.util.Map;
19 public class Agent extends ARef {
21 static class ActionQueue {
22 public final IPersistentStack q;
23 public final Throwable error; // non-null indicates fail state
24 static final ActionQueue EMPTY = new ActionQueue(PersistentQueue.EMPTY, null);
26 public ActionQueue( IPersistentStack q, Throwable error )
27 {
28 this.q = q;
29 this.error = error;
30 }
31 }
33 static final Keyword CONTINUE = Keyword.intern(null, "continue");
34 static final Keyword FAIL = Keyword.intern(null, "fail");
36 volatile Object state;
37 AtomicReference<ActionQueue> aq = new AtomicReference(ActionQueue.EMPTY);
39 volatile Keyword errorMode = CONTINUE;
40 volatile IFn errorHandler = null;
42 final public static ExecutorService pooledExecutor =
43 Executors.newFixedThreadPool(2 + Runtime.getRuntime().availableProcessors());
45 final public static ExecutorService soloExecutor = Executors.newCachedThreadPool();
47 final static ThreadLocal<IPersistentVector> nested = new ThreadLocal<IPersistentVector>();
50 public static void shutdown(){
51 soloExecutor.shutdown();
52 pooledExecutor.shutdown();
53 }
55 static class Action implements Runnable{
56 final Agent agent;
57 final IFn fn;
58 final ISeq args;
59 final boolean solo;
62 public Action(Agent agent, IFn fn, ISeq args, boolean solo){
63 this.agent = agent;
64 this.args = args;
65 this.fn = fn;
66 this.solo = solo;
67 }
69 void execute(){
70 try
71 {
72 if(solo)
73 soloExecutor.execute(this);
74 else
75 pooledExecutor.execute(this);
76 }
77 catch(Throwable error)
78 {
79 if(agent.errorHandler != null)
80 {
81 try
82 {
83 agent.errorHandler.invoke(agent, error);
84 }
85 catch(Throwable e) {} // ignore errorHandler errors
86 }
87 }
88 }
90 static void doRun(Action action){
91 try
92 {
93 Var.pushThreadBindings(RT.map(RT.AGENT, action.agent));
94 nested.set(PersistentVector.EMPTY);
96 Throwable error = null;
97 try
98 {
99 Object oldval = action.agent.state;
100 Object newval = action.fn.applyTo(RT.cons(action.agent.state, action.args));
101 action.agent.setState(newval);
102 action.agent.notifyWatches(oldval,newval);
103 }
104 catch(Throwable e)
105 {
106 error = e;
107 }
109 if(error == null)
110 {
111 releasePendingSends();
112 }
113 else
114 {
115 nested.set(PersistentVector.EMPTY);
116 if(action.agent.errorHandler != null)
117 {
118 try
119 {
120 action.agent.errorHandler.invoke(action.agent, error);
121 }
122 catch(Throwable e) {} // ignore errorHandler errors
123 }
124 if(action.agent.errorMode == CONTINUE)
125 {
126 error = null;
127 }
128 }
130 boolean popped = false;
131 ActionQueue next = null;
132 while(!popped)
133 {
134 ActionQueue prior = action.agent.aq.get();
135 next = new ActionQueue(prior.q.pop(), error);
136 popped = action.agent.aq.compareAndSet(prior, next);
137 }
139 if(error == null && next.q.count() > 0)
140 ((Action) next.q.peek()).execute();
141 }
142 finally
143 {
144 nested.set(null);
145 Var.popThreadBindings();
146 }
147 }
149 public void run(){
150 doRun(this);
151 }
152 }
154 public Agent(Object state) throws Exception{
155 this(state,null);
156 }
158 public Agent(Object state, IPersistentMap meta) throws Exception {
159 super(meta);
160 setState(state);
161 }
163 boolean setState(Object newState) throws Exception{
164 validate(newState);
165 boolean ret = state != newState;
166 state = newState;
167 return ret;
168 }
170 public Object deref() throws Exception{
171 return state;
172 }
174 public Throwable getError(){
175 return aq.get().error;
176 }
178 public void setErrorMode(Keyword k){
179 errorMode = k;
180 }
182 public Keyword getErrorMode(){
183 return errorMode;
184 }
186 public void setErrorHandler(IFn f){
187 errorHandler = f;
188 }
190 public IFn getErrorHandler(){
191 return errorHandler;
192 }
194 synchronized public Object restart(Object newState, boolean clearActions){
195 if(getError() == null)
196 {
197 throw new RuntimeException("Agent does not need a restart");
198 }
199 validate(newState);
200 state = newState;
202 if(clearActions)
203 aq.set(ActionQueue.EMPTY);
204 else
205 {
206 boolean restarted = false;
207 ActionQueue prior = null;
208 while(!restarted)
209 {
210 prior = aq.get();
211 restarted = aq.compareAndSet(prior, new ActionQueue(prior.q, null));
212 }
214 if(prior.q.count() > 0)
215 ((Action) prior.q.peek()).execute();
216 }
218 return newState;
219 }
221 public Object dispatch(IFn fn, ISeq args, boolean solo) {
222 Throwable error = getError();
223 if(error != null)
224 {
225 throw new RuntimeException("Agent is failed, needs restart", error);
226 }
227 Action action = new Action(this, fn, args, solo);
228 dispatchAction(action);
230 return this;
231 }
233 static void dispatchAction(Action action){
234 LockingTransaction trans = LockingTransaction.getRunning();
235 if(trans != null)
236 trans.enqueue(action);
237 else if(nested.get() != null)
238 {
239 nested.set(nested.get().cons(action));
240 }
241 else
242 action.agent.enqueue(action);
243 }
245 void enqueue(Action action){
246 boolean queued = false;
247 ActionQueue prior = null;
248 while(!queued)
249 {
250 prior = aq.get();
251 queued = aq.compareAndSet(prior, new ActionQueue((IPersistentStack)prior.q.cons(action), prior.error));
252 }
254 if(prior.q.count() == 0 && prior.error == null)
255 action.execute();
256 }
258 public int getQueueCount(){
259 return aq.get().q.count();
260 }
262 static public int releasePendingSends(){
263 IPersistentVector sends = nested.get();
264 if(sends == null)
265 return 0;
266 for(int i=0;i<sends.count();i++)
267 {
268 Action a = (Action) sends.valAt(i);
269 a.agent.enqueue(a);
270 }
271 nested.set(PersistentVector.EMPTY);
272 return sends.count();
273 }
274 }