diff src/clojure/parallel.clj @ 10:ef7dbbd6452c

added clojure source goodness
author Robert McIntyre <rlm@mit.edu>
date Sat, 21 Aug 2010 06:25:44 -0400
parents
children
line wrap: on
line diff
     1.1 --- /dev/null	Thu Jan 01 00:00:00 1970 +0000
     1.2 +++ b/src/clojure/parallel.clj	Sat Aug 21 06:25:44 2010 -0400
     1.3 @@ -0,0 +1,250 @@
     1.4 +;   Copyright (c) Rich Hickey. All rights reserved.
     1.5 +;   The use and distribution terms for this software are covered by the
     1.6 +;   Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)
     1.7 +;   which can be found in the file epl-v10.html at the root of this distribution.
     1.8 +;   By using this software in any fashion, you are agreeing to be bound by
     1.9 +;   the terms of this license.
    1.10 +;   You must not remove this notice, or any other, from this software.
    1.11 +
    1.12 +(ns ^{:doc "DEPRECATED Wrapper of the ForkJoin library (JSR-166)."
    1.13 +       :author "Rich Hickey"}
    1.14 +    clojure.parallel)
    1.15 +(alias 'parallel 'clojure.parallel)
    1.16 +
    1.17 +(comment "
    1.18 +The parallel library wraps the ForkJoin library scheduled for inclusion in JDK 7:
    1.19 +
    1.20 +http://gee.cs.oswego.edu/dl/concurrency-interest/index.html
    1.21 +
    1.22 +You'll need jsr166y.jar in your classpath in order to use this
    1.23 +library.  The basic idea is that Clojure collections, and most
    1.24 +efficiently vectors, can be turned into parallel arrays for use by
    1.25 +this library with the function par, although most of the functions
    1.26 +take collections and will call par if needed, so normally you will
    1.27 +only need to call par explicitly in order to attach bound/filter/map
    1.28 +ops. Parallel arrays support the attachment of bounds, filters and
    1.29 +mapping functions prior to realization/calculation, which happens as
    1.30 +the result of any of several operations on the
    1.31 +array (pvec/psort/pfilter-nils/pfilter-dupes). Rather than perform
    1.32 +composite operations in steps, as would normally be done with
    1.33 +sequences, maps and filters are instead attached and thus composed by
    1.34 +providing ops to par. Note that there is an order sensitivity to the
    1.35 +attachments - bounds precede filters precede mappings.  All operations
    1.36 +then happen in parallel, using multiple threads and a sophisticated
    1.37 +work-stealing system supported by fork-join, either when the array is
    1.38 +realized, or to perform aggregate operations like preduce/pmin/pmax
    1.39 +etc. A parallel array can be realized into a Clojure vector using
    1.40 +pvec.
    1.41 +")
    1.42 +
    1.43 +(import '(jsr166y.forkjoin ParallelArray ParallelArrayWithBounds ParallelArrayWithFilter 
    1.44 +                           ParallelArrayWithMapping 
    1.45 +                           Ops$Op Ops$BinaryOp Ops$Reducer Ops$Predicate Ops$BinaryPredicate 
    1.46 +                           Ops$IntAndObjectPredicate Ops$IntAndObjectToObject))
    1.47 +
    1.48 +(defn- op [f]
    1.49 +  (proxy [Ops$Op] []
    1.50 +    (op [x] (f x))))
    1.51 +
    1.52 +(defn- binary-op [f]
    1.53 +  (proxy [Ops$BinaryOp] []
    1.54 +    (op [x y] (f x y))))
    1.55 +
    1.56 +(defn- int-and-object-to-object [f]
    1.57 +  (proxy [Ops$IntAndObjectToObject] []
    1.58 +    (op [i x] (f x i))))
    1.59 +
    1.60 +(defn- reducer [f]
    1.61 +  (proxy [Ops$Reducer] []
    1.62 +    (op [x y] (f x y))))
    1.63 +
    1.64 +(defn- predicate [f]
    1.65 +  (proxy [Ops$Predicate] []
    1.66 +    (op [x] (boolean (f x)))))
    1.67 +
    1.68 +(defn- binary-predicate [f]
    1.69 +  (proxy [Ops$BinaryPredicate] []
    1.70 +    (op [x y] (boolean (f x y)))))
    1.71 +
    1.72 +(defn- int-and-object-predicate [f]
    1.73 +  (proxy [Ops$IntAndObjectPredicate] []
    1.74 +    (op [i x] (boolean (f x i)))))
    1.75 +
    1.76 +(defn par
    1.77 +  "Creates a parallel array from coll. ops, if supplied, perform
    1.78 +  on-the-fly filtering or transformations during parallel realization
    1.79 +  or calculation. ops form a chain, and bounds must precede filters,
    1.80 +  must precede maps. ops must be a set of keyword value pairs of the
    1.81 +  following forms:
    1.82 +
    1.83 +     :bound [start end] 
    1.84 +
    1.85 +  Only elements from start (inclusive) to end (exclusive) will be
    1.86 +  processed when the array is realized.
    1.87 +
    1.88 +     :filter pred 
    1.89 +
    1.90 +  Filter preds remove elements from processing when the array is realized. pred
    1.91 +  must be a function of one argument whose return will be processed
    1.92 +  via boolean.
    1.93 +
    1.94 +     :filter-index pred2 
    1.95 +
    1.96 +  pred2 must be a function of two arguments, which will be an element
    1.97 +  of the collection and the corresponding index, whose return will be
    1.98 +  processed via boolean.
    1.99 +
   1.100 +     :filter-with [pred2 coll2] 
   1.101 +
   1.102 +  pred2 must be a function of two arguments, which will be
   1.103 +  corresponding elements of the 2 collections.
   1.104 +
   1.105 +     :map f 
   1.106 +
   1.107 +  Map fns will be used to transform elements when the array is
   1.108 +  realized. f must be a function of one argument.
   1.109 +
   1.110 +     :map-index f2 
   1.111 +
   1.112 +  f2 must be a function of two arguments, which will be an element of
   1.113 +  the collection and the corresponding index.
   1.114 +
   1.115 +     :map-with [f2 coll2]
   1.116 +
   1.117 +  f2 must be a function of two arguments, which will be corresponding
   1.118 +  elements of the 2 collections."
   1.119 +
   1.120 +  ([coll] 
   1.121 +     (if (instance? ParallelArrayWithMapping coll)
   1.122 +       coll
   1.123 +       (. ParallelArray createUsingHandoff  
   1.124 +        (to-array coll) 
   1.125 +        (. ParallelArray defaultExecutor))))
   1.126 +  ([coll & ops]
   1.127 +     (reduce (fn [pa [op args]] 
   1.128 +                 (cond
   1.129 +                  (= op :bound) (. pa withBounds (args 0) (args 1))
   1.130 +                  (= op :filter) (. pa withFilter (predicate args))
   1.131 +                  (= op :filter-with) (. pa withFilter (binary-predicate (args 0)) (par (args 1)))
   1.132 +                  (= op :filter-index) (. pa withIndexedFilter (int-and-object-predicate args))
   1.133 +                  (= op :map) (. pa withMapping (parallel/op args))
   1.134 +                  (= op :map-with) (. pa withMapping (binary-op (args 0)) (par (args 1)))
   1.135 +                  (= op :map-index) (. pa withIndexedMapping (int-and-object-to-object args))
   1.136 +                  :else (throw (Exception. (str "Unsupported par op: " op)))))
   1.137 +             (par coll) 
   1.138 +             (partition 2 ops))))
   1.139 +
   1.140 +;;;;;;;;;;;;;;;;;;;;; aggregate operations ;;;;;;;;;;;;;;;;;;;;;;
   1.141 +(defn pany
   1.142 +  "Returns some (random) element of the coll if it satisfies the bound/filter/map"
   1.143 +  [coll] 
   1.144 +  (. (par coll) any))
   1.145 +
   1.146 +(defn pmax
   1.147 +  "Returns the maximum element, presuming Comparable elements, unless
   1.148 +  a Comparator comp is supplied"
   1.149 +  ([coll] (. (par coll) max))
   1.150 +  ([coll comp] (. (par coll) max comp)))
   1.151 +
   1.152 +(defn pmin
   1.153 +  "Returns the minimum element, presuming Comparable elements, unless
   1.154 +  a Comparator comp is supplied"
   1.155 +  ([coll] (. (par coll) min))
   1.156 +  ([coll comp] (. (par coll) min comp)))
   1.157 +
   1.158 +(defn- summary-map [s]
   1.159 +  {:min (.min s) :max (.max s) :size (.size s) :min-index (.indexOfMin s) :max-index (.indexOfMax s)})
   1.160 +
   1.161 +(defn psummary 
   1.162 +  "Returns a map of summary statistics (min. max, size, min-index, max-index, 
   1.163 +  presuming Comparable elements, unless a Comparator comp is supplied"
   1.164 +  ([coll] (summary-map (. (par coll) summary)))
   1.165 +  ([coll comp] (summary-map (. (par coll) summary comp))))
   1.166 +
   1.167 +(defn preduce 
   1.168 +  "Returns the reduction of the realized elements of coll
   1.169 +  using function f. Note f will not necessarily be called
   1.170 +  consecutively, and so must be commutative. Also note that 
   1.171 +  (f base an-element) might be performed many times, i.e. base is not
   1.172 +  an initial value as with sequential reduce."
   1.173 +  [f base coll]
   1.174 +  (. (par coll) (reduce (reducer f) base)))
   1.175 +
   1.176 +;;;;;;;;;;;;;;;;;;;;; collection-producing operations ;;;;;;;;;;;;;;;;;;;;;;
   1.177 +
   1.178 +(defn- pa-to-vec [pa]
   1.179 +  (vec (. pa getArray)))
   1.180 +
   1.181 +(defn- pall
   1.182 +  "Realizes a copy of the coll as a parallel array, with any bounds/filters/maps applied"
   1.183 +  [coll]
   1.184 +  (if (instance? ParallelArrayWithMapping coll)
   1.185 +    (. coll all)
   1.186 +    (par coll)))
   1.187 +
   1.188 +(defn pvec 
   1.189 +  "Returns the realized contents of the parallel array pa as a Clojure vector"
   1.190 +  [pa] (pa-to-vec (pall pa)))
   1.191 +
   1.192 +(defn pdistinct
   1.193 +  "Returns a parallel array of the distinct elements of coll"
   1.194 +  [coll]
   1.195 +  (pa-to-vec (. (pall coll) allUniqueElements)))
   1.196 +
   1.197 +;this doesn't work, passes null to reducer?
   1.198 +(defn- pcumulate [coll f init]
   1.199 +  (.. (pall coll) (precumulate (reducer f) init)))
   1.200 +
   1.201 +(defn psort 
   1.202 +  "Returns a new vector consisting of the realized items in coll, sorted, 
   1.203 +  presuming Comparable elements, unless a Comparator comp is supplied"
   1.204 +  ([coll] (pa-to-vec (. (pall coll) sort)))
   1.205 +  ([coll comp] (pa-to-vec (. (pall coll) sort comp))))
   1.206 +
   1.207 +(defn pfilter-nils
   1.208 +  "Returns a vector containing the non-nil (realized) elements of coll"
   1.209 +  [coll]
   1.210 +  (pa-to-vec (. (pall coll) removeNulls)))
   1.211 +
   1.212 +(defn pfilter-dupes 
   1.213 +  "Returns a vector containing the (realized) elements of coll, 
   1.214 +  without any consecutive duplicates"
   1.215 +  [coll]
   1.216 +  (pa-to-vec (. (pall coll) removeConsecutiveDuplicates)))
   1.217 +
   1.218 +
   1.219 +(comment
   1.220 +(load-file "src/parallel.clj")
   1.221 +(refer 'parallel)
   1.222 +(pdistinct [1 2 3 2 1])
   1.223 +;(pcumulate [1 2 3 2 1] + 0) ;broken, not exposed
   1.224 +(def a (make-array Object 1000000))
   1.225 +(dotimes i (count a)
   1.226 +  (aset a i (rand-int i)))
   1.227 +(time (reduce + 0 a))
   1.228 +(time (preduce + 0 a))
   1.229 +(time (count (distinct a)))
   1.230 +(time (count (pdistinct a)))
   1.231 +
   1.232 +(preduce + 0 [1 2 3 2 1])
   1.233 +(preduce + 0 (psort a))
   1.234 +(pvec (par [11 2 3 2] :filter-index (fn [x i] (> i x))))
   1.235 +(pvec (par [11 2 3 2] :filter-with [(fn [x y] (> y x)) [110 2 33 2]]))
   1.236 +
   1.237 +(psummary ;or pvec/pmax etc
   1.238 + (par [11 2 3 2] 
   1.239 +      :filter-with [(fn [x y] (> y x)) 
   1.240 +                    [110 2 33 2]]
   1.241 +      :map #(* % 2)))
   1.242 +
   1.243 +(preduce + 0
   1.244 +  (par [11 2 3 2] 
   1.245 +       :filter-with [< [110 2 33 2]]))
   1.246 +
   1.247 +(time (reduce + 0 (map #(* % %) (range 1000000))))
   1.248 +(time (preduce + 0 (par (range 1000000) :map-index *)))
   1.249 +(def v (range 1000000))
   1.250 +(time (preduce + 0 (par v :map-index *)))
   1.251 +(time (preduce + 0 (par v :map  #(* % %))))
   1.252 +(time (reduce + 0 (map #(* % %) v)))
   1.253 +)
   1.254 \ No newline at end of file