Mercurial > coderloop
view src/parallel_io.clj @ 0:307a81e46071 tip
initial committ
author | Robert McIntyre <rlm@mit.edu> |
---|---|
date | Tue, 18 Oct 2011 01:17:49 -0700 |
parents | |
children |
line wrap: on
line source
1 (ns coderloop.parallel-io2 (:refer-clojure :only [])3 (:require rlm.ns-rlm rlm.light-base))4 (rlm.ns-rlm/ns-clone rlm.light-base)6 (import '[java.io FileInputStream RandomAccessFile])7 (use '(clojure.contrib [str-utils2 :only [join]]))8 (import 'java.util.concurrent.atomic.AtomicLong)10 ;;(set! *warn-on-reflection* true)16 ;;;; Reading18 (defn chunk-file19 "Partitions a file into n line-aligned chunks. Returns a list of start and20 end byte offset pairs."21 [filename n]22 (with-open [file (java.io.RandomAccessFile. filename "r")]23 (let [offsets (for [offset (range 0 (.length file) (/ (.length file) n))]24 (do (when-not (zero? offset)25 (.seek file offset)26 (while (not= (.read file) (int \newline))))27 (.getFilePointer file)))28 offsets (concat offsets [(.length file)])]29 (doall (partition 2 (interleave offsets (rest offsets)))))))32 (defn read-lines-range [file start-byte end-byte]33 "Returns a lazy sequence of lines from file between start-byte and end-byte."34 (let [reader (-> (doto (java.io.FileInputStream. file)35 (.skip start-byte))36 (java.io.BufferedInputStream. (* 8 131072))37 (java.io.InputStreamReader. "US-ASCII")38 (java.io.BufferedReader. 131072))]39 (letfn [(read-line [remaining]40 (lazy-seq41 (if-let [line (and (pos? remaining) (.readLine reader))]42 (cons line (read-line (- remaining (.length line))))43 (.close reader))))]44 (read-line (- end-byte start-byte)))))46 (defn #^"[Ljava.lang.String;" dumbest-split47 [#^String s c #^"[Ljava.lang.String;" tokens]48 (let [len (dec (int (alength tokens)))]49 (loop [start (int 0)50 i (int 0)]51 (let [idx (int (.indexOf s (int c) (int start)))]52 (if (or (neg? idx) (>= i len))53 (do (aset tokens i (.substring s start))54 tokens)55 (do (aset tokens i (.substring s start idx))56 (recur (inc idx) (inc i))))))))58 (defn parse-lines [lines]59 (let [ary (make-array String 12)]60 (for [#^String line lines61 :let [fields (dumbest-split line \space ary)62 status (aget fields 8)63 bytes (aget fields 9)64 #^String ref (aget fields 10)]65 :when (= (aget fields 5) "\"GET")66 :when ('#{"200" "304" "404"} status)]67 {:client (aget fields 0)68 :url (aget fields 6)69 :status status70 :bytes (if (= bytes "-") 0 (Long/parseLong bytes))71 :ref (.substring ref 1 (dec (count ref)))})))73 ;;;; Tallying75 (defn bump! [map-atom #^String key #^Long delta]76 (if-let [#^AtomicLong counter (get @map-atom key)]77 (.addAndGet counter delta)78 (swap! map-atom #(assoc % (String. key)79 (if-let [#^AtomicLong counter (get % key)]80 (AtomicLong. (+ (.get counter) delta))81 (AtomicLong. delta))))))83 (def article-re #"^/ongoing/When/\d\d\dx/\d\d\d\d/\d\d/\d\d/[^ .]+$")85 (defn tally! [{:keys [url-hits url-bytes clients refs s404s]} records]86 (doseq [{:keys [#^String url bytes client status #^String ref]} records]87 (if (= status "404")88 (bump! s404s url 1)89 (do (bump! url-bytes url bytes)90 (when (when (.startsWith url "/ongoing/When/")91 (re-matches article-re url))92 (bump! url-hits url 1)93 (bump! clients client 1)94 (when-not (or (= ref "-")95 (.startsWith ref "http://www.tbray.org/ongoing/"))96 (bump! refs ref 1)))))))98 ;;;; Reporting100 (defn truncate [s n]101 (if (> (count s) n) (str (.substring s 0 n) "...") s))103 (defn print-top10 [results label & [shrink?]]104 (println "Top" label)105 (let [fmt (if shrink? " %9.1fM: %s" " %10d: %s")]106 (doseq [[k v] (take 10 results)]107 (let [v (if shrink? (/ v 1024.0 1024.0) (long v))]108 (println (format fmt v (truncate k 60))))))109 (println))111 (defn sort-by-vals-desc [m]112 (sort-by #(- (val %)) m))116 (defn take-greatest-vals [n m]117 (when-let [m (seq m)]118 (reduce (fn [best x]119 (if (>= (val x) (val (last best)))120 (vec (take n (sort-by-vals-desc (conj best x))))121 best))122 [(first m)] (rest m))))124 (defn report [tallies state]125 (->> state126 (map (fn [[tally rows]] (str (count @rows) " " (name tally))))127 (join ", ")128 (println))129 (println)130 (->> tallies131 (pmap (fn [[tally & options]]132 (cons (take-greatest-vals 10 (sort-by-vals-desc @(state tally)))133 options)))134 (map #(apply print-top10 %))135 (dorun)))137 ;;;; Main139 (def tallies [[:url-hits "URIs by hit"]140 [:url-bytes "URIs by bytes" :shrink]141 [:s404s "404s"]142 [:clients "client addresses"]143 [:refs "referrers"]])145 (defn wf-atoms [file]146 (let [chunk-count (int (/ (.length (java.io.File. file)) (* 32 1024 1024)))147 state (zipmap (map first tallies) (repeatedly #(atom {})))]148 (dorun149 (pmap (fn [[idx [start end]]]150 (println (str "Chunk " idx "/" chunk-count151 " (" start " -> " end ")"))152 (->> (read-lines-range file start end)153 (parse-lines)154 (tally! state)))155 (indexed (chunk-file file chunk-count))))156 (time (report tallies state))))