rlm@0: (ns coderloop.parallel-io rlm@0: (:refer-clojure :only []) rlm@0: (:require rlm.ns-rlm rlm.light-base)) rlm@0: (rlm.ns-rlm/ns-clone rlm.light-base) rlm@0: rlm@0: (import '[java.io FileInputStream RandomAccessFile]) rlm@0: (use '(clojure.contrib [str-utils2 :only [join]])) rlm@0: (import 'java.util.concurrent.atomic.AtomicLong) rlm@0: rlm@0: ;;(set! *warn-on-reflection* true) rlm@0: rlm@0: rlm@0: rlm@0: rlm@0: rlm@0: ;;;; Reading rlm@0: rlm@0: (defn chunk-file rlm@0: "Partitions a file into n line-aligned chunks. Returns a list of start and rlm@0: end byte offset pairs." rlm@0: [filename n] rlm@0: (with-open [file (java.io.RandomAccessFile. filename "r")] rlm@0: (let [offsets (for [offset (range 0 (.length file) (/ (.length file) n))] rlm@0: (do (when-not (zero? offset) rlm@0: (.seek file offset) rlm@0: (while (not= (.read file) (int \newline)))) rlm@0: (.getFilePointer file))) rlm@0: offsets (concat offsets [(.length file)])] rlm@0: (doall (partition 2 (interleave offsets (rest offsets))))))) rlm@0: rlm@0: rlm@0: (defn read-lines-range [file start-byte end-byte] rlm@0: "Returns a lazy sequence of lines from file between start-byte and end-byte." rlm@0: (let [reader (-> (doto (java.io.FileInputStream. file) rlm@0: (.skip start-byte)) rlm@0: (java.io.BufferedInputStream. (* 8 131072)) rlm@0: (java.io.InputStreamReader. "US-ASCII") rlm@0: (java.io.BufferedReader. 131072))] rlm@0: (letfn [(read-line [remaining] rlm@0: (lazy-seq rlm@0: (if-let [line (and (pos? remaining) (.readLine reader))] rlm@0: (cons line (read-line (- remaining (.length line)))) rlm@0: (.close reader))))] rlm@0: (read-line (- end-byte start-byte))))) rlm@0: rlm@0: (defn #^"[Ljava.lang.String;" dumbest-split rlm@0: [#^String s c #^"[Ljava.lang.String;" tokens] rlm@0: (let [len (dec (int (alength tokens)))] rlm@0: (loop [start (int 0) rlm@0: i (int 0)] rlm@0: (let [idx (int (.indexOf s (int c) (int start)))] rlm@0: (if (or (neg? idx) (>= i len)) rlm@0: (do (aset tokens i (.substring s start)) rlm@0: tokens) rlm@0: (do (aset tokens i (.substring s start idx)) rlm@0: (recur (inc idx) (inc i)))))))) rlm@0: rlm@0: (defn parse-lines [lines] rlm@0: (let [ary (make-array String 12)] rlm@0: (for [#^String line lines rlm@0: :let [fields (dumbest-split line \space ary) rlm@0: status (aget fields 8) rlm@0: bytes (aget fields 9) rlm@0: #^String ref (aget fields 10)] rlm@0: :when (= (aget fields 5) "\"GET") rlm@0: :when ('#{"200" "304" "404"} status)] rlm@0: {:client (aget fields 0) rlm@0: :url (aget fields 6) rlm@0: :status status rlm@0: :bytes (if (= bytes "-") 0 (Long/parseLong bytes)) rlm@0: :ref (.substring ref 1 (dec (count ref)))}))) rlm@0: rlm@0: ;;;; Tallying rlm@0: rlm@0: (defn bump! [map-atom #^String key #^Long delta] rlm@0: (if-let [#^AtomicLong counter (get @map-atom key)] rlm@0: (.addAndGet counter delta) rlm@0: (swap! map-atom #(assoc % (String. key) rlm@0: (if-let [#^AtomicLong counter (get % key)] rlm@0: (AtomicLong. (+ (.get counter) delta)) rlm@0: (AtomicLong. delta)))))) rlm@0: rlm@0: (def article-re #"^/ongoing/When/\d\d\dx/\d\d\d\d/\d\d/\d\d/[^ .]+$") rlm@0: rlm@0: (defn tally! [{:keys [url-hits url-bytes clients refs s404s]} records] rlm@0: (doseq [{:keys [#^String url bytes client status #^String ref]} records] rlm@0: (if (= status "404") rlm@0: (bump! s404s url 1) rlm@0: (do (bump! url-bytes url bytes) rlm@0: (when (when (.startsWith url "/ongoing/When/") rlm@0: (re-matches article-re url)) rlm@0: (bump! url-hits url 1) rlm@0: (bump! clients client 1) rlm@0: (when-not (or (= ref "-") rlm@0: (.startsWith ref "http://www.tbray.org/ongoing/")) rlm@0: (bump! refs ref 1))))))) rlm@0: rlm@0: ;;;; Reporting rlm@0: rlm@0: (defn truncate [s n] rlm@0: (if (> (count s) n) (str (.substring s 0 n) "...") s)) rlm@0: rlm@0: (defn print-top10 [results label & [shrink?]] rlm@0: (println "Top" label) rlm@0: (let [fmt (if shrink? " %9.1fM: %s" " %10d: %s")] rlm@0: (doseq [[k v] (take 10 results)] rlm@0: (let [v (if shrink? (/ v 1024.0 1024.0) (long v))] rlm@0: (println (format fmt v (truncate k 60)))))) rlm@0: (println)) rlm@0: rlm@0: (defn sort-by-vals-desc [m] rlm@0: (sort-by #(- (val %)) m)) rlm@0: rlm@0: rlm@0: rlm@0: (defn take-greatest-vals [n m] rlm@0: (when-let [m (seq m)] rlm@0: (reduce (fn [best x] rlm@0: (if (>= (val x) (val (last best))) rlm@0: (vec (take n (sort-by-vals-desc (conj best x)))) rlm@0: best)) rlm@0: [(first m)] (rest m)))) rlm@0: rlm@0: (defn report [tallies state] rlm@0: (->> state rlm@0: (map (fn [[tally rows]] (str (count @rows) " " (name tally)))) rlm@0: (join ", ") rlm@0: (println)) rlm@0: (println) rlm@0: (->> tallies rlm@0: (pmap (fn [[tally & options]] rlm@0: (cons (take-greatest-vals 10 (sort-by-vals-desc @(state tally))) rlm@0: options))) rlm@0: (map #(apply print-top10 %)) rlm@0: (dorun))) rlm@0: rlm@0: ;;;; Main rlm@0: rlm@0: (def tallies [[:url-hits "URIs by hit"] rlm@0: [:url-bytes "URIs by bytes" :shrink] rlm@0: [:s404s "404s"] rlm@0: [:clients "client addresses"] rlm@0: [:refs "referrers"]]) rlm@0: rlm@0: (defn wf-atoms [file] rlm@0: (let [chunk-count (int (/ (.length (java.io.File. file)) (* 32 1024 1024))) rlm@0: state (zipmap (map first tallies) (repeatedly #(atom {})))] rlm@0: (dorun rlm@0: (pmap (fn [[idx [start end]]] rlm@0: (println (str "Chunk " idx "/" chunk-count rlm@0: " (" start " -> " end ")")) rlm@0: (->> (read-lines-range file start end) rlm@0: (parse-lines) rlm@0: (tally! state))) rlm@0: (indexed (chunk-file file chunk-count)))) rlm@0: (time (report tallies state)))) rlm@0: rlm@0: