diff src/parallel_io.clj @ 0:307a81e46071 tip

initial committ
author Robert McIntyre <rlm@mit.edu>
date Tue, 18 Oct 2011 01:17:49 -0700
parents
children
line wrap: on
line diff
     1.1 --- /dev/null	Thu Jan 01 00:00:00 1970 +0000
     1.2 +++ b/src/parallel_io.clj	Tue Oct 18 01:17:49 2011 -0700
     1.3 @@ -0,0 +1,158 @@
     1.4 +(ns coderloop.parallel-io
     1.5 +    (:refer-clojure :only [])
     1.6 +  (:require rlm.ns-rlm rlm.light-base))
     1.7 +(rlm.ns-rlm/ns-clone rlm.light-base)
     1.8 +
     1.9 +(import '[java.io FileInputStream RandomAccessFile])
    1.10 +(use '(clojure.contrib   [str-utils2 :only [join]]))
    1.11 +(import 'java.util.concurrent.atomic.AtomicLong)
    1.12 +
    1.13 +;;(set! *warn-on-reflection* true)
    1.14 +
    1.15 +
    1.16 +
    1.17 +
    1.18 +
    1.19 +;;;; Reading
    1.20 +
    1.21 +(defn chunk-file
    1.22 +  "Partitions a file into n line-aligned chunks.  Returns a list of start and
    1.23 +  end byte offset pairs."
    1.24 +  [filename n]
    1.25 +  (with-open [file (java.io.RandomAccessFile. filename "r")]
    1.26 +    (let [offsets (for [offset (range 0 (.length file) (/ (.length file) n))]
    1.27 +		    (do (when-not (zero? offset)
    1.28 +			  (.seek file offset)
    1.29 +			  (while (not= (.read file) (int \newline))))
    1.30 +			(.getFilePointer file)))
    1.31 +	  offsets (concat offsets [(.length file)])]
    1.32 +      (doall (partition 2 (interleave offsets (rest offsets)))))))
    1.33 +
    1.34 +
    1.35 +(defn read-lines-range [file start-byte end-byte]
    1.36 +  "Returns a lazy sequence of lines from file between start-byte and end-byte."
    1.37 +  (let [reader (-> (doto (java.io.FileInputStream. file)
    1.38 +		     (.skip start-byte))
    1.39 +		   (java.io.BufferedInputStream. (* 8 131072))
    1.40 +		   (java.io.InputStreamReader. "US-ASCII")
    1.41 +		   (java.io.BufferedReader. 131072))]
    1.42 +    (letfn [(read-line [remaining]
    1.43 +		       (lazy-seq
    1.44 +			(if-let [line (and (pos? remaining) (.readLine reader))]
    1.45 +			  (cons line (read-line (- remaining (.length line))))
    1.46 +			  (.close reader))))]
    1.47 +      (read-line (- end-byte start-byte)))))
    1.48 +
    1.49 +(defn #^"[Ljava.lang.String;" dumbest-split
    1.50 +  [#^String s c #^"[Ljava.lang.String;" tokens]
    1.51 +  (let [len (dec (int (alength tokens)))]
    1.52 +    (loop [start (int 0)
    1.53 +	   i (int 0)]
    1.54 +      (let [idx (int (.indexOf s (int c) (int start)))]
    1.55 +	(if (or (neg? idx) (>= i len))
    1.56 +	  (do (aset tokens i (.substring s start))
    1.57 +	      tokens)
    1.58 +	  (do (aset tokens i (.substring s start idx))
    1.59 +	      (recur (inc idx) (inc i))))))))
    1.60 +
    1.61 +(defn parse-lines [lines]
    1.62 +  (let [ary (make-array String 12)]
    1.63 +    (for [#^String line lines
    1.64 +	  :let [fields (dumbest-split line \space ary)
    1.65 +		status (aget fields 8)
    1.66 +		bytes (aget fields 9)
    1.67 +		#^String ref (aget fields 10)]
    1.68 +	  :when (= (aget fields 5) "\"GET")
    1.69 +	  :when ('#{"200" "304" "404"} status)]
    1.70 +      {:client (aget fields 0)
    1.71 +       :url (aget fields 6)
    1.72 +       :status status
    1.73 +       :bytes (if (= bytes "-") 0 (Long/parseLong bytes))
    1.74 +       :ref (.substring ref 1 (dec (count ref)))})))
    1.75 +
    1.76 +;;;; Tallying
    1.77 +
    1.78 +(defn bump! [map-atom #^String key #^Long delta]
    1.79 +  (if-let [#^AtomicLong counter (get @map-atom key)]
    1.80 +    (.addAndGet counter delta)
    1.81 +    (swap! map-atom #(assoc % (String. key)
    1.82 +			    (if-let [#^AtomicLong counter (get % key)]
    1.83 +			      (AtomicLong. (+ (.get counter) delta))
    1.84 +			      (AtomicLong. delta))))))
    1.85 +
    1.86 +(def article-re #"^/ongoing/When/\d\d\dx/\d\d\d\d/\d\d/\d\d/[^ .]+$")
    1.87 +
    1.88 +(defn tally! [{:keys [url-hits url-bytes clients refs s404s]} records]
    1.89 +  (doseq [{:keys [#^String url bytes client status #^String ref]} records]
    1.90 +    (if (= status "404")
    1.91 +      (bump! s404s url 1)
    1.92 +      (do (bump! url-bytes url bytes)
    1.93 +	  (when (when (.startsWith url "/ongoing/When/")
    1.94 +		  (re-matches article-re url))
    1.95 +	    (bump! url-hits url 1)
    1.96 +	    (bump! clients client 1)
    1.97 +	    (when-not (or (= ref "-")
    1.98 +			  (.startsWith ref "http://www.tbray.org/ongoing/"))
    1.99 +	      (bump! refs ref 1)))))))
   1.100 +
   1.101 +;;;; Reporting
   1.102 +
   1.103 +(defn truncate [s n]
   1.104 +  (if (> (count s) n) (str (.substring s 0 n) "...") s))
   1.105 +
   1.106 +(defn print-top10 [results label & [shrink?]]
   1.107 +  (println "Top" label)
   1.108 +  (let [fmt (if shrink? " %9.1fM: %s" " %10d: %s")]
   1.109 +    (doseq [[k v] (take 10 results)]
   1.110 +      (let [v (if shrink? (/ v 1024.0 1024.0) (long v))]
   1.111 +	(println (format fmt v (truncate k 60))))))
   1.112 +  (println))
   1.113 +
   1.114 +(defn sort-by-vals-desc [m]
   1.115 +  (sort-by #(- (val %)) m))
   1.116 +
   1.117 +
   1.118 +
   1.119 +(defn take-greatest-vals [n m]
   1.120 +  (when-let [m (seq m)]
   1.121 +    (reduce (fn [best x]
   1.122 +	      (if (>= (val x) (val (last best)))
   1.123 +		(vec (take n (sort-by-vals-desc (conj best x))))
   1.124 +		best))
   1.125 +	    [(first m)] (rest m))))
   1.126 +
   1.127 +(defn report [tallies state]
   1.128 +  (->> state
   1.129 +       (map (fn [[tally rows]] (str (count @rows) " " (name tally))))
   1.130 +       (join ", ")
   1.131 +       (println))
   1.132 +  (println)
   1.133 +  (->> tallies
   1.134 +       (pmap (fn [[tally & options]]
   1.135 +	       (cons (take-greatest-vals 10 (sort-by-vals-desc @(state tally)))
   1.136 +		     options)))
   1.137 +       (map #(apply print-top10 %))
   1.138 +       (dorun)))
   1.139 +
   1.140 +						 ;;;; Main
   1.141 +
   1.142 +(def tallies [[:url-hits  "URIs by hit"]
   1.143 +	      [:url-bytes "URIs by bytes" :shrink]
   1.144 +	      [:s404s	   "404s"]
   1.145 +	      [:clients   "client addresses"]
   1.146 +	      [:refs	   "referrers"]])
   1.147 +
   1.148 +(defn wf-atoms [file]
   1.149 +  (let [chunk-count (int (/ (.length (java.io.File. file)) (* 32 1024 1024)))
   1.150 +	state (zipmap (map first tallies) (repeatedly #(atom {})))]
   1.151 +    (dorun
   1.152 +     (pmap (fn [[idx [start end]]]
   1.153 +	     (println (str "Chunk " idx "/" chunk-count
   1.154 +			   " (" start " -> " end ")"))
   1.155 +	     (->> (read-lines-range file start end)
   1.156 +		  (parse-lines)
   1.157 +		  (tally! state)))
   1.158 +	   (indexed (chunk-file file chunk-count))))
   1.159 +    (time (report tallies state))))
   1.160 +
   1.161 +