annotate src/clojure/contrib/stream_utils.clj @ 10:ef7dbbd6452c

added clojure source goodness
author Robert McIntyre <rlm@mit.edu>
date Sat, 21 Aug 2010 06:25:44 -0400
parents
children
rev   line source
rlm@10 1 ;; Stream utilities
rlm@10 2
rlm@10 3 ;; by Konrad Hinsen
rlm@10 4 ;; last updated May 3, 2009
rlm@10 5
rlm@10 6 ;; Copyright (c) Konrad Hinsen, 2009. All rights reserved. The use
rlm@10 7 ;; and distribution terms for this software are covered by the Eclipse
rlm@10 8 ;; Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)
rlm@10 9 ;; which can be found in the file epl-v10.html at the root of this
rlm@10 10 ;; distribution. By using this software in any fashion, you are
rlm@10 11 ;; agreeing to be bound by the terms of this license. You must not
rlm@10 12 ;; remove this notice, or any other, from this software.
rlm@10 13
rlm@10 14 (ns
rlm@10 15 ^{:author "Konrad Hinsen"
rlm@10 16 :doc "Functions for setting up computational pipelines via data streams.
rlm@10 17
rlm@10 18 NOTE: This library is experimental. It may change significantly
rlm@10 19 with future release.
rlm@10 20
rlm@10 21 This library defines:
rlm@10 22 - an abstract stream type, whose interface consists of the
rlm@10 23 multimethod stream-next
rlm@10 24 - a macro for implementing streams
rlm@10 25 - implementations of stream for
rlm@10 26 1) Clojure sequences, and vectors
rlm@10 27 2) nil, representing an empty stream
rlm@10 28 - tools for writing stream transformers, including the
rlm@10 29 monad stream-m
rlm@10 30 - various utility functions for working with streams
rlm@10 31
rlm@10 32 Streams are building blocks in the construction of computational
rlm@10 33 pipelines. A stream is represented by its current state plus
rlm@10 34 a function that takes a stream state and obtains the next item
rlm@10 35 in the stream as well as the new stream state. The state is
rlm@10 36 implemented as a Java class or a Clojure type (as defined by the
rlm@10 37 function clojure.core/type), and the function is provided as an
rlm@10 38 implementation of the multimethod stream-next for this class or type.
rlm@10 39
rlm@10 40 While setting up pipelines using this mechanism is somewhat more
rlm@10 41 cumbersome than using Clojure's lazy seq mechanisms, there are a
rlm@10 42 few advantages:
rlm@10 43 - The state of a stream can be stored in any Clojure data structure,
rlm@10 44 and the stream can be re-generated from it any number of times.
rlm@10 45 Any number of states can be stored this way.
rlm@10 46 - The elements of the stream are never cached, so keeping a reference
rlm@10 47 to a stream state does not incur an uncontrollable memory penalty.
rlm@10 48
rlm@10 49 Note that the stream mechanism is thread-safe as long as the
rlm@10 50 concrete stream implementations do not use any mutable state.
rlm@10 51
rlm@10 52 Stream transformers take any number of input streams and produce one
rlm@10 53 output stream. They are typically written using the stream-m
rlm@10 54 monad. In the definition of a stream transformer, (pick s) returns
rlm@10 55 the next value of stream argument s, whereas pick-all returns the
rlm@10 56 next value of all stream arguments in the form of a vector."}
rlm@10 57 clojure.contrib.stream-utils
rlm@10 58 (:refer-clojure :exclude (deftype))
rlm@10 59 (:use [clojure.contrib.types :only (deftype deftype-)])
rlm@10 60 (:use [clojure.contrib.monads :only (defmonad with-monad)])
rlm@10 61 (:use [clojure.contrib.def :only (defvar defvar-)])
rlm@10 62 (:require [clojure.contrib.seq])
rlm@10 63 (:require [clojure.contrib.generic.collection]))
rlm@10 64
rlm@10 65
rlm@10 66 ;
rlm@10 67 ; Stream type and interface
rlm@10 68 ;
rlm@10 69 (defvar stream-type ::stream
rlm@10 70 "The root type for the stream hierarchy. For each stream type,
rlm@10 71 add a derivation from this type.")
rlm@10 72
rlm@10 73 (defmacro defstream
rlm@10 74 "Define object of the given type as a stream whose implementation
rlm@10 75 of stream-next is defined by args and body. This macro adds
rlm@10 76 a type-specific method for stream-next and derives type
rlm@10 77 from stream-type."
rlm@10 78 [type-tag args & body]
rlm@10 79 `(do
rlm@10 80 (derive ~type-tag stream-type)
rlm@10 81 (defmethod stream-next ~type-tag ~args ~@body)))
rlm@10 82
rlm@10 83 (defvar- stream-skip ::skip
rlm@10 84 "The skip-this-item value.")
rlm@10 85
rlm@10 86 (defn- stream-skip?
rlm@10 87 "Returns true if x is the stream-skip."
rlm@10 88 [x]
rlm@10 89 (identical? x stream-skip))
rlm@10 90
rlm@10 91 (defmulti stream-next
rlm@10 92 "Returns a vector [next-value new-state] where next-value is the next
rlm@10 93 item in the data stream defined by stream-state and new-state
rlm@10 94 is the new state of the stream. At the end of the stream,
rlm@10 95 next-value and new-state are nil."
rlm@10 96 {:arglists '([stream-state])}
rlm@10 97 type)
rlm@10 98
rlm@10 99 (defmethod stream-next nil
rlm@10 100 [s]
rlm@10 101 [nil nil])
rlm@10 102
rlm@10 103 (defmethod stream-next clojure.lang.ISeq
rlm@10 104 [s]
rlm@10 105 (if (seq s)
rlm@10 106 [(first s) (rest s)]
rlm@10 107 [nil nil]))
rlm@10 108
rlm@10 109 (defmethod stream-next clojure.lang.IPersistentVector
rlm@10 110 [v]
rlm@10 111 (stream-next (seq v)))
rlm@10 112
rlm@10 113 (defn stream-seq
rlm@10 114 "Return a lazy seq on the stream. Also accessible via
rlm@10 115 clojure.contrib.seq/seq-on and
rlm@10 116 clojure.contrib.generic.collection/seq for streams."
rlm@10 117 [s]
rlm@10 118 (lazy-seq
rlm@10 119 (let [[v ns] (stream-next s)]
rlm@10 120 (if (nil? ns)
rlm@10 121 nil
rlm@10 122 (cons v (stream-seq ns))))))
rlm@10 123
rlm@10 124 (defmethod clojure.contrib.seq/seq-on stream-type
rlm@10 125 [s]
rlm@10 126 (stream-seq s))
rlm@10 127
rlm@10 128 (defmethod clojure.contrib.generic.collection/seq stream-type
rlm@10 129 [s]
rlm@10 130 (stream-seq s))
rlm@10 131
rlm@10 132 ;
rlm@10 133 ; Stream transformers
rlm@10 134 ;
rlm@10 135 (defmonad stream-m
rlm@10 136 "Monad describing stream computations. The monadic values can be
rlm@10 137 of any type handled by stream-next."
rlm@10 138 [m-result (fn m-result-stream [v]
rlm@10 139 (fn [s] [v s]))
rlm@10 140 m-bind (fn m-bind-stream [mv f]
rlm@10 141 (fn [s]
rlm@10 142 (let [[v ss :as r] (mv s)]
rlm@10 143 (if (or (nil? ss) (stream-skip? v))
rlm@10 144 r
rlm@10 145 ((f v) ss)))))
rlm@10 146 m-zero (fn [s] [stream-skip s])
rlm@10 147 ])
rlm@10 148
rlm@10 149 (defn pick
rlm@10 150 "Return the next value of stream argument n inside a stream
rlm@10 151 transformer. When used inside of defst, the name of the stream
rlm@10 152 argument can be used instead of its index n."
rlm@10 153 [n]
rlm@10 154 (fn [streams]
rlm@10 155 (let [[v ns] (stream-next (streams n))]
rlm@10 156 (if (nil? ns)
rlm@10 157 [nil nil]
rlm@10 158 [v (assoc streams n ns)]))))
rlm@10 159
rlm@10 160 (defn pick-all
rlm@10 161 "Return a vector containing the next value of each stream argument
rlm@10 162 inside a stream transformer."
rlm@10 163 [streams]
rlm@10 164 (let [next (map stream-next streams)
rlm@10 165 values (map first next)
rlm@10 166 streams (vec (map second next))]
rlm@10 167 (if (some nil? streams)
rlm@10 168 [nil nil]
rlm@10 169 [values streams])))
rlm@10 170
rlm@10 171 (deftype ::stream-transformer st-as-stream
rlm@10 172 (fn [st streams] [st streams])
rlm@10 173 seq)
rlm@10 174
rlm@10 175 (defstream ::stream-transformer
rlm@10 176 [[st streams]]
rlm@10 177 (loop [s streams]
rlm@10 178 (let [[v ns] (st s)]
rlm@10 179 (cond (nil? ns) [nil nil]
rlm@10 180 (stream-skip? v) (recur ns)
rlm@10 181 :else [v (st-as-stream st ns)]))))
rlm@10 182
rlm@10 183 (defmacro defst
rlm@10 184 "Define the stream transformer name by body.
rlm@10 185 The non-stream arguments args and the stream arguments streams
rlm@10 186 are given separately, with args being possibly empty."
rlm@10 187 [name args streams & body]
rlm@10 188 (if (= (first streams) '&)
rlm@10 189 `(defn ~name ~(vec (concat args streams))
rlm@10 190 (let [~'st (with-monad stream-m ~@body)]
rlm@10 191 (st-as-stream ~'st ~(second streams))))
rlm@10 192 `(defn ~name ~(vec (concat args streams))
rlm@10 193 (let [~'st (with-monad stream-m
rlm@10 194 (let [~streams (range ~(count streams))]
rlm@10 195 ~@body))]
rlm@10 196 (st-as-stream ~'st ~streams)))))
rlm@10 197
rlm@10 198 ;
rlm@10 199 ; Stream utilities
rlm@10 200 ;
rlm@10 201 (defn stream-drop
rlm@10 202 "Return a stream containing all but the first n elements of stream."
rlm@10 203 [n stream]
rlm@10 204 (if (zero? n)
rlm@10 205 stream
rlm@10 206 (let [[_ s] (stream-next stream)]
rlm@10 207 (recur (dec n) s))))
rlm@10 208
rlm@10 209 ; Map a function on a stream
rlm@10 210 (deftype- ::stream-map stream-map-state)
rlm@10 211
rlm@10 212 (defstream ::stream-map
rlm@10 213 [[f stream]]
rlm@10 214 (let [[v ns] (stream-next stream)]
rlm@10 215 (if (nil? ns)
rlm@10 216 [nil nil]
rlm@10 217 [(f v) (stream-map-state [f ns])])))
rlm@10 218
rlm@10 219 (defmulti stream-map
rlm@10 220 "Return a new stream by mapping the function f on the given stream."
rlm@10 221 {:arglists '([f stream])}
rlm@10 222 (fn [f stream] (type stream)))
rlm@10 223
rlm@10 224 (defmethod stream-map :default
rlm@10 225 [f stream]
rlm@10 226 (stream-map-state [f stream]))
rlm@10 227
rlm@10 228 (defmethod stream-map ::stream-map
rlm@10 229 [f [g stream]]
rlm@10 230 (stream-map-state [(comp f g) stream]))
rlm@10 231
rlm@10 232 ; Filter stream elements
rlm@10 233 (deftype- ::stream-filter stream-filter-state)
rlm@10 234
rlm@10 235 (defstream ::stream-filter
rlm@10 236 [[p stream]]
rlm@10 237 (loop [stream stream]
rlm@10 238 (let [[v ns] (stream-next stream)]
rlm@10 239 (cond (nil? ns) [nil nil]
rlm@10 240 (p v) [v (stream-filter-state [p ns])]
rlm@10 241 :else (recur ns)))))
rlm@10 242
rlm@10 243 (defmulti stream-filter
rlm@10 244 "Return a new stream that contrains the elements of stream
rlm@10 245 that satisfy the predicate p."
rlm@10 246 {:arglists '([p stream])}
rlm@10 247 (fn [p stream] (type stream)))
rlm@10 248
rlm@10 249 (defmethod stream-filter :default
rlm@10 250 [p stream]
rlm@10 251 (stream-filter-state [p stream]))
rlm@10 252
rlm@10 253 (defmethod stream-filter ::stream-filter
rlm@10 254 [p [q stream]]
rlm@10 255 (stream-filter-state [(fn [v] (and (q v) (p v))) stream]))
rlm@10 256
rlm@10 257 ; Flatten a stream of sequences
rlm@10 258 (deftype- ::stream-flatten stream-flatten-state)
rlm@10 259
rlm@10 260 (defstream ::stream-flatten
rlm@10 261 [[buffer stream]]
rlm@10 262 (loop [buffer buffer
rlm@10 263 stream stream]
rlm@10 264 (if (nil? buffer)
rlm@10 265 (let [[v new-stream] (stream-next stream)]
rlm@10 266 (cond (nil? new-stream) [nil nil]
rlm@10 267 (empty? v) (recur nil new-stream)
rlm@10 268 :else (recur v new-stream)))
rlm@10 269 [(first buffer) (stream-flatten-state [(next buffer) stream])])))
rlm@10 270
rlm@10 271 (defn stream-flatten
rlm@10 272 "Converts a stream of sequences into a stream of the elements of the
rlm@10 273 sequences. Flattening is not recursive, only one level of nesting
rlm@10 274 will be removed."
rlm@10 275 [s]
rlm@10 276 (stream-flatten-state [nil s]))