Mercurial > lasercutter
comparison src/clojure/lang/Agent.java @ 10:ef7dbbd6452c
added clojure source goodness
author | Robert McIntyre <rlm@mit.edu> |
---|---|
date | Sat, 21 Aug 2010 06:25:44 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
9:35cf337adfcf | 10:ef7dbbd6452c |
---|---|
1 /** | |
2 * Copyright (c) Rich Hickey. All rights reserved. | |
3 * The use and distribution terms for this software are covered by the | |
4 * Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php) | |
5 * which can be found in the file epl-v10.html at the root of this distribution. | |
6 * By using this software in any fashion, you are agreeing to be bound by | |
7 * the terms of this license. | |
8 * You must not remove this notice, or any other, from this software. | |
9 **/ | |
10 | |
11 /* rich Nov 17, 2007 */ | |
12 | |
13 package clojure.lang; | |
14 | |
15 import java.util.concurrent.*; | |
16 import java.util.concurrent.atomic.AtomicReference; | |
17 import java.util.Map; | |
18 | |
19 public class Agent extends ARef { | |
20 | |
21 static class ActionQueue { | |
22 public final IPersistentStack q; | |
23 public final Throwable error; // non-null indicates fail state | |
24 static final ActionQueue EMPTY = new ActionQueue(PersistentQueue.EMPTY, null); | |
25 | |
26 public ActionQueue( IPersistentStack q, Throwable error ) | |
27 { | |
28 this.q = q; | |
29 this.error = error; | |
30 } | |
31 } | |
32 | |
33 static final Keyword CONTINUE = Keyword.intern(null, "continue"); | |
34 static final Keyword FAIL = Keyword.intern(null, "fail"); | |
35 | |
36 volatile Object state; | |
37 AtomicReference<ActionQueue> aq = new AtomicReference(ActionQueue.EMPTY); | |
38 | |
39 volatile Keyword errorMode = CONTINUE; | |
40 volatile IFn errorHandler = null; | |
41 | |
42 final public static ExecutorService pooledExecutor = | |
43 Executors.newFixedThreadPool(2 + Runtime.getRuntime().availableProcessors()); | |
44 | |
45 final public static ExecutorService soloExecutor = Executors.newCachedThreadPool(); | |
46 | |
47 final static ThreadLocal<IPersistentVector> nested = new ThreadLocal<IPersistentVector>(); | |
48 | |
49 | |
50 public static void shutdown(){ | |
51 soloExecutor.shutdown(); | |
52 pooledExecutor.shutdown(); | |
53 } | |
54 | |
55 static class Action implements Runnable{ | |
56 final Agent agent; | |
57 final IFn fn; | |
58 final ISeq args; | |
59 final boolean solo; | |
60 | |
61 | |
62 public Action(Agent agent, IFn fn, ISeq args, boolean solo){ | |
63 this.agent = agent; | |
64 this.args = args; | |
65 this.fn = fn; | |
66 this.solo = solo; | |
67 } | |
68 | |
69 void execute(){ | |
70 try | |
71 { | |
72 if(solo) | |
73 soloExecutor.execute(this); | |
74 else | |
75 pooledExecutor.execute(this); | |
76 } | |
77 catch(Throwable error) | |
78 { | |
79 if(agent.errorHandler != null) | |
80 { | |
81 try | |
82 { | |
83 agent.errorHandler.invoke(agent, error); | |
84 } | |
85 catch(Throwable e) {} // ignore errorHandler errors | |
86 } | |
87 } | |
88 } | |
89 | |
90 static void doRun(Action action){ | |
91 try | |
92 { | |
93 Var.pushThreadBindings(RT.map(RT.AGENT, action.agent)); | |
94 nested.set(PersistentVector.EMPTY); | |
95 | |
96 Throwable error = null; | |
97 try | |
98 { | |
99 Object oldval = action.agent.state; | |
100 Object newval = action.fn.applyTo(RT.cons(action.agent.state, action.args)); | |
101 action.agent.setState(newval); | |
102 action.agent.notifyWatches(oldval,newval); | |
103 } | |
104 catch(Throwable e) | |
105 { | |
106 error = e; | |
107 } | |
108 | |
109 if(error == null) | |
110 { | |
111 releasePendingSends(); | |
112 } | |
113 else | |
114 { | |
115 nested.set(PersistentVector.EMPTY); | |
116 if(action.agent.errorHandler != null) | |
117 { | |
118 try | |
119 { | |
120 action.agent.errorHandler.invoke(action.agent, error); | |
121 } | |
122 catch(Throwable e) {} // ignore errorHandler errors | |
123 } | |
124 if(action.agent.errorMode == CONTINUE) | |
125 { | |
126 error = null; | |
127 } | |
128 } | |
129 | |
130 boolean popped = false; | |
131 ActionQueue next = null; | |
132 while(!popped) | |
133 { | |
134 ActionQueue prior = action.agent.aq.get(); | |
135 next = new ActionQueue(prior.q.pop(), error); | |
136 popped = action.agent.aq.compareAndSet(prior, next); | |
137 } | |
138 | |
139 if(error == null && next.q.count() > 0) | |
140 ((Action) next.q.peek()).execute(); | |
141 } | |
142 finally | |
143 { | |
144 nested.set(null); | |
145 Var.popThreadBindings(); | |
146 } | |
147 } | |
148 | |
149 public void run(){ | |
150 doRun(this); | |
151 } | |
152 } | |
153 | |
154 public Agent(Object state) throws Exception{ | |
155 this(state,null); | |
156 } | |
157 | |
158 public Agent(Object state, IPersistentMap meta) throws Exception { | |
159 super(meta); | |
160 setState(state); | |
161 } | |
162 | |
163 boolean setState(Object newState) throws Exception{ | |
164 validate(newState); | |
165 boolean ret = state != newState; | |
166 state = newState; | |
167 return ret; | |
168 } | |
169 | |
170 public Object deref() throws Exception{ | |
171 return state; | |
172 } | |
173 | |
174 public Throwable getError(){ | |
175 return aq.get().error; | |
176 } | |
177 | |
178 public void setErrorMode(Keyword k){ | |
179 errorMode = k; | |
180 } | |
181 | |
182 public Keyword getErrorMode(){ | |
183 return errorMode; | |
184 } | |
185 | |
186 public void setErrorHandler(IFn f){ | |
187 errorHandler = f; | |
188 } | |
189 | |
190 public IFn getErrorHandler(){ | |
191 return errorHandler; | |
192 } | |
193 | |
194 synchronized public Object restart(Object newState, boolean clearActions){ | |
195 if(getError() == null) | |
196 { | |
197 throw new RuntimeException("Agent does not need a restart"); | |
198 } | |
199 validate(newState); | |
200 state = newState; | |
201 | |
202 if(clearActions) | |
203 aq.set(ActionQueue.EMPTY); | |
204 else | |
205 { | |
206 boolean restarted = false; | |
207 ActionQueue prior = null; | |
208 while(!restarted) | |
209 { | |
210 prior = aq.get(); | |
211 restarted = aq.compareAndSet(prior, new ActionQueue(prior.q, null)); | |
212 } | |
213 | |
214 if(prior.q.count() > 0) | |
215 ((Action) prior.q.peek()).execute(); | |
216 } | |
217 | |
218 return newState; | |
219 } | |
220 | |
221 public Object dispatch(IFn fn, ISeq args, boolean solo) { | |
222 Throwable error = getError(); | |
223 if(error != null) | |
224 { | |
225 throw new RuntimeException("Agent is failed, needs restart", error); | |
226 } | |
227 Action action = new Action(this, fn, args, solo); | |
228 dispatchAction(action); | |
229 | |
230 return this; | |
231 } | |
232 | |
233 static void dispatchAction(Action action){ | |
234 LockingTransaction trans = LockingTransaction.getRunning(); | |
235 if(trans != null) | |
236 trans.enqueue(action); | |
237 else if(nested.get() != null) | |
238 { | |
239 nested.set(nested.get().cons(action)); | |
240 } | |
241 else | |
242 action.agent.enqueue(action); | |
243 } | |
244 | |
245 void enqueue(Action action){ | |
246 boolean queued = false; | |
247 ActionQueue prior = null; | |
248 while(!queued) | |
249 { | |
250 prior = aq.get(); | |
251 queued = aq.compareAndSet(prior, new ActionQueue((IPersistentStack)prior.q.cons(action), prior.error)); | |
252 } | |
253 | |
254 if(prior.q.count() == 0 && prior.error == null) | |
255 action.execute(); | |
256 } | |
257 | |
258 public int getQueueCount(){ | |
259 return aq.get().q.count(); | |
260 } | |
261 | |
262 static public int releasePendingSends(){ | |
263 IPersistentVector sends = nested.get(); | |
264 if(sends == null) | |
265 return 0; | |
266 for(int i=0;i<sends.count();i++) | |
267 { | |
268 Action a = (Action) sends.valAt(i); | |
269 a.agent.enqueue(a); | |
270 } | |
271 nested.set(PersistentVector.EMPTY); | |
272 return sends.count(); | |
273 } | |
274 } |