Mercurial > lasercutter
diff src/clojure/contrib/stream_utils.clj @ 10:ef7dbbd6452c
added clojure source goodness
author | Robert McIntyre <rlm@mit.edu> |
---|---|
date | Sat, 21 Aug 2010 06:25:44 -0400 |
parents | |
children |
line wrap: on
line diff
1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 1.2 +++ b/src/clojure/contrib/stream_utils.clj Sat Aug 21 06:25:44 2010 -0400 1.3 @@ -0,0 +1,276 @@ 1.4 +;; Stream utilities 1.5 + 1.6 +;; by Konrad Hinsen 1.7 +;; last updated May 3, 2009 1.8 + 1.9 +;; Copyright (c) Konrad Hinsen, 2009. All rights reserved. The use 1.10 +;; and distribution terms for this software are covered by the Eclipse 1.11 +;; Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php) 1.12 +;; which can be found in the file epl-v10.html at the root of this 1.13 +;; distribution. By using this software in any fashion, you are 1.14 +;; agreeing to be bound by the terms of this license. You must not 1.15 +;; remove this notice, or any other, from this software. 1.16 + 1.17 +(ns 1.18 + ^{:author "Konrad Hinsen" 1.19 + :doc "Functions for setting up computational pipelines via data streams. 1.20 + 1.21 + NOTE: This library is experimental. It may change significantly 1.22 + with future release. 1.23 + 1.24 + This library defines: 1.25 + - an abstract stream type, whose interface consists of the 1.26 + multimethod stream-next 1.27 + - a macro for implementing streams 1.28 + - implementations of stream for 1.29 + 1) Clojure sequences, and vectors 1.30 + 2) nil, representing an empty stream 1.31 + - tools for writing stream transformers, including the 1.32 + monad stream-m 1.33 + - various utility functions for working with streams 1.34 + 1.35 + Streams are building blocks in the construction of computational 1.36 + pipelines. A stream is represented by its current state plus 1.37 + a function that takes a stream state and obtains the next item 1.38 + in the stream as well as the new stream state. The state is 1.39 + implemented as a Java class or a Clojure type (as defined by the 1.40 + function clojure.core/type), and the function is provided as an 1.41 + implementation of the multimethod stream-next for this class or type. 1.42 + 1.43 + While setting up pipelines using this mechanism is somewhat more 1.44 + cumbersome than using Clojure's lazy seq mechanisms, there are a 1.45 + few advantages: 1.46 + - The state of a stream can be stored in any Clojure data structure, 1.47 + and the stream can be re-generated from it any number of times. 1.48 + Any number of states can be stored this way. 1.49 + - The elements of the stream are never cached, so keeping a reference 1.50 + to a stream state does not incur an uncontrollable memory penalty. 1.51 + 1.52 + Note that the stream mechanism is thread-safe as long as the 1.53 + concrete stream implementations do not use any mutable state. 1.54 + 1.55 + Stream transformers take any number of input streams and produce one 1.56 + output stream. They are typically written using the stream-m 1.57 + monad. In the definition of a stream transformer, (pick s) returns 1.58 + the next value of stream argument s, whereas pick-all returns the 1.59 + next value of all stream arguments in the form of a vector."} 1.60 + clojure.contrib.stream-utils 1.61 + (:refer-clojure :exclude (deftype)) 1.62 + (:use [clojure.contrib.types :only (deftype deftype-)]) 1.63 + (:use [clojure.contrib.monads :only (defmonad with-monad)]) 1.64 + (:use [clojure.contrib.def :only (defvar defvar-)]) 1.65 + (:require [clojure.contrib.seq]) 1.66 + (:require [clojure.contrib.generic.collection])) 1.67 + 1.68 + 1.69 +; 1.70 +; Stream type and interface 1.71 +; 1.72 +(defvar stream-type ::stream 1.73 + "The root type for the stream hierarchy. For each stream type, 1.74 + add a derivation from this type.") 1.75 + 1.76 +(defmacro defstream 1.77 + "Define object of the given type as a stream whose implementation 1.78 + of stream-next is defined by args and body. This macro adds 1.79 + a type-specific method for stream-next and derives type 1.80 + from stream-type." 1.81 + [type-tag args & body] 1.82 + `(do 1.83 + (derive ~type-tag stream-type) 1.84 + (defmethod stream-next ~type-tag ~args ~@body))) 1.85 + 1.86 +(defvar- stream-skip ::skip 1.87 + "The skip-this-item value.") 1.88 + 1.89 +(defn- stream-skip? 1.90 + "Returns true if x is the stream-skip." 1.91 + [x] 1.92 + (identical? x stream-skip)) 1.93 + 1.94 +(defmulti stream-next 1.95 + "Returns a vector [next-value new-state] where next-value is the next 1.96 + item in the data stream defined by stream-state and new-state 1.97 + is the new state of the stream. At the end of the stream, 1.98 + next-value and new-state are nil." 1.99 + {:arglists '([stream-state])} 1.100 + type) 1.101 + 1.102 +(defmethod stream-next nil 1.103 + [s] 1.104 + [nil nil]) 1.105 + 1.106 +(defmethod stream-next clojure.lang.ISeq 1.107 + [s] 1.108 + (if (seq s) 1.109 + [(first s) (rest s)] 1.110 + [nil nil])) 1.111 + 1.112 +(defmethod stream-next clojure.lang.IPersistentVector 1.113 + [v] 1.114 + (stream-next (seq v))) 1.115 + 1.116 +(defn stream-seq 1.117 + "Return a lazy seq on the stream. Also accessible via 1.118 + clojure.contrib.seq/seq-on and 1.119 + clojure.contrib.generic.collection/seq for streams." 1.120 + [s] 1.121 + (lazy-seq 1.122 + (let [[v ns] (stream-next s)] 1.123 + (if (nil? ns) 1.124 + nil 1.125 + (cons v (stream-seq ns)))))) 1.126 + 1.127 +(defmethod clojure.contrib.seq/seq-on stream-type 1.128 + [s] 1.129 + (stream-seq s)) 1.130 + 1.131 +(defmethod clojure.contrib.generic.collection/seq stream-type 1.132 + [s] 1.133 + (stream-seq s)) 1.134 + 1.135 +; 1.136 +; Stream transformers 1.137 +; 1.138 +(defmonad stream-m 1.139 + "Monad describing stream computations. The monadic values can be 1.140 + of any type handled by stream-next." 1.141 + [m-result (fn m-result-stream [v] 1.142 + (fn [s] [v s])) 1.143 + m-bind (fn m-bind-stream [mv f] 1.144 + (fn [s] 1.145 + (let [[v ss :as r] (mv s)] 1.146 + (if (or (nil? ss) (stream-skip? v)) 1.147 + r 1.148 + ((f v) ss))))) 1.149 + m-zero (fn [s] [stream-skip s]) 1.150 + ]) 1.151 + 1.152 +(defn pick 1.153 + "Return the next value of stream argument n inside a stream 1.154 + transformer. When used inside of defst, the name of the stream 1.155 + argument can be used instead of its index n." 1.156 + [n] 1.157 + (fn [streams] 1.158 + (let [[v ns] (stream-next (streams n))] 1.159 + (if (nil? ns) 1.160 + [nil nil] 1.161 + [v (assoc streams n ns)])))) 1.162 + 1.163 +(defn pick-all 1.164 + "Return a vector containing the next value of each stream argument 1.165 + inside a stream transformer." 1.166 + [streams] 1.167 + (let [next (map stream-next streams) 1.168 + values (map first next) 1.169 + streams (vec (map second next))] 1.170 + (if (some nil? streams) 1.171 + [nil nil] 1.172 + [values streams]))) 1.173 + 1.174 +(deftype ::stream-transformer st-as-stream 1.175 + (fn [st streams] [st streams]) 1.176 + seq) 1.177 + 1.178 +(defstream ::stream-transformer 1.179 + [[st streams]] 1.180 + (loop [s streams] 1.181 + (let [[v ns] (st s)] 1.182 + (cond (nil? ns) [nil nil] 1.183 + (stream-skip? v) (recur ns) 1.184 + :else [v (st-as-stream st ns)])))) 1.185 + 1.186 +(defmacro defst 1.187 + "Define the stream transformer name by body. 1.188 + The non-stream arguments args and the stream arguments streams 1.189 + are given separately, with args being possibly empty." 1.190 + [name args streams & body] 1.191 + (if (= (first streams) '&) 1.192 + `(defn ~name ~(vec (concat args streams)) 1.193 + (let [~'st (with-monad stream-m ~@body)] 1.194 + (st-as-stream ~'st ~(second streams)))) 1.195 + `(defn ~name ~(vec (concat args streams)) 1.196 + (let [~'st (with-monad stream-m 1.197 + (let [~streams (range ~(count streams))] 1.198 + ~@body))] 1.199 + (st-as-stream ~'st ~streams))))) 1.200 + 1.201 +; 1.202 +; Stream utilities 1.203 +; 1.204 +(defn stream-drop 1.205 + "Return a stream containing all but the first n elements of stream." 1.206 + [n stream] 1.207 + (if (zero? n) 1.208 + stream 1.209 + (let [[_ s] (stream-next stream)] 1.210 + (recur (dec n) s)))) 1.211 + 1.212 +; Map a function on a stream 1.213 +(deftype- ::stream-map stream-map-state) 1.214 + 1.215 +(defstream ::stream-map 1.216 + [[f stream]] 1.217 + (let [[v ns] (stream-next stream)] 1.218 + (if (nil? ns) 1.219 + [nil nil] 1.220 + [(f v) (stream-map-state [f ns])]))) 1.221 + 1.222 +(defmulti stream-map 1.223 + "Return a new stream by mapping the function f on the given stream." 1.224 + {:arglists '([f stream])} 1.225 + (fn [f stream] (type stream))) 1.226 + 1.227 +(defmethod stream-map :default 1.228 + [f stream] 1.229 + (stream-map-state [f stream])) 1.230 + 1.231 +(defmethod stream-map ::stream-map 1.232 + [f [g stream]] 1.233 + (stream-map-state [(comp f g) stream])) 1.234 + 1.235 +; Filter stream elements 1.236 +(deftype- ::stream-filter stream-filter-state) 1.237 + 1.238 +(defstream ::stream-filter 1.239 + [[p stream]] 1.240 + (loop [stream stream] 1.241 + (let [[v ns] (stream-next stream)] 1.242 + (cond (nil? ns) [nil nil] 1.243 + (p v) [v (stream-filter-state [p ns])] 1.244 + :else (recur ns))))) 1.245 + 1.246 +(defmulti stream-filter 1.247 + "Return a new stream that contrains the elements of stream 1.248 + that satisfy the predicate p." 1.249 + {:arglists '([p stream])} 1.250 + (fn [p stream] (type stream))) 1.251 + 1.252 +(defmethod stream-filter :default 1.253 + [p stream] 1.254 + (stream-filter-state [p stream])) 1.255 + 1.256 +(defmethod stream-filter ::stream-filter 1.257 + [p [q stream]] 1.258 + (stream-filter-state [(fn [v] (and (q v) (p v))) stream])) 1.259 + 1.260 +; Flatten a stream of sequences 1.261 +(deftype- ::stream-flatten stream-flatten-state) 1.262 + 1.263 +(defstream ::stream-flatten 1.264 + [[buffer stream]] 1.265 + (loop [buffer buffer 1.266 + stream stream] 1.267 + (if (nil? buffer) 1.268 + (let [[v new-stream] (stream-next stream)] 1.269 + (cond (nil? new-stream) [nil nil] 1.270 + (empty? v) (recur nil new-stream) 1.271 + :else (recur v new-stream))) 1.272 + [(first buffer) (stream-flatten-state [(next buffer) stream])]))) 1.273 + 1.274 +(defn stream-flatten 1.275 + "Converts a stream of sequences into a stream of the elements of the 1.276 + sequences. Flattening is not recursive, only one level of nesting 1.277 + will be removed." 1.278 + [s] 1.279 + (stream-flatten-state [nil s]))