diff src/clojure/contrib/stream_utils.clj @ 10:ef7dbbd6452c

added clojure source goodness
author Robert McIntyre <rlm@mit.edu>
date Sat, 21 Aug 2010 06:25:44 -0400
parents
children
line wrap: on
line diff
     1.1 --- /dev/null	Thu Jan 01 00:00:00 1970 +0000
     1.2 +++ b/src/clojure/contrib/stream_utils.clj	Sat Aug 21 06:25:44 2010 -0400
     1.3 @@ -0,0 +1,276 @@
     1.4 +;; Stream utilities
     1.5 +
     1.6 +;; by Konrad Hinsen
     1.7 +;; last updated May 3, 2009
     1.8 +
     1.9 +;; Copyright (c) Konrad Hinsen, 2009. All rights reserved.  The use
    1.10 +;; and distribution terms for this software are covered by the Eclipse
    1.11 +;; Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)
    1.12 +;; which can be found in the file epl-v10.html at the root of this
    1.13 +;; distribution.  By using this software in any fashion, you are
    1.14 +;; agreeing to be bound by the terms of this license.  You must not
    1.15 +;; remove this notice, or any other, from this software.
    1.16 +
    1.17 +(ns
    1.18 +  ^{:author "Konrad Hinsen"
    1.19 +     :doc "Functions for setting up computational pipelines via data streams.
    1.20 +
    1.21 +           NOTE: This library is experimental. It may change significantly
    1.22 +                 with future release.
    1.23 +
    1.24 +           This library defines:
    1.25 +           - an abstract stream type, whose interface consists of the
    1.26 +             multimethod stream-next
    1.27 +           - a macro for implementing streams
    1.28 +           - implementations of stream for
    1.29 +             1) Clojure sequences, and vectors
    1.30 +             2) nil, representing an empty stream
    1.31 +           - tools for writing stream transformers, including the
    1.32 +             monad stream-m
    1.33 +           - various utility functions for working with streams
    1.34 +
    1.35 +           Streams are building blocks in the construction of computational
    1.36 +           pipelines. A stream is represented by its current state plus
    1.37 +           a function that takes a stream state and obtains the next item
    1.38 +           in the stream as well as the new stream state. The state is
    1.39 +           implemented as a Java class or a Clojure type (as defined by the
    1.40 +           function clojure.core/type), and the function is provided as an
    1.41 +           implementation of the multimethod stream-next for this class or type.
    1.42 +
    1.43 +           While setting up pipelines using this mechanism is somewhat more
    1.44 +           cumbersome than using Clojure's lazy seq mechanisms, there are a
    1.45 +           few advantages:
    1.46 +           - The state of a stream can be stored in any Clojure data structure,
    1.47 +             and the stream can be re-generated from it any number of times.
    1.48 +             Any number of states can be stored this way.
    1.49 +           - The elements of the stream are never cached, so keeping a reference
    1.50 +             to a stream state does not incur an uncontrollable memory penalty.
    1.51 +
    1.52 +           Note that the stream mechanism is thread-safe as long as the
    1.53 +           concrete stream implementations do not use any mutable state.
    1.54 +
    1.55 +           Stream transformers take any number of input streams and produce one
    1.56 +           output stream. They are typically written using the stream-m
    1.57 +           monad. In the definition of a stream transformer, (pick s) returns
    1.58 +           the next value of stream argument s, whereas pick-all returns the
    1.59 +           next value of all stream arguments in the form of a vector."}
    1.60 +  clojure.contrib.stream-utils
    1.61 +  (:refer-clojure :exclude (deftype))
    1.62 +  (:use [clojure.contrib.types :only (deftype deftype-)])
    1.63 +  (:use [clojure.contrib.monads :only (defmonad with-monad)])
    1.64 +  (:use [clojure.contrib.def :only (defvar defvar-)])
    1.65 +  (:require [clojure.contrib.seq])
    1.66 +  (:require [clojure.contrib.generic.collection]))
    1.67 +
    1.68 +
    1.69 +;
    1.70 +; Stream type and interface
    1.71 +;
    1.72 +(defvar stream-type ::stream
    1.73 +  "The root type for the stream hierarchy. For each stream type,
    1.74 +   add a derivation from this type.")
    1.75 +
    1.76 +(defmacro defstream
    1.77 +  "Define object of the given type as a stream whose implementation
    1.78 +   of stream-next is defined by args and body. This macro adds
    1.79 +   a type-specific method for stream-next and derives type
    1.80 +   from stream-type."
    1.81 +  [type-tag args & body]
    1.82 +  `(do
    1.83 +     (derive ~type-tag stream-type)
    1.84 +     (defmethod stream-next ~type-tag ~args ~@body)))
    1.85 +
    1.86 +(defvar- stream-skip ::skip
    1.87 +  "The skip-this-item value.")
    1.88 +
    1.89 +(defn- stream-skip?
    1.90 +  "Returns true if x is the stream-skip."
    1.91 +  [x]
    1.92 +  (identical? x stream-skip))
    1.93 +
    1.94 +(defmulti stream-next
    1.95 +  "Returns a vector [next-value new-state] where next-value is the next
    1.96 +   item in the data stream defined by stream-state and new-state
    1.97 +   is the new state of the stream. At the end of the stream,
    1.98 +   next-value and new-state are nil."
    1.99 +  {:arglists '([stream-state])}
   1.100 +  type)
   1.101 +
   1.102 +(defmethod stream-next nil
   1.103 +  [s]
   1.104 +  [nil nil])
   1.105 +
   1.106 +(defmethod stream-next clojure.lang.ISeq
   1.107 +  [s]
   1.108 +  (if (seq s)
   1.109 +    [(first s) (rest s)]
   1.110 +    [nil nil]))
   1.111 +
   1.112 +(defmethod stream-next clojure.lang.IPersistentVector
   1.113 +  [v]
   1.114 +  (stream-next (seq v)))
   1.115 +
   1.116 +(defn stream-seq
   1.117 +  "Return a lazy seq on the stream. Also accessible via
   1.118 +   clojure.contrib.seq/seq-on and
   1.119 +   clojure.contrib.generic.collection/seq for streams."
   1.120 +  [s]
   1.121 +  (lazy-seq
   1.122 +   (let [[v ns] (stream-next s)]
   1.123 +     (if (nil? ns)
   1.124 +       nil
   1.125 +       (cons v (stream-seq ns))))))
   1.126 +
   1.127 +(defmethod clojure.contrib.seq/seq-on stream-type
   1.128 +  [s]
   1.129 +  (stream-seq s))
   1.130 +
   1.131 +(defmethod clojure.contrib.generic.collection/seq stream-type
   1.132 +  [s]
   1.133 +  (stream-seq s))
   1.134 +
   1.135 +;
   1.136 +; Stream transformers
   1.137 +;
   1.138 +(defmonad stream-m
   1.139 +  "Monad describing stream computations. The monadic values can be
   1.140 +   of any type handled by stream-next."
   1.141 +  [m-result  (fn m-result-stream [v]
   1.142 +	       (fn [s] [v s]))
   1.143 +   m-bind    (fn m-bind-stream [mv f]
   1.144 +	       (fn [s]
   1.145 +		 (let [[v ss :as r] (mv s)]
   1.146 +		   (if (or (nil? ss) (stream-skip? v))
   1.147 +		     r
   1.148 +		     ((f v) ss)))))
   1.149 +   m-zero     (fn [s] [stream-skip s])
   1.150 +   ])
   1.151 +
   1.152 +(defn pick
   1.153 +  "Return the next value of stream argument n inside a stream
   1.154 +   transformer. When used inside of defst, the name of the stream
   1.155 +   argument can be used instead of its index n."
   1.156 +  [n]
   1.157 +  (fn [streams]
   1.158 +    (let [[v ns] (stream-next (streams n))]
   1.159 +      (if (nil? ns)
   1.160 +	[nil nil]
   1.161 +	[v (assoc streams n ns)]))))
   1.162 +
   1.163 +(defn pick-all
   1.164 +  "Return a vector containing the next value of each stream argument
   1.165 +   inside a stream transformer."
   1.166 +  [streams]
   1.167 +  (let [next    (map stream-next streams)
   1.168 +	values  (map first next)
   1.169 +	streams (vec (map second next))]
   1.170 +    (if (some nil? streams)
   1.171 +      [nil nil]
   1.172 +      [values streams])))
   1.173 +
   1.174 +(deftype ::stream-transformer st-as-stream
   1.175 +  (fn [st streams] [st streams])
   1.176 +  seq)
   1.177 +
   1.178 +(defstream ::stream-transformer
   1.179 +  [[st streams]]
   1.180 +  (loop [s streams]
   1.181 +    (let [[v ns] (st s)]
   1.182 +      (cond (nil? ns) [nil nil]
   1.183 +	    (stream-skip? v) (recur ns)
   1.184 +	    :else [v (st-as-stream st ns)]))))
   1.185 +
   1.186 +(defmacro defst
   1.187 +  "Define the stream transformer name by body.
   1.188 +   The non-stream arguments args and the stream arguments streams
   1.189 +   are given separately, with args being possibly empty."
   1.190 +  [name args streams & body]
   1.191 +  (if (= (first streams) '&)
   1.192 +    `(defn ~name ~(vec (concat args streams))
   1.193 +       (let [~'st (with-monad stream-m ~@body)]
   1.194 +	 (st-as-stream ~'st ~(second streams))))
   1.195 +    `(defn ~name ~(vec (concat args streams))
   1.196 +       (let [~'st (with-monad stream-m
   1.197 +		    (let [~streams (range ~(count streams))]
   1.198 +		      ~@body))]
   1.199 +	 (st-as-stream ~'st ~streams)))))
   1.200 +
   1.201 +;
   1.202 +; Stream utilities
   1.203 +;
   1.204 +(defn stream-drop
   1.205 +  "Return a stream containing all but the first n elements of stream."
   1.206 +  [n stream]
   1.207 +  (if (zero? n)
   1.208 +    stream
   1.209 +    (let [[_ s] (stream-next stream)]
   1.210 +      (recur (dec n) s))))
   1.211 +
   1.212 +; Map a function on a stream
   1.213 +(deftype- ::stream-map stream-map-state)
   1.214 +
   1.215 +(defstream ::stream-map
   1.216 +  [[f stream]]
   1.217 +  (let [[v ns] (stream-next stream)]
   1.218 +    (if (nil? ns)
   1.219 +      [nil nil]
   1.220 +      [(f v) (stream-map-state [f ns])])))
   1.221 +
   1.222 +(defmulti stream-map
   1.223 +  "Return a new stream by mapping the function f on the given stream."
   1.224 +  {:arglists '([f stream])}
   1.225 +  (fn [f stream] (type stream)))
   1.226 +
   1.227 +(defmethod stream-map :default
   1.228 +  [f stream]
   1.229 +  (stream-map-state [f stream]))
   1.230 +
   1.231 +(defmethod stream-map ::stream-map
   1.232 +  [f [g stream]]
   1.233 +  (stream-map-state [(comp f g) stream]))
   1.234 +
   1.235 +; Filter stream elements
   1.236 +(deftype- ::stream-filter stream-filter-state)
   1.237 +
   1.238 +(defstream ::stream-filter
   1.239 +  [[p stream]]
   1.240 +  (loop [stream stream]
   1.241 +    (let [[v ns] (stream-next stream)]
   1.242 +      (cond (nil? ns) [nil nil]
   1.243 +	    (p v) [v (stream-filter-state [p ns])]
   1.244 +	    :else (recur ns)))))
   1.245 +
   1.246 +(defmulti stream-filter
   1.247 +  "Return a new stream that contrains the elements of stream
   1.248 +   that satisfy the predicate p."
   1.249 +  {:arglists '([p stream])}
   1.250 +  (fn [p stream] (type stream)))
   1.251 +
   1.252 +(defmethod stream-filter :default
   1.253 +  [p stream]
   1.254 +  (stream-filter-state [p stream]))
   1.255 +
   1.256 +(defmethod stream-filter ::stream-filter
   1.257 +  [p [q stream]]
   1.258 +  (stream-filter-state [(fn [v] (and (q v) (p v))) stream]))
   1.259 +
   1.260 +; Flatten a stream of sequences
   1.261 +(deftype- ::stream-flatten stream-flatten-state)
   1.262 +
   1.263 +(defstream ::stream-flatten
   1.264 +  [[buffer stream]]
   1.265 +  (loop [buffer buffer
   1.266 +  	 stream stream]
   1.267 +    (if (nil? buffer)
   1.268 +      (let [[v new-stream] (stream-next stream)]
   1.269 +  	(cond (nil? new-stream) [nil nil]
   1.270 +  	      (empty? v) (recur nil new-stream)
   1.271 +  	      :else (recur v new-stream)))
   1.272 +      [(first buffer) (stream-flatten-state [(next buffer) stream])])))
   1.273 +
   1.274 +(defn stream-flatten
   1.275 +  "Converts a stream of sequences into a stream of the elements of the
   1.276 +   sequences. Flattening is not recursive, only one level of nesting
   1.277 +   will be removed."
   1.278 +  [s]
   1.279 +  (stream-flatten-state [nil s]))