rlm@10
|
1 ;; Stream utilities
|
rlm@10
|
2
|
rlm@10
|
3 ;; by Konrad Hinsen
|
rlm@10
|
4 ;; last updated May 3, 2009
|
rlm@10
|
5
|
rlm@10
|
6 ;; Copyright (c) Konrad Hinsen, 2009. All rights reserved. The use
|
rlm@10
|
7 ;; and distribution terms for this software are covered by the Eclipse
|
rlm@10
|
8 ;; Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)
|
rlm@10
|
9 ;; which can be found in the file epl-v10.html at the root of this
|
rlm@10
|
10 ;; distribution. By using this software in any fashion, you are
|
rlm@10
|
11 ;; agreeing to be bound by the terms of this license. You must not
|
rlm@10
|
12 ;; remove this notice, or any other, from this software.
|
rlm@10
|
13
|
rlm@10
|
14 (ns
|
rlm@10
|
15 ^{:author "Konrad Hinsen"
|
rlm@10
|
16 :doc "Functions for setting up computational pipelines via data streams.
|
rlm@10
|
17
|
rlm@10
|
18 NOTE: This library is experimental. It may change significantly
|
rlm@10
|
19 with future release.
|
rlm@10
|
20
|
rlm@10
|
21 This library defines:
|
rlm@10
|
22 - an abstract stream type, whose interface consists of the
|
rlm@10
|
23 multimethod stream-next
|
rlm@10
|
24 - a macro for implementing streams
|
rlm@10
|
25 - implementations of stream for
|
rlm@10
|
26 1) Clojure sequences, and vectors
|
rlm@10
|
27 2) nil, representing an empty stream
|
rlm@10
|
28 - tools for writing stream transformers, including the
|
rlm@10
|
29 monad stream-m
|
rlm@10
|
30 - various utility functions for working with streams
|
rlm@10
|
31
|
rlm@10
|
32 Streams are building blocks in the construction of computational
|
rlm@10
|
33 pipelines. A stream is represented by its current state plus
|
rlm@10
|
34 a function that takes a stream state and obtains the next item
|
rlm@10
|
35 in the stream as well as the new stream state. The state is
|
rlm@10
|
36 implemented as a Java class or a Clojure type (as defined by the
|
rlm@10
|
37 function clojure.core/type), and the function is provided as an
|
rlm@10
|
38 implementation of the multimethod stream-next for this class or type.
|
rlm@10
|
39
|
rlm@10
|
40 While setting up pipelines using this mechanism is somewhat more
|
rlm@10
|
41 cumbersome than using Clojure's lazy seq mechanisms, there are a
|
rlm@10
|
42 few advantages:
|
rlm@10
|
43 - The state of a stream can be stored in any Clojure data structure,
|
rlm@10
|
44 and the stream can be re-generated from it any number of times.
|
rlm@10
|
45 Any number of states can be stored this way.
|
rlm@10
|
46 - The elements of the stream are never cached, so keeping a reference
|
rlm@10
|
47 to a stream state does not incur an uncontrollable memory penalty.
|
rlm@10
|
48
|
rlm@10
|
49 Note that the stream mechanism is thread-safe as long as the
|
rlm@10
|
50 concrete stream implementations do not use any mutable state.
|
rlm@10
|
51
|
rlm@10
|
52 Stream transformers take any number of input streams and produce one
|
rlm@10
|
53 output stream. They are typically written using the stream-m
|
rlm@10
|
54 monad. In the definition of a stream transformer, (pick s) returns
|
rlm@10
|
55 the next value of stream argument s, whereas pick-all returns the
|
rlm@10
|
56 next value of all stream arguments in the form of a vector."}
|
rlm@10
|
57 clojure.contrib.stream-utils
|
rlm@10
|
58 (:refer-clojure :exclude (deftype))
|
rlm@10
|
59 (:use [clojure.contrib.types :only (deftype deftype-)])
|
rlm@10
|
60 (:use [clojure.contrib.monads :only (defmonad with-monad)])
|
rlm@10
|
61 (:use [clojure.contrib.def :only (defvar defvar-)])
|
rlm@10
|
62 (:require [clojure.contrib.seq])
|
rlm@10
|
63 (:require [clojure.contrib.generic.collection]))
|
rlm@10
|
64
|
rlm@10
|
65
|
rlm@10
|
66 ;
|
rlm@10
|
67 ; Stream type and interface
|
rlm@10
|
68 ;
|
rlm@10
|
69 (defvar stream-type ::stream
|
rlm@10
|
70 "The root type for the stream hierarchy. For each stream type,
|
rlm@10
|
71 add a derivation from this type.")
|
rlm@10
|
72
|
rlm@10
|
73 (defmacro defstream
|
rlm@10
|
74 "Define object of the given type as a stream whose implementation
|
rlm@10
|
75 of stream-next is defined by args and body. This macro adds
|
rlm@10
|
76 a type-specific method for stream-next and derives type
|
rlm@10
|
77 from stream-type."
|
rlm@10
|
78 [type-tag args & body]
|
rlm@10
|
79 `(do
|
rlm@10
|
80 (derive ~type-tag stream-type)
|
rlm@10
|
81 (defmethod stream-next ~type-tag ~args ~@body)))
|
rlm@10
|
82
|
rlm@10
|
83 (defvar- stream-skip ::skip
|
rlm@10
|
84 "The skip-this-item value.")
|
rlm@10
|
85
|
rlm@10
|
86 (defn- stream-skip?
|
rlm@10
|
87 "Returns true if x is the stream-skip."
|
rlm@10
|
88 [x]
|
rlm@10
|
89 (identical? x stream-skip))
|
rlm@10
|
90
|
rlm@10
|
91 (defmulti stream-next
|
rlm@10
|
92 "Returns a vector [next-value new-state] where next-value is the next
|
rlm@10
|
93 item in the data stream defined by stream-state and new-state
|
rlm@10
|
94 is the new state of the stream. At the end of the stream,
|
rlm@10
|
95 next-value and new-state are nil."
|
rlm@10
|
96 {:arglists '([stream-state])}
|
rlm@10
|
97 type)
|
rlm@10
|
98
|
rlm@10
|
99 (defmethod stream-next nil
|
rlm@10
|
100 [s]
|
rlm@10
|
101 [nil nil])
|
rlm@10
|
102
|
rlm@10
|
103 (defmethod stream-next clojure.lang.ISeq
|
rlm@10
|
104 [s]
|
rlm@10
|
105 (if (seq s)
|
rlm@10
|
106 [(first s) (rest s)]
|
rlm@10
|
107 [nil nil]))
|
rlm@10
|
108
|
rlm@10
|
109 (defmethod stream-next clojure.lang.IPersistentVector
|
rlm@10
|
110 [v]
|
rlm@10
|
111 (stream-next (seq v)))
|
rlm@10
|
112
|
rlm@10
|
113 (defn stream-seq
|
rlm@10
|
114 "Return a lazy seq on the stream. Also accessible via
|
rlm@10
|
115 clojure.contrib.seq/seq-on and
|
rlm@10
|
116 clojure.contrib.generic.collection/seq for streams."
|
rlm@10
|
117 [s]
|
rlm@10
|
118 (lazy-seq
|
rlm@10
|
119 (let [[v ns] (stream-next s)]
|
rlm@10
|
120 (if (nil? ns)
|
rlm@10
|
121 nil
|
rlm@10
|
122 (cons v (stream-seq ns))))))
|
rlm@10
|
123
|
rlm@10
|
124 (defmethod clojure.contrib.seq/seq-on stream-type
|
rlm@10
|
125 [s]
|
rlm@10
|
126 (stream-seq s))
|
rlm@10
|
127
|
rlm@10
|
128 (defmethod clojure.contrib.generic.collection/seq stream-type
|
rlm@10
|
129 [s]
|
rlm@10
|
130 (stream-seq s))
|
rlm@10
|
131
|
rlm@10
|
132 ;
|
rlm@10
|
133 ; Stream transformers
|
rlm@10
|
134 ;
|
rlm@10
|
135 (defmonad stream-m
|
rlm@10
|
136 "Monad describing stream computations. The monadic values can be
|
rlm@10
|
137 of any type handled by stream-next."
|
rlm@10
|
138 [m-result (fn m-result-stream [v]
|
rlm@10
|
139 (fn [s] [v s]))
|
rlm@10
|
140 m-bind (fn m-bind-stream [mv f]
|
rlm@10
|
141 (fn [s]
|
rlm@10
|
142 (let [[v ss :as r] (mv s)]
|
rlm@10
|
143 (if (or (nil? ss) (stream-skip? v))
|
rlm@10
|
144 r
|
rlm@10
|
145 ((f v) ss)))))
|
rlm@10
|
146 m-zero (fn [s] [stream-skip s])
|
rlm@10
|
147 ])
|
rlm@10
|
148
|
rlm@10
|
149 (defn pick
|
rlm@10
|
150 "Return the next value of stream argument n inside a stream
|
rlm@10
|
151 transformer. When used inside of defst, the name of the stream
|
rlm@10
|
152 argument can be used instead of its index n."
|
rlm@10
|
153 [n]
|
rlm@10
|
154 (fn [streams]
|
rlm@10
|
155 (let [[v ns] (stream-next (streams n))]
|
rlm@10
|
156 (if (nil? ns)
|
rlm@10
|
157 [nil nil]
|
rlm@10
|
158 [v (assoc streams n ns)]))))
|
rlm@10
|
159
|
rlm@10
|
160 (defn pick-all
|
rlm@10
|
161 "Return a vector containing the next value of each stream argument
|
rlm@10
|
162 inside a stream transformer."
|
rlm@10
|
163 [streams]
|
rlm@10
|
164 (let [next (map stream-next streams)
|
rlm@10
|
165 values (map first next)
|
rlm@10
|
166 streams (vec (map second next))]
|
rlm@10
|
167 (if (some nil? streams)
|
rlm@10
|
168 [nil nil]
|
rlm@10
|
169 [values streams])))
|
rlm@10
|
170
|
rlm@10
|
171 (deftype ::stream-transformer st-as-stream
|
rlm@10
|
172 (fn [st streams] [st streams])
|
rlm@10
|
173 seq)
|
rlm@10
|
174
|
rlm@10
|
175 (defstream ::stream-transformer
|
rlm@10
|
176 [[st streams]]
|
rlm@10
|
177 (loop [s streams]
|
rlm@10
|
178 (let [[v ns] (st s)]
|
rlm@10
|
179 (cond (nil? ns) [nil nil]
|
rlm@10
|
180 (stream-skip? v) (recur ns)
|
rlm@10
|
181 :else [v (st-as-stream st ns)]))))
|
rlm@10
|
182
|
rlm@10
|
183 (defmacro defst
|
rlm@10
|
184 "Define the stream transformer name by body.
|
rlm@10
|
185 The non-stream arguments args and the stream arguments streams
|
rlm@10
|
186 are given separately, with args being possibly empty."
|
rlm@10
|
187 [name args streams & body]
|
rlm@10
|
188 (if (= (first streams) '&)
|
rlm@10
|
189 `(defn ~name ~(vec (concat args streams))
|
rlm@10
|
190 (let [~'st (with-monad stream-m ~@body)]
|
rlm@10
|
191 (st-as-stream ~'st ~(second streams))))
|
rlm@10
|
192 `(defn ~name ~(vec (concat args streams))
|
rlm@10
|
193 (let [~'st (with-monad stream-m
|
rlm@10
|
194 (let [~streams (range ~(count streams))]
|
rlm@10
|
195 ~@body))]
|
rlm@10
|
196 (st-as-stream ~'st ~streams)))))
|
rlm@10
|
197
|
rlm@10
|
198 ;
|
rlm@10
|
199 ; Stream utilities
|
rlm@10
|
200 ;
|
rlm@10
|
201 (defn stream-drop
|
rlm@10
|
202 "Return a stream containing all but the first n elements of stream."
|
rlm@10
|
203 [n stream]
|
rlm@10
|
204 (if (zero? n)
|
rlm@10
|
205 stream
|
rlm@10
|
206 (let [[_ s] (stream-next stream)]
|
rlm@10
|
207 (recur (dec n) s))))
|
rlm@10
|
208
|
rlm@10
|
209 ; Map a function on a stream
|
rlm@10
|
210 (deftype- ::stream-map stream-map-state)
|
rlm@10
|
211
|
rlm@10
|
212 (defstream ::stream-map
|
rlm@10
|
213 [[f stream]]
|
rlm@10
|
214 (let [[v ns] (stream-next stream)]
|
rlm@10
|
215 (if (nil? ns)
|
rlm@10
|
216 [nil nil]
|
rlm@10
|
217 [(f v) (stream-map-state [f ns])])))
|
rlm@10
|
218
|
rlm@10
|
219 (defmulti stream-map
|
rlm@10
|
220 "Return a new stream by mapping the function f on the given stream."
|
rlm@10
|
221 {:arglists '([f stream])}
|
rlm@10
|
222 (fn [f stream] (type stream)))
|
rlm@10
|
223
|
rlm@10
|
224 (defmethod stream-map :default
|
rlm@10
|
225 [f stream]
|
rlm@10
|
226 (stream-map-state [f stream]))
|
rlm@10
|
227
|
rlm@10
|
228 (defmethod stream-map ::stream-map
|
rlm@10
|
229 [f [g stream]]
|
rlm@10
|
230 (stream-map-state [(comp f g) stream]))
|
rlm@10
|
231
|
rlm@10
|
232 ; Filter stream elements
|
rlm@10
|
233 (deftype- ::stream-filter stream-filter-state)
|
rlm@10
|
234
|
rlm@10
|
235 (defstream ::stream-filter
|
rlm@10
|
236 [[p stream]]
|
rlm@10
|
237 (loop [stream stream]
|
rlm@10
|
238 (let [[v ns] (stream-next stream)]
|
rlm@10
|
239 (cond (nil? ns) [nil nil]
|
rlm@10
|
240 (p v) [v (stream-filter-state [p ns])]
|
rlm@10
|
241 :else (recur ns)))))
|
rlm@10
|
242
|
rlm@10
|
243 (defmulti stream-filter
|
rlm@10
|
244 "Return a new stream that contrains the elements of stream
|
rlm@10
|
245 that satisfy the predicate p."
|
rlm@10
|
246 {:arglists '([p stream])}
|
rlm@10
|
247 (fn [p stream] (type stream)))
|
rlm@10
|
248
|
rlm@10
|
249 (defmethod stream-filter :default
|
rlm@10
|
250 [p stream]
|
rlm@10
|
251 (stream-filter-state [p stream]))
|
rlm@10
|
252
|
rlm@10
|
253 (defmethod stream-filter ::stream-filter
|
rlm@10
|
254 [p [q stream]]
|
rlm@10
|
255 (stream-filter-state [(fn [v] (and (q v) (p v))) stream]))
|
rlm@10
|
256
|
rlm@10
|
257 ; Flatten a stream of sequences
|
rlm@10
|
258 (deftype- ::stream-flatten stream-flatten-state)
|
rlm@10
|
259
|
rlm@10
|
260 (defstream ::stream-flatten
|
rlm@10
|
261 [[buffer stream]]
|
rlm@10
|
262 (loop [buffer buffer
|
rlm@10
|
263 stream stream]
|
rlm@10
|
264 (if (nil? buffer)
|
rlm@10
|
265 (let [[v new-stream] (stream-next stream)]
|
rlm@10
|
266 (cond (nil? new-stream) [nil nil]
|
rlm@10
|
267 (empty? v) (recur nil new-stream)
|
rlm@10
|
268 :else (recur v new-stream)))
|
rlm@10
|
269 [(first buffer) (stream-flatten-state [(next buffer) stream])])))
|
rlm@10
|
270
|
rlm@10
|
271 (defn stream-flatten
|
rlm@10
|
272 "Converts a stream of sequences into a stream of the elements of the
|
rlm@10
|
273 sequences. Flattening is not recursive, only one level of nesting
|
rlm@10
|
274 will be removed."
|
rlm@10
|
275 [s]
|
rlm@10
|
276 (stream-flatten-state [nil s]))
|