Mercurial > coderloop
comparison src/parallel_io.clj @ 0:307a81e46071 tip
initial committ
author | Robert McIntyre <rlm@mit.edu> |
---|---|
date | Tue, 18 Oct 2011 01:17:49 -0700 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:307a81e46071 |
---|---|
1 (ns coderloop.parallel-io | |
2 (:refer-clojure :only []) | |
3 (:require rlm.ns-rlm rlm.light-base)) | |
4 (rlm.ns-rlm/ns-clone rlm.light-base) | |
5 | |
6 (import '[java.io FileInputStream RandomAccessFile]) | |
7 (use '(clojure.contrib [str-utils2 :only [join]])) | |
8 (import 'java.util.concurrent.atomic.AtomicLong) | |
9 | |
10 ;;(set! *warn-on-reflection* true) | |
11 | |
12 | |
13 | |
14 | |
15 | |
16 ;;;; Reading | |
17 | |
18 (defn chunk-file | |
19 "Partitions a file into n line-aligned chunks. Returns a list of start and | |
20 end byte offset pairs." | |
21 [filename n] | |
22 (with-open [file (java.io.RandomAccessFile. filename "r")] | |
23 (let [offsets (for [offset (range 0 (.length file) (/ (.length file) n))] | |
24 (do (when-not (zero? offset) | |
25 (.seek file offset) | |
26 (while (not= (.read file) (int \newline)))) | |
27 (.getFilePointer file))) | |
28 offsets (concat offsets [(.length file)])] | |
29 (doall (partition 2 (interleave offsets (rest offsets))))))) | |
30 | |
31 | |
32 (defn read-lines-range [file start-byte end-byte] | |
33 "Returns a lazy sequence of lines from file between start-byte and end-byte." | |
34 (let [reader (-> (doto (java.io.FileInputStream. file) | |
35 (.skip start-byte)) | |
36 (java.io.BufferedInputStream. (* 8 131072)) | |
37 (java.io.InputStreamReader. "US-ASCII") | |
38 (java.io.BufferedReader. 131072))] | |
39 (letfn [(read-line [remaining] | |
40 (lazy-seq | |
41 (if-let [line (and (pos? remaining) (.readLine reader))] | |
42 (cons line (read-line (- remaining (.length line)))) | |
43 (.close reader))))] | |
44 (read-line (- end-byte start-byte))))) | |
45 | |
46 (defn #^"[Ljava.lang.String;" dumbest-split | |
47 [#^String s c #^"[Ljava.lang.String;" tokens] | |
48 (let [len (dec (int (alength tokens)))] | |
49 (loop [start (int 0) | |
50 i (int 0)] | |
51 (let [idx (int (.indexOf s (int c) (int start)))] | |
52 (if (or (neg? idx) (>= i len)) | |
53 (do (aset tokens i (.substring s start)) | |
54 tokens) | |
55 (do (aset tokens i (.substring s start idx)) | |
56 (recur (inc idx) (inc i)))))))) | |
57 | |
58 (defn parse-lines [lines] | |
59 (let [ary (make-array String 12)] | |
60 (for [#^String line lines | |
61 :let [fields (dumbest-split line \space ary) | |
62 status (aget fields 8) | |
63 bytes (aget fields 9) | |
64 #^String ref (aget fields 10)] | |
65 :when (= (aget fields 5) "\"GET") | |
66 :when ('#{"200" "304" "404"} status)] | |
67 {:client (aget fields 0) | |
68 :url (aget fields 6) | |
69 :status status | |
70 :bytes (if (= bytes "-") 0 (Long/parseLong bytes)) | |
71 :ref (.substring ref 1 (dec (count ref)))}))) | |
72 | |
73 ;;;; Tallying | |
74 | |
75 (defn bump! [map-atom #^String key #^Long delta] | |
76 (if-let [#^AtomicLong counter (get @map-atom key)] | |
77 (.addAndGet counter delta) | |
78 (swap! map-atom #(assoc % (String. key) | |
79 (if-let [#^AtomicLong counter (get % key)] | |
80 (AtomicLong. (+ (.get counter) delta)) | |
81 (AtomicLong. delta)))))) | |
82 | |
83 (def article-re #"^/ongoing/When/\d\d\dx/\d\d\d\d/\d\d/\d\d/[^ .]+$") | |
84 | |
85 (defn tally! [{:keys [url-hits url-bytes clients refs s404s]} records] | |
86 (doseq [{:keys [#^String url bytes client status #^String ref]} records] | |
87 (if (= status "404") | |
88 (bump! s404s url 1) | |
89 (do (bump! url-bytes url bytes) | |
90 (when (when (.startsWith url "/ongoing/When/") | |
91 (re-matches article-re url)) | |
92 (bump! url-hits url 1) | |
93 (bump! clients client 1) | |
94 (when-not (or (= ref "-") | |
95 (.startsWith ref "http://www.tbray.org/ongoing/")) | |
96 (bump! refs ref 1))))))) | |
97 | |
98 ;;;; Reporting | |
99 | |
100 (defn truncate [s n] | |
101 (if (> (count s) n) (str (.substring s 0 n) "...") s)) | |
102 | |
103 (defn print-top10 [results label & [shrink?]] | |
104 (println "Top" label) | |
105 (let [fmt (if shrink? " %9.1fM: %s" " %10d: %s")] | |
106 (doseq [[k v] (take 10 results)] | |
107 (let [v (if shrink? (/ v 1024.0 1024.0) (long v))] | |
108 (println (format fmt v (truncate k 60)))))) | |
109 (println)) | |
110 | |
111 (defn sort-by-vals-desc [m] | |
112 (sort-by #(- (val %)) m)) | |
113 | |
114 | |
115 | |
116 (defn take-greatest-vals [n m] | |
117 (when-let [m (seq m)] | |
118 (reduce (fn [best x] | |
119 (if (>= (val x) (val (last best))) | |
120 (vec (take n (sort-by-vals-desc (conj best x)))) | |
121 best)) | |
122 [(first m)] (rest m)))) | |
123 | |
124 (defn report [tallies state] | |
125 (->> state | |
126 (map (fn [[tally rows]] (str (count @rows) " " (name tally)))) | |
127 (join ", ") | |
128 (println)) | |
129 (println) | |
130 (->> tallies | |
131 (pmap (fn [[tally & options]] | |
132 (cons (take-greatest-vals 10 (sort-by-vals-desc @(state tally))) | |
133 options))) | |
134 (map #(apply print-top10 %)) | |
135 (dorun))) | |
136 | |
137 ;;;; Main | |
138 | |
139 (def tallies [[:url-hits "URIs by hit"] | |
140 [:url-bytes "URIs by bytes" :shrink] | |
141 [:s404s "404s"] | |
142 [:clients "client addresses"] | |
143 [:refs "referrers"]]) | |
144 | |
145 (defn wf-atoms [file] | |
146 (let [chunk-count (int (/ (.length (java.io.File. file)) (* 32 1024 1024))) | |
147 state (zipmap (map first tallies) (repeatedly #(atom {})))] | |
148 (dorun | |
149 (pmap (fn [[idx [start end]]] | |
150 (println (str "Chunk " idx "/" chunk-count | |
151 " (" start " -> " end ")")) | |
152 (->> (read-lines-range file start end) | |
153 (parse-lines) | |
154 (tally! state))) | |
155 (indexed (chunk-file file chunk-count)))) | |
156 (time (report tallies state)))) | |
157 | |
158 |