Mercurial > lasercutter
diff src/clojure/parallel.clj @ 10:ef7dbbd6452c
added clojure source goodness
author | Robert McIntyre <rlm@mit.edu> |
---|---|
date | Sat, 21 Aug 2010 06:25:44 -0400 |
parents | |
children |
line wrap: on
line diff
1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 1.2 +++ b/src/clojure/parallel.clj Sat Aug 21 06:25:44 2010 -0400 1.3 @@ -0,0 +1,250 @@ 1.4 +; Copyright (c) Rich Hickey. All rights reserved. 1.5 +; The use and distribution terms for this software are covered by the 1.6 +; Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php) 1.7 +; which can be found in the file epl-v10.html at the root of this distribution. 1.8 +; By using this software in any fashion, you are agreeing to be bound by 1.9 +; the terms of this license. 1.10 +; You must not remove this notice, or any other, from this software. 1.11 + 1.12 +(ns ^{:doc "DEPRECATED Wrapper of the ForkJoin library (JSR-166)." 1.13 + :author "Rich Hickey"} 1.14 + clojure.parallel) 1.15 +(alias 'parallel 'clojure.parallel) 1.16 + 1.17 +(comment " 1.18 +The parallel library wraps the ForkJoin library scheduled for inclusion in JDK 7: 1.19 + 1.20 +http://gee.cs.oswego.edu/dl/concurrency-interest/index.html 1.21 + 1.22 +You'll need jsr166y.jar in your classpath in order to use this 1.23 +library. The basic idea is that Clojure collections, and most 1.24 +efficiently vectors, can be turned into parallel arrays for use by 1.25 +this library with the function par, although most of the functions 1.26 +take collections and will call par if needed, so normally you will 1.27 +only need to call par explicitly in order to attach bound/filter/map 1.28 +ops. Parallel arrays support the attachment of bounds, filters and 1.29 +mapping functions prior to realization/calculation, which happens as 1.30 +the result of any of several operations on the 1.31 +array (pvec/psort/pfilter-nils/pfilter-dupes). Rather than perform 1.32 +composite operations in steps, as would normally be done with 1.33 +sequences, maps and filters are instead attached and thus composed by 1.34 +providing ops to par. Note that there is an order sensitivity to the 1.35 +attachments - bounds precede filters precede mappings. All operations 1.36 +then happen in parallel, using multiple threads and a sophisticated 1.37 +work-stealing system supported by fork-join, either when the array is 1.38 +realized, or to perform aggregate operations like preduce/pmin/pmax 1.39 +etc. A parallel array can be realized into a Clojure vector using 1.40 +pvec. 1.41 +") 1.42 + 1.43 +(import '(jsr166y.forkjoin ParallelArray ParallelArrayWithBounds ParallelArrayWithFilter 1.44 + ParallelArrayWithMapping 1.45 + Ops$Op Ops$BinaryOp Ops$Reducer Ops$Predicate Ops$BinaryPredicate 1.46 + Ops$IntAndObjectPredicate Ops$IntAndObjectToObject)) 1.47 + 1.48 +(defn- op [f] 1.49 + (proxy [Ops$Op] [] 1.50 + (op [x] (f x)))) 1.51 + 1.52 +(defn- binary-op [f] 1.53 + (proxy [Ops$BinaryOp] [] 1.54 + (op [x y] (f x y)))) 1.55 + 1.56 +(defn- int-and-object-to-object [f] 1.57 + (proxy [Ops$IntAndObjectToObject] [] 1.58 + (op [i x] (f x i)))) 1.59 + 1.60 +(defn- reducer [f] 1.61 + (proxy [Ops$Reducer] [] 1.62 + (op [x y] (f x y)))) 1.63 + 1.64 +(defn- predicate [f] 1.65 + (proxy [Ops$Predicate] [] 1.66 + (op [x] (boolean (f x))))) 1.67 + 1.68 +(defn- binary-predicate [f] 1.69 + (proxy [Ops$BinaryPredicate] [] 1.70 + (op [x y] (boolean (f x y))))) 1.71 + 1.72 +(defn- int-and-object-predicate [f] 1.73 + (proxy [Ops$IntAndObjectPredicate] [] 1.74 + (op [i x] (boolean (f x i))))) 1.75 + 1.76 +(defn par 1.77 + "Creates a parallel array from coll. ops, if supplied, perform 1.78 + on-the-fly filtering or transformations during parallel realization 1.79 + or calculation. ops form a chain, and bounds must precede filters, 1.80 + must precede maps. ops must be a set of keyword value pairs of the 1.81 + following forms: 1.82 + 1.83 + :bound [start end] 1.84 + 1.85 + Only elements from start (inclusive) to end (exclusive) will be 1.86 + processed when the array is realized. 1.87 + 1.88 + :filter pred 1.89 + 1.90 + Filter preds remove elements from processing when the array is realized. pred 1.91 + must be a function of one argument whose return will be processed 1.92 + via boolean. 1.93 + 1.94 + :filter-index pred2 1.95 + 1.96 + pred2 must be a function of two arguments, which will be an element 1.97 + of the collection and the corresponding index, whose return will be 1.98 + processed via boolean. 1.99 + 1.100 + :filter-with [pred2 coll2] 1.101 + 1.102 + pred2 must be a function of two arguments, which will be 1.103 + corresponding elements of the 2 collections. 1.104 + 1.105 + :map f 1.106 + 1.107 + Map fns will be used to transform elements when the array is 1.108 + realized. f must be a function of one argument. 1.109 + 1.110 + :map-index f2 1.111 + 1.112 + f2 must be a function of two arguments, which will be an element of 1.113 + the collection and the corresponding index. 1.114 + 1.115 + :map-with [f2 coll2] 1.116 + 1.117 + f2 must be a function of two arguments, which will be corresponding 1.118 + elements of the 2 collections." 1.119 + 1.120 + ([coll] 1.121 + (if (instance? ParallelArrayWithMapping coll) 1.122 + coll 1.123 + (. ParallelArray createUsingHandoff 1.124 + (to-array coll) 1.125 + (. ParallelArray defaultExecutor)))) 1.126 + ([coll & ops] 1.127 + (reduce (fn [pa [op args]] 1.128 + (cond 1.129 + (= op :bound) (. pa withBounds (args 0) (args 1)) 1.130 + (= op :filter) (. pa withFilter (predicate args)) 1.131 + (= op :filter-with) (. pa withFilter (binary-predicate (args 0)) (par (args 1))) 1.132 + (= op :filter-index) (. pa withIndexedFilter (int-and-object-predicate args)) 1.133 + (= op :map) (. pa withMapping (parallel/op args)) 1.134 + (= op :map-with) (. pa withMapping (binary-op (args 0)) (par (args 1))) 1.135 + (= op :map-index) (. pa withIndexedMapping (int-and-object-to-object args)) 1.136 + :else (throw (Exception. (str "Unsupported par op: " op))))) 1.137 + (par coll) 1.138 + (partition 2 ops)))) 1.139 + 1.140 +;;;;;;;;;;;;;;;;;;;;; aggregate operations ;;;;;;;;;;;;;;;;;;;;;; 1.141 +(defn pany 1.142 + "Returns some (random) element of the coll if it satisfies the bound/filter/map" 1.143 + [coll] 1.144 + (. (par coll) any)) 1.145 + 1.146 +(defn pmax 1.147 + "Returns the maximum element, presuming Comparable elements, unless 1.148 + a Comparator comp is supplied" 1.149 + ([coll] (. (par coll) max)) 1.150 + ([coll comp] (. (par coll) max comp))) 1.151 + 1.152 +(defn pmin 1.153 + "Returns the minimum element, presuming Comparable elements, unless 1.154 + a Comparator comp is supplied" 1.155 + ([coll] (. (par coll) min)) 1.156 + ([coll comp] (. (par coll) min comp))) 1.157 + 1.158 +(defn- summary-map [s] 1.159 + {:min (.min s) :max (.max s) :size (.size s) :min-index (.indexOfMin s) :max-index (.indexOfMax s)}) 1.160 + 1.161 +(defn psummary 1.162 + "Returns a map of summary statistics (min. max, size, min-index, max-index, 1.163 + presuming Comparable elements, unless a Comparator comp is supplied" 1.164 + ([coll] (summary-map (. (par coll) summary))) 1.165 + ([coll comp] (summary-map (. (par coll) summary comp)))) 1.166 + 1.167 +(defn preduce 1.168 + "Returns the reduction of the realized elements of coll 1.169 + using function f. Note f will not necessarily be called 1.170 + consecutively, and so must be commutative. Also note that 1.171 + (f base an-element) might be performed many times, i.e. base is not 1.172 + an initial value as with sequential reduce." 1.173 + [f base coll] 1.174 + (. (par coll) (reduce (reducer f) base))) 1.175 + 1.176 +;;;;;;;;;;;;;;;;;;;;; collection-producing operations ;;;;;;;;;;;;;;;;;;;;;; 1.177 + 1.178 +(defn- pa-to-vec [pa] 1.179 + (vec (. pa getArray))) 1.180 + 1.181 +(defn- pall 1.182 + "Realizes a copy of the coll as a parallel array, with any bounds/filters/maps applied" 1.183 + [coll] 1.184 + (if (instance? ParallelArrayWithMapping coll) 1.185 + (. coll all) 1.186 + (par coll))) 1.187 + 1.188 +(defn pvec 1.189 + "Returns the realized contents of the parallel array pa as a Clojure vector" 1.190 + [pa] (pa-to-vec (pall pa))) 1.191 + 1.192 +(defn pdistinct 1.193 + "Returns a parallel array of the distinct elements of coll" 1.194 + [coll] 1.195 + (pa-to-vec (. (pall coll) allUniqueElements))) 1.196 + 1.197 +;this doesn't work, passes null to reducer? 1.198 +(defn- pcumulate [coll f init] 1.199 + (.. (pall coll) (precumulate (reducer f) init))) 1.200 + 1.201 +(defn psort 1.202 + "Returns a new vector consisting of the realized items in coll, sorted, 1.203 + presuming Comparable elements, unless a Comparator comp is supplied" 1.204 + ([coll] (pa-to-vec (. (pall coll) sort))) 1.205 + ([coll comp] (pa-to-vec (. (pall coll) sort comp)))) 1.206 + 1.207 +(defn pfilter-nils 1.208 + "Returns a vector containing the non-nil (realized) elements of coll" 1.209 + [coll] 1.210 + (pa-to-vec (. (pall coll) removeNulls))) 1.211 + 1.212 +(defn pfilter-dupes 1.213 + "Returns a vector containing the (realized) elements of coll, 1.214 + without any consecutive duplicates" 1.215 + [coll] 1.216 + (pa-to-vec (. (pall coll) removeConsecutiveDuplicates))) 1.217 + 1.218 + 1.219 +(comment 1.220 +(load-file "src/parallel.clj") 1.221 +(refer 'parallel) 1.222 +(pdistinct [1 2 3 2 1]) 1.223 +;(pcumulate [1 2 3 2 1] + 0) ;broken, not exposed 1.224 +(def a (make-array Object 1000000)) 1.225 +(dotimes i (count a) 1.226 + (aset a i (rand-int i))) 1.227 +(time (reduce + 0 a)) 1.228 +(time (preduce + 0 a)) 1.229 +(time (count (distinct a))) 1.230 +(time (count (pdistinct a))) 1.231 + 1.232 +(preduce + 0 [1 2 3 2 1]) 1.233 +(preduce + 0 (psort a)) 1.234 +(pvec (par [11 2 3 2] :filter-index (fn [x i] (> i x)))) 1.235 +(pvec (par [11 2 3 2] :filter-with [(fn [x y] (> y x)) [110 2 33 2]])) 1.236 + 1.237 +(psummary ;or pvec/pmax etc 1.238 + (par [11 2 3 2] 1.239 + :filter-with [(fn [x y] (> y x)) 1.240 + [110 2 33 2]] 1.241 + :map #(* % 2))) 1.242 + 1.243 +(preduce + 0 1.244 + (par [11 2 3 2] 1.245 + :filter-with [< [110 2 33 2]])) 1.246 + 1.247 +(time (reduce + 0 (map #(* % %) (range 1000000)))) 1.248 +(time (preduce + 0 (par (range 1000000) :map-index *))) 1.249 +(def v (range 1000000)) 1.250 +(time (preduce + 0 (par v :map-index *))) 1.251 +(time (preduce + 0 (par v :map #(* % %)))) 1.252 +(time (reduce + 0 (map #(* % %) v))) 1.253 +) 1.254 \ No newline at end of file