Skip to content

Commit

Permalink
Add stream credentials dialog and storage with support for twitter.
Browse files Browse the repository at this point in the history
  • Loading branch information
drewr committed Jun 12, 2013
1 parent 2556f93 commit 6d8ea18
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 6 deletions.
3 changes: 2 additions & 1 deletion project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
[org.elasticsearch/elasticsearch-river-wikipedia
"1.2.0-SNAPSHOT"]
[org.twitter4j/twitter4j-stream "3.0.3"]
[slingshot "0.10.3"]]
[slingshot "0.10.3"]
[clj-oauth "1.4.0"]]
:plugins [[lein-bin "0.3.2"]]
:main stream2es.main
:bin {:bootclasspath true})
54 changes: 54 additions & 0 deletions src/stream2es/auth.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
(ns stream2es.auth
(:require [clojure.edn :as edn]
[clojure.java.io :as io]
[clojure.pprint :as pp]
[oauth.client :as oauth]
[slingshot.slingshot :refer [try+ throw+]]
[stream2es.log :as log]
[stream2es.util.io :as uio])
(:import (java.io FileNotFoundException)))

(def make-oauth-consumer oauth/make-consumer)

(defn get-token! [consumer]
(let [request (oauth/request-token consumer)
uri (oauth/user-approval-uri consumer (:oauth_token request))
_ (do
(println (apply str (take 80 (repeat "-"))))
(println "Visit this URL...")
(println)
(println " " uri)
(println)
(print "...and enter VERIFICATION CODE: ")
(flush))
code (-> System/in io/reader .readLine .trim)]
(oauth/access-token consumer request code)))

(defn get-creds
([authinfo]
(try
(->> (slurp authinfo) edn/read-string)
(catch FileNotFoundException _
(throw+ {:type ::nocreds}
"credentials %s not found" authinfo))))
([authinfo & credtypes]
(let [eligible (fn [entry]
(if (seq credtypes)
((into #{} credtypes) (:type entry))
entry))]
(->> (get-creds authinfo)
(filter eligible)
(sort-by :created)))))

(defn get-current-cred [authinfo type]
(last (get-creds authinfo type)))

(defn store-creds [authinfo creds]
(let [current (get-creds authinfo)]
(->> creds
(conj (get-creds authinfo))
pp/pprint
with-out-str
(spit authinfo)))
(uio/chmod-0600 authinfo))

12 changes: 12 additions & 0 deletions src/stream2es/main.clj
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
[stream2es.stream.twitter :as twitter])
(:require [cheshire.core :as json]
[clojure.tools.cli :refer [cli]]
[stream2es.auth :as auth]
[stream2es.log :as log]
[stream2es.es :as es]
[stream2es.size :refer [size-of]]
Expand All @@ -17,6 +18,7 @@
[stream2es.util.time :as time]
[slingshot.slingshot :refer [try+ throw+]])
(:import (clojure.lang ExceptionInfo)
(java.io FileNotFoundException)
(java.util.concurrent CountDownLatch
LinkedBlockingQueue
TimeUnit)))
Expand Down Expand Up @@ -53,6 +55,11 @@
["--replace" "Delete index before streaming" :flag true :default false]
["--indexing" "Whether to actually send data to ES"
:flag true :default true]
["--authinfo" "Stored stream credentials"
:default (str
(System/getProperty "user.home")
(java.io.File/separator)
".authinfo.stream2es")]
["-u" "--es" "ES location" :default "http:https://localhost:9200"]
["-h" "--help" "Display help" :flag true :default false]])

Expand Down Expand Up @@ -394,9 +401,14 @@
[optmap args _] (parse-opts args main-plus-cmd-specs)]
(when (:help optmap)
(quit (help stream)))
(when (and (= cmd 'twitter) (:authorize optmap))
(auth/store-creds (:authinfo optmap) (twitter/make-creds optmap))
(quit "*** Success! Credentials saved to %s" (:authinfo optmap)))
(if (:version optmap)
(quit (version))
(main (assoc optmap :stream stream :cmd cmd))))
(catch [:type :stream2es.auth/nocreds] _
(quit (format "Error: %s" (:message &throw-context))))
(catch [:type ::badcmd] _
(quit (format "Error: %s\n\n%s" (:message &throw-context) (help))))
(catch [:type ::badarg] _
Expand Down
27 changes: 24 additions & 3 deletions src/stream2es/stream/twitter.clj
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
(ns stream2es.stream.twitter
(:require [cheshire.core :as json]
[stream2es.auth :as auth]
[stream2es.stream :refer [new Stream Streamable
StreamStorage CommandLine]]
[stream2es.util.data :refer [maybe-update-in]])
[stream2es.util.data :refer [maybe-update-in]]
[stream2es.util.time :as time])
(:import (twitter4j.conf ConfigurationBuilder)
(twitter4j TwitterStreamFactory RawStreamListener)
(twitter4j.json DataObjectFactory)))
Expand All @@ -28,8 +30,9 @@
:parse-fn #(Integer/parseInt %)]
["-i" "--index" "ES index" :default "twitter"]
["-t" "--type" "ES document type" :default "status"]
["--user" "Twitter username" :default ""]
["--pass" "Twitter password" :default ""]
["--authorize" "Create oauth credentials" :flag true :default false]
["--twitter-key" "Twitter app consumer key"]
["--twitter-secret" "Twitter app consumer secret"]
["--stream-buffer" "Buffer up to this many tweets"
:default 1000
:parse-fn #(Integer/parseInt %)]])
Expand Down Expand Up @@ -91,3 +94,21 @@
(map (fn [poly]
(conj poly (first poly)))
polys?))

(defn oauth-consumer [opts]
(auth/make-oauth-consumer
(:twitter-key opts)
(:twitter-secret opts)
"http:https://api.twitter.com/oauth/request_token"
"http:https://api.twitter.com/oauth/access_token"
"http:https://api.twitter.com/oauth/authorize"
:hmac-sha1))

(defn make-creds [opts]
(let [tok (auth/get-token! (oauth-consumer opts))]
{:type :twitter
:created (time/now)
:token (:oauth_token tok)
:token-secret (:oauth_token_secret tok)
:key (:twitter-key opts)
:secret (:twitter-secret opts)}))
14 changes: 13 additions & 1 deletion src/stream2es/util/io.clj
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
(ns stream2es.util.io
(:require [clojure.java.io :as io])
(:import (java.util.zip GZIPOutputStream)
(java.net URL)))
(java.net URL)
(java.nio.file.attribute PosixFilePermission)
(java.nio.file FileSystems Files)))

(def file io/file)

Expand All @@ -12,3 +14,13 @@
(let [gz (-> f io/output-stream GZIPOutputStream.)]
(with-open [#^java.io.Writer w (apply io/writer gz options)]
(.write w (str content)))))

(defn get-path [one & more]
(.getPath (FileSystems/getDefault) one (into-array String more)))

(defn chmod-0600 [filename]
(let [path (get-path filename)]
(Files/setPosixFilePermissions
path
#{PosixFilePermission/OWNER_READ
PosixFilePermission/OWNER_WRITE})))
6 changes: 5 additions & 1 deletion src/stream2es/util/time.clj
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
(ns stream2es.util.time
(:import (SimpleDateFormat)))
(:import (java.text SimpleDateFormat)))

(defn now []
(.format (SimpleDateFormat. "yyyy-MM-dd'T'HH:mm:ssZ")
(java.util.Date.)))

(defn minsecs [secs]
(let [mins (Math/floor (float (/ secs 60)))
Expand Down

0 comments on commit 6d8ea18

Please sign in to comment.