rlm@0
|
1 (ns coderloop.parallel-io
|
rlm@0
|
2 (:refer-clojure :only [])
|
rlm@0
|
3 (:require rlm.ns-rlm rlm.light-base))
|
rlm@0
|
4 (rlm.ns-rlm/ns-clone rlm.light-base)
|
rlm@0
|
5
|
rlm@0
|
6 (import '[java.io FileInputStream RandomAccessFile])
|
rlm@0
|
7 (use '(clojure.contrib [str-utils2 :only [join]]))
|
rlm@0
|
8 (import 'java.util.concurrent.atomic.AtomicLong)
|
rlm@0
|
9
|
rlm@0
|
10 ;;(set! *warn-on-reflection* true)
|
rlm@0
|
11
|
rlm@0
|
12
|
rlm@0
|
13
|
rlm@0
|
14
|
rlm@0
|
15
|
rlm@0
|
16 ;;;; Reading
|
rlm@0
|
17
|
rlm@0
|
18 (defn chunk-file
|
rlm@0
|
19 "Partitions a file into n line-aligned chunks. Returns a list of start and
|
rlm@0
|
20 end byte offset pairs."
|
rlm@0
|
21 [filename n]
|
rlm@0
|
22 (with-open [file (java.io.RandomAccessFile. filename "r")]
|
rlm@0
|
23 (let [offsets (for [offset (range 0 (.length file) (/ (.length file) n))]
|
rlm@0
|
24 (do (when-not (zero? offset)
|
rlm@0
|
25 (.seek file offset)
|
rlm@0
|
26 (while (not= (.read file) (int \newline))))
|
rlm@0
|
27 (.getFilePointer file)))
|
rlm@0
|
28 offsets (concat offsets [(.length file)])]
|
rlm@0
|
29 (doall (partition 2 (interleave offsets (rest offsets)))))))
|
rlm@0
|
30
|
rlm@0
|
31
|
rlm@0
|
32 (defn read-lines-range [file start-byte end-byte]
|
rlm@0
|
33 "Returns a lazy sequence of lines from file between start-byte and end-byte."
|
rlm@0
|
34 (let [reader (-> (doto (java.io.FileInputStream. file)
|
rlm@0
|
35 (.skip start-byte))
|
rlm@0
|
36 (java.io.BufferedInputStream. (* 8 131072))
|
rlm@0
|
37 (java.io.InputStreamReader. "US-ASCII")
|
rlm@0
|
38 (java.io.BufferedReader. 131072))]
|
rlm@0
|
39 (letfn [(read-line [remaining]
|
rlm@0
|
40 (lazy-seq
|
rlm@0
|
41 (if-let [line (and (pos? remaining) (.readLine reader))]
|
rlm@0
|
42 (cons line (read-line (- remaining (.length line))))
|
rlm@0
|
43 (.close reader))))]
|
rlm@0
|
44 (read-line (- end-byte start-byte)))))
|
rlm@0
|
45
|
rlm@0
|
46 (defn #^"[Ljava.lang.String;" dumbest-split
|
rlm@0
|
47 [#^String s c #^"[Ljava.lang.String;" tokens]
|
rlm@0
|
48 (let [len (dec (int (alength tokens)))]
|
rlm@0
|
49 (loop [start (int 0)
|
rlm@0
|
50 i (int 0)]
|
rlm@0
|
51 (let [idx (int (.indexOf s (int c) (int start)))]
|
rlm@0
|
52 (if (or (neg? idx) (>= i len))
|
rlm@0
|
53 (do (aset tokens i (.substring s start))
|
rlm@0
|
54 tokens)
|
rlm@0
|
55 (do (aset tokens i (.substring s start idx))
|
rlm@0
|
56 (recur (inc idx) (inc i))))))))
|
rlm@0
|
57
|
rlm@0
|
58 (defn parse-lines [lines]
|
rlm@0
|
59 (let [ary (make-array String 12)]
|
rlm@0
|
60 (for [#^String line lines
|
rlm@0
|
61 :let [fields (dumbest-split line \space ary)
|
rlm@0
|
62 status (aget fields 8)
|
rlm@0
|
63 bytes (aget fields 9)
|
rlm@0
|
64 #^String ref (aget fields 10)]
|
rlm@0
|
65 :when (= (aget fields 5) "\"GET")
|
rlm@0
|
66 :when ('#{"200" "304" "404"} status)]
|
rlm@0
|
67 {:client (aget fields 0)
|
rlm@0
|
68 :url (aget fields 6)
|
rlm@0
|
69 :status status
|
rlm@0
|
70 :bytes (if (= bytes "-") 0 (Long/parseLong bytes))
|
rlm@0
|
71 :ref (.substring ref 1 (dec (count ref)))})))
|
rlm@0
|
72
|
rlm@0
|
73 ;;;; Tallying
|
rlm@0
|
74
|
rlm@0
|
75 (defn bump! [map-atom #^String key #^Long delta]
|
rlm@0
|
76 (if-let [#^AtomicLong counter (get @map-atom key)]
|
rlm@0
|
77 (.addAndGet counter delta)
|
rlm@0
|
78 (swap! map-atom #(assoc % (String. key)
|
rlm@0
|
79 (if-let [#^AtomicLong counter (get % key)]
|
rlm@0
|
80 (AtomicLong. (+ (.get counter) delta))
|
rlm@0
|
81 (AtomicLong. delta))))))
|
rlm@0
|
82
|
rlm@0
|
83 (def article-re #"^/ongoing/When/\d\d\dx/\d\d\d\d/\d\d/\d\d/[^ .]+$")
|
rlm@0
|
84
|
rlm@0
|
85 (defn tally! [{:keys [url-hits url-bytes clients refs s404s]} records]
|
rlm@0
|
86 (doseq [{:keys [#^String url bytes client status #^String ref]} records]
|
rlm@0
|
87 (if (= status "404")
|
rlm@0
|
88 (bump! s404s url 1)
|
rlm@0
|
89 (do (bump! url-bytes url bytes)
|
rlm@0
|
90 (when (when (.startsWith url "/ongoing/When/")
|
rlm@0
|
91 (re-matches article-re url))
|
rlm@0
|
92 (bump! url-hits url 1)
|
rlm@0
|
93 (bump! clients client 1)
|
rlm@0
|
94 (when-not (or (= ref "-")
|
rlm@0
|
95 (.startsWith ref "http://www.tbray.org/ongoing/"))
|
rlm@0
|
96 (bump! refs ref 1)))))))
|
rlm@0
|
97
|
rlm@0
|
98 ;;;; Reporting
|
rlm@0
|
99
|
rlm@0
|
100 (defn truncate [s n]
|
rlm@0
|
101 (if (> (count s) n) (str (.substring s 0 n) "...") s))
|
rlm@0
|
102
|
rlm@0
|
103 (defn print-top10 [results label & [shrink?]]
|
rlm@0
|
104 (println "Top" label)
|
rlm@0
|
105 (let [fmt (if shrink? " %9.1fM: %s" " %10d: %s")]
|
rlm@0
|
106 (doseq [[k v] (take 10 results)]
|
rlm@0
|
107 (let [v (if shrink? (/ v 1024.0 1024.0) (long v))]
|
rlm@0
|
108 (println (format fmt v (truncate k 60))))))
|
rlm@0
|
109 (println))
|
rlm@0
|
110
|
rlm@0
|
111 (defn sort-by-vals-desc [m]
|
rlm@0
|
112 (sort-by #(- (val %)) m))
|
rlm@0
|
113
|
rlm@0
|
114
|
rlm@0
|
115
|
rlm@0
|
116 (defn take-greatest-vals [n m]
|
rlm@0
|
117 (when-let [m (seq m)]
|
rlm@0
|
118 (reduce (fn [best x]
|
rlm@0
|
119 (if (>= (val x) (val (last best)))
|
rlm@0
|
120 (vec (take n (sort-by-vals-desc (conj best x))))
|
rlm@0
|
121 best))
|
rlm@0
|
122 [(first m)] (rest m))))
|
rlm@0
|
123
|
rlm@0
|
124 (defn report [tallies state]
|
rlm@0
|
125 (->> state
|
rlm@0
|
126 (map (fn [[tally rows]] (str (count @rows) " " (name tally))))
|
rlm@0
|
127 (join ", ")
|
rlm@0
|
128 (println))
|
rlm@0
|
129 (println)
|
rlm@0
|
130 (->> tallies
|
rlm@0
|
131 (pmap (fn [[tally & options]]
|
rlm@0
|
132 (cons (take-greatest-vals 10 (sort-by-vals-desc @(state tally)))
|
rlm@0
|
133 options)))
|
rlm@0
|
134 (map #(apply print-top10 %))
|
rlm@0
|
135 (dorun)))
|
rlm@0
|
136
|
rlm@0
|
137 ;;;; Main
|
rlm@0
|
138
|
rlm@0
|
139 (def tallies [[:url-hits "URIs by hit"]
|
rlm@0
|
140 [:url-bytes "URIs by bytes" :shrink]
|
rlm@0
|
141 [:s404s "404s"]
|
rlm@0
|
142 [:clients "client addresses"]
|
rlm@0
|
143 [:refs "referrers"]])
|
rlm@0
|
144
|
rlm@0
|
145 (defn wf-atoms [file]
|
rlm@0
|
146 (let [chunk-count (int (/ (.length (java.io.File. file)) (* 32 1024 1024)))
|
rlm@0
|
147 state (zipmap (map first tallies) (repeatedly #(atom {})))]
|
rlm@0
|
148 (dorun
|
rlm@0
|
149 (pmap (fn [[idx [start end]]]
|
rlm@0
|
150 (println (str "Chunk " idx "/" chunk-count
|
rlm@0
|
151 " (" start " -> " end ")"))
|
rlm@0
|
152 (->> (read-lines-range file start end)
|
rlm@0
|
153 (parse-lines)
|
rlm@0
|
154 (tally! state)))
|
rlm@0
|
155 (indexed (chunk-file file chunk-count))))
|
rlm@0
|
156 (time (report tallies state))))
|
rlm@0
|
157
|
rlm@0
|
158
|