annotate src/parallel_io.clj @ 0:307a81e46071 tip

initial committ
author Robert McIntyre <rlm@mit.edu>
date Tue, 18 Oct 2011 01:17:49 -0700
parents
children
rev   line source
rlm@0 1 (ns coderloop.parallel-io
rlm@0 2 (:refer-clojure :only [])
rlm@0 3 (:require rlm.ns-rlm rlm.light-base))
rlm@0 4 (rlm.ns-rlm/ns-clone rlm.light-base)
rlm@0 5
rlm@0 6 (import '[java.io FileInputStream RandomAccessFile])
rlm@0 7 (use '(clojure.contrib [str-utils2 :only [join]]))
rlm@0 8 (import 'java.util.concurrent.atomic.AtomicLong)
rlm@0 9
rlm@0 10 ;;(set! *warn-on-reflection* true)
rlm@0 11
rlm@0 12
rlm@0 13
rlm@0 14
rlm@0 15
rlm@0 16 ;;;; Reading
rlm@0 17
rlm@0 18 (defn chunk-file
rlm@0 19 "Partitions a file into n line-aligned chunks. Returns a list of start and
rlm@0 20 end byte offset pairs."
rlm@0 21 [filename n]
rlm@0 22 (with-open [file (java.io.RandomAccessFile. filename "r")]
rlm@0 23 (let [offsets (for [offset (range 0 (.length file) (/ (.length file) n))]
rlm@0 24 (do (when-not (zero? offset)
rlm@0 25 (.seek file offset)
rlm@0 26 (while (not= (.read file) (int \newline))))
rlm@0 27 (.getFilePointer file)))
rlm@0 28 offsets (concat offsets [(.length file)])]
rlm@0 29 (doall (partition 2 (interleave offsets (rest offsets)))))))
rlm@0 30
rlm@0 31
rlm@0 32 (defn read-lines-range [file start-byte end-byte]
rlm@0 33 "Returns a lazy sequence of lines from file between start-byte and end-byte."
rlm@0 34 (let [reader (-> (doto (java.io.FileInputStream. file)
rlm@0 35 (.skip start-byte))
rlm@0 36 (java.io.BufferedInputStream. (* 8 131072))
rlm@0 37 (java.io.InputStreamReader. "US-ASCII")
rlm@0 38 (java.io.BufferedReader. 131072))]
rlm@0 39 (letfn [(read-line [remaining]
rlm@0 40 (lazy-seq
rlm@0 41 (if-let [line (and (pos? remaining) (.readLine reader))]
rlm@0 42 (cons line (read-line (- remaining (.length line))))
rlm@0 43 (.close reader))))]
rlm@0 44 (read-line (- end-byte start-byte)))))
rlm@0 45
rlm@0 46 (defn #^"[Ljava.lang.String;" dumbest-split
rlm@0 47 [#^String s c #^"[Ljava.lang.String;" tokens]
rlm@0 48 (let [len (dec (int (alength tokens)))]
rlm@0 49 (loop [start (int 0)
rlm@0 50 i (int 0)]
rlm@0 51 (let [idx (int (.indexOf s (int c) (int start)))]
rlm@0 52 (if (or (neg? idx) (>= i len))
rlm@0 53 (do (aset tokens i (.substring s start))
rlm@0 54 tokens)
rlm@0 55 (do (aset tokens i (.substring s start idx))
rlm@0 56 (recur (inc idx) (inc i))))))))
rlm@0 57
rlm@0 58 (defn parse-lines [lines]
rlm@0 59 (let [ary (make-array String 12)]
rlm@0 60 (for [#^String line lines
rlm@0 61 :let [fields (dumbest-split line \space ary)
rlm@0 62 status (aget fields 8)
rlm@0 63 bytes (aget fields 9)
rlm@0 64 #^String ref (aget fields 10)]
rlm@0 65 :when (= (aget fields 5) "\"GET")
rlm@0 66 :when ('#{"200" "304" "404"} status)]
rlm@0 67 {:client (aget fields 0)
rlm@0 68 :url (aget fields 6)
rlm@0 69 :status status
rlm@0 70 :bytes (if (= bytes "-") 0 (Long/parseLong bytes))
rlm@0 71 :ref (.substring ref 1 (dec (count ref)))})))
rlm@0 72
rlm@0 73 ;;;; Tallying
rlm@0 74
rlm@0 75 (defn bump! [map-atom #^String key #^Long delta]
rlm@0 76 (if-let [#^AtomicLong counter (get @map-atom key)]
rlm@0 77 (.addAndGet counter delta)
rlm@0 78 (swap! map-atom #(assoc % (String. key)
rlm@0 79 (if-let [#^AtomicLong counter (get % key)]
rlm@0 80 (AtomicLong. (+ (.get counter) delta))
rlm@0 81 (AtomicLong. delta))))))
rlm@0 82
rlm@0 83 (def article-re #"^/ongoing/When/\d\d\dx/\d\d\d\d/\d\d/\d\d/[^ .]+$")
rlm@0 84
rlm@0 85 (defn tally! [{:keys [url-hits url-bytes clients refs s404s]} records]
rlm@0 86 (doseq [{:keys [#^String url bytes client status #^String ref]} records]
rlm@0 87 (if (= status "404")
rlm@0 88 (bump! s404s url 1)
rlm@0 89 (do (bump! url-bytes url bytes)
rlm@0 90 (when (when (.startsWith url "/ongoing/When/")
rlm@0 91 (re-matches article-re url))
rlm@0 92 (bump! url-hits url 1)
rlm@0 93 (bump! clients client 1)
rlm@0 94 (when-not (or (= ref "-")
rlm@0 95 (.startsWith ref "http://www.tbray.org/ongoing/"))
rlm@0 96 (bump! refs ref 1)))))))
rlm@0 97
rlm@0 98 ;;;; Reporting
rlm@0 99
rlm@0 100 (defn truncate [s n]
rlm@0 101 (if (> (count s) n) (str (.substring s 0 n) "...") s))
rlm@0 102
rlm@0 103 (defn print-top10 [results label & [shrink?]]
rlm@0 104 (println "Top" label)
rlm@0 105 (let [fmt (if shrink? " %9.1fM: %s" " %10d: %s")]
rlm@0 106 (doseq [[k v] (take 10 results)]
rlm@0 107 (let [v (if shrink? (/ v 1024.0 1024.0) (long v))]
rlm@0 108 (println (format fmt v (truncate k 60))))))
rlm@0 109 (println))
rlm@0 110
rlm@0 111 (defn sort-by-vals-desc [m]
rlm@0 112 (sort-by #(- (val %)) m))
rlm@0 113
rlm@0 114
rlm@0 115
rlm@0 116 (defn take-greatest-vals [n m]
rlm@0 117 (when-let [m (seq m)]
rlm@0 118 (reduce (fn [best x]
rlm@0 119 (if (>= (val x) (val (last best)))
rlm@0 120 (vec (take n (sort-by-vals-desc (conj best x))))
rlm@0 121 best))
rlm@0 122 [(first m)] (rest m))))
rlm@0 123
rlm@0 124 (defn report [tallies state]
rlm@0 125 (->> state
rlm@0 126 (map (fn [[tally rows]] (str (count @rows) " " (name tally))))
rlm@0 127 (join ", ")
rlm@0 128 (println))
rlm@0 129 (println)
rlm@0 130 (->> tallies
rlm@0 131 (pmap (fn [[tally & options]]
rlm@0 132 (cons (take-greatest-vals 10 (sort-by-vals-desc @(state tally)))
rlm@0 133 options)))
rlm@0 134 (map #(apply print-top10 %))
rlm@0 135 (dorun)))
rlm@0 136
rlm@0 137 ;;;; Main
rlm@0 138
rlm@0 139 (def tallies [[:url-hits "URIs by hit"]
rlm@0 140 [:url-bytes "URIs by bytes" :shrink]
rlm@0 141 [:s404s "404s"]
rlm@0 142 [:clients "client addresses"]
rlm@0 143 [:refs "referrers"]])
rlm@0 144
rlm@0 145 (defn wf-atoms [file]
rlm@0 146 (let [chunk-count (int (/ (.length (java.io.File. file)) (* 32 1024 1024)))
rlm@0 147 state (zipmap (map first tallies) (repeatedly #(atom {})))]
rlm@0 148 (dorun
rlm@0 149 (pmap (fn [[idx [start end]]]
rlm@0 150 (println (str "Chunk " idx "/" chunk-count
rlm@0 151 " (" start " -> " end ")"))
rlm@0 152 (->> (read-lines-range file start end)
rlm@0 153 (parse-lines)
rlm@0 154 (tally! state)))
rlm@0 155 (indexed (chunk-file file chunk-count))))
rlm@0 156 (time (report tallies state))))
rlm@0 157
rlm@0 158