Mercurial > coderloop
diff src/parallel_io.clj @ 0:307a81e46071 tip
initial committ
author | Robert McIntyre <rlm@mit.edu> |
---|---|
date | Tue, 18 Oct 2011 01:17:49 -0700 |
parents | |
children |
line wrap: on
line diff
1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 1.2 +++ b/src/parallel_io.clj Tue Oct 18 01:17:49 2011 -0700 1.3 @@ -0,0 +1,158 @@ 1.4 +(ns coderloop.parallel-io 1.5 + (:refer-clojure :only []) 1.6 + (:require rlm.ns-rlm rlm.light-base)) 1.7 +(rlm.ns-rlm/ns-clone rlm.light-base) 1.8 + 1.9 +(import '[java.io FileInputStream RandomAccessFile]) 1.10 +(use '(clojure.contrib [str-utils2 :only [join]])) 1.11 +(import 'java.util.concurrent.atomic.AtomicLong) 1.12 + 1.13 +;;(set! *warn-on-reflection* true) 1.14 + 1.15 + 1.16 + 1.17 + 1.18 + 1.19 +;;;; Reading 1.20 + 1.21 +(defn chunk-file 1.22 + "Partitions a file into n line-aligned chunks. Returns a list of start and 1.23 + end byte offset pairs." 1.24 + [filename n] 1.25 + (with-open [file (java.io.RandomAccessFile. filename "r")] 1.26 + (let [offsets (for [offset (range 0 (.length file) (/ (.length file) n))] 1.27 + (do (when-not (zero? offset) 1.28 + (.seek file offset) 1.29 + (while (not= (.read file) (int \newline)))) 1.30 + (.getFilePointer file))) 1.31 + offsets (concat offsets [(.length file)])] 1.32 + (doall (partition 2 (interleave offsets (rest offsets))))))) 1.33 + 1.34 + 1.35 +(defn read-lines-range [file start-byte end-byte] 1.36 + "Returns a lazy sequence of lines from file between start-byte and end-byte." 1.37 + (let [reader (-> (doto (java.io.FileInputStream. file) 1.38 + (.skip start-byte)) 1.39 + (java.io.BufferedInputStream. (* 8 131072)) 1.40 + (java.io.InputStreamReader. "US-ASCII") 1.41 + (java.io.BufferedReader. 131072))] 1.42 + (letfn [(read-line [remaining] 1.43 + (lazy-seq 1.44 + (if-let [line (and (pos? remaining) (.readLine reader))] 1.45 + (cons line (read-line (- remaining (.length line)))) 1.46 + (.close reader))))] 1.47 + (read-line (- end-byte start-byte))))) 1.48 + 1.49 +(defn #^"[Ljava.lang.String;" dumbest-split 1.50 + [#^String s c #^"[Ljava.lang.String;" tokens] 1.51 + (let [len (dec (int (alength tokens)))] 1.52 + (loop [start (int 0) 1.53 + i (int 0)] 1.54 + (let [idx (int (.indexOf s (int c) (int start)))] 1.55 + (if (or (neg? idx) (>= i len)) 1.56 + (do (aset tokens i (.substring s start)) 1.57 + tokens) 1.58 + (do (aset tokens i (.substring s start idx)) 1.59 + (recur (inc idx) (inc i)))))))) 1.60 + 1.61 +(defn parse-lines [lines] 1.62 + (let [ary (make-array String 12)] 1.63 + (for [#^String line lines 1.64 + :let [fields (dumbest-split line \space ary) 1.65 + status (aget fields 8) 1.66 + bytes (aget fields 9) 1.67 + #^String ref (aget fields 10)] 1.68 + :when (= (aget fields 5) "\"GET") 1.69 + :when ('#{"200" "304" "404"} status)] 1.70 + {:client (aget fields 0) 1.71 + :url (aget fields 6) 1.72 + :status status 1.73 + :bytes (if (= bytes "-") 0 (Long/parseLong bytes)) 1.74 + :ref (.substring ref 1 (dec (count ref)))}))) 1.75 + 1.76 +;;;; Tallying 1.77 + 1.78 +(defn bump! [map-atom #^String key #^Long delta] 1.79 + (if-let [#^AtomicLong counter (get @map-atom key)] 1.80 + (.addAndGet counter delta) 1.81 + (swap! map-atom #(assoc % (String. key) 1.82 + (if-let [#^AtomicLong counter (get % key)] 1.83 + (AtomicLong. (+ (.get counter) delta)) 1.84 + (AtomicLong. delta)))))) 1.85 + 1.86 +(def article-re #"^/ongoing/When/\d\d\dx/\d\d\d\d/\d\d/\d\d/[^ .]+$") 1.87 + 1.88 +(defn tally! [{:keys [url-hits url-bytes clients refs s404s]} records] 1.89 + (doseq [{:keys [#^String url bytes client status #^String ref]} records] 1.90 + (if (= status "404") 1.91 + (bump! s404s url 1) 1.92 + (do (bump! url-bytes url bytes) 1.93 + (when (when (.startsWith url "/ongoing/When/") 1.94 + (re-matches article-re url)) 1.95 + (bump! url-hits url 1) 1.96 + (bump! clients client 1) 1.97 + (when-not (or (= ref "-") 1.98 + (.startsWith ref "http://www.tbray.org/ongoing/")) 1.99 + (bump! refs ref 1))))))) 1.100 + 1.101 +;;;; Reporting 1.102 + 1.103 +(defn truncate [s n] 1.104 + (if (> (count s) n) (str (.substring s 0 n) "...") s)) 1.105 + 1.106 +(defn print-top10 [results label & [shrink?]] 1.107 + (println "Top" label) 1.108 + (let [fmt (if shrink? " %9.1fM: %s" " %10d: %s")] 1.109 + (doseq [[k v] (take 10 results)] 1.110 + (let [v (if shrink? (/ v 1024.0 1024.0) (long v))] 1.111 + (println (format fmt v (truncate k 60)))))) 1.112 + (println)) 1.113 + 1.114 +(defn sort-by-vals-desc [m] 1.115 + (sort-by #(- (val %)) m)) 1.116 + 1.117 + 1.118 + 1.119 +(defn take-greatest-vals [n m] 1.120 + (when-let [m (seq m)] 1.121 + (reduce (fn [best x] 1.122 + (if (>= (val x) (val (last best))) 1.123 + (vec (take n (sort-by-vals-desc (conj best x)))) 1.124 + best)) 1.125 + [(first m)] (rest m)))) 1.126 + 1.127 +(defn report [tallies state] 1.128 + (->> state 1.129 + (map (fn [[tally rows]] (str (count @rows) " " (name tally)))) 1.130 + (join ", ") 1.131 + (println)) 1.132 + (println) 1.133 + (->> tallies 1.134 + (pmap (fn [[tally & options]] 1.135 + (cons (take-greatest-vals 10 (sort-by-vals-desc @(state tally))) 1.136 + options))) 1.137 + (map #(apply print-top10 %)) 1.138 + (dorun))) 1.139 + 1.140 + ;;;; Main 1.141 + 1.142 +(def tallies [[:url-hits "URIs by hit"] 1.143 + [:url-bytes "URIs by bytes" :shrink] 1.144 + [:s404s "404s"] 1.145 + [:clients "client addresses"] 1.146 + [:refs "referrers"]]) 1.147 + 1.148 +(defn wf-atoms [file] 1.149 + (let [chunk-count (int (/ (.length (java.io.File. file)) (* 32 1024 1024))) 1.150 + state (zipmap (map first tallies) (repeatedly #(atom {})))] 1.151 + (dorun 1.152 + (pmap (fn [[idx [start end]]] 1.153 + (println (str "Chunk " idx "/" chunk-count 1.154 + " (" start " -> " end ")")) 1.155 + (->> (read-lines-range file start end) 1.156 + (parse-lines) 1.157 + (tally! state))) 1.158 + (indexed (chunk-file file chunk-count)))) 1.159 + (time (report tallies state)))) 1.160 + 1.161 +