rlm@10: ;; Stream utilities rlm@10: rlm@10: ;; by Konrad Hinsen rlm@10: ;; last updated May 3, 2009 rlm@10: rlm@10: ;; Copyright (c) Konrad Hinsen, 2009. All rights reserved. The use rlm@10: ;; and distribution terms for this software are covered by the Eclipse rlm@10: ;; Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php) rlm@10: ;; which can be found in the file epl-v10.html at the root of this rlm@10: ;; distribution. By using this software in any fashion, you are rlm@10: ;; agreeing to be bound by the terms of this license. You must not rlm@10: ;; remove this notice, or any other, from this software. rlm@10: rlm@10: (ns rlm@10: ^{:author "Konrad Hinsen" rlm@10: :doc "Functions for setting up computational pipelines via data streams. rlm@10: rlm@10: NOTE: This library is experimental. It may change significantly rlm@10: with future release. rlm@10: rlm@10: This library defines: rlm@10: - an abstract stream type, whose interface consists of the rlm@10: multimethod stream-next rlm@10: - a macro for implementing streams rlm@10: - implementations of stream for rlm@10: 1) Clojure sequences, and vectors rlm@10: 2) nil, representing an empty stream rlm@10: - tools for writing stream transformers, including the rlm@10: monad stream-m rlm@10: - various utility functions for working with streams rlm@10: rlm@10: Streams are building blocks in the construction of computational rlm@10: pipelines. A stream is represented by its current state plus rlm@10: a function that takes a stream state and obtains the next item rlm@10: in the stream as well as the new stream state. The state is rlm@10: implemented as a Java class or a Clojure type (as defined by the rlm@10: function clojure.core/type), and the function is provided as an rlm@10: implementation of the multimethod stream-next for this class or type. rlm@10: rlm@10: While setting up pipelines using this mechanism is somewhat more rlm@10: cumbersome than using Clojure's lazy seq mechanisms, there are a rlm@10: few advantages: rlm@10: - The state of a stream can be stored in any Clojure data structure, rlm@10: and the stream can be re-generated from it any number of times. rlm@10: Any number of states can be stored this way. rlm@10: - The elements of the stream are never cached, so keeping a reference rlm@10: to a stream state does not incur an uncontrollable memory penalty. rlm@10: rlm@10: Note that the stream mechanism is thread-safe as long as the rlm@10: concrete stream implementations do not use any mutable state. rlm@10: rlm@10: Stream transformers take any number of input streams and produce one rlm@10: output stream. They are typically written using the stream-m rlm@10: monad. In the definition of a stream transformer, (pick s) returns rlm@10: the next value of stream argument s, whereas pick-all returns the rlm@10: next value of all stream arguments in the form of a vector."} rlm@10: clojure.contrib.stream-utils rlm@10: (:refer-clojure :exclude (deftype)) rlm@10: (:use [clojure.contrib.types :only (deftype deftype-)]) rlm@10: (:use [clojure.contrib.monads :only (defmonad with-monad)]) rlm@10: (:use [clojure.contrib.def :only (defvar defvar-)]) rlm@10: (:require [clojure.contrib.seq]) rlm@10: (:require [clojure.contrib.generic.collection])) rlm@10: rlm@10: rlm@10: ; rlm@10: ; Stream type and interface rlm@10: ; rlm@10: (defvar stream-type ::stream rlm@10: "The root type for the stream hierarchy. For each stream type, rlm@10: add a derivation from this type.") rlm@10: rlm@10: (defmacro defstream rlm@10: "Define object of the given type as a stream whose implementation rlm@10: of stream-next is defined by args and body. This macro adds rlm@10: a type-specific method for stream-next and derives type rlm@10: from stream-type." rlm@10: [type-tag args & body] rlm@10: `(do rlm@10: (derive ~type-tag stream-type) rlm@10: (defmethod stream-next ~type-tag ~args ~@body))) rlm@10: rlm@10: (defvar- stream-skip ::skip rlm@10: "The skip-this-item value.") rlm@10: rlm@10: (defn- stream-skip? rlm@10: "Returns true if x is the stream-skip." rlm@10: [x] rlm@10: (identical? x stream-skip)) rlm@10: rlm@10: (defmulti stream-next rlm@10: "Returns a vector [next-value new-state] where next-value is the next rlm@10: item in the data stream defined by stream-state and new-state rlm@10: is the new state of the stream. At the end of the stream, rlm@10: next-value and new-state are nil." rlm@10: {:arglists '([stream-state])} rlm@10: type) rlm@10: rlm@10: (defmethod stream-next nil rlm@10: [s] rlm@10: [nil nil]) rlm@10: rlm@10: (defmethod stream-next clojure.lang.ISeq rlm@10: [s] rlm@10: (if (seq s) rlm@10: [(first s) (rest s)] rlm@10: [nil nil])) rlm@10: rlm@10: (defmethod stream-next clojure.lang.IPersistentVector rlm@10: [v] rlm@10: (stream-next (seq v))) rlm@10: rlm@10: (defn stream-seq rlm@10: "Return a lazy seq on the stream. Also accessible via rlm@10: clojure.contrib.seq/seq-on and rlm@10: clojure.contrib.generic.collection/seq for streams." rlm@10: [s] rlm@10: (lazy-seq rlm@10: (let [[v ns] (stream-next s)] rlm@10: (if (nil? ns) rlm@10: nil rlm@10: (cons v (stream-seq ns)))))) rlm@10: rlm@10: (defmethod clojure.contrib.seq/seq-on stream-type rlm@10: [s] rlm@10: (stream-seq s)) rlm@10: rlm@10: (defmethod clojure.contrib.generic.collection/seq stream-type rlm@10: [s] rlm@10: (stream-seq s)) rlm@10: rlm@10: ; rlm@10: ; Stream transformers rlm@10: ; rlm@10: (defmonad stream-m rlm@10: "Monad describing stream computations. The monadic values can be rlm@10: of any type handled by stream-next." rlm@10: [m-result (fn m-result-stream [v] rlm@10: (fn [s] [v s])) rlm@10: m-bind (fn m-bind-stream [mv f] rlm@10: (fn [s] rlm@10: (let [[v ss :as r] (mv s)] rlm@10: (if (or (nil? ss) (stream-skip? v)) rlm@10: r rlm@10: ((f v) ss))))) rlm@10: m-zero (fn [s] [stream-skip s]) rlm@10: ]) rlm@10: rlm@10: (defn pick rlm@10: "Return the next value of stream argument n inside a stream rlm@10: transformer. When used inside of defst, the name of the stream rlm@10: argument can be used instead of its index n." rlm@10: [n] rlm@10: (fn [streams] rlm@10: (let [[v ns] (stream-next (streams n))] rlm@10: (if (nil? ns) rlm@10: [nil nil] rlm@10: [v (assoc streams n ns)])))) rlm@10: rlm@10: (defn pick-all rlm@10: "Return a vector containing the next value of each stream argument rlm@10: inside a stream transformer." rlm@10: [streams] rlm@10: (let [next (map stream-next streams) rlm@10: values (map first next) rlm@10: streams (vec (map second next))] rlm@10: (if (some nil? streams) rlm@10: [nil nil] rlm@10: [values streams]))) rlm@10: rlm@10: (deftype ::stream-transformer st-as-stream rlm@10: (fn [st streams] [st streams]) rlm@10: seq) rlm@10: rlm@10: (defstream ::stream-transformer rlm@10: [[st streams]] rlm@10: (loop [s streams] rlm@10: (let [[v ns] (st s)] rlm@10: (cond (nil? ns) [nil nil] rlm@10: (stream-skip? v) (recur ns) rlm@10: :else [v (st-as-stream st ns)])))) rlm@10: rlm@10: (defmacro defst rlm@10: "Define the stream transformer name by body. rlm@10: The non-stream arguments args and the stream arguments streams rlm@10: are given separately, with args being possibly empty." rlm@10: [name args streams & body] rlm@10: (if (= (first streams) '&) rlm@10: `(defn ~name ~(vec (concat args streams)) rlm@10: (let [~'st (with-monad stream-m ~@body)] rlm@10: (st-as-stream ~'st ~(second streams)))) rlm@10: `(defn ~name ~(vec (concat args streams)) rlm@10: (let [~'st (with-monad stream-m rlm@10: (let [~streams (range ~(count streams))] rlm@10: ~@body))] rlm@10: (st-as-stream ~'st ~streams))))) rlm@10: rlm@10: ; rlm@10: ; Stream utilities rlm@10: ; rlm@10: (defn stream-drop rlm@10: "Return a stream containing all but the first n elements of stream." rlm@10: [n stream] rlm@10: (if (zero? n) rlm@10: stream rlm@10: (let [[_ s] (stream-next stream)] rlm@10: (recur (dec n) s)))) rlm@10: rlm@10: ; Map a function on a stream rlm@10: (deftype- ::stream-map stream-map-state) rlm@10: rlm@10: (defstream ::stream-map rlm@10: [[f stream]] rlm@10: (let [[v ns] (stream-next stream)] rlm@10: (if (nil? ns) rlm@10: [nil nil] rlm@10: [(f v) (stream-map-state [f ns])]))) rlm@10: rlm@10: (defmulti stream-map rlm@10: "Return a new stream by mapping the function f on the given stream." rlm@10: {:arglists '([f stream])} rlm@10: (fn [f stream] (type stream))) rlm@10: rlm@10: (defmethod stream-map :default rlm@10: [f stream] rlm@10: (stream-map-state [f stream])) rlm@10: rlm@10: (defmethod stream-map ::stream-map rlm@10: [f [g stream]] rlm@10: (stream-map-state [(comp f g) stream])) rlm@10: rlm@10: ; Filter stream elements rlm@10: (deftype- ::stream-filter stream-filter-state) rlm@10: rlm@10: (defstream ::stream-filter rlm@10: [[p stream]] rlm@10: (loop [stream stream] rlm@10: (let [[v ns] (stream-next stream)] rlm@10: (cond (nil? ns) [nil nil] rlm@10: (p v) [v (stream-filter-state [p ns])] rlm@10: :else (recur ns))))) rlm@10: rlm@10: (defmulti stream-filter rlm@10: "Return a new stream that contrains the elements of stream rlm@10: that satisfy the predicate p." rlm@10: {:arglists '([p stream])} rlm@10: (fn [p stream] (type stream))) rlm@10: rlm@10: (defmethod stream-filter :default rlm@10: [p stream] rlm@10: (stream-filter-state [p stream])) rlm@10: rlm@10: (defmethod stream-filter ::stream-filter rlm@10: [p [q stream]] rlm@10: (stream-filter-state [(fn [v] (and (q v) (p v))) stream])) rlm@10: rlm@10: ; Flatten a stream of sequences rlm@10: (deftype- ::stream-flatten stream-flatten-state) rlm@10: rlm@10: (defstream ::stream-flatten rlm@10: [[buffer stream]] rlm@10: (loop [buffer buffer rlm@10: stream stream] rlm@10: (if (nil? buffer) rlm@10: (let [[v new-stream] (stream-next stream)] rlm@10: (cond (nil? new-stream) [nil nil] rlm@10: (empty? v) (recur nil new-stream) rlm@10: :else (recur v new-stream))) rlm@10: [(first buffer) (stream-flatten-state [(next buffer) stream])]))) rlm@10: rlm@10: (defn stream-flatten rlm@10: "Converts a stream of sequences into a stream of the elements of the rlm@10: sequences. Flattening is not recursive, only one level of nesting rlm@10: will be removed." rlm@10: [s] rlm@10: (stream-flatten-state [nil s]))