view src/clojure/lang/LockingTransaction.java @ 10:ef7dbbd6452c

added clojure source goodness
author Robert McIntyre <rlm@mit.edu>
date Sat, 21 Aug 2010 06:25:44 -0400
parents
children
line wrap: on
line source
1 /**
2 * Copyright (c) Rich Hickey. All rights reserved.
3 * The use and distribution terms for this software are covered by the
4 * Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)
5 * which can be found in the file epl-v10.html at the root of this distribution.
6 * By using this software in any fashion, you are agreeing to be bound by
7 * the terms of this license.
8 * You must not remove this notice, or any other, from this software.
9 **/
11 /* rich Jul 26, 2007 */
13 package clojure.lang;
15 import java.util.*;
16 import java.util.concurrent.atomic.AtomicInteger;
17 import java.util.concurrent.atomic.AtomicLong;
18 import java.util.concurrent.Callable;
19 import java.util.concurrent.TimeUnit;
20 import java.util.concurrent.CountDownLatch;
22 @SuppressWarnings({"SynchronizeOnNonFinalField"})
23 public class LockingTransaction{
25 public static final int RETRY_LIMIT = 10000;
26 public static final int LOCK_WAIT_MSECS = 100;
27 public static final long BARGE_WAIT_NANOS = 10 * 1000000;
28 //public static int COMMUTE_RETRY_LIMIT = 10;
30 static final int RUNNING = 0;
31 static final int COMMITTING = 1;
32 static final int RETRY = 2;
33 static final int KILLED = 3;
34 static final int COMMITTED = 4;
36 final static ThreadLocal<LockingTransaction> transaction = new ThreadLocal<LockingTransaction>();
39 static class RetryEx extends Error{
40 }
42 static class AbortException extends Exception{
43 }
45 public static class Info{
46 final AtomicInteger status;
47 final long startPoint;
48 final CountDownLatch latch;
51 public Info(int status, long startPoint){
52 this.status = new AtomicInteger(status);
53 this.startPoint = startPoint;
54 this.latch = new CountDownLatch(1);
55 }
57 public boolean running(){
58 int s = status.get();
59 return s == RUNNING || s == COMMITTING;
60 }
61 }
63 static class CFn{
64 final IFn fn;
65 final ISeq args;
67 public CFn(IFn fn, ISeq args){
68 this.fn = fn;
69 this.args = args;
70 }
71 }
72 //total order on transactions
73 //transactions will consume a point for init, for each retry, and on commit if writing
74 final private static AtomicLong lastPoint = new AtomicLong();
76 void getReadPoint(){
77 readPoint = lastPoint.incrementAndGet();
78 }
80 long getCommitPoint(){
81 return lastPoint.incrementAndGet();
82 }
84 void stop(int status){
85 if(info != null)
86 {
87 synchronized(info)
88 {
89 info.status.set(status);
90 info.latch.countDown();
91 }
92 info = null;
93 vals.clear();
94 sets.clear();
95 commutes.clear();
96 //actions.clear();
97 }
98 }
101 Info info;
102 long readPoint;
103 long startPoint;
104 long startTime;
105 final RetryEx retryex = new RetryEx();
106 final ArrayList<Agent.Action> actions = new ArrayList<Agent.Action>();
107 final HashMap<Ref, Object> vals = new HashMap<Ref, Object>();
108 final HashSet<Ref> sets = new HashSet<Ref>();
109 final TreeMap<Ref, ArrayList<CFn>> commutes = new TreeMap<Ref, ArrayList<CFn>>();
111 final HashSet<Ref> ensures = new HashSet<Ref>(); //all hold readLock
114 void tryWriteLock(Ref ref){
115 try
116 {
117 if(!ref.lock.writeLock().tryLock(LOCK_WAIT_MSECS, TimeUnit.MILLISECONDS))
118 throw retryex;
119 }
120 catch(InterruptedException e)
121 {
122 throw retryex;
123 }
124 }
126 //returns the most recent val
127 Object lock(Ref ref){
128 //can't upgrade readLock, so release it
129 releaseIfEnsured(ref);
131 boolean unlocked = true;
132 try
133 {
134 tryWriteLock(ref);
135 unlocked = false;
137 if(ref.tvals != null && ref.tvals.point > readPoint)
138 throw retryex;
139 Info refinfo = ref.tinfo;
141 //write lock conflict
142 if(refinfo != null && refinfo != info && refinfo.running())
143 {
144 if(!barge(refinfo))
145 {
146 ref.lock.writeLock().unlock();
147 unlocked = true;
148 return blockAndBail(refinfo);
149 }
150 }
151 ref.tinfo = info;
152 return ref.tvals == null ? null : ref.tvals.val;
153 }
154 finally
155 {
156 if(!unlocked)
157 ref.lock.writeLock().unlock();
158 }
159 }
161 private Object blockAndBail(Info refinfo){
162 //stop prior to blocking
163 stop(RETRY);
164 try
165 {
166 refinfo.latch.await(LOCK_WAIT_MSECS, TimeUnit.MILLISECONDS);
167 }
168 catch(InterruptedException e)
169 {
170 //ignore
171 }
172 throw retryex;
173 }
175 private void releaseIfEnsured(Ref ref){
176 if(ensures.contains(ref))
177 {
178 ensures.remove(ref);
179 ref.lock.readLock().unlock();
180 }
181 }
183 void abort() throws AbortException{
184 stop(KILLED);
185 throw new AbortException();
186 }
188 private boolean bargeTimeElapsed(){
189 return System.nanoTime() - startTime > BARGE_WAIT_NANOS;
190 }
192 private boolean barge(Info refinfo){
193 boolean barged = false;
194 //if this transaction is older
195 // try to abort the other
196 if(bargeTimeElapsed() && startPoint < refinfo.startPoint)
197 {
198 barged = refinfo.status.compareAndSet(RUNNING, KILLED);
199 if(barged)
200 refinfo.latch.countDown();
201 }
202 return barged;
203 }
205 static LockingTransaction getEx(){
206 LockingTransaction t = transaction.get();
207 if(t == null || t.info == null)
208 throw new IllegalStateException("No transaction running");
209 return t;
210 }
212 static public boolean isRunning(){
213 return getRunning() != null;
214 }
216 static LockingTransaction getRunning(){
217 LockingTransaction t = transaction.get();
218 if(t == null || t.info == null)
219 return null;
220 return t;
221 }
223 static public Object runInTransaction(Callable fn) throws Exception{
224 LockingTransaction t = transaction.get();
225 if(t == null)
226 transaction.set(t = new LockingTransaction());
228 if(t.info != null)
229 return fn.call();
231 return t.run(fn);
232 }
234 static class Notify{
235 final public Ref ref;
236 final public Object oldval;
237 final public Object newval;
239 Notify(Ref ref, Object oldval, Object newval){
240 this.ref = ref;
241 this.oldval = oldval;
242 this.newval = newval;
243 }
244 }
246 Object run(Callable fn) throws Exception{
247 boolean done = false;
248 Object ret = null;
249 ArrayList<Ref> locked = new ArrayList<Ref>();
250 ArrayList<Notify> notify = new ArrayList<Notify>();
252 for(int i = 0; !done && i < RETRY_LIMIT; i++)
253 {
254 try
255 {
256 getReadPoint();
257 if(i == 0)
258 {
259 startPoint = readPoint;
260 startTime = System.nanoTime();
261 }
262 info = new Info(RUNNING, startPoint);
263 ret = fn.call();
264 //make sure no one has killed us before this point, and can't from now on
265 if(info.status.compareAndSet(RUNNING, COMMITTING))
266 {
267 for(Map.Entry<Ref, ArrayList<CFn>> e : commutes.entrySet())
268 {
269 Ref ref = e.getKey();
270 if(sets.contains(ref)) continue;
272 boolean wasEnsured = ensures.contains(ref);
273 //can't upgrade readLock, so release it
274 releaseIfEnsured(ref);
275 tryWriteLock(ref);
276 locked.add(ref);
277 if(wasEnsured && ref.tvals != null && ref.tvals.point > readPoint)
278 throw retryex;
280 Info refinfo = ref.tinfo;
281 if(refinfo != null && refinfo != info && refinfo.running())
282 {
283 if(!barge(refinfo))
284 throw retryex;
285 }
286 Object val = ref.tvals == null ? null : ref.tvals.val;
287 vals.put(ref, val);
288 for(CFn f : e.getValue())
289 {
290 vals.put(ref, f.fn.applyTo(RT.cons(vals.get(ref), f.args)));
291 }
292 }
293 for(Ref ref : sets)
294 {
295 tryWriteLock(ref);
296 locked.add(ref);
297 }
299 //validate and enqueue notifications
300 for(Map.Entry<Ref, Object> e : vals.entrySet())
301 {
302 Ref ref = e.getKey();
303 ref.validate(ref.getValidator(), e.getValue());
304 }
306 //at this point, all values calced, all refs to be written locked
307 //no more client code to be called
308 long msecs = System.currentTimeMillis();
309 long commitPoint = getCommitPoint();
310 for(Map.Entry<Ref, Object> e : vals.entrySet())
311 {
312 Ref ref = e.getKey();
313 Object oldval = ref.tvals == null ? null : ref.tvals.val;
314 Object newval = e.getValue();
315 int hcount = ref.histCount();
317 if(ref.tvals == null)
318 {
319 ref.tvals = new Ref.TVal(newval, commitPoint, msecs);
320 }
321 else if((ref.faults.get() > 0 && hcount < ref.maxHistory)
322 || hcount < ref.minHistory)
323 {
324 ref.tvals = new Ref.TVal(newval, commitPoint, msecs, ref.tvals);
325 ref.faults.set(0);
326 }
327 else
328 {
329 ref.tvals = ref.tvals.next;
330 ref.tvals.val = newval;
331 ref.tvals.point = commitPoint;
332 ref.tvals.msecs = msecs;
333 }
334 if(ref.getWatches().count() > 0)
335 notify.add(new Notify(ref, oldval, newval));
336 }
338 done = true;
339 info.status.set(COMMITTED);
340 }
341 }
342 catch(RetryEx retry)
343 {
344 //eat this so we retry rather than fall out
345 }
346 finally
347 {
348 for(int k = locked.size() - 1; k >= 0; --k)
349 {
350 locked.get(k).lock.writeLock().unlock();
351 }
352 locked.clear();
353 for(Ref r : ensures)
354 {
355 r.lock.readLock().unlock();
356 }
357 ensures.clear();
358 stop(done ? COMMITTED : RETRY);
359 try
360 {
361 if(done) //re-dispatch out of transaction
362 {
363 for(Notify n : notify)
364 {
365 n.ref.notifyWatches(n.oldval, n.newval);
366 }
367 for(Agent.Action action : actions)
368 {
369 Agent.dispatchAction(action);
370 }
371 }
372 }
373 finally
374 {
375 notify.clear();
376 actions.clear();
377 }
378 }
379 }
380 if(!done)
381 throw new Exception("Transaction failed after reaching retry limit");
382 return ret;
383 }
385 public void enqueue(Agent.Action action){
386 actions.add(action);
387 }
389 Object doGet(Ref ref){
390 if(!info.running())
391 throw retryex;
392 if(vals.containsKey(ref))
393 return vals.get(ref);
394 try
395 {
396 ref.lock.readLock().lock();
397 if(ref.tvals == null)
398 throw new IllegalStateException(ref.toString() + " is unbound.");
399 Ref.TVal ver = ref.tvals;
400 do
401 {
402 if(ver.point <= readPoint)
403 return ver.val;
404 } while((ver = ver.prior) != ref.tvals);
405 }
406 finally
407 {
408 ref.lock.readLock().unlock();
409 }
410 //no version of val precedes the read point
411 ref.faults.incrementAndGet();
412 throw retryex;
414 }
416 Object doSet(Ref ref, Object val){
417 if(!info.running())
418 throw retryex;
419 if(commutes.containsKey(ref))
420 throw new IllegalStateException("Can't set after commute");
421 if(!sets.contains(ref))
422 {
423 sets.add(ref);
424 lock(ref);
425 }
426 vals.put(ref, val);
427 return val;
428 }
430 void doEnsure(Ref ref){
431 if(!info.running())
432 throw retryex;
433 if(ensures.contains(ref))
434 return;
435 ref.lock.readLock().lock();
437 //someone completed a write after our snapshot
438 if(ref.tvals != null && ref.tvals.point > readPoint) {
439 ref.lock.readLock().unlock();
440 throw retryex;
441 }
443 Info refinfo = ref.tinfo;
445 //writer exists
446 if(refinfo != null && refinfo.running())
447 {
448 ref.lock.readLock().unlock();
450 if(refinfo != info) //not us, ensure is doomed
451 {
452 blockAndBail(refinfo);
453 }
454 }
455 else
456 ensures.add(ref);
457 }
459 Object doCommute(Ref ref, IFn fn, ISeq args) throws Exception{
460 if(!info.running())
461 throw retryex;
462 if(!vals.containsKey(ref))
463 {
464 Object val = null;
465 try
466 {
467 ref.lock.readLock().lock();
468 val = ref.tvals == null ? null : ref.tvals.val;
469 }
470 finally
471 {
472 ref.lock.readLock().unlock();
473 }
474 vals.put(ref, val);
475 }
476 ArrayList<CFn> fns = commutes.get(ref);
477 if(fns == null)
478 commutes.put(ref, fns = new ArrayList<CFn>());
479 fns.add(new CFn(fn, args));
480 Object ret = fn.applyTo(RT.cons(vals.get(ref), args));
481 vals.put(ref, ret);
482 return ret;
483 }
485 /*
486 //for test
487 static CyclicBarrier barrier;
488 static ArrayList<Ref> items;
490 public static void main(String[] args){
491 try
492 {
493 if(args.length != 4)
494 System.err.println("Usage: LockingTransaction nthreads nitems niters ninstances");
495 int nthreads = Integer.parseInt(args[0]);
496 int nitems = Integer.parseInt(args[1]);
497 int niters = Integer.parseInt(args[2]);
498 int ninstances = Integer.parseInt(args[3]);
500 if(items == null)
501 {
502 ArrayList<Ref> temp = new ArrayList(nitems);
503 for(int i = 0; i < nitems; i++)
504 temp.add(new Ref(0));
505 items = temp;
506 }
508 class Incr extends AFn{
509 public Object invoke(Object arg1) throws Exception{
510 Integer i = (Integer) arg1;
511 return i + 1;
512 }
514 public Obj withMeta(IPersistentMap meta){
515 throw new UnsupportedOperationException();
517 }
518 }
520 class Commuter extends AFn implements Callable{
521 int niters;
522 List<Ref> items;
523 Incr incr;
526 public Commuter(int niters, List<Ref> items){
527 this.niters = niters;
528 this.items = items;
529 this.incr = new Incr();
530 }
532 public Object call() throws Exception{
533 long nanos = 0;
534 for(int i = 0; i < niters; i++)
535 {
536 long start = System.nanoTime();
537 LockingTransaction.runInTransaction(this);
538 nanos += System.nanoTime() - start;
539 }
540 return nanos;
541 }
543 public Object invoke() throws Exception{
544 for(Ref tref : items)
545 {
546 LockingTransaction.getEx().doCommute(tref, incr);
547 }
548 return null;
549 }
551 public Obj withMeta(IPersistentMap meta){
552 throw new UnsupportedOperationException();
554 }
555 }
557 class Incrementer extends AFn implements Callable{
558 int niters;
559 List<Ref> items;
562 public Incrementer(int niters, List<Ref> items){
563 this.niters = niters;
564 this.items = items;
565 }
567 public Object call() throws Exception{
568 long nanos = 0;
569 for(int i = 0; i < niters; i++)
570 {
571 long start = System.nanoTime();
572 LockingTransaction.runInTransaction(this);
573 nanos += System.nanoTime() - start;
574 }
575 return nanos;
576 }
578 public Object invoke() throws Exception{
579 for(Ref tref : items)
580 {
581 //Transaction.get().doTouch(tref);
582 // LockingTransaction t = LockingTransaction.getEx();
583 // int val = (Integer) t.doGet(tref);
584 // t.doSet(tref, val + 1);
585 int val = (Integer) tref.get();
586 tref.set(val + 1);
587 }
588 return null;
589 }
591 public Obj withMeta(IPersistentMap meta){
592 throw new UnsupportedOperationException();
594 }
595 }
597 ArrayList<Callable<Long>> tasks = new ArrayList(nthreads);
598 for(int i = 0; i < nthreads; i++)
599 {
600 ArrayList<Ref> si;
601 synchronized(items)
602 {
603 si = (ArrayList<Ref>) items.clone();
604 }
605 Collections.shuffle(si);
606 tasks.add(new Incrementer(niters, si));
607 //tasks.add(new Commuter(niters, si));
608 }
609 ExecutorService e = Executors.newFixedThreadPool(nthreads);
611 if(barrier == null)
612 barrier = new CyclicBarrier(ninstances);
613 System.out.println("waiting for other instances...");
614 barrier.await();
615 System.out.println("starting");
616 long start = System.nanoTime();
617 List<Future<Long>> results = e.invokeAll(tasks);
618 long estimatedTime = System.nanoTime() - start;
619 System.out.printf("nthreads: %d, nitems: %d, niters: %d, time: %d%n", nthreads, nitems, niters,
620 estimatedTime / 1000000);
621 e.shutdown();
622 for(Future<Long> result : results)
623 {
624 System.out.printf("%d, ", result.get() / 1000000);
625 }
626 System.out.println();
627 System.out.println("waiting for other instances...");
628 barrier.await();
629 synchronized(items)
630 {
631 for(Ref item : items)
632 {
633 System.out.printf("%d, ", (Integer) item.currentVal());
634 }
635 }
636 System.out.println("\ndone");
637 System.out.flush();
638 }
639 catch(Exception ex)
640 {
641 ex.printStackTrace();
642 }
643 }
644 */
645 }