annotate src/clojure/parallel.clj @ 10:ef7dbbd6452c

added clojure source goodness
author Robert McIntyre <rlm@mit.edu>
date Sat, 21 Aug 2010 06:25:44 -0400
parents
children
rev   line source
rlm@10 1 ; Copyright (c) Rich Hickey. All rights reserved.
rlm@10 2 ; The use and distribution terms for this software are covered by the
rlm@10 3 ; Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)
rlm@10 4 ; which can be found in the file epl-v10.html at the root of this distribution.
rlm@10 5 ; By using this software in any fashion, you are agreeing to be bound by
rlm@10 6 ; the terms of this license.
rlm@10 7 ; You must not remove this notice, or any other, from this software.
rlm@10 8
rlm@10 9 (ns ^{:doc "DEPRECATED Wrapper of the ForkJoin library (JSR-166)."
rlm@10 10 :author "Rich Hickey"}
rlm@10 11 clojure.parallel)
rlm@10 12 (alias 'parallel 'clojure.parallel)
rlm@10 13
rlm@10 14 (comment "
rlm@10 15 The parallel library wraps the ForkJoin library scheduled for inclusion in JDK 7:
rlm@10 16
rlm@10 17 http://gee.cs.oswego.edu/dl/concurrency-interest/index.html
rlm@10 18
rlm@10 19 You'll need jsr166y.jar in your classpath in order to use this
rlm@10 20 library. The basic idea is that Clojure collections, and most
rlm@10 21 efficiently vectors, can be turned into parallel arrays for use by
rlm@10 22 this library with the function par, although most of the functions
rlm@10 23 take collections and will call par if needed, so normally you will
rlm@10 24 only need to call par explicitly in order to attach bound/filter/map
rlm@10 25 ops. Parallel arrays support the attachment of bounds, filters and
rlm@10 26 mapping functions prior to realization/calculation, which happens as
rlm@10 27 the result of any of several operations on the
rlm@10 28 array (pvec/psort/pfilter-nils/pfilter-dupes). Rather than perform
rlm@10 29 composite operations in steps, as would normally be done with
rlm@10 30 sequences, maps and filters are instead attached and thus composed by
rlm@10 31 providing ops to par. Note that there is an order sensitivity to the
rlm@10 32 attachments - bounds precede filters precede mappings. All operations
rlm@10 33 then happen in parallel, using multiple threads and a sophisticated
rlm@10 34 work-stealing system supported by fork-join, either when the array is
rlm@10 35 realized, or to perform aggregate operations like preduce/pmin/pmax
rlm@10 36 etc. A parallel array can be realized into a Clojure vector using
rlm@10 37 pvec.
rlm@10 38 ")
rlm@10 39
rlm@10 40 (import '(jsr166y.forkjoin ParallelArray ParallelArrayWithBounds ParallelArrayWithFilter
rlm@10 41 ParallelArrayWithMapping
rlm@10 42 Ops$Op Ops$BinaryOp Ops$Reducer Ops$Predicate Ops$BinaryPredicate
rlm@10 43 Ops$IntAndObjectPredicate Ops$IntAndObjectToObject))
rlm@10 44
rlm@10 45 (defn- op [f]
rlm@10 46 (proxy [Ops$Op] []
rlm@10 47 (op [x] (f x))))
rlm@10 48
rlm@10 49 (defn- binary-op [f]
rlm@10 50 (proxy [Ops$BinaryOp] []
rlm@10 51 (op [x y] (f x y))))
rlm@10 52
rlm@10 53 (defn- int-and-object-to-object [f]
rlm@10 54 (proxy [Ops$IntAndObjectToObject] []
rlm@10 55 (op [i x] (f x i))))
rlm@10 56
rlm@10 57 (defn- reducer [f]
rlm@10 58 (proxy [Ops$Reducer] []
rlm@10 59 (op [x y] (f x y))))
rlm@10 60
rlm@10 61 (defn- predicate [f]
rlm@10 62 (proxy [Ops$Predicate] []
rlm@10 63 (op [x] (boolean (f x)))))
rlm@10 64
rlm@10 65 (defn- binary-predicate [f]
rlm@10 66 (proxy [Ops$BinaryPredicate] []
rlm@10 67 (op [x y] (boolean (f x y)))))
rlm@10 68
rlm@10 69 (defn- int-and-object-predicate [f]
rlm@10 70 (proxy [Ops$IntAndObjectPredicate] []
rlm@10 71 (op [i x] (boolean (f x i)))))
rlm@10 72
rlm@10 73 (defn par
rlm@10 74 "Creates a parallel array from coll. ops, if supplied, perform
rlm@10 75 on-the-fly filtering or transformations during parallel realization
rlm@10 76 or calculation. ops form a chain, and bounds must precede filters,
rlm@10 77 must precede maps. ops must be a set of keyword value pairs of the
rlm@10 78 following forms:
rlm@10 79
rlm@10 80 :bound [start end]
rlm@10 81
rlm@10 82 Only elements from start (inclusive) to end (exclusive) will be
rlm@10 83 processed when the array is realized.
rlm@10 84
rlm@10 85 :filter pred
rlm@10 86
rlm@10 87 Filter preds remove elements from processing when the array is realized. pred
rlm@10 88 must be a function of one argument whose return will be processed
rlm@10 89 via boolean.
rlm@10 90
rlm@10 91 :filter-index pred2
rlm@10 92
rlm@10 93 pred2 must be a function of two arguments, which will be an element
rlm@10 94 of the collection and the corresponding index, whose return will be
rlm@10 95 processed via boolean.
rlm@10 96
rlm@10 97 :filter-with [pred2 coll2]
rlm@10 98
rlm@10 99 pred2 must be a function of two arguments, which will be
rlm@10 100 corresponding elements of the 2 collections.
rlm@10 101
rlm@10 102 :map f
rlm@10 103
rlm@10 104 Map fns will be used to transform elements when the array is
rlm@10 105 realized. f must be a function of one argument.
rlm@10 106
rlm@10 107 :map-index f2
rlm@10 108
rlm@10 109 f2 must be a function of two arguments, which will be an element of
rlm@10 110 the collection and the corresponding index.
rlm@10 111
rlm@10 112 :map-with [f2 coll2]
rlm@10 113
rlm@10 114 f2 must be a function of two arguments, which will be corresponding
rlm@10 115 elements of the 2 collections."
rlm@10 116
rlm@10 117 ([coll]
rlm@10 118 (if (instance? ParallelArrayWithMapping coll)
rlm@10 119 coll
rlm@10 120 (. ParallelArray createUsingHandoff
rlm@10 121 (to-array coll)
rlm@10 122 (. ParallelArray defaultExecutor))))
rlm@10 123 ([coll & ops]
rlm@10 124 (reduce (fn [pa [op args]]
rlm@10 125 (cond
rlm@10 126 (= op :bound) (. pa withBounds (args 0) (args 1))
rlm@10 127 (= op :filter) (. pa withFilter (predicate args))
rlm@10 128 (= op :filter-with) (. pa withFilter (binary-predicate (args 0)) (par (args 1)))
rlm@10 129 (= op :filter-index) (. pa withIndexedFilter (int-and-object-predicate args))
rlm@10 130 (= op :map) (. pa withMapping (parallel/op args))
rlm@10 131 (= op :map-with) (. pa withMapping (binary-op (args 0)) (par (args 1)))
rlm@10 132 (= op :map-index) (. pa withIndexedMapping (int-and-object-to-object args))
rlm@10 133 :else (throw (Exception. (str "Unsupported par op: " op)))))
rlm@10 134 (par coll)
rlm@10 135 (partition 2 ops))))
rlm@10 136
rlm@10 137 ;;;;;;;;;;;;;;;;;;;;; aggregate operations ;;;;;;;;;;;;;;;;;;;;;;
rlm@10 138 (defn pany
rlm@10 139 "Returns some (random) element of the coll if it satisfies the bound/filter/map"
rlm@10 140 [coll]
rlm@10 141 (. (par coll) any))
rlm@10 142
rlm@10 143 (defn pmax
rlm@10 144 "Returns the maximum element, presuming Comparable elements, unless
rlm@10 145 a Comparator comp is supplied"
rlm@10 146 ([coll] (. (par coll) max))
rlm@10 147 ([coll comp] (. (par coll) max comp)))
rlm@10 148
rlm@10 149 (defn pmin
rlm@10 150 "Returns the minimum element, presuming Comparable elements, unless
rlm@10 151 a Comparator comp is supplied"
rlm@10 152 ([coll] (. (par coll) min))
rlm@10 153 ([coll comp] (. (par coll) min comp)))
rlm@10 154
rlm@10 155 (defn- summary-map [s]
rlm@10 156 {:min (.min s) :max (.max s) :size (.size s) :min-index (.indexOfMin s) :max-index (.indexOfMax s)})
rlm@10 157
rlm@10 158 (defn psummary
rlm@10 159 "Returns a map of summary statistics (min. max, size, min-index, max-index,
rlm@10 160 presuming Comparable elements, unless a Comparator comp is supplied"
rlm@10 161 ([coll] (summary-map (. (par coll) summary)))
rlm@10 162 ([coll comp] (summary-map (. (par coll) summary comp))))
rlm@10 163
rlm@10 164 (defn preduce
rlm@10 165 "Returns the reduction of the realized elements of coll
rlm@10 166 using function f. Note f will not necessarily be called
rlm@10 167 consecutively, and so must be commutative. Also note that
rlm@10 168 (f base an-element) might be performed many times, i.e. base is not
rlm@10 169 an initial value as with sequential reduce."
rlm@10 170 [f base coll]
rlm@10 171 (. (par coll) (reduce (reducer f) base)))
rlm@10 172
rlm@10 173 ;;;;;;;;;;;;;;;;;;;;; collection-producing operations ;;;;;;;;;;;;;;;;;;;;;;
rlm@10 174
rlm@10 175 (defn- pa-to-vec [pa]
rlm@10 176 (vec (. pa getArray)))
rlm@10 177
rlm@10 178 (defn- pall
rlm@10 179 "Realizes a copy of the coll as a parallel array, with any bounds/filters/maps applied"
rlm@10 180 [coll]
rlm@10 181 (if (instance? ParallelArrayWithMapping coll)
rlm@10 182 (. coll all)
rlm@10 183 (par coll)))
rlm@10 184
rlm@10 185 (defn pvec
rlm@10 186 "Returns the realized contents of the parallel array pa as a Clojure vector"
rlm@10 187 [pa] (pa-to-vec (pall pa)))
rlm@10 188
rlm@10 189 (defn pdistinct
rlm@10 190 "Returns a parallel array of the distinct elements of coll"
rlm@10 191 [coll]
rlm@10 192 (pa-to-vec (. (pall coll) allUniqueElements)))
rlm@10 193
rlm@10 194 ;this doesn't work, passes null to reducer?
rlm@10 195 (defn- pcumulate [coll f init]
rlm@10 196 (.. (pall coll) (precumulate (reducer f) init)))
rlm@10 197
rlm@10 198 (defn psort
rlm@10 199 "Returns a new vector consisting of the realized items in coll, sorted,
rlm@10 200 presuming Comparable elements, unless a Comparator comp is supplied"
rlm@10 201 ([coll] (pa-to-vec (. (pall coll) sort)))
rlm@10 202 ([coll comp] (pa-to-vec (. (pall coll) sort comp))))
rlm@10 203
rlm@10 204 (defn pfilter-nils
rlm@10 205 "Returns a vector containing the non-nil (realized) elements of coll"
rlm@10 206 [coll]
rlm@10 207 (pa-to-vec (. (pall coll) removeNulls)))
rlm@10 208
rlm@10 209 (defn pfilter-dupes
rlm@10 210 "Returns a vector containing the (realized) elements of coll,
rlm@10 211 without any consecutive duplicates"
rlm@10 212 [coll]
rlm@10 213 (pa-to-vec (. (pall coll) removeConsecutiveDuplicates)))
rlm@10 214
rlm@10 215
rlm@10 216 (comment
rlm@10 217 (load-file "src/parallel.clj")
rlm@10 218 (refer 'parallel)
rlm@10 219 (pdistinct [1 2 3 2 1])
rlm@10 220 ;(pcumulate [1 2 3 2 1] + 0) ;broken, not exposed
rlm@10 221 (def a (make-array Object 1000000))
rlm@10 222 (dotimes i (count a)
rlm@10 223 (aset a i (rand-int i)))
rlm@10 224 (time (reduce + 0 a))
rlm@10 225 (time (preduce + 0 a))
rlm@10 226 (time (count (distinct a)))
rlm@10 227 (time (count (pdistinct a)))
rlm@10 228
rlm@10 229 (preduce + 0 [1 2 3 2 1])
rlm@10 230 (preduce + 0 (psort a))
rlm@10 231 (pvec (par [11 2 3 2] :filter-index (fn [x i] (> i x))))
rlm@10 232 (pvec (par [11 2 3 2] :filter-with [(fn [x y] (> y x)) [110 2 33 2]]))
rlm@10 233
rlm@10 234 (psummary ;or pvec/pmax etc
rlm@10 235 (par [11 2 3 2]
rlm@10 236 :filter-with [(fn [x y] (> y x))
rlm@10 237 [110 2 33 2]]
rlm@10 238 :map #(* % 2)))
rlm@10 239
rlm@10 240 (preduce + 0
rlm@10 241 (par [11 2 3 2]
rlm@10 242 :filter-with [< [110 2 33 2]]))
rlm@10 243
rlm@10 244 (time (reduce + 0 (map #(* % %) (range 1000000))))
rlm@10 245 (time (preduce + 0 (par (range 1000000) :map-index *)))
rlm@10 246 (def v (range 1000000))
rlm@10 247 (time (preduce + 0 (par v :map-index *)))
rlm@10 248 (time (preduce + 0 (par v :map #(* % %))))
rlm@10 249 (time (reduce + 0 (map #(* % %) v)))
rlm@10 250 )