view src/clojure/parallel.clj @ 10:ef7dbbd6452c

added clojure source goodness
author Robert McIntyre <rlm@mit.edu>
date Sat, 21 Aug 2010 06:25:44 -0400
parents
children
line wrap: on
line source
1 ; Copyright (c) Rich Hickey. All rights reserved.
2 ; The use and distribution terms for this software are covered by the
3 ; Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)
4 ; which can be found in the file epl-v10.html at the root of this distribution.
5 ; By using this software in any fashion, you are agreeing to be bound by
6 ; the terms of this license.
7 ; You must not remove this notice, or any other, from this software.
9 (ns ^{:doc "DEPRECATED Wrapper of the ForkJoin library (JSR-166)."
10 :author "Rich Hickey"}
11 clojure.parallel)
12 (alias 'parallel 'clojure.parallel)
14 (comment "
15 The parallel library wraps the ForkJoin library scheduled for inclusion in JDK 7:
17 http://gee.cs.oswego.edu/dl/concurrency-interest/index.html
19 You'll need jsr166y.jar in your classpath in order to use this
20 library. The basic idea is that Clojure collections, and most
21 efficiently vectors, can be turned into parallel arrays for use by
22 this library with the function par, although most of the functions
23 take collections and will call par if needed, so normally you will
24 only need to call par explicitly in order to attach bound/filter/map
25 ops. Parallel arrays support the attachment of bounds, filters and
26 mapping functions prior to realization/calculation, which happens as
27 the result of any of several operations on the
28 array (pvec/psort/pfilter-nils/pfilter-dupes). Rather than perform
29 composite operations in steps, as would normally be done with
30 sequences, maps and filters are instead attached and thus composed by
31 providing ops to par. Note that there is an order sensitivity to the
32 attachments - bounds precede filters precede mappings. All operations
33 then happen in parallel, using multiple threads and a sophisticated
34 work-stealing system supported by fork-join, either when the array is
35 realized, or to perform aggregate operations like preduce/pmin/pmax
36 etc. A parallel array can be realized into a Clojure vector using
37 pvec.
38 ")
40 (import '(jsr166y.forkjoin ParallelArray ParallelArrayWithBounds ParallelArrayWithFilter
41 ParallelArrayWithMapping
42 Ops$Op Ops$BinaryOp Ops$Reducer Ops$Predicate Ops$BinaryPredicate
43 Ops$IntAndObjectPredicate Ops$IntAndObjectToObject))
45 (defn- op [f]
46 (proxy [Ops$Op] []
47 (op [x] (f x))))
49 (defn- binary-op [f]
50 (proxy [Ops$BinaryOp] []
51 (op [x y] (f x y))))
53 (defn- int-and-object-to-object [f]
54 (proxy [Ops$IntAndObjectToObject] []
55 (op [i x] (f x i))))
57 (defn- reducer [f]
58 (proxy [Ops$Reducer] []
59 (op [x y] (f x y))))
61 (defn- predicate [f]
62 (proxy [Ops$Predicate] []
63 (op [x] (boolean (f x)))))
65 (defn- binary-predicate [f]
66 (proxy [Ops$BinaryPredicate] []
67 (op [x y] (boolean (f x y)))))
69 (defn- int-and-object-predicate [f]
70 (proxy [Ops$IntAndObjectPredicate] []
71 (op [i x] (boolean (f x i)))))
73 (defn par
74 "Creates a parallel array from coll. ops, if supplied, perform
75 on-the-fly filtering or transformations during parallel realization
76 or calculation. ops form a chain, and bounds must precede filters,
77 must precede maps. ops must be a set of keyword value pairs of the
78 following forms:
80 :bound [start end]
82 Only elements from start (inclusive) to end (exclusive) will be
83 processed when the array is realized.
85 :filter pred
87 Filter preds remove elements from processing when the array is realized. pred
88 must be a function of one argument whose return will be processed
89 via boolean.
91 :filter-index pred2
93 pred2 must be a function of two arguments, which will be an element
94 of the collection and the corresponding index, whose return will be
95 processed via boolean.
97 :filter-with [pred2 coll2]
99 pred2 must be a function of two arguments, which will be
100 corresponding elements of the 2 collections.
102 :map f
104 Map fns will be used to transform elements when the array is
105 realized. f must be a function of one argument.
107 :map-index f2
109 f2 must be a function of two arguments, which will be an element of
110 the collection and the corresponding index.
112 :map-with [f2 coll2]
114 f2 must be a function of two arguments, which will be corresponding
115 elements of the 2 collections."
117 ([coll]
118 (if (instance? ParallelArrayWithMapping coll)
119 coll
120 (. ParallelArray createUsingHandoff
121 (to-array coll)
122 (. ParallelArray defaultExecutor))))
123 ([coll & ops]
124 (reduce (fn [pa [op args]]
125 (cond
126 (= op :bound) (. pa withBounds (args 0) (args 1))
127 (= op :filter) (. pa withFilter (predicate args))
128 (= op :filter-with) (. pa withFilter (binary-predicate (args 0)) (par (args 1)))
129 (= op :filter-index) (. pa withIndexedFilter (int-and-object-predicate args))
130 (= op :map) (. pa withMapping (parallel/op args))
131 (= op :map-with) (. pa withMapping (binary-op (args 0)) (par (args 1)))
132 (= op :map-index) (. pa withIndexedMapping (int-and-object-to-object args))
133 :else (throw (Exception. (str "Unsupported par op: " op)))))
134 (par coll)
135 (partition 2 ops))))
137 ;;;;;;;;;;;;;;;;;;;;; aggregate operations ;;;;;;;;;;;;;;;;;;;;;;
138 (defn pany
139 "Returns some (random) element of the coll if it satisfies the bound/filter/map"
140 [coll]
141 (. (par coll) any))
143 (defn pmax
144 "Returns the maximum element, presuming Comparable elements, unless
145 a Comparator comp is supplied"
146 ([coll] (. (par coll) max))
147 ([coll comp] (. (par coll) max comp)))
149 (defn pmin
150 "Returns the minimum element, presuming Comparable elements, unless
151 a Comparator comp is supplied"
152 ([coll] (. (par coll) min))
153 ([coll comp] (. (par coll) min comp)))
155 (defn- summary-map [s]
156 {:min (.min s) :max (.max s) :size (.size s) :min-index (.indexOfMin s) :max-index (.indexOfMax s)})
158 (defn psummary
159 "Returns a map of summary statistics (min. max, size, min-index, max-index,
160 presuming Comparable elements, unless a Comparator comp is supplied"
161 ([coll] (summary-map (. (par coll) summary)))
162 ([coll comp] (summary-map (. (par coll) summary comp))))
164 (defn preduce
165 "Returns the reduction of the realized elements of coll
166 using function f. Note f will not necessarily be called
167 consecutively, and so must be commutative. Also note that
168 (f base an-element) might be performed many times, i.e. base is not
169 an initial value as with sequential reduce."
170 [f base coll]
171 (. (par coll) (reduce (reducer f) base)))
173 ;;;;;;;;;;;;;;;;;;;;; collection-producing operations ;;;;;;;;;;;;;;;;;;;;;;
175 (defn- pa-to-vec [pa]
176 (vec (. pa getArray)))
178 (defn- pall
179 "Realizes a copy of the coll as a parallel array, with any bounds/filters/maps applied"
180 [coll]
181 (if (instance? ParallelArrayWithMapping coll)
182 (. coll all)
183 (par coll)))
185 (defn pvec
186 "Returns the realized contents of the parallel array pa as a Clojure vector"
187 [pa] (pa-to-vec (pall pa)))
189 (defn pdistinct
190 "Returns a parallel array of the distinct elements of coll"
191 [coll]
192 (pa-to-vec (. (pall coll) allUniqueElements)))
194 ;this doesn't work, passes null to reducer?
195 (defn- pcumulate [coll f init]
196 (.. (pall coll) (precumulate (reducer f) init)))
198 (defn psort
199 "Returns a new vector consisting of the realized items in coll, sorted,
200 presuming Comparable elements, unless a Comparator comp is supplied"
201 ([coll] (pa-to-vec (. (pall coll) sort)))
202 ([coll comp] (pa-to-vec (. (pall coll) sort comp))))
204 (defn pfilter-nils
205 "Returns a vector containing the non-nil (realized) elements of coll"
206 [coll]
207 (pa-to-vec (. (pall coll) removeNulls)))
209 (defn pfilter-dupes
210 "Returns a vector containing the (realized) elements of coll,
211 without any consecutive duplicates"
212 [coll]
213 (pa-to-vec (. (pall coll) removeConsecutiveDuplicates)))
216 (comment
217 (load-file "src/parallel.clj")
218 (refer 'parallel)
219 (pdistinct [1 2 3 2 1])
220 ;(pcumulate [1 2 3 2 1] + 0) ;broken, not exposed
221 (def a (make-array Object 1000000))
222 (dotimes i (count a)
223 (aset a i (rand-int i)))
224 (time (reduce + 0 a))
225 (time (preduce + 0 a))
226 (time (count (distinct a)))
227 (time (count (pdistinct a)))
229 (preduce + 0 [1 2 3 2 1])
230 (preduce + 0 (psort a))
231 (pvec (par [11 2 3 2] :filter-index (fn [x i] (> i x))))
232 (pvec (par [11 2 3 2] :filter-with [(fn [x y] (> y x)) [110 2 33 2]]))
234 (psummary ;or pvec/pmax etc
235 (par [11 2 3 2]
236 :filter-with [(fn [x y] (> y x))
237 [110 2 33 2]]
238 :map #(* % 2)))
240 (preduce + 0
241 (par [11 2 3 2]
242 :filter-with [< [110 2 33 2]]))
244 (time (reduce + 0 (map #(* % %) (range 1000000))))
245 (time (preduce + 0 (par (range 1000000) :map-index *)))
246 (def v (range 1000000))
247 (time (preduce + 0 (par v :map-index *)))
248 (time (preduce + 0 (par v :map #(* % %))))
249 (time (reduce + 0 (map #(* % %) v)))