Mercurial > lasercutter
view src/clojure/contrib/stream_utils.clj @ 10:ef7dbbd6452c
added clojure source goodness
author | Robert McIntyre <rlm@mit.edu> |
---|---|
date | Sat, 21 Aug 2010 06:25:44 -0400 |
parents | |
children |
line wrap: on
line source
1 ;; Stream utilities3 ;; by Konrad Hinsen4 ;; last updated May 3, 20096 ;; Copyright (c) Konrad Hinsen, 2009. All rights reserved. The use7 ;; and distribution terms for this software are covered by the Eclipse8 ;; Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)9 ;; which can be found in the file epl-v10.html at the root of this10 ;; distribution. By using this software in any fashion, you are11 ;; agreeing to be bound by the terms of this license. You must not12 ;; remove this notice, or any other, from this software.14 (ns15 ^{:author "Konrad Hinsen"16 :doc "Functions for setting up computational pipelines via data streams.18 NOTE: This library is experimental. It may change significantly19 with future release.21 This library defines:22 - an abstract stream type, whose interface consists of the23 multimethod stream-next24 - a macro for implementing streams25 - implementations of stream for26 1) Clojure sequences, and vectors27 2) nil, representing an empty stream28 - tools for writing stream transformers, including the29 monad stream-m30 - various utility functions for working with streams32 Streams are building blocks in the construction of computational33 pipelines. A stream is represented by its current state plus34 a function that takes a stream state and obtains the next item35 in the stream as well as the new stream state. The state is36 implemented as a Java class or a Clojure type (as defined by the37 function clojure.core/type), and the function is provided as an38 implementation of the multimethod stream-next for this class or type.40 While setting up pipelines using this mechanism is somewhat more41 cumbersome than using Clojure's lazy seq mechanisms, there are a42 few advantages:43 - The state of a stream can be stored in any Clojure data structure,44 and the stream can be re-generated from it any number of times.45 Any number of states can be stored this way.46 - The elements of the stream are never cached, so keeping a reference47 to a stream state does not incur an uncontrollable memory penalty.49 Note that the stream mechanism is thread-safe as long as the50 concrete stream implementations do not use any mutable state.52 Stream transformers take any number of input streams and produce one53 output stream. They are typically written using the stream-m54 monad. In the definition of a stream transformer, (pick s) returns55 the next value of stream argument s, whereas pick-all returns the56 next value of all stream arguments in the form of a vector."}57 clojure.contrib.stream-utils58 (:refer-clojure :exclude (deftype))59 (:use [clojure.contrib.types :only (deftype deftype-)])60 (:use [clojure.contrib.monads :only (defmonad with-monad)])61 (:use [clojure.contrib.def :only (defvar defvar-)])62 (:require [clojure.contrib.seq])63 (:require [clojure.contrib.generic.collection]))66 ;67 ; Stream type and interface68 ;69 (defvar stream-type ::stream70 "The root type for the stream hierarchy. For each stream type,71 add a derivation from this type.")73 (defmacro defstream74 "Define object of the given type as a stream whose implementation75 of stream-next is defined by args and body. This macro adds76 a type-specific method for stream-next and derives type77 from stream-type."78 [type-tag args & body]79 `(do80 (derive ~type-tag stream-type)81 (defmethod stream-next ~type-tag ~args ~@body)))83 (defvar- stream-skip ::skip84 "The skip-this-item value.")86 (defn- stream-skip?87 "Returns true if x is the stream-skip."88 [x]89 (identical? x stream-skip))91 (defmulti stream-next92 "Returns a vector [next-value new-state] where next-value is the next93 item in the data stream defined by stream-state and new-state94 is the new state of the stream. At the end of the stream,95 next-value and new-state are nil."96 {:arglists '([stream-state])}97 type)99 (defmethod stream-next nil100 [s]101 [nil nil])103 (defmethod stream-next clojure.lang.ISeq104 [s]105 (if (seq s)106 [(first s) (rest s)]107 [nil nil]))109 (defmethod stream-next clojure.lang.IPersistentVector110 [v]111 (stream-next (seq v)))113 (defn stream-seq114 "Return a lazy seq on the stream. Also accessible via115 clojure.contrib.seq/seq-on and116 clojure.contrib.generic.collection/seq for streams."117 [s]118 (lazy-seq119 (let [[v ns] (stream-next s)]120 (if (nil? ns)121 nil122 (cons v (stream-seq ns))))))124 (defmethod clojure.contrib.seq/seq-on stream-type125 [s]126 (stream-seq s))128 (defmethod clojure.contrib.generic.collection/seq stream-type129 [s]130 (stream-seq s))132 ;133 ; Stream transformers134 ;135 (defmonad stream-m136 "Monad describing stream computations. The monadic values can be137 of any type handled by stream-next."138 [m-result (fn m-result-stream [v]139 (fn [s] [v s]))140 m-bind (fn m-bind-stream [mv f]141 (fn [s]142 (let [[v ss :as r] (mv s)]143 (if (or (nil? ss) (stream-skip? v))144 r145 ((f v) ss)))))146 m-zero (fn [s] [stream-skip s])147 ])149 (defn pick150 "Return the next value of stream argument n inside a stream151 transformer. When used inside of defst, the name of the stream152 argument can be used instead of its index n."153 [n]154 (fn [streams]155 (let [[v ns] (stream-next (streams n))]156 (if (nil? ns)157 [nil nil]158 [v (assoc streams n ns)]))))160 (defn pick-all161 "Return a vector containing the next value of each stream argument162 inside a stream transformer."163 [streams]164 (let [next (map stream-next streams)165 values (map first next)166 streams (vec (map second next))]167 (if (some nil? streams)168 [nil nil]169 [values streams])))171 (deftype ::stream-transformer st-as-stream172 (fn [st streams] [st streams])173 seq)175 (defstream ::stream-transformer176 [[st streams]]177 (loop [s streams]178 (let [[v ns] (st s)]179 (cond (nil? ns) [nil nil]180 (stream-skip? v) (recur ns)181 :else [v (st-as-stream st ns)]))))183 (defmacro defst184 "Define the stream transformer name by body.185 The non-stream arguments args and the stream arguments streams186 are given separately, with args being possibly empty."187 [name args streams & body]188 (if (= (first streams) '&)189 `(defn ~name ~(vec (concat args streams))190 (let [~'st (with-monad stream-m ~@body)]191 (st-as-stream ~'st ~(second streams))))192 `(defn ~name ~(vec (concat args streams))193 (let [~'st (with-monad stream-m194 (let [~streams (range ~(count streams))]195 ~@body))]196 (st-as-stream ~'st ~streams)))))198 ;199 ; Stream utilities200 ;201 (defn stream-drop202 "Return a stream containing all but the first n elements of stream."203 [n stream]204 (if (zero? n)205 stream206 (let [[_ s] (stream-next stream)]207 (recur (dec n) s))))209 ; Map a function on a stream210 (deftype- ::stream-map stream-map-state)212 (defstream ::stream-map213 [[f stream]]214 (let [[v ns] (stream-next stream)]215 (if (nil? ns)216 [nil nil]217 [(f v) (stream-map-state [f ns])])))219 (defmulti stream-map220 "Return a new stream by mapping the function f on the given stream."221 {:arglists '([f stream])}222 (fn [f stream] (type stream)))224 (defmethod stream-map :default225 [f stream]226 (stream-map-state [f stream]))228 (defmethod stream-map ::stream-map229 [f [g stream]]230 (stream-map-state [(comp f g) stream]))232 ; Filter stream elements233 (deftype- ::stream-filter stream-filter-state)235 (defstream ::stream-filter236 [[p stream]]237 (loop [stream stream]238 (let [[v ns] (stream-next stream)]239 (cond (nil? ns) [nil nil]240 (p v) [v (stream-filter-state [p ns])]241 :else (recur ns)))))243 (defmulti stream-filter244 "Return a new stream that contrains the elements of stream245 that satisfy the predicate p."246 {:arglists '([p stream])}247 (fn [p stream] (type stream)))249 (defmethod stream-filter :default250 [p stream]251 (stream-filter-state [p stream]))253 (defmethod stream-filter ::stream-filter254 [p [q stream]]255 (stream-filter-state [(fn [v] (and (q v) (p v))) stream]))257 ; Flatten a stream of sequences258 (deftype- ::stream-flatten stream-flatten-state)260 (defstream ::stream-flatten261 [[buffer stream]]262 (loop [buffer buffer263 stream stream]264 (if (nil? buffer)265 (let [[v new-stream] (stream-next stream)]266 (cond (nil? new-stream) [nil nil]267 (empty? v) (recur nil new-stream)268 :else (recur v new-stream)))269 [(first buffer) (stream-flatten-state [(next buffer) stream])])))271 (defn stream-flatten272 "Converts a stream of sequences into a stream of the elements of the273 sequences. Flattening is not recursive, only one level of nesting274 will be removed."275 [s]276 (stream-flatten-state [nil s]))