rlm@10
|
1 ; Copyright (c) Rich Hickey. All rights reserved.
|
rlm@10
|
2 ; The use and distribution terms for this software are covered by the
|
rlm@10
|
3 ; Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)
|
rlm@10
|
4 ; which can be found in the file epl-v10.html at the root of this distribution.
|
rlm@10
|
5 ; By using this software in any fashion, you are agreeing to be bound by
|
rlm@10
|
6 ; the terms of this license.
|
rlm@10
|
7 ; You must not remove this notice, or any other, from this software.
|
rlm@10
|
8
|
rlm@10
|
9 (ns ^{:doc "DEPRECATED Wrapper of the ForkJoin library (JSR-166)."
|
rlm@10
|
10 :author "Rich Hickey"}
|
rlm@10
|
11 clojure.parallel)
|
rlm@10
|
12 (alias 'parallel 'clojure.parallel)
|
rlm@10
|
13
|
rlm@10
|
14 (comment "
|
rlm@10
|
15 The parallel library wraps the ForkJoin library scheduled for inclusion in JDK 7:
|
rlm@10
|
16
|
rlm@10
|
17 http://gee.cs.oswego.edu/dl/concurrency-interest/index.html
|
rlm@10
|
18
|
rlm@10
|
19 You'll need jsr166y.jar in your classpath in order to use this
|
rlm@10
|
20 library. The basic idea is that Clojure collections, and most
|
rlm@10
|
21 efficiently vectors, can be turned into parallel arrays for use by
|
rlm@10
|
22 this library with the function par, although most of the functions
|
rlm@10
|
23 take collections and will call par if needed, so normally you will
|
rlm@10
|
24 only need to call par explicitly in order to attach bound/filter/map
|
rlm@10
|
25 ops. Parallel arrays support the attachment of bounds, filters and
|
rlm@10
|
26 mapping functions prior to realization/calculation, which happens as
|
rlm@10
|
27 the result of any of several operations on the
|
rlm@10
|
28 array (pvec/psort/pfilter-nils/pfilter-dupes). Rather than perform
|
rlm@10
|
29 composite operations in steps, as would normally be done with
|
rlm@10
|
30 sequences, maps and filters are instead attached and thus composed by
|
rlm@10
|
31 providing ops to par. Note that there is an order sensitivity to the
|
rlm@10
|
32 attachments - bounds precede filters precede mappings. All operations
|
rlm@10
|
33 then happen in parallel, using multiple threads and a sophisticated
|
rlm@10
|
34 work-stealing system supported by fork-join, either when the array is
|
rlm@10
|
35 realized, or to perform aggregate operations like preduce/pmin/pmax
|
rlm@10
|
36 etc. A parallel array can be realized into a Clojure vector using
|
rlm@10
|
37 pvec.
|
rlm@10
|
38 ")
|
rlm@10
|
39
|
rlm@10
|
40 (import '(jsr166y.forkjoin ParallelArray ParallelArrayWithBounds ParallelArrayWithFilter
|
rlm@10
|
41 ParallelArrayWithMapping
|
rlm@10
|
42 Ops$Op Ops$BinaryOp Ops$Reducer Ops$Predicate Ops$BinaryPredicate
|
rlm@10
|
43 Ops$IntAndObjectPredicate Ops$IntAndObjectToObject))
|
rlm@10
|
44
|
rlm@10
|
45 (defn- op [f]
|
rlm@10
|
46 (proxy [Ops$Op] []
|
rlm@10
|
47 (op [x] (f x))))
|
rlm@10
|
48
|
rlm@10
|
49 (defn- binary-op [f]
|
rlm@10
|
50 (proxy [Ops$BinaryOp] []
|
rlm@10
|
51 (op [x y] (f x y))))
|
rlm@10
|
52
|
rlm@10
|
53 (defn- int-and-object-to-object [f]
|
rlm@10
|
54 (proxy [Ops$IntAndObjectToObject] []
|
rlm@10
|
55 (op [i x] (f x i))))
|
rlm@10
|
56
|
rlm@10
|
57 (defn- reducer [f]
|
rlm@10
|
58 (proxy [Ops$Reducer] []
|
rlm@10
|
59 (op [x y] (f x y))))
|
rlm@10
|
60
|
rlm@10
|
61 (defn- predicate [f]
|
rlm@10
|
62 (proxy [Ops$Predicate] []
|
rlm@10
|
63 (op [x] (boolean (f x)))))
|
rlm@10
|
64
|
rlm@10
|
65 (defn- binary-predicate [f]
|
rlm@10
|
66 (proxy [Ops$BinaryPredicate] []
|
rlm@10
|
67 (op [x y] (boolean (f x y)))))
|
rlm@10
|
68
|
rlm@10
|
69 (defn- int-and-object-predicate [f]
|
rlm@10
|
70 (proxy [Ops$IntAndObjectPredicate] []
|
rlm@10
|
71 (op [i x] (boolean (f x i)))))
|
rlm@10
|
72
|
rlm@10
|
73 (defn par
|
rlm@10
|
74 "Creates a parallel array from coll. ops, if supplied, perform
|
rlm@10
|
75 on-the-fly filtering or transformations during parallel realization
|
rlm@10
|
76 or calculation. ops form a chain, and bounds must precede filters,
|
rlm@10
|
77 must precede maps. ops must be a set of keyword value pairs of the
|
rlm@10
|
78 following forms:
|
rlm@10
|
79
|
rlm@10
|
80 :bound [start end]
|
rlm@10
|
81
|
rlm@10
|
82 Only elements from start (inclusive) to end (exclusive) will be
|
rlm@10
|
83 processed when the array is realized.
|
rlm@10
|
84
|
rlm@10
|
85 :filter pred
|
rlm@10
|
86
|
rlm@10
|
87 Filter preds remove elements from processing when the array is realized. pred
|
rlm@10
|
88 must be a function of one argument whose return will be processed
|
rlm@10
|
89 via boolean.
|
rlm@10
|
90
|
rlm@10
|
91 :filter-index pred2
|
rlm@10
|
92
|
rlm@10
|
93 pred2 must be a function of two arguments, which will be an element
|
rlm@10
|
94 of the collection and the corresponding index, whose return will be
|
rlm@10
|
95 processed via boolean.
|
rlm@10
|
96
|
rlm@10
|
97 :filter-with [pred2 coll2]
|
rlm@10
|
98
|
rlm@10
|
99 pred2 must be a function of two arguments, which will be
|
rlm@10
|
100 corresponding elements of the 2 collections.
|
rlm@10
|
101
|
rlm@10
|
102 :map f
|
rlm@10
|
103
|
rlm@10
|
104 Map fns will be used to transform elements when the array is
|
rlm@10
|
105 realized. f must be a function of one argument.
|
rlm@10
|
106
|
rlm@10
|
107 :map-index f2
|
rlm@10
|
108
|
rlm@10
|
109 f2 must be a function of two arguments, which will be an element of
|
rlm@10
|
110 the collection and the corresponding index.
|
rlm@10
|
111
|
rlm@10
|
112 :map-with [f2 coll2]
|
rlm@10
|
113
|
rlm@10
|
114 f2 must be a function of two arguments, which will be corresponding
|
rlm@10
|
115 elements of the 2 collections."
|
rlm@10
|
116
|
rlm@10
|
117 ([coll]
|
rlm@10
|
118 (if (instance? ParallelArrayWithMapping coll)
|
rlm@10
|
119 coll
|
rlm@10
|
120 (. ParallelArray createUsingHandoff
|
rlm@10
|
121 (to-array coll)
|
rlm@10
|
122 (. ParallelArray defaultExecutor))))
|
rlm@10
|
123 ([coll & ops]
|
rlm@10
|
124 (reduce (fn [pa [op args]]
|
rlm@10
|
125 (cond
|
rlm@10
|
126 (= op :bound) (. pa withBounds (args 0) (args 1))
|
rlm@10
|
127 (= op :filter) (. pa withFilter (predicate args))
|
rlm@10
|
128 (= op :filter-with) (. pa withFilter (binary-predicate (args 0)) (par (args 1)))
|
rlm@10
|
129 (= op :filter-index) (. pa withIndexedFilter (int-and-object-predicate args))
|
rlm@10
|
130 (= op :map) (. pa withMapping (parallel/op args))
|
rlm@10
|
131 (= op :map-with) (. pa withMapping (binary-op (args 0)) (par (args 1)))
|
rlm@10
|
132 (= op :map-index) (. pa withIndexedMapping (int-and-object-to-object args))
|
rlm@10
|
133 :else (throw (Exception. (str "Unsupported par op: " op)))))
|
rlm@10
|
134 (par coll)
|
rlm@10
|
135 (partition 2 ops))))
|
rlm@10
|
136
|
rlm@10
|
137 ;;;;;;;;;;;;;;;;;;;;; aggregate operations ;;;;;;;;;;;;;;;;;;;;;;
|
rlm@10
|
138 (defn pany
|
rlm@10
|
139 "Returns some (random) element of the coll if it satisfies the bound/filter/map"
|
rlm@10
|
140 [coll]
|
rlm@10
|
141 (. (par coll) any))
|
rlm@10
|
142
|
rlm@10
|
143 (defn pmax
|
rlm@10
|
144 "Returns the maximum element, presuming Comparable elements, unless
|
rlm@10
|
145 a Comparator comp is supplied"
|
rlm@10
|
146 ([coll] (. (par coll) max))
|
rlm@10
|
147 ([coll comp] (. (par coll) max comp)))
|
rlm@10
|
148
|
rlm@10
|
149 (defn pmin
|
rlm@10
|
150 "Returns the minimum element, presuming Comparable elements, unless
|
rlm@10
|
151 a Comparator comp is supplied"
|
rlm@10
|
152 ([coll] (. (par coll) min))
|
rlm@10
|
153 ([coll comp] (. (par coll) min comp)))
|
rlm@10
|
154
|
rlm@10
|
155 (defn- summary-map [s]
|
rlm@10
|
156 {:min (.min s) :max (.max s) :size (.size s) :min-index (.indexOfMin s) :max-index (.indexOfMax s)})
|
rlm@10
|
157
|
rlm@10
|
158 (defn psummary
|
rlm@10
|
159 "Returns a map of summary statistics (min. max, size, min-index, max-index,
|
rlm@10
|
160 presuming Comparable elements, unless a Comparator comp is supplied"
|
rlm@10
|
161 ([coll] (summary-map (. (par coll) summary)))
|
rlm@10
|
162 ([coll comp] (summary-map (. (par coll) summary comp))))
|
rlm@10
|
163
|
rlm@10
|
164 (defn preduce
|
rlm@10
|
165 "Returns the reduction of the realized elements of coll
|
rlm@10
|
166 using function f. Note f will not necessarily be called
|
rlm@10
|
167 consecutively, and so must be commutative. Also note that
|
rlm@10
|
168 (f base an-element) might be performed many times, i.e. base is not
|
rlm@10
|
169 an initial value as with sequential reduce."
|
rlm@10
|
170 [f base coll]
|
rlm@10
|
171 (. (par coll) (reduce (reducer f) base)))
|
rlm@10
|
172
|
rlm@10
|
173 ;;;;;;;;;;;;;;;;;;;;; collection-producing operations ;;;;;;;;;;;;;;;;;;;;;;
|
rlm@10
|
174
|
rlm@10
|
175 (defn- pa-to-vec [pa]
|
rlm@10
|
176 (vec (. pa getArray)))
|
rlm@10
|
177
|
rlm@10
|
178 (defn- pall
|
rlm@10
|
179 "Realizes a copy of the coll as a parallel array, with any bounds/filters/maps applied"
|
rlm@10
|
180 [coll]
|
rlm@10
|
181 (if (instance? ParallelArrayWithMapping coll)
|
rlm@10
|
182 (. coll all)
|
rlm@10
|
183 (par coll)))
|
rlm@10
|
184
|
rlm@10
|
185 (defn pvec
|
rlm@10
|
186 "Returns the realized contents of the parallel array pa as a Clojure vector"
|
rlm@10
|
187 [pa] (pa-to-vec (pall pa)))
|
rlm@10
|
188
|
rlm@10
|
189 (defn pdistinct
|
rlm@10
|
190 "Returns a parallel array of the distinct elements of coll"
|
rlm@10
|
191 [coll]
|
rlm@10
|
192 (pa-to-vec (. (pall coll) allUniqueElements)))
|
rlm@10
|
193
|
rlm@10
|
194 ;this doesn't work, passes null to reducer?
|
rlm@10
|
195 (defn- pcumulate [coll f init]
|
rlm@10
|
196 (.. (pall coll) (precumulate (reducer f) init)))
|
rlm@10
|
197
|
rlm@10
|
198 (defn psort
|
rlm@10
|
199 "Returns a new vector consisting of the realized items in coll, sorted,
|
rlm@10
|
200 presuming Comparable elements, unless a Comparator comp is supplied"
|
rlm@10
|
201 ([coll] (pa-to-vec (. (pall coll) sort)))
|
rlm@10
|
202 ([coll comp] (pa-to-vec (. (pall coll) sort comp))))
|
rlm@10
|
203
|
rlm@10
|
204 (defn pfilter-nils
|
rlm@10
|
205 "Returns a vector containing the non-nil (realized) elements of coll"
|
rlm@10
|
206 [coll]
|
rlm@10
|
207 (pa-to-vec (. (pall coll) removeNulls)))
|
rlm@10
|
208
|
rlm@10
|
209 (defn pfilter-dupes
|
rlm@10
|
210 "Returns a vector containing the (realized) elements of coll,
|
rlm@10
|
211 without any consecutive duplicates"
|
rlm@10
|
212 [coll]
|
rlm@10
|
213 (pa-to-vec (. (pall coll) removeConsecutiveDuplicates)))
|
rlm@10
|
214
|
rlm@10
|
215
|
rlm@10
|
216 (comment
|
rlm@10
|
217 (load-file "src/parallel.clj")
|
rlm@10
|
218 (refer 'parallel)
|
rlm@10
|
219 (pdistinct [1 2 3 2 1])
|
rlm@10
|
220 ;(pcumulate [1 2 3 2 1] + 0) ;broken, not exposed
|
rlm@10
|
221 (def a (make-array Object 1000000))
|
rlm@10
|
222 (dotimes i (count a)
|
rlm@10
|
223 (aset a i (rand-int i)))
|
rlm@10
|
224 (time (reduce + 0 a))
|
rlm@10
|
225 (time (preduce + 0 a))
|
rlm@10
|
226 (time (count (distinct a)))
|
rlm@10
|
227 (time (count (pdistinct a)))
|
rlm@10
|
228
|
rlm@10
|
229 (preduce + 0 [1 2 3 2 1])
|
rlm@10
|
230 (preduce + 0 (psort a))
|
rlm@10
|
231 (pvec (par [11 2 3 2] :filter-index (fn [x i] (> i x))))
|
rlm@10
|
232 (pvec (par [11 2 3 2] :filter-with [(fn [x y] (> y x)) [110 2 33 2]]))
|
rlm@10
|
233
|
rlm@10
|
234 (psummary ;or pvec/pmax etc
|
rlm@10
|
235 (par [11 2 3 2]
|
rlm@10
|
236 :filter-with [(fn [x y] (> y x))
|
rlm@10
|
237 [110 2 33 2]]
|
rlm@10
|
238 :map #(* % 2)))
|
rlm@10
|
239
|
rlm@10
|
240 (preduce + 0
|
rlm@10
|
241 (par [11 2 3 2]
|
rlm@10
|
242 :filter-with [< [110 2 33 2]]))
|
rlm@10
|
243
|
rlm@10
|
244 (time (reduce + 0 (map #(* % %) (range 1000000))))
|
rlm@10
|
245 (time (preduce + 0 (par (range 1000000) :map-index *)))
|
rlm@10
|
246 (def v (range 1000000))
|
rlm@10
|
247 (time (preduce + 0 (par v :map-index *)))
|
rlm@10
|
248 (time (preduce + 0 (par v :map #(* % %))))
|
rlm@10
|
249 (time (reduce + 0 (map #(* % %) v)))
|
rlm@10
|
250 ) |