Mercurial > lasercutter
view src/clojure/lang/LockingTransaction.java @ 10:ef7dbbd6452c
added clojure source goodness
author | Robert McIntyre <rlm@mit.edu> |
---|---|
date | Sat, 21 Aug 2010 06:25:44 -0400 |
parents | |
children |
line wrap: on
line source
1 /**2 * Copyright (c) Rich Hickey. All rights reserved.3 * The use and distribution terms for this software are covered by the4 * Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)5 * which can be found in the file epl-v10.html at the root of this distribution.6 * By using this software in any fashion, you are agreeing to be bound by7 * the terms of this license.8 * You must not remove this notice, or any other, from this software.9 **/11 /* rich Jul 26, 2007 */13 package clojure.lang;15 import java.util.*;16 import java.util.concurrent.atomic.AtomicInteger;17 import java.util.concurrent.atomic.AtomicLong;18 import java.util.concurrent.Callable;19 import java.util.concurrent.TimeUnit;20 import java.util.concurrent.CountDownLatch;22 @SuppressWarnings({"SynchronizeOnNonFinalField"})23 public class LockingTransaction{25 public static final int RETRY_LIMIT = 10000;26 public static final int LOCK_WAIT_MSECS = 100;27 public static final long BARGE_WAIT_NANOS = 10 * 1000000;28 //public static int COMMUTE_RETRY_LIMIT = 10;30 static final int RUNNING = 0;31 static final int COMMITTING = 1;32 static final int RETRY = 2;33 static final int KILLED = 3;34 static final int COMMITTED = 4;36 final static ThreadLocal<LockingTransaction> transaction = new ThreadLocal<LockingTransaction>();39 static class RetryEx extends Error{40 }42 static class AbortException extends Exception{43 }45 public static class Info{46 final AtomicInteger status;47 final long startPoint;48 final CountDownLatch latch;51 public Info(int status, long startPoint){52 this.status = new AtomicInteger(status);53 this.startPoint = startPoint;54 this.latch = new CountDownLatch(1);55 }57 public boolean running(){58 int s = status.get();59 return s == RUNNING || s == COMMITTING;60 }61 }63 static class CFn{64 final IFn fn;65 final ISeq args;67 public CFn(IFn fn, ISeq args){68 this.fn = fn;69 this.args = args;70 }71 }72 //total order on transactions73 //transactions will consume a point for init, for each retry, and on commit if writing74 final private static AtomicLong lastPoint = new AtomicLong();76 void getReadPoint(){77 readPoint = lastPoint.incrementAndGet();78 }80 long getCommitPoint(){81 return lastPoint.incrementAndGet();82 }84 void stop(int status){85 if(info != null)86 {87 synchronized(info)88 {89 info.status.set(status);90 info.latch.countDown();91 }92 info = null;93 vals.clear();94 sets.clear();95 commutes.clear();96 //actions.clear();97 }98 }101 Info info;102 long readPoint;103 long startPoint;104 long startTime;105 final RetryEx retryex = new RetryEx();106 final ArrayList<Agent.Action> actions = new ArrayList<Agent.Action>();107 final HashMap<Ref, Object> vals = new HashMap<Ref, Object>();108 final HashSet<Ref> sets = new HashSet<Ref>();109 final TreeMap<Ref, ArrayList<CFn>> commutes = new TreeMap<Ref, ArrayList<CFn>>();111 final HashSet<Ref> ensures = new HashSet<Ref>(); //all hold readLock114 void tryWriteLock(Ref ref){115 try116 {117 if(!ref.lock.writeLock().tryLock(LOCK_WAIT_MSECS, TimeUnit.MILLISECONDS))118 throw retryex;119 }120 catch(InterruptedException e)121 {122 throw retryex;123 }124 }126 //returns the most recent val127 Object lock(Ref ref){128 //can't upgrade readLock, so release it129 releaseIfEnsured(ref);131 boolean unlocked = true;132 try133 {134 tryWriteLock(ref);135 unlocked = false;137 if(ref.tvals != null && ref.tvals.point > readPoint)138 throw retryex;139 Info refinfo = ref.tinfo;141 //write lock conflict142 if(refinfo != null && refinfo != info && refinfo.running())143 {144 if(!barge(refinfo))145 {146 ref.lock.writeLock().unlock();147 unlocked = true;148 return blockAndBail(refinfo);149 }150 }151 ref.tinfo = info;152 return ref.tvals == null ? null : ref.tvals.val;153 }154 finally155 {156 if(!unlocked)157 ref.lock.writeLock().unlock();158 }159 }161 private Object blockAndBail(Info refinfo){162 //stop prior to blocking163 stop(RETRY);164 try165 {166 refinfo.latch.await(LOCK_WAIT_MSECS, TimeUnit.MILLISECONDS);167 }168 catch(InterruptedException e)169 {170 //ignore171 }172 throw retryex;173 }175 private void releaseIfEnsured(Ref ref){176 if(ensures.contains(ref))177 {178 ensures.remove(ref);179 ref.lock.readLock().unlock();180 }181 }183 void abort() throws AbortException{184 stop(KILLED);185 throw new AbortException();186 }188 private boolean bargeTimeElapsed(){189 return System.nanoTime() - startTime > BARGE_WAIT_NANOS;190 }192 private boolean barge(Info refinfo){193 boolean barged = false;194 //if this transaction is older195 // try to abort the other196 if(bargeTimeElapsed() && startPoint < refinfo.startPoint)197 {198 barged = refinfo.status.compareAndSet(RUNNING, KILLED);199 if(barged)200 refinfo.latch.countDown();201 }202 return barged;203 }205 static LockingTransaction getEx(){206 LockingTransaction t = transaction.get();207 if(t == null || t.info == null)208 throw new IllegalStateException("No transaction running");209 return t;210 }212 static public boolean isRunning(){213 return getRunning() != null;214 }216 static LockingTransaction getRunning(){217 LockingTransaction t = transaction.get();218 if(t == null || t.info == null)219 return null;220 return t;221 }223 static public Object runInTransaction(Callable fn) throws Exception{224 LockingTransaction t = transaction.get();225 if(t == null)226 transaction.set(t = new LockingTransaction());228 if(t.info != null)229 return fn.call();231 return t.run(fn);232 }234 static class Notify{235 final public Ref ref;236 final public Object oldval;237 final public Object newval;239 Notify(Ref ref, Object oldval, Object newval){240 this.ref = ref;241 this.oldval = oldval;242 this.newval = newval;243 }244 }246 Object run(Callable fn) throws Exception{247 boolean done = false;248 Object ret = null;249 ArrayList<Ref> locked = new ArrayList<Ref>();250 ArrayList<Notify> notify = new ArrayList<Notify>();252 for(int i = 0; !done && i < RETRY_LIMIT; i++)253 {254 try255 {256 getReadPoint();257 if(i == 0)258 {259 startPoint = readPoint;260 startTime = System.nanoTime();261 }262 info = new Info(RUNNING, startPoint);263 ret = fn.call();264 //make sure no one has killed us before this point, and can't from now on265 if(info.status.compareAndSet(RUNNING, COMMITTING))266 {267 for(Map.Entry<Ref, ArrayList<CFn>> e : commutes.entrySet())268 {269 Ref ref = e.getKey();270 if(sets.contains(ref)) continue;272 boolean wasEnsured = ensures.contains(ref);273 //can't upgrade readLock, so release it274 releaseIfEnsured(ref);275 tryWriteLock(ref);276 locked.add(ref);277 if(wasEnsured && ref.tvals != null && ref.tvals.point > readPoint)278 throw retryex;280 Info refinfo = ref.tinfo;281 if(refinfo != null && refinfo != info && refinfo.running())282 {283 if(!barge(refinfo))284 throw retryex;285 }286 Object val = ref.tvals == null ? null : ref.tvals.val;287 vals.put(ref, val);288 for(CFn f : e.getValue())289 {290 vals.put(ref, f.fn.applyTo(RT.cons(vals.get(ref), f.args)));291 }292 }293 for(Ref ref : sets)294 {295 tryWriteLock(ref);296 locked.add(ref);297 }299 //validate and enqueue notifications300 for(Map.Entry<Ref, Object> e : vals.entrySet())301 {302 Ref ref = e.getKey();303 ref.validate(ref.getValidator(), e.getValue());304 }306 //at this point, all values calced, all refs to be written locked307 //no more client code to be called308 long msecs = System.currentTimeMillis();309 long commitPoint = getCommitPoint();310 for(Map.Entry<Ref, Object> e : vals.entrySet())311 {312 Ref ref = e.getKey();313 Object oldval = ref.tvals == null ? null : ref.tvals.val;314 Object newval = e.getValue();315 int hcount = ref.histCount();317 if(ref.tvals == null)318 {319 ref.tvals = new Ref.TVal(newval, commitPoint, msecs);320 }321 else if((ref.faults.get() > 0 && hcount < ref.maxHistory)322 || hcount < ref.minHistory)323 {324 ref.tvals = new Ref.TVal(newval, commitPoint, msecs, ref.tvals);325 ref.faults.set(0);326 }327 else328 {329 ref.tvals = ref.tvals.next;330 ref.tvals.val = newval;331 ref.tvals.point = commitPoint;332 ref.tvals.msecs = msecs;333 }334 if(ref.getWatches().count() > 0)335 notify.add(new Notify(ref, oldval, newval));336 }338 done = true;339 info.status.set(COMMITTED);340 }341 }342 catch(RetryEx retry)343 {344 //eat this so we retry rather than fall out345 }346 finally347 {348 for(int k = locked.size() - 1; k >= 0; --k)349 {350 locked.get(k).lock.writeLock().unlock();351 }352 locked.clear();353 for(Ref r : ensures)354 {355 r.lock.readLock().unlock();356 }357 ensures.clear();358 stop(done ? COMMITTED : RETRY);359 try360 {361 if(done) //re-dispatch out of transaction362 {363 for(Notify n : notify)364 {365 n.ref.notifyWatches(n.oldval, n.newval);366 }367 for(Agent.Action action : actions)368 {369 Agent.dispatchAction(action);370 }371 }372 }373 finally374 {375 notify.clear();376 actions.clear();377 }378 }379 }380 if(!done)381 throw new Exception("Transaction failed after reaching retry limit");382 return ret;383 }385 public void enqueue(Agent.Action action){386 actions.add(action);387 }389 Object doGet(Ref ref){390 if(!info.running())391 throw retryex;392 if(vals.containsKey(ref))393 return vals.get(ref);394 try395 {396 ref.lock.readLock().lock();397 if(ref.tvals == null)398 throw new IllegalStateException(ref.toString() + " is unbound.");399 Ref.TVal ver = ref.tvals;400 do401 {402 if(ver.point <= readPoint)403 return ver.val;404 } while((ver = ver.prior) != ref.tvals);405 }406 finally407 {408 ref.lock.readLock().unlock();409 }410 //no version of val precedes the read point411 ref.faults.incrementAndGet();412 throw retryex;414 }416 Object doSet(Ref ref, Object val){417 if(!info.running())418 throw retryex;419 if(commutes.containsKey(ref))420 throw new IllegalStateException("Can't set after commute");421 if(!sets.contains(ref))422 {423 sets.add(ref);424 lock(ref);425 }426 vals.put(ref, val);427 return val;428 }430 void doEnsure(Ref ref){431 if(!info.running())432 throw retryex;433 if(ensures.contains(ref))434 return;435 ref.lock.readLock().lock();437 //someone completed a write after our snapshot438 if(ref.tvals != null && ref.tvals.point > readPoint) {439 ref.lock.readLock().unlock();440 throw retryex;441 }443 Info refinfo = ref.tinfo;445 //writer exists446 if(refinfo != null && refinfo.running())447 {448 ref.lock.readLock().unlock();450 if(refinfo != info) //not us, ensure is doomed451 {452 blockAndBail(refinfo);453 }454 }455 else456 ensures.add(ref);457 }459 Object doCommute(Ref ref, IFn fn, ISeq args) throws Exception{460 if(!info.running())461 throw retryex;462 if(!vals.containsKey(ref))463 {464 Object val = null;465 try466 {467 ref.lock.readLock().lock();468 val = ref.tvals == null ? null : ref.tvals.val;469 }470 finally471 {472 ref.lock.readLock().unlock();473 }474 vals.put(ref, val);475 }476 ArrayList<CFn> fns = commutes.get(ref);477 if(fns == null)478 commutes.put(ref, fns = new ArrayList<CFn>());479 fns.add(new CFn(fn, args));480 Object ret = fn.applyTo(RT.cons(vals.get(ref), args));481 vals.put(ref, ret);482 return ret;483 }485 /*486 //for test487 static CyclicBarrier barrier;488 static ArrayList<Ref> items;490 public static void main(String[] args){491 try492 {493 if(args.length != 4)494 System.err.println("Usage: LockingTransaction nthreads nitems niters ninstances");495 int nthreads = Integer.parseInt(args[0]);496 int nitems = Integer.parseInt(args[1]);497 int niters = Integer.parseInt(args[2]);498 int ninstances = Integer.parseInt(args[3]);500 if(items == null)501 {502 ArrayList<Ref> temp = new ArrayList(nitems);503 for(int i = 0; i < nitems; i++)504 temp.add(new Ref(0));505 items = temp;506 }508 class Incr extends AFn{509 public Object invoke(Object arg1) throws Exception{510 Integer i = (Integer) arg1;511 return i + 1;512 }514 public Obj withMeta(IPersistentMap meta){515 throw new UnsupportedOperationException();517 }518 }520 class Commuter extends AFn implements Callable{521 int niters;522 List<Ref> items;523 Incr incr;526 public Commuter(int niters, List<Ref> items){527 this.niters = niters;528 this.items = items;529 this.incr = new Incr();530 }532 public Object call() throws Exception{533 long nanos = 0;534 for(int i = 0; i < niters; i++)535 {536 long start = System.nanoTime();537 LockingTransaction.runInTransaction(this);538 nanos += System.nanoTime() - start;539 }540 return nanos;541 }543 public Object invoke() throws Exception{544 for(Ref tref : items)545 {546 LockingTransaction.getEx().doCommute(tref, incr);547 }548 return null;549 }551 public Obj withMeta(IPersistentMap meta){552 throw new UnsupportedOperationException();554 }555 }557 class Incrementer extends AFn implements Callable{558 int niters;559 List<Ref> items;562 public Incrementer(int niters, List<Ref> items){563 this.niters = niters;564 this.items = items;565 }567 public Object call() throws Exception{568 long nanos = 0;569 for(int i = 0; i < niters; i++)570 {571 long start = System.nanoTime();572 LockingTransaction.runInTransaction(this);573 nanos += System.nanoTime() - start;574 }575 return nanos;576 }578 public Object invoke() throws Exception{579 for(Ref tref : items)580 {581 //Transaction.get().doTouch(tref);582 // LockingTransaction t = LockingTransaction.getEx();583 // int val = (Integer) t.doGet(tref);584 // t.doSet(tref, val + 1);585 int val = (Integer) tref.get();586 tref.set(val + 1);587 }588 return null;589 }591 public Obj withMeta(IPersistentMap meta){592 throw new UnsupportedOperationException();594 }595 }597 ArrayList<Callable<Long>> tasks = new ArrayList(nthreads);598 for(int i = 0; i < nthreads; i++)599 {600 ArrayList<Ref> si;601 synchronized(items)602 {603 si = (ArrayList<Ref>) items.clone();604 }605 Collections.shuffle(si);606 tasks.add(new Incrementer(niters, si));607 //tasks.add(new Commuter(niters, si));608 }609 ExecutorService e = Executors.newFixedThreadPool(nthreads);611 if(barrier == null)612 barrier = new CyclicBarrier(ninstances);613 System.out.println("waiting for other instances...");614 barrier.await();615 System.out.println("starting");616 long start = System.nanoTime();617 List<Future<Long>> results = e.invokeAll(tasks);618 long estimatedTime = System.nanoTime() - start;619 System.out.printf("nthreads: %d, nitems: %d, niters: %d, time: %d%n", nthreads, nitems, niters,620 estimatedTime / 1000000);621 e.shutdown();622 for(Future<Long> result : results)623 {624 System.out.printf("%d, ", result.get() / 1000000);625 }626 System.out.println();627 System.out.println("waiting for other instances...");628 barrier.await();629 synchronized(items)630 {631 for(Ref item : items)632 {633 System.out.printf("%d, ", (Integer) item.currentVal());634 }635 }636 System.out.println("\ndone");637 System.out.flush();638 }639 catch(Exception ex)640 {641 ex.printStackTrace();642 }643 }644 */645 }