rlm@0: (ns coderloop.top-five) rlm@0: rlm@0: rlm@0: (use 'rlm.shell-inspect) rlm@0: (use 'clojure.contrib.profile) rlm@0: (use '[clojure.contrib [duck-streams :only [file-str read-lines *default-encoding*]]]) rlm@0: (import '(java.nio ByteBuffer CharBuffer) rlm@0: '(java.io PushbackReader InputStream InputStreamReader rlm@0: FileInputStream)) rlm@0: rlm@0: rlm@0: rlm@0: ;; ^{:doc "Name of the default encoding to use when reading & writing. rlm@0: ;; Default is UTF-8." rlm@0: ;; :tag "java.lang.String"} rlm@0: ;; *default-encoding* "US-ASCII") rlm@0: ;;(set! clojure.contrib.duck-streams/*default-encoding* "US-ASCII") rlm@0: rlm@0: ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; rlm@0: rlm@0: (def a (file-str "/home/r/coderloop-test/topfive-a.in")) rlm@0: (def b (file-str "/home/r/coderloop-test/topfive-b.in")) rlm@0: (def c (file-str "/home/r/coderloop-test/topfive-c.in")) rlm@0: (def d (file-str "/home/r/coderloop-test/topfive-d.in")) rlm@0: (def e (file-str "/home/r/coderloop-test/topfive-e.in")) rlm@0: (def f (file-str "/home/r/coderloop-test/topfive-f.in")) rlm@0: rlm@0: (defn get-query-slow [s] rlm@0: (nth (re-matches #"^.*, query=(.*)]$" s) 1)) rlm@0: rlm@0: (defn #^java.lang.String get-query [#^java.lang.String s] rlm@0: (.substring s (clojure.core/+ (.lastIndexOf s "query=") 6) (clojure.core/- (.length s) 1))) rlm@0: rlm@0: rlm@0: (defn chuncked-pmap rlm@0: "helps with paralleization of functions that don't take rlm@0: very much time to execute" rlm@0: [n f coll] rlm@0: (apply concat rlm@0: (pmap rlm@0: (fn [coll] rlm@0: (doall (map f coll))) rlm@0: (partition-all n coll)))) rlm@0: rlm@0: rlm@0: ;; (def *main-map* (atom {})) rlm@0: rlm@0: ;; (declare parse-lines tally!) rlm@0: rlm@0: ;; (defn parse-lines [lines] rlm@0: ;; (map get-query lines)) rlm@0: rlm@0: ;; (defn tally! [state parsed-lines] rlm@0: ;; (doseq [element parsed-lines] rlm@0: ;; (bump! state element 1))) rlm@0: rlm@0: rlm@0: ;; (defn analyze-log [file] rlm@0: ;; (let [chunk-count (int (/ (.length #^java.io.File file) (* 32 1024 1024))) rlm@0: ;; state (atom {})] rlm@0: ;; (dorun rlm@0: ;; (pmap (fn [[idx [start end]]] rlm@0: ;; (println (str "Chunk " idx "/" chunk-count rlm@0: ;; " (" start " -> " end ")")) rlm@0: ;; (->> (read-lines-range file start end) rlm@0: ;; (parse-lines) rlm@0: ;; (tally! state))) rlm@0: ;; (indexed (chunk-file file chunk-count)))) rlm@0: ;; state)) rlm@0: rlm@0: rlm@0: rlm@0: rlm@0: ;; (defn preduce-full rlm@0: ;; "a parallel reduce. Because it is parallel, the reducing rlm@0: ;; function must make sense in a tree-reducing context" rlm@0: ;; [num-chunks mapping-function f coll] rlm@0: ;; (loop [prev coll] rlm@0: ;; (let [new-coll rlm@0: ;; (concat rlm@0: ;; (mapping-function rlm@0: ;; (partial apply f) rlm@0: ;; (partition-all num-chunks prev)))] rlm@0: ;; (if (not (next new-coll)) rlm@0: ;; (first new-coll) rlm@0: ;; (recur new-coll))))) rlm@0: rlm@0: rlm@0: rlm@0: rlm@0: rlm@0: ;;; correctness before speed: rlm@0: (defn top-five-0 [file] rlm@0: (binding [*default-encoding* "US-ASCII"] rlm@0: rlm@0: (map rlm@0: first rlm@0: (take rlm@0: 5 rlm@0: (reverse rlm@0: (sort-by second rlm@0: (reduce rlm@0: (partial merge-with +) rlm@0: (map frequencies rlm@0: (partition-all 2 (map get-query (read-lines file))))))))))) rlm@0: "Elapsed time: 34440.153889 msecs" rlm@0: "Elapsed time: 21072.141885 msecs" rlm@0: "Elapsed time: 21041.711202 msecs" rlm@0: ;; now let's do speed: rlm@0: (defn top-five-1 [file] rlm@0: (map rlm@0: first rlm@0: (take rlm@0: 5 rlm@0: (sort-by (comp - second) rlm@0: (reduce rlm@0: (partial merge-with +) rlm@0: (map frequencies rlm@0: (partition-all rlm@0: 800 (map get-query (read-lines file))))))))) rlm@0: rlm@0: rlm@0: (defn top-five-3 [file] rlm@0: (map rlm@0: first rlm@0: (take 5 rlm@0: (sort-by (comp - second) rlm@0: (frequencies rlm@0: (map get-query (read-lines file))))))) rlm@0: rlm@0: rlm@0: rlm@0: "Elapsed time: 12877.194194 msecs" rlm@0: "Elapsed time: 12282.975191 msecs" rlm@0: ;; got a factor of 2x increase in speed from a larger partition and chuncked-pmap. rlm@0: ;; still too slow.... rlm@0: rlm@0: ;;; let's try the "make it all one big function" approach rlm@0: ;;(set! *warn-on-reflection* true) rlm@0: rlm@0: (defn top-five-2 [file] rlm@0: rlm@0: (let [*top-5* (atom (vector :a :b :c :d :e)) rlm@0: *data* (atom {})] rlm@0: rlm@0: (letfn [(top-ten-helper rlm@0: [line] rlm@0: (let [query (get-query line)] rlm@0: (swap! *data* #(assoc % query (inc (get @*data* query 0)))) rlm@0: (if (> (@*data* query 0) (get @*data* (nth @*top-5* 0) 0)) rlm@0: (do rlm@0: rlm@0: (if-not (contains? (set @*top-5*) query) rlm@0: (do rlm@0: (swap! *top-5* #(assoc % 0 query)) rlm@0: (swap! *top-5* rlm@0: (fn [v] (vec (sort-by #(get @*data* % 0) v))))))))))] rlm@0: rlm@0: (dorun (chuncked-pmap 800 top-ten-helper (read-lines file))) rlm@0: (swap! *top-5* rlm@0: (fn [v] (vec (sort-by #(get @*data* % 0) v)))) rlm@0: (reverse @*top-5*)))) rlm@0: "Elapsed time: 10735.897831 msecs" ;; with chuncked-pmap rlm@0: rlm@0: rlm@0: rlm@0: rlm@0: (if (command-line?) rlm@0: (do rlm@0: (dorun (map println (top-five-3 (file-str (first *command-line-args*))))) rlm@0: (System/exit 0)))