Mercurial > lasercutter
comparison src/clojure/contrib/stream_utils.clj @ 10:ef7dbbd6452c
added clojure source goodness
author | Robert McIntyre <rlm@mit.edu> |
---|---|
date | Sat, 21 Aug 2010 06:25:44 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
9:35cf337adfcf | 10:ef7dbbd6452c |
---|---|
1 ;; Stream utilities | |
2 | |
3 ;; by Konrad Hinsen | |
4 ;; last updated May 3, 2009 | |
5 | |
6 ;; Copyright (c) Konrad Hinsen, 2009. All rights reserved. The use | |
7 ;; and distribution terms for this software are covered by the Eclipse | |
8 ;; Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php) | |
9 ;; which can be found in the file epl-v10.html at the root of this | |
10 ;; distribution. By using this software in any fashion, you are | |
11 ;; agreeing to be bound by the terms of this license. You must not | |
12 ;; remove this notice, or any other, from this software. | |
13 | |
14 (ns | |
15 ^{:author "Konrad Hinsen" | |
16 :doc "Functions for setting up computational pipelines via data streams. | |
17 | |
18 NOTE: This library is experimental. It may change significantly | |
19 with future release. | |
20 | |
21 This library defines: | |
22 - an abstract stream type, whose interface consists of the | |
23 multimethod stream-next | |
24 - a macro for implementing streams | |
25 - implementations of stream for | |
26 1) Clojure sequences, and vectors | |
27 2) nil, representing an empty stream | |
28 - tools for writing stream transformers, including the | |
29 monad stream-m | |
30 - various utility functions for working with streams | |
31 | |
32 Streams are building blocks in the construction of computational | |
33 pipelines. A stream is represented by its current state plus | |
34 a function that takes a stream state and obtains the next item | |
35 in the stream as well as the new stream state. The state is | |
36 implemented as a Java class or a Clojure type (as defined by the | |
37 function clojure.core/type), and the function is provided as an | |
38 implementation of the multimethod stream-next for this class or type. | |
39 | |
40 While setting up pipelines using this mechanism is somewhat more | |
41 cumbersome than using Clojure's lazy seq mechanisms, there are a | |
42 few advantages: | |
43 - The state of a stream can be stored in any Clojure data structure, | |
44 and the stream can be re-generated from it any number of times. | |
45 Any number of states can be stored this way. | |
46 - The elements of the stream are never cached, so keeping a reference | |
47 to a stream state does not incur an uncontrollable memory penalty. | |
48 | |
49 Note that the stream mechanism is thread-safe as long as the | |
50 concrete stream implementations do not use any mutable state. | |
51 | |
52 Stream transformers take any number of input streams and produce one | |
53 output stream. They are typically written using the stream-m | |
54 monad. In the definition of a stream transformer, (pick s) returns | |
55 the next value of stream argument s, whereas pick-all returns the | |
56 next value of all stream arguments in the form of a vector."} | |
57 clojure.contrib.stream-utils | |
58 (:refer-clojure :exclude (deftype)) | |
59 (:use [clojure.contrib.types :only (deftype deftype-)]) | |
60 (:use [clojure.contrib.monads :only (defmonad with-monad)]) | |
61 (:use [clojure.contrib.def :only (defvar defvar-)]) | |
62 (:require [clojure.contrib.seq]) | |
63 (:require [clojure.contrib.generic.collection])) | |
64 | |
65 | |
66 ; | |
67 ; Stream type and interface | |
68 ; | |
69 (defvar stream-type ::stream | |
70 "The root type for the stream hierarchy. For each stream type, | |
71 add a derivation from this type.") | |
72 | |
73 (defmacro defstream | |
74 "Define object of the given type as a stream whose implementation | |
75 of stream-next is defined by args and body. This macro adds | |
76 a type-specific method for stream-next and derives type | |
77 from stream-type." | |
78 [type-tag args & body] | |
79 `(do | |
80 (derive ~type-tag stream-type) | |
81 (defmethod stream-next ~type-tag ~args ~@body))) | |
82 | |
83 (defvar- stream-skip ::skip | |
84 "The skip-this-item value.") | |
85 | |
86 (defn- stream-skip? | |
87 "Returns true if x is the stream-skip." | |
88 [x] | |
89 (identical? x stream-skip)) | |
90 | |
91 (defmulti stream-next | |
92 "Returns a vector [next-value new-state] where next-value is the next | |
93 item in the data stream defined by stream-state and new-state | |
94 is the new state of the stream. At the end of the stream, | |
95 next-value and new-state are nil." | |
96 {:arglists '([stream-state])} | |
97 type) | |
98 | |
99 (defmethod stream-next nil | |
100 [s] | |
101 [nil nil]) | |
102 | |
103 (defmethod stream-next clojure.lang.ISeq | |
104 [s] | |
105 (if (seq s) | |
106 [(first s) (rest s)] | |
107 [nil nil])) | |
108 | |
109 (defmethod stream-next clojure.lang.IPersistentVector | |
110 [v] | |
111 (stream-next (seq v))) | |
112 | |
113 (defn stream-seq | |
114 "Return a lazy seq on the stream. Also accessible via | |
115 clojure.contrib.seq/seq-on and | |
116 clojure.contrib.generic.collection/seq for streams." | |
117 [s] | |
118 (lazy-seq | |
119 (let [[v ns] (stream-next s)] | |
120 (if (nil? ns) | |
121 nil | |
122 (cons v (stream-seq ns)))))) | |
123 | |
124 (defmethod clojure.contrib.seq/seq-on stream-type | |
125 [s] | |
126 (stream-seq s)) | |
127 | |
128 (defmethod clojure.contrib.generic.collection/seq stream-type | |
129 [s] | |
130 (stream-seq s)) | |
131 | |
132 ; | |
133 ; Stream transformers | |
134 ; | |
135 (defmonad stream-m | |
136 "Monad describing stream computations. The monadic values can be | |
137 of any type handled by stream-next." | |
138 [m-result (fn m-result-stream [v] | |
139 (fn [s] [v s])) | |
140 m-bind (fn m-bind-stream [mv f] | |
141 (fn [s] | |
142 (let [[v ss :as r] (mv s)] | |
143 (if (or (nil? ss) (stream-skip? v)) | |
144 r | |
145 ((f v) ss))))) | |
146 m-zero (fn [s] [stream-skip s]) | |
147 ]) | |
148 | |
149 (defn pick | |
150 "Return the next value of stream argument n inside a stream | |
151 transformer. When used inside of defst, the name of the stream | |
152 argument can be used instead of its index n." | |
153 [n] | |
154 (fn [streams] | |
155 (let [[v ns] (stream-next (streams n))] | |
156 (if (nil? ns) | |
157 [nil nil] | |
158 [v (assoc streams n ns)])))) | |
159 | |
160 (defn pick-all | |
161 "Return a vector containing the next value of each stream argument | |
162 inside a stream transformer." | |
163 [streams] | |
164 (let [next (map stream-next streams) | |
165 values (map first next) | |
166 streams (vec (map second next))] | |
167 (if (some nil? streams) | |
168 [nil nil] | |
169 [values streams]))) | |
170 | |
171 (deftype ::stream-transformer st-as-stream | |
172 (fn [st streams] [st streams]) | |
173 seq) | |
174 | |
175 (defstream ::stream-transformer | |
176 [[st streams]] | |
177 (loop [s streams] | |
178 (let [[v ns] (st s)] | |
179 (cond (nil? ns) [nil nil] | |
180 (stream-skip? v) (recur ns) | |
181 :else [v (st-as-stream st ns)])))) | |
182 | |
183 (defmacro defst | |
184 "Define the stream transformer name by body. | |
185 The non-stream arguments args and the stream arguments streams | |
186 are given separately, with args being possibly empty." | |
187 [name args streams & body] | |
188 (if (= (first streams) '&) | |
189 `(defn ~name ~(vec (concat args streams)) | |
190 (let [~'st (with-monad stream-m ~@body)] | |
191 (st-as-stream ~'st ~(second streams)))) | |
192 `(defn ~name ~(vec (concat args streams)) | |
193 (let [~'st (with-monad stream-m | |
194 (let [~streams (range ~(count streams))] | |
195 ~@body))] | |
196 (st-as-stream ~'st ~streams))))) | |
197 | |
198 ; | |
199 ; Stream utilities | |
200 ; | |
201 (defn stream-drop | |
202 "Return a stream containing all but the first n elements of stream." | |
203 [n stream] | |
204 (if (zero? n) | |
205 stream | |
206 (let [[_ s] (stream-next stream)] | |
207 (recur (dec n) s)))) | |
208 | |
209 ; Map a function on a stream | |
210 (deftype- ::stream-map stream-map-state) | |
211 | |
212 (defstream ::stream-map | |
213 [[f stream]] | |
214 (let [[v ns] (stream-next stream)] | |
215 (if (nil? ns) | |
216 [nil nil] | |
217 [(f v) (stream-map-state [f ns])]))) | |
218 | |
219 (defmulti stream-map | |
220 "Return a new stream by mapping the function f on the given stream." | |
221 {:arglists '([f stream])} | |
222 (fn [f stream] (type stream))) | |
223 | |
224 (defmethod stream-map :default | |
225 [f stream] | |
226 (stream-map-state [f stream])) | |
227 | |
228 (defmethod stream-map ::stream-map | |
229 [f [g stream]] | |
230 (stream-map-state [(comp f g) stream])) | |
231 | |
232 ; Filter stream elements | |
233 (deftype- ::stream-filter stream-filter-state) | |
234 | |
235 (defstream ::stream-filter | |
236 [[p stream]] | |
237 (loop [stream stream] | |
238 (let [[v ns] (stream-next stream)] | |
239 (cond (nil? ns) [nil nil] | |
240 (p v) [v (stream-filter-state [p ns])] | |
241 :else (recur ns))))) | |
242 | |
243 (defmulti stream-filter | |
244 "Return a new stream that contrains the elements of stream | |
245 that satisfy the predicate p." | |
246 {:arglists '([p stream])} | |
247 (fn [p stream] (type stream))) | |
248 | |
249 (defmethod stream-filter :default | |
250 [p stream] | |
251 (stream-filter-state [p stream])) | |
252 | |
253 (defmethod stream-filter ::stream-filter | |
254 [p [q stream]] | |
255 (stream-filter-state [(fn [v] (and (q v) (p v))) stream])) | |
256 | |
257 ; Flatten a stream of sequences | |
258 (deftype- ::stream-flatten stream-flatten-state) | |
259 | |
260 (defstream ::stream-flatten | |
261 [[buffer stream]] | |
262 (loop [buffer buffer | |
263 stream stream] | |
264 (if (nil? buffer) | |
265 (let [[v new-stream] (stream-next stream)] | |
266 (cond (nil? new-stream) [nil nil] | |
267 (empty? v) (recur nil new-stream) | |
268 :else (recur v new-stream))) | |
269 [(first buffer) (stream-flatten-state [(next buffer) stream])]))) | |
270 | |
271 (defn stream-flatten | |
272 "Converts a stream of sequences into a stream of the elements of the | |
273 sequences. Flattening is not recursive, only one level of nesting | |
274 will be removed." | |
275 [s] | |
276 (stream-flatten-state [nil s])) |