From 62514f3ef6b7bde107bf22d7d92bb24fcb00728f Mon Sep 17 00:00:00 2001 From: Seon Lee Date: Wed, 13 Aug 2014 13:01:45 -0400 Subject: [PATCH] Supports configurable batchsize, to allow for greater than 4000 messages per batch. - Updated project.clj to support latest leiningen version 2.4.3 - Now compiles and generates 'jar' correctly - Depends on storm-core 0.9.1-incubating - KestrelThriftSpout constructors accept int batchsize parameter --- project.clj | 11 +++--- .../storm/spout/KestrelThriftSpout.java | 38 ++++++++++++------- 2 files changed, 29 insertions(+), 20 deletions(-) diff --git a/project.clj b/project.clj index 62aa75e..ea5b333 100644 --- a/project.clj +++ b/project.clj @@ -1,9 +1,8 @@ -(defproject storm/storm-kestrel "0.7.2-snap3" +(defproject storm/storm-kestrel "0.7.2-batchsize" :source-path "src/clj" - :java-source-path "src/jvm" - :javac-options {:debug "true" :fork "true"} - :dependencies [] - :dev-dependencies [[storm "0.7.0"] - [org.clojure/clojure "1.2.0"] + :java-source-paths ["src/jvm"] + :javac-options {:debug "true"} + :dependencies [[org.apache.storm/storm-core "0.9.1-incubating"]] + :dev-dependencies [[org.clojure/clojure "1.2.0"] [org.clojure/clojure-contrib "1.2.0"]] :jvm-opts ["-Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib"]) diff --git a/src/jvm/backtype/storm/spout/KestrelThriftSpout.java b/src/jvm/backtype/storm/spout/KestrelThriftSpout.java index 6129ad0..c52b29c 100644 --- a/src/jvm/backtype/storm/spout/KestrelThriftSpout.java +++ b/src/jvm/backtype/storm/spout/KestrelThriftSpout.java @@ -32,12 +32,13 @@ public class KestrelThriftSpout extends BaseRichSpout { public static Logger LOG = Logger.getLogger(KestrelThriftSpout.class); public static final long BLACKLIST_TIME_MS = 1000 * 60; - public static final int BATCH_SIZE = 4000; + public static final int DEFAULT_BATCH_SIZE = 4000; private List _hosts = null; private int _port = -1; private String _queueName = null; + private int _batchSize = DEFAULT_BATCH_SIZE; private SpoutOutputCollector _collector; private MultiScheme _scheme; @@ -97,33 +98,42 @@ public void closeClient() { } public KestrelThriftSpout(List hosts, int port, String queueName, Scheme scheme) { - this(hosts, port, queueName, new SchemeAsMultiScheme(scheme)); + this(hosts, port, queueName, DEFAULT_BATCH_SIZE, new SchemeAsMultiScheme(scheme)); } - public KestrelThriftSpout(List hosts, int port, String queueName, MultiScheme scheme) { + public KestrelThriftSpout(List hosts, int port, String queueName, int batchSize, MultiScheme scheme) { if(hosts.isEmpty()) { throw new IllegalArgumentException("Must configure at least one host"); } _port = port; _hosts = hosts; _queueName = queueName; + _batchSize = batchSize; _scheme = scheme; } - public KestrelThriftSpout(String hostname, int port, String queueName, Scheme scheme) { - this(hostname, port, queueName, new SchemeAsMultiScheme(scheme)); - } + public KestrelThriftSpout(String hostname, int port, String queueName, int batchSize, Scheme scheme) { + this(hostname, port, queueName, batchSize, new SchemeAsMultiScheme(scheme)); + } - public KestrelThriftSpout(String hostname, int port, String queueName, MultiScheme scheme) { - this(Arrays.asList(hostname), port, queueName, scheme); - } + public KestrelThriftSpout(String hostname, int port, String queueName, Scheme scheme) { + this(hostname, port, queueName, DEFAULT_BATCH_SIZE, new SchemeAsMultiScheme(scheme)); + } + + public KestrelThriftSpout(String hostname, int port, String queueName, int batchSize, MultiScheme scheme) { + this(Arrays.asList(hostname), port, queueName, batchSize, scheme); + } + + public KestrelThriftSpout(String hostname, int port, String queueName, MultiScheme scheme) { + this(Arrays.asList(hostname), port, queueName, DEFAULT_BATCH_SIZE, scheme); + } - public KestrelThriftSpout(String hostname, int port, String queueName) { - this(hostname, port, queueName, new RawMultiScheme()); + public KestrelThriftSpout(String hostname, int port, String queueName, int batchSize) { + this(hostname, port, queueName, batchSize, new RawMultiScheme()); } public KestrelThriftSpout(List hosts, int port, String queueName) { - this(hosts, port, queueName, new RawMultiScheme()); + this(hosts, port, queueName, DEFAULT_BATCH_SIZE, new RawMultiScheme()); } public Fields getOutputFields() { @@ -171,13 +181,13 @@ public boolean bufferKestrelGet(int index) { if(now > info.blacklistTillTimeMs) { List items = null; try { - items = info.getValidClient().get(_queueName, BATCH_SIZE, 0, _messageTimeoutMillis); + items = info.getValidClient().get(_queueName, DEFAULT_BATCH_SIZE, 0, _messageTimeoutMillis); } catch(TException e) { blacklist(info, e); return false; } - assert items.size() <= BATCH_SIZE; + assert items.size() <= DEFAULT_BATCH_SIZE; // LOG.info("Kestrel batch get fetched " + items.size() + " items. (batchSize= " + BATCH_SIZE + // " queueName=" + _queueName + ", index=" + index + ", host=" + info.host + ")");