rlm@10
|
1 /**
|
rlm@10
|
2 * Copyright (c) Rich Hickey. All rights reserved.
|
rlm@10
|
3 * The use and distribution terms for this software are covered by the
|
rlm@10
|
4 * Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)
|
rlm@10
|
5 * which can be found in the file epl-v10.html at the root of this distribution.
|
rlm@10
|
6 * By using this software in any fashion, you are agreeing to be bound by
|
rlm@10
|
7 * the terms of this license.
|
rlm@10
|
8 * You must not remove this notice, or any other, from this software.
|
rlm@10
|
9 **/
|
rlm@10
|
10
|
rlm@10
|
11 /* rich Nov 17, 2007 */
|
rlm@10
|
12
|
rlm@10
|
13 package clojure.lang;
|
rlm@10
|
14
|
rlm@10
|
15 import java.util.concurrent.*;
|
rlm@10
|
16 import java.util.concurrent.atomic.AtomicReference;
|
rlm@10
|
17 import java.util.Map;
|
rlm@10
|
18
|
rlm@10
|
19 public class Agent extends ARef {
|
rlm@10
|
20
|
rlm@10
|
21 static class ActionQueue {
|
rlm@10
|
22 public final IPersistentStack q;
|
rlm@10
|
23 public final Throwable error; // non-null indicates fail state
|
rlm@10
|
24 static final ActionQueue EMPTY = new ActionQueue(PersistentQueue.EMPTY, null);
|
rlm@10
|
25
|
rlm@10
|
26 public ActionQueue( IPersistentStack q, Throwable error )
|
rlm@10
|
27 {
|
rlm@10
|
28 this.q = q;
|
rlm@10
|
29 this.error = error;
|
rlm@10
|
30 }
|
rlm@10
|
31 }
|
rlm@10
|
32
|
rlm@10
|
33 static final Keyword CONTINUE = Keyword.intern(null, "continue");
|
rlm@10
|
34 static final Keyword FAIL = Keyword.intern(null, "fail");
|
rlm@10
|
35
|
rlm@10
|
36 volatile Object state;
|
rlm@10
|
37 AtomicReference<ActionQueue> aq = new AtomicReference(ActionQueue.EMPTY);
|
rlm@10
|
38
|
rlm@10
|
39 volatile Keyword errorMode = CONTINUE;
|
rlm@10
|
40 volatile IFn errorHandler = null;
|
rlm@10
|
41
|
rlm@10
|
42 final public static ExecutorService pooledExecutor =
|
rlm@10
|
43 Executors.newFixedThreadPool(2 + Runtime.getRuntime().availableProcessors());
|
rlm@10
|
44
|
rlm@10
|
45 final public static ExecutorService soloExecutor = Executors.newCachedThreadPool();
|
rlm@10
|
46
|
rlm@10
|
47 final static ThreadLocal<IPersistentVector> nested = new ThreadLocal<IPersistentVector>();
|
rlm@10
|
48
|
rlm@10
|
49
|
rlm@10
|
50 public static void shutdown(){
|
rlm@10
|
51 soloExecutor.shutdown();
|
rlm@10
|
52 pooledExecutor.shutdown();
|
rlm@10
|
53 }
|
rlm@10
|
54
|
rlm@10
|
55 static class Action implements Runnable{
|
rlm@10
|
56 final Agent agent;
|
rlm@10
|
57 final IFn fn;
|
rlm@10
|
58 final ISeq args;
|
rlm@10
|
59 final boolean solo;
|
rlm@10
|
60
|
rlm@10
|
61
|
rlm@10
|
62 public Action(Agent agent, IFn fn, ISeq args, boolean solo){
|
rlm@10
|
63 this.agent = agent;
|
rlm@10
|
64 this.args = args;
|
rlm@10
|
65 this.fn = fn;
|
rlm@10
|
66 this.solo = solo;
|
rlm@10
|
67 }
|
rlm@10
|
68
|
rlm@10
|
69 void execute(){
|
rlm@10
|
70 try
|
rlm@10
|
71 {
|
rlm@10
|
72 if(solo)
|
rlm@10
|
73 soloExecutor.execute(this);
|
rlm@10
|
74 else
|
rlm@10
|
75 pooledExecutor.execute(this);
|
rlm@10
|
76 }
|
rlm@10
|
77 catch(Throwable error)
|
rlm@10
|
78 {
|
rlm@10
|
79 if(agent.errorHandler != null)
|
rlm@10
|
80 {
|
rlm@10
|
81 try
|
rlm@10
|
82 {
|
rlm@10
|
83 agent.errorHandler.invoke(agent, error);
|
rlm@10
|
84 }
|
rlm@10
|
85 catch(Throwable e) {} // ignore errorHandler errors
|
rlm@10
|
86 }
|
rlm@10
|
87 }
|
rlm@10
|
88 }
|
rlm@10
|
89
|
rlm@10
|
90 static void doRun(Action action){
|
rlm@10
|
91 try
|
rlm@10
|
92 {
|
rlm@10
|
93 Var.pushThreadBindings(RT.map(RT.AGENT, action.agent));
|
rlm@10
|
94 nested.set(PersistentVector.EMPTY);
|
rlm@10
|
95
|
rlm@10
|
96 Throwable error = null;
|
rlm@10
|
97 try
|
rlm@10
|
98 {
|
rlm@10
|
99 Object oldval = action.agent.state;
|
rlm@10
|
100 Object newval = action.fn.applyTo(RT.cons(action.agent.state, action.args));
|
rlm@10
|
101 action.agent.setState(newval);
|
rlm@10
|
102 action.agent.notifyWatches(oldval,newval);
|
rlm@10
|
103 }
|
rlm@10
|
104 catch(Throwable e)
|
rlm@10
|
105 {
|
rlm@10
|
106 error = e;
|
rlm@10
|
107 }
|
rlm@10
|
108
|
rlm@10
|
109 if(error == null)
|
rlm@10
|
110 {
|
rlm@10
|
111 releasePendingSends();
|
rlm@10
|
112 }
|
rlm@10
|
113 else
|
rlm@10
|
114 {
|
rlm@10
|
115 nested.set(PersistentVector.EMPTY);
|
rlm@10
|
116 if(action.agent.errorHandler != null)
|
rlm@10
|
117 {
|
rlm@10
|
118 try
|
rlm@10
|
119 {
|
rlm@10
|
120 action.agent.errorHandler.invoke(action.agent, error);
|
rlm@10
|
121 }
|
rlm@10
|
122 catch(Throwable e) {} // ignore errorHandler errors
|
rlm@10
|
123 }
|
rlm@10
|
124 if(action.agent.errorMode == CONTINUE)
|
rlm@10
|
125 {
|
rlm@10
|
126 error = null;
|
rlm@10
|
127 }
|
rlm@10
|
128 }
|
rlm@10
|
129
|
rlm@10
|
130 boolean popped = false;
|
rlm@10
|
131 ActionQueue next = null;
|
rlm@10
|
132 while(!popped)
|
rlm@10
|
133 {
|
rlm@10
|
134 ActionQueue prior = action.agent.aq.get();
|
rlm@10
|
135 next = new ActionQueue(prior.q.pop(), error);
|
rlm@10
|
136 popped = action.agent.aq.compareAndSet(prior, next);
|
rlm@10
|
137 }
|
rlm@10
|
138
|
rlm@10
|
139 if(error == null && next.q.count() > 0)
|
rlm@10
|
140 ((Action) next.q.peek()).execute();
|
rlm@10
|
141 }
|
rlm@10
|
142 finally
|
rlm@10
|
143 {
|
rlm@10
|
144 nested.set(null);
|
rlm@10
|
145 Var.popThreadBindings();
|
rlm@10
|
146 }
|
rlm@10
|
147 }
|
rlm@10
|
148
|
rlm@10
|
149 public void run(){
|
rlm@10
|
150 doRun(this);
|
rlm@10
|
151 }
|
rlm@10
|
152 }
|
rlm@10
|
153
|
rlm@10
|
154 public Agent(Object state) throws Exception{
|
rlm@10
|
155 this(state,null);
|
rlm@10
|
156 }
|
rlm@10
|
157
|
rlm@10
|
158 public Agent(Object state, IPersistentMap meta) throws Exception {
|
rlm@10
|
159 super(meta);
|
rlm@10
|
160 setState(state);
|
rlm@10
|
161 }
|
rlm@10
|
162
|
rlm@10
|
163 boolean setState(Object newState) throws Exception{
|
rlm@10
|
164 validate(newState);
|
rlm@10
|
165 boolean ret = state != newState;
|
rlm@10
|
166 state = newState;
|
rlm@10
|
167 return ret;
|
rlm@10
|
168 }
|
rlm@10
|
169
|
rlm@10
|
170 public Object deref() throws Exception{
|
rlm@10
|
171 return state;
|
rlm@10
|
172 }
|
rlm@10
|
173
|
rlm@10
|
174 public Throwable getError(){
|
rlm@10
|
175 return aq.get().error;
|
rlm@10
|
176 }
|
rlm@10
|
177
|
rlm@10
|
178 public void setErrorMode(Keyword k){
|
rlm@10
|
179 errorMode = k;
|
rlm@10
|
180 }
|
rlm@10
|
181
|
rlm@10
|
182 public Keyword getErrorMode(){
|
rlm@10
|
183 return errorMode;
|
rlm@10
|
184 }
|
rlm@10
|
185
|
rlm@10
|
186 public void setErrorHandler(IFn f){
|
rlm@10
|
187 errorHandler = f;
|
rlm@10
|
188 }
|
rlm@10
|
189
|
rlm@10
|
190 public IFn getErrorHandler(){
|
rlm@10
|
191 return errorHandler;
|
rlm@10
|
192 }
|
rlm@10
|
193
|
rlm@10
|
194 synchronized public Object restart(Object newState, boolean clearActions){
|
rlm@10
|
195 if(getError() == null)
|
rlm@10
|
196 {
|
rlm@10
|
197 throw new RuntimeException("Agent does not need a restart");
|
rlm@10
|
198 }
|
rlm@10
|
199 validate(newState);
|
rlm@10
|
200 state = newState;
|
rlm@10
|
201
|
rlm@10
|
202 if(clearActions)
|
rlm@10
|
203 aq.set(ActionQueue.EMPTY);
|
rlm@10
|
204 else
|
rlm@10
|
205 {
|
rlm@10
|
206 boolean restarted = false;
|
rlm@10
|
207 ActionQueue prior = null;
|
rlm@10
|
208 while(!restarted)
|
rlm@10
|
209 {
|
rlm@10
|
210 prior = aq.get();
|
rlm@10
|
211 restarted = aq.compareAndSet(prior, new ActionQueue(prior.q, null));
|
rlm@10
|
212 }
|
rlm@10
|
213
|
rlm@10
|
214 if(prior.q.count() > 0)
|
rlm@10
|
215 ((Action) prior.q.peek()).execute();
|
rlm@10
|
216 }
|
rlm@10
|
217
|
rlm@10
|
218 return newState;
|
rlm@10
|
219 }
|
rlm@10
|
220
|
rlm@10
|
221 public Object dispatch(IFn fn, ISeq args, boolean solo) {
|
rlm@10
|
222 Throwable error = getError();
|
rlm@10
|
223 if(error != null)
|
rlm@10
|
224 {
|
rlm@10
|
225 throw new RuntimeException("Agent is failed, needs restart", error);
|
rlm@10
|
226 }
|
rlm@10
|
227 Action action = new Action(this, fn, args, solo);
|
rlm@10
|
228 dispatchAction(action);
|
rlm@10
|
229
|
rlm@10
|
230 return this;
|
rlm@10
|
231 }
|
rlm@10
|
232
|
rlm@10
|
233 static void dispatchAction(Action action){
|
rlm@10
|
234 LockingTransaction trans = LockingTransaction.getRunning();
|
rlm@10
|
235 if(trans != null)
|
rlm@10
|
236 trans.enqueue(action);
|
rlm@10
|
237 else if(nested.get() != null)
|
rlm@10
|
238 {
|
rlm@10
|
239 nested.set(nested.get().cons(action));
|
rlm@10
|
240 }
|
rlm@10
|
241 else
|
rlm@10
|
242 action.agent.enqueue(action);
|
rlm@10
|
243 }
|
rlm@10
|
244
|
rlm@10
|
245 void enqueue(Action action){
|
rlm@10
|
246 boolean queued = false;
|
rlm@10
|
247 ActionQueue prior = null;
|
rlm@10
|
248 while(!queued)
|
rlm@10
|
249 {
|
rlm@10
|
250 prior = aq.get();
|
rlm@10
|
251 queued = aq.compareAndSet(prior, new ActionQueue((IPersistentStack)prior.q.cons(action), prior.error));
|
rlm@10
|
252 }
|
rlm@10
|
253
|
rlm@10
|
254 if(prior.q.count() == 0 && prior.error == null)
|
rlm@10
|
255 action.execute();
|
rlm@10
|
256 }
|
rlm@10
|
257
|
rlm@10
|
258 public int getQueueCount(){
|
rlm@10
|
259 return aq.get().q.count();
|
rlm@10
|
260 }
|
rlm@10
|
261
|
rlm@10
|
262 static public int releasePendingSends(){
|
rlm@10
|
263 IPersistentVector sends = nested.get();
|
rlm@10
|
264 if(sends == null)
|
rlm@10
|
265 return 0;
|
rlm@10
|
266 for(int i=0;i<sends.count();i++)
|
rlm@10
|
267 {
|
rlm@10
|
268 Action a = (Action) sends.valAt(i);
|
rlm@10
|
269 a.agent.enqueue(a);
|
rlm@10
|
270 }
|
rlm@10
|
271 nested.set(PersistentVector.EMPTY);
|
rlm@10
|
272 return sends.count();
|
rlm@10
|
273 }
|
rlm@10
|
274 }
|