view src/clojure/contrib/stream_utils.clj @ 10:ef7dbbd6452c

added clojure source goodness
author Robert McIntyre <rlm@mit.edu>
date Sat, 21 Aug 2010 06:25:44 -0400
parents
children
line wrap: on
line source
1 ;; Stream utilities
3 ;; by Konrad Hinsen
4 ;; last updated May 3, 2009
6 ;; Copyright (c) Konrad Hinsen, 2009. All rights reserved. The use
7 ;; and distribution terms for this software are covered by the Eclipse
8 ;; Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)
9 ;; which can be found in the file epl-v10.html at the root of this
10 ;; distribution. By using this software in any fashion, you are
11 ;; agreeing to be bound by the terms of this license. You must not
12 ;; remove this notice, or any other, from this software.
14 (ns
15 ^{:author "Konrad Hinsen"
16 :doc "Functions for setting up computational pipelines via data streams.
18 NOTE: This library is experimental. It may change significantly
19 with future release.
21 This library defines:
22 - an abstract stream type, whose interface consists of the
23 multimethod stream-next
24 - a macro for implementing streams
25 - implementations of stream for
26 1) Clojure sequences, and vectors
27 2) nil, representing an empty stream
28 - tools for writing stream transformers, including the
29 monad stream-m
30 - various utility functions for working with streams
32 Streams are building blocks in the construction of computational
33 pipelines. A stream is represented by its current state plus
34 a function that takes a stream state and obtains the next item
35 in the stream as well as the new stream state. The state is
36 implemented as a Java class or a Clojure type (as defined by the
37 function clojure.core/type), and the function is provided as an
38 implementation of the multimethod stream-next for this class or type.
40 While setting up pipelines using this mechanism is somewhat more
41 cumbersome than using Clojure's lazy seq mechanisms, there are a
42 few advantages:
43 - The state of a stream can be stored in any Clojure data structure,
44 and the stream can be re-generated from it any number of times.
45 Any number of states can be stored this way.
46 - The elements of the stream are never cached, so keeping a reference
47 to a stream state does not incur an uncontrollable memory penalty.
49 Note that the stream mechanism is thread-safe as long as the
50 concrete stream implementations do not use any mutable state.
52 Stream transformers take any number of input streams and produce one
53 output stream. They are typically written using the stream-m
54 monad. In the definition of a stream transformer, (pick s) returns
55 the next value of stream argument s, whereas pick-all returns the
56 next value of all stream arguments in the form of a vector."}
57 clojure.contrib.stream-utils
58 (:refer-clojure :exclude (deftype))
59 (:use [clojure.contrib.types :only (deftype deftype-)])
60 (:use [clojure.contrib.monads :only (defmonad with-monad)])
61 (:use [clojure.contrib.def :only (defvar defvar-)])
62 (:require [clojure.contrib.seq])
63 (:require [clojure.contrib.generic.collection]))
66 ;
67 ; Stream type and interface
68 ;
69 (defvar stream-type ::stream
70 "The root type for the stream hierarchy. For each stream type,
71 add a derivation from this type.")
73 (defmacro defstream
74 "Define object of the given type as a stream whose implementation
75 of stream-next is defined by args and body. This macro adds
76 a type-specific method for stream-next and derives type
77 from stream-type."
78 [type-tag args & body]
79 `(do
80 (derive ~type-tag stream-type)
81 (defmethod stream-next ~type-tag ~args ~@body)))
83 (defvar- stream-skip ::skip
84 "The skip-this-item value.")
86 (defn- stream-skip?
87 "Returns true if x is the stream-skip."
88 [x]
89 (identical? x stream-skip))
91 (defmulti stream-next
92 "Returns a vector [next-value new-state] where next-value is the next
93 item in the data stream defined by stream-state and new-state
94 is the new state of the stream. At the end of the stream,
95 next-value and new-state are nil."
96 {:arglists '([stream-state])}
97 type)
99 (defmethod stream-next nil
100 [s]
101 [nil nil])
103 (defmethod stream-next clojure.lang.ISeq
104 [s]
105 (if (seq s)
106 [(first s) (rest s)]
107 [nil nil]))
109 (defmethod stream-next clojure.lang.IPersistentVector
110 [v]
111 (stream-next (seq v)))
113 (defn stream-seq
114 "Return a lazy seq on the stream. Also accessible via
115 clojure.contrib.seq/seq-on and
116 clojure.contrib.generic.collection/seq for streams."
117 [s]
118 (lazy-seq
119 (let [[v ns] (stream-next s)]
120 (if (nil? ns)
121 nil
122 (cons v (stream-seq ns))))))
124 (defmethod clojure.contrib.seq/seq-on stream-type
125 [s]
126 (stream-seq s))
128 (defmethod clojure.contrib.generic.collection/seq stream-type
129 [s]
130 (stream-seq s))
132 ;
133 ; Stream transformers
134 ;
135 (defmonad stream-m
136 "Monad describing stream computations. The monadic values can be
137 of any type handled by stream-next."
138 [m-result (fn m-result-stream [v]
139 (fn [s] [v s]))
140 m-bind (fn m-bind-stream [mv f]
141 (fn [s]
142 (let [[v ss :as r] (mv s)]
143 (if (or (nil? ss) (stream-skip? v))
144 r
145 ((f v) ss)))))
146 m-zero (fn [s] [stream-skip s])
147 ])
149 (defn pick
150 "Return the next value of stream argument n inside a stream
151 transformer. When used inside of defst, the name of the stream
152 argument can be used instead of its index n."
153 [n]
154 (fn [streams]
155 (let [[v ns] (stream-next (streams n))]
156 (if (nil? ns)
157 [nil nil]
158 [v (assoc streams n ns)]))))
160 (defn pick-all
161 "Return a vector containing the next value of each stream argument
162 inside a stream transformer."
163 [streams]
164 (let [next (map stream-next streams)
165 values (map first next)
166 streams (vec (map second next))]
167 (if (some nil? streams)
168 [nil nil]
169 [values streams])))
171 (deftype ::stream-transformer st-as-stream
172 (fn [st streams] [st streams])
173 seq)
175 (defstream ::stream-transformer
176 [[st streams]]
177 (loop [s streams]
178 (let [[v ns] (st s)]
179 (cond (nil? ns) [nil nil]
180 (stream-skip? v) (recur ns)
181 :else [v (st-as-stream st ns)]))))
183 (defmacro defst
184 "Define the stream transformer name by body.
185 The non-stream arguments args and the stream arguments streams
186 are given separately, with args being possibly empty."
187 [name args streams & body]
188 (if (= (first streams) '&)
189 `(defn ~name ~(vec (concat args streams))
190 (let [~'st (with-monad stream-m ~@body)]
191 (st-as-stream ~'st ~(second streams))))
192 `(defn ~name ~(vec (concat args streams))
193 (let [~'st (with-monad stream-m
194 (let [~streams (range ~(count streams))]
195 ~@body))]
196 (st-as-stream ~'st ~streams)))))
198 ;
199 ; Stream utilities
200 ;
201 (defn stream-drop
202 "Return a stream containing all but the first n elements of stream."
203 [n stream]
204 (if (zero? n)
205 stream
206 (let [[_ s] (stream-next stream)]
207 (recur (dec n) s))))
209 ; Map a function on a stream
210 (deftype- ::stream-map stream-map-state)
212 (defstream ::stream-map
213 [[f stream]]
214 (let [[v ns] (stream-next stream)]
215 (if (nil? ns)
216 [nil nil]
217 [(f v) (stream-map-state [f ns])])))
219 (defmulti stream-map
220 "Return a new stream by mapping the function f on the given stream."
221 {:arglists '([f stream])}
222 (fn [f stream] (type stream)))
224 (defmethod stream-map :default
225 [f stream]
226 (stream-map-state [f stream]))
228 (defmethod stream-map ::stream-map
229 [f [g stream]]
230 (stream-map-state [(comp f g) stream]))
232 ; Filter stream elements
233 (deftype- ::stream-filter stream-filter-state)
235 (defstream ::stream-filter
236 [[p stream]]
237 (loop [stream stream]
238 (let [[v ns] (stream-next stream)]
239 (cond (nil? ns) [nil nil]
240 (p v) [v (stream-filter-state [p ns])]
241 :else (recur ns)))))
243 (defmulti stream-filter
244 "Return a new stream that contrains the elements of stream
245 that satisfy the predicate p."
246 {:arglists '([p stream])}
247 (fn [p stream] (type stream)))
249 (defmethod stream-filter :default
250 [p stream]
251 (stream-filter-state [p stream]))
253 (defmethod stream-filter ::stream-filter
254 [p [q stream]]
255 (stream-filter-state [(fn [v] (and (q v) (p v))) stream]))
257 ; Flatten a stream of sequences
258 (deftype- ::stream-flatten stream-flatten-state)
260 (defstream ::stream-flatten
261 [[buffer stream]]
262 (loop [buffer buffer
263 stream stream]
264 (if (nil? buffer)
265 (let [[v new-stream] (stream-next stream)]
266 (cond (nil? new-stream) [nil nil]
267 (empty? v) (recur nil new-stream)
268 :else (recur v new-stream)))
269 [(first buffer) (stream-flatten-state [(next buffer) stream])])))
271 (defn stream-flatten
272 "Converts a stream of sequences into a stream of the elements of the
273 sequences. Flattening is not recursive, only one level of nesting
274 will be removed."
275 [s]
276 (stream-flatten-state [nil s]))