diff --git a/.gitignore b/.gitignore
index 9bdc891..9ef42c0 100644
--- a/.gitignore
+++ b/.gitignore
@@ -19,3 +19,5 @@ activemq-data
.vscode
classpath.txt
javaconfig.json
+
+build.properties
diff --git a/.travis.yml b/.travis.yml
index 94d5896..1c0f80b 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,6 +1,13 @@
language: scala
scala:
- - "2.11.4"
+- 2.11.4
jdk:
- - oraclejdk8
+- oraclejdk8
sudo: false
+env:
+ global:
+ - secure: f9pej7Im4e7dwwv9s/ctP1Mh+RgLF3tlUCPsewLZbHhv1plgbGQnKW10j0B/GZIy+Bc8M1+H/EnNhsHp9QE7mUOMJXM8kW/OlcBMIwxYkHFGBhHND2j7GXwUCXbjctwryjFxOg3hHMwQy+z3K4r9ShdrOnJ9cqLKo3OkTZ5Vi+M=
+ - secure: olvXVQJ6c/uCWXrm7dw/LsWCKEguylLybPoiM40trNIrHMpEcDWQ/2u+fvcQ9KWm8ztQeBSmXwlkHMCMicbh2iTX9W1hmxfzrFE8N33iVet74N72tsRfrFxcUI3HXjPfB3hCF3NbPAeplfxqYXRY2OzNOueigHb4oW6jMf1a97I=
+ - secure: JhqZSvXjlGDMHvVli9Cuv6hwOeCWLywagZsocigtIOI9b6lO4um9qokcNJTg+rbQE+bd7E65zMviJPnwMWzbuTdI4LG5Ldqxr4IQbFrkythWSDMWLHsziFPAYHPZMZPf0tALzf1F7bMj0yo4YulqeeAr/Iy7bpAQPVWPf+Lqox0=
+after_success:
+- ./bin/checkPublish
diff --git a/README.adoc b/README.adoc
index 7311188..f94fa4d 100644
--- a/README.adoc
+++ b/README.adoc
@@ -10,6 +10,16 @@ link:doc/forklift.adoc[Documentation]
== Releases
link:doc/prev_releases.adoc[Previous Releases]
+* *December 4th 2017* - v3.1
+** Kafka: Add internal support for separate dedicated consumers on the same topic
+
+* *December 2nd 2017* - v3.0
+** Upgrade replay logging to utilize Elastic Search 5
+
+* *Novemeber 28th 2017* - v2.3
+** Update build to automate releases to sonatype
+** Align release numbers of subcomponents to avoid having a version 1 of a plugin while a version 2 of the core is being used.
+
* *September 28th 2017* - v2.2
** Eliminate unneeded queries to elasticsearch, improving preformance the performance replay processing
** Fix bug in re-runing failed retries in the retry plugin
diff --git a/bin/checkPublish b/bin/checkPublish
new file mode 100755
index 0000000..c5920c3
--- /dev/null
+++ b/bin/checkPublish
@@ -0,0 +1,9 @@
+#!/bin/bash
+BRANCH=$(if [ "$TRAVIS_PULL_REQUEST" == "false" ]; then echo $TRAVIS_BRANCH; else echo $TRAVIS_PULL_REQUEST_BRANCH; fi)
+echo Found branch $BRANCH
+
+if [ $BRANCH == 'master' ]
+ then
+ echo Publishing to Sonatype
+ sbt publishSigned sonatypeReleaseAll
+fi
diff --git a/build.sbt b/build.sbt
new file mode 100644
index 0000000..e576c05
--- /dev/null
+++ b/build.sbt
@@ -0,0 +1,78 @@
+lazy val baseSettings = Seq(
+ organization := "com.github.dcshock",
+ version := "3.1",
+ scalaVersion := "2.11.7",
+ javacOptions ++= Seq("-source", "1.8"),
+ javacOptions in compile ++= Seq("-g:lines,vars,source", "-deprecation"),
+ javacOptions in doc += "-Xdoclint:none",
+ crossPaths := false,
+ autoScalaLibrary := false,
+ resolvers ++= Seq(
+ "Sonatype OSS Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots",
+ "Maven Central" at "http://repo1.maven.org/maven2",
+ "Fuse Snapshots" at "http://repo.fusesource.com/nexus/content/repositories/snapshots",
+ "Fuse" at "http://repo.fusesource.com/nexus/content/groups/public",
+ "Confluent Maven Repo" at "http://packages.confluent.io/maven/"
+ ),
+ publishTo := {
+ val nexus = "https://oss.sonatype.org/"
+ if (isSnapshot.value)
+ Some("snapshots" at nexus + "content/repositories/snapshots")
+ else
+ Some("releases" at nexus + "service/local/staging/deploy/maven2")
+ },
+ publishMavenStyle := true,
+ credentials += Credentials(
+ "Sonatype Nexus Repository Manager",
+ "oss.sonatype.org",
+ sys.env.getOrElse("SONATYPE_USER", ""),
+ sys.env.getOrElse("SONATYPE_PASS", "")
+ ),
+ useGpg := false,
+ usePgpKeyHex("E46770E4F1ED27F3"),
+ pgpPublicRing := file(sys.props("user.dir")) / "project" / ".gnupg" / "pubring.gpg",
+ pgpSecretRing := file(sys.props("user.dir")) / "project" / ".gnupg" / "secring.gpg",
+ pgpPassphrase := sys.env.get("GPG_PASS").map(_.toArray),
+ pomIncludeRepository := { _ => false },
+ pomExtra := (
+ https://github.com/dcshock/forklift
+
+
+ BSD-style
+ http://www.opensource.org/licenses/bsd-license.php
+ repo
+
+
+
+ git@github.com:dcshock/forklift.git
+ scm:git:git@github.com:dcshock/forklift.git
+
+
+
+ dcshock
+ Matt Conroy
+ http://www.mattconroy.com
+
+
+ afrieze
+ Andrew Frieze
+
+
+ kuroshii
+ Bridger Howell
+
+ )
+)
+
+lazy val core = project in file("core") settings baseSettings
+lazy val replay = project.dependsOn(core) in file("plugins/replay") settings baseSettings
+lazy val retry = project.dependsOn(core) in file("plugins/retry") settings baseSettings
+lazy val stats = project.dependsOn(core) in file("plugins/stats") settings baseSettings
+lazy val activemq = project.dependsOn(core) in file("connectors/activemq") settings baseSettings
+lazy val kafka = project.dependsOn(core) in file("connectors/kafka") settings baseSettings
+lazy val server = project.dependsOn(core, replay, retry, stats, activemq, kafka) in file("server") settings baseSettings
+
+lazy val rootSettings = baseSettings ++ Seq(
+ skip in publish := true
+)
+lazy val root = project in file(".") aggregate(core, replay, retry, stats, activemq, kafka, server) settings rootSettings
diff --git a/connectors/activemq/build.sbt b/connectors/activemq/build.sbt
index 2b01798..b088875 100644
--- a/connectors/activemq/build.sbt
+++ b/connectors/activemq/build.sbt
@@ -1,28 +1,6 @@
-organization := "com.github.dcshock"
-
name := "forklift-activemq"
-version := "2.0"
-
-javacOptions ++= Seq("-source", "1.8")
-
-javacOptions in compile ++= Seq("-g:lines,vars,source")
-
-initialize := {
- val _ = initialize.value
- if (sys.props("java.specification.version") != "1.8")
- sys.error("Java 8 is required for this project.")
-}
-
-resolvers ++= Seq(
- "Sonatype OSS Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots",
- "Maven Central" at "http://repo1.maven.org/maven2",
- "Fuse Snapshots" at "http://repo.fusesource.com/nexus/content/repositories/snapshots",
- "Fuse" at "http://repo.fusesource.com/nexus/content/groups/public"
-)
-
libraryDependencies ++= Seq(
- "com.github.dcshock" % "forklift" % "2.0",
"org.apache.activemq" % "activemq-client" % "5.14.0",
"org.apache.activemq" % "activemq-broker" % "5.14.0",
"com.fasterxml.jackson.core" % "jackson-databind" % "2.7.3",
@@ -42,42 +20,3 @@ libraryDependencies ++= testDependencies.map(_ % "test")
// so disable parallel test execution
parallelExecution in Test := false
testOptions += Tests.Argument(TestFrameworks.JUnit, "-v", "-a")
-
-// Remove scala dependency for pure Java libraries
-autoScalaLibrary := false
-
-// Remove the scala version from the generated/published artifact
-crossPaths := false
-
-publishMavenStyle := true
-
-publishTo := {
- val nexus = "https://oss.sonatype.org/"
- if (isSnapshot.value)
- Some("snapshots" at nexus + "content/repositories/snapshots")
- else
- Some("releases" at nexus + "service/local/staging/deploy/maven2")
-}
-
-pomIncludeRepository := { _ => false }
-
-pomExtra := (
- https://github.com/dcshock/forklift
-
-
- BSD-style
- http://www.opensource.org/licenses/bsd-license.php
- repo
-
-
-
- git@github.com:dcshock/forklift.git
- scm:git:git@github.com:dcshock/forklift.git
-
-
-
- dcshock
- Matt Conroy
- http://www.mattconroy.com
-
- )
diff --git a/connectors/activemq/project/plugins.sbt b/connectors/activemq/project/plugins.sbt
deleted file mode 100644
index 4ce4d9e..0000000
--- a/connectors/activemq/project/plugins.sbt
+++ /dev/null
@@ -1 +0,0 @@
-addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0")
diff --git a/connectors/kafka/build.sbt b/connectors/kafka/build.sbt
index 12017ca..4308670 100644
--- a/connectors/kafka/build.sbt
+++ b/connectors/kafka/build.sbt
@@ -1,32 +1,6 @@
-organization := "com.github.dcshock"
-
name := "forklift-kafka"
-version := "2.0"
-
-//required for some test dependencies
-scalaVersion := "2.11.7"
-
-javacOptions ++= Seq("-source", "1.8")
-
-javacOptions in compile ++= Seq("-g:lines,vars,source")
-
-initialize := {
- val _ = initialize.value
- if (sys.props("java.specification.version") != "1.8")
- sys.error("Java 8 is required for this project.")
-}
-
-resolvers ++= Seq(
- "Sonatype OSS Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots",
- "Maven Central" at "http://repo1.maven.org/maven2",
- "Fuse Snapshots" at "http://repo.fusesource.com/nexus/content/repositories/snapshots",
- "Fuse" at "http://repo.fusesource.com/nexus/content/groups/public",
- "Confluent Maven Repo" at "http://packages.confluent.io/maven/"
-)
-
libraryDependencies ++= Seq(
- "com.github.dcshock" % "forklift" % "2.0" ,
"com.fasterxml.jackson.core" % "jackson-databind" % "2.7.3",
"com.fasterxml.jackson.datatype" % "jackson-datatype-jsr310" % "2.7.3",
"org.apache.kafka" % "kafka-clients" % "0.10.1.1-cp1" exclude("org.slf4j","slf4j-log4j12"),
@@ -55,45 +29,3 @@ testOptions += Tests.Argument(TestFrameworks.JUnit, "-v", "-a")
// avro settings
(javaSource in avroConfig) := baseDirectory(_/"target/generated-sources").value
(sourceDirectory in avroConfig) := baseDirectory(_/"src/test/resources/schemas").value
-
-// Remove scala dependency for pure Java libraries
-autoScalaLibrary := false
-
-// Remove the scala version from the generated/published artifact
-crossPaths := false
-
-publishMavenStyle := true
-
-publishTo := {
- val nexus = "https://oss.sonatype.org/"
- if (isSnapshot.value)
- Some("snapshots" at nexus + "content/repositories/snapshots")
- else
- Some("releases" at nexus + "service/local/staging/deploy/maven2")
-}
-
-pomIncludeRepository := { _ => false }
-
-pomExtra := (
- https://github.com/dcshock/forklift-kafka
-
-
- BSD-style
- http://www.opensource.org/licenses/bsd-license.php
- repo
-
-
-
- git@github.com:dcshock/forklift-kafka.git
- scm:git:git@github.com:dcshock/forklift-kafka.git
-
-
-
- afrieze
- Andrew Frieze
-
-
- kuroshii
- Bridger Howell
-
- )
diff --git a/connectors/kafka/project/plugins.sbt b/connectors/kafka/project/plugins.sbt
index 3a29115..16ad420 100644
--- a/connectors/kafka/project/plugins.sbt
+++ b/connectors/kafka/project/plugins.sbt
@@ -1,3 +1 @@
addSbtPlugin("com.cavorite" % "sbt-avro-1-7" % "1.1.2")
-
-addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0")
diff --git a/connectors/kafka/src/main/java/forklift/connectors/KafkaConnector.java b/connectors/kafka/src/main/java/forklift/connectors/KafkaConnector.java
index 8044d32..ba292bd 100644
--- a/connectors/kafka/src/main/java/forklift/connectors/KafkaConnector.java
+++ b/connectors/kafka/src/main/java/forklift/connectors/KafkaConnector.java
@@ -39,24 +39,24 @@ public class KafkaConnector implements ForkliftConnectorI {
private final String kafkaHosts;
private final String schemaRegistries;
- private final String groupId;
+ private final String defaultGroupId;
private KafkaProducer, ?> kafkaProducer;
private KafkaController controller;
private ForkliftSerializer serializer;
- private Map controllers = new HashMap<>();
+ private Map controllers = new HashMap<>();
/**
* Constructs a new instance of the KafkaConnector
*
* @param kafkaHosts list of kafka servers in host:port,... format
* @param schemaRegistries list of schema registry servers in http://host:port,... format
- * @param groupId the groupId to use when subscribing to topics
+ * @param defaultGroupId the default groupId to use when subscribing to topics
*/
- public KafkaConnector(String kafkaHosts, String schemaRegistries, String groupId) {
+ public KafkaConnector(String kafkaHosts, String schemaRegistries, String defaultGroupId) {
this.kafkaHosts = kafkaHosts;
this.schemaRegistries = schemaRegistries;
- this.groupId = groupId;
+ this.defaultGroupId = defaultGroupId;
this.serializer = new KafkaSerializer(this, newSerializer(), newDeserializer());
}
@@ -101,7 +101,7 @@ private KafkaProducer createKafkaProducer() {
return new KafkaProducer(producerProperties);
}
- private KafkaController createController(String topicName) {
+ private KafkaController createController(String topicName, String groupId) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHosts);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
@@ -137,48 +137,41 @@ public synchronized void stop() throws ConnectorException {
@Override
public ForkliftConsumerI getConsumerForSource(SourceI source) throws ConnectorException {
- return source
- .apply(QueueSource.class, queue -> getQueue(queue.getName()))
- .apply(TopicSource.class, topic -> getTopic(topic.getName()))
- .apply(GroupedTopicSource.class, topic -> getGroupedTopic(topic))
- .apply(RoleInputSource.class, roleSource -> {
- final ForkliftConsumerI rawConsumer = getConsumerForSource(roleSource.getActionSource(this));
- return new RoleInputConsumerWrapper(rawConsumer);
- })
- .elseUnsupportedError();
+ if (source instanceof RoleInputSource) {
+ final RoleInputSource roleSource = (RoleInputSource) source;
+ final ForkliftConsumerI rawConsumer = getConsumerForSource(roleSource.getActionSource(this));
+ return new RoleInputConsumerWrapper(rawConsumer);
+ }
+ return getGroupedTopic(mapToGroupedTopic(source));
}
public synchronized ForkliftConsumerI getGroupedTopic(GroupedTopicSource source) throws ConnectorException {
if (!source.groupSpecified()) {
- source.overrideGroup(groupId);
+ source.overrideGroup(defaultGroupId);
}
- if (!source.getGroup().equals(groupId)) { //TODO actually support GroupedTopics
- throw new ConnectorException("Unexpected group '" + source.getGroup() + "'; only the connector group '" + groupId + "' is allowed");
- }
-
- KafkaController controller = controllers.get(source.getName());
+ KafkaController controller = controllers.get(source);
if (controller != null && controller.isRunning()) {
- log.warn("Consumer for topic already exists under this controller's groupname. Messages will be divided amongst consumers.");
+ log.warn("Consumer for topic and group already exists. Messages will be divided amongst consumers.");
} else {
- controller = createController(source.getName());
- this.controllers.put(source.getName(), controller);
+ controller = createController(source.getName(), source.getGroup());
+ this.controllers.put(source, controller);
controller.start();
}
+
return new KafkaTopicConsumer(source.getName(), controller);
}
@Override
public ForkliftConsumerI getQueue(String name) throws ConnectorException {
- return getGroupedTopic(new GroupedTopicSource(name, groupId));
+ return getGroupedTopic(mapToGroupedTopic(new QueueSource(name)));
}
@Override
public ForkliftConsumerI getTopic(String name) throws ConnectorException {
- return getGroupedTopic(new GroupedTopicSource(name, groupId));
+ return getGroupedTopic(mapToGroupedTopic(new TopicSource(name)));
}
-
@Override
public ForkliftProducerI getQueueProducer(String name) {
return getTopicProducer(name);
@@ -200,11 +193,28 @@ public ActionSource mapSource(LogicalSource source) {
}
protected GroupedTopicSource mapRoleInputSource(RoleInputSource roleSource) {
- return new GroupedTopicSource("forklift-role-" + roleSource.getRole(), groupId);
+ return new GroupedTopicSource("forklift-role-" + roleSource.getRole(), defaultGroupId);
}
@Override
public boolean supportsResponse() {
return true;
}
+
+ /* visible for testing */
+ protected GroupedTopicSource mapToGroupedTopic(SourceI source) {
+ return source
+ .apply(QueueSource.class, queueSource -> new GroupedTopicSource(queueSource.getName(), defaultGroupId))
+ .apply(TopicSource.class, topicSource -> topicToGroupedTopic(topicSource))
+ .apply(GroupedTopicSource.class, groupedTopicSource -> groupedTopicSource)
+ .elseUnsupportedError();
+ }
+
+ private GroupedTopicSource topicToGroupedTopic(TopicSource topicSource) {
+ if (topicSource.getContextClass() == null) {
+ return new GroupedTopicSource(topicSource.getName(), defaultGroupId);
+ }
+ final String groupName = defaultGroupId + "-" + topicSource.getContextClass().getSimpleName();
+ return new GroupedTopicSource(topicSource.getName(), groupName);
+ }
}
diff --git a/connectors/kafka/src/test/java/forklift/connectors/KafkaConnectorTests.java b/connectors/kafka/src/test/java/forklift/connectors/KafkaConnectorTests.java
new file mode 100644
index 0000000..6abcdac
--- /dev/null
+++ b/connectors/kafka/src/test/java/forklift/connectors/KafkaConnectorTests.java
@@ -0,0 +1,58 @@
+package forklift.connectors;
+
+import forklift.source.sources.GroupedTopicSource;
+import forklift.source.sources.QueueSource;
+import forklift.source.sources.TopicSource;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class KafkaConnectorTests {
+ private static final String GROUP_ID = "test-default";
+ private KafkaConnector connector;
+
+ @Before
+ public void setup() throws Exception {
+ connector = new KafkaConnector("blah", "blah", GROUP_ID);
+ }
+
+ @Test
+ public void testQueueMapping() {
+ final String topicName = "test-topic";
+
+ final GroupedTopicSource mappedSource = connector.mapToGroupedTopic(new QueueSource(topicName));
+ Assert.assertEquals(new GroupedTopicSource(topicName, GROUP_ID), mappedSource);
+ }
+
+ @Test
+ public void testTopicMapping() {
+ final String topicName = "test-topic";
+ final TopicSource consumerSource = new TopicSource(topicName);
+ consumerSource.setContextClass(FakeConsumer.class);
+
+ final GroupedTopicSource mappedConsumerSource = connector.mapToGroupedTopic(consumerSource);
+ Assert.assertEquals(new GroupedTopicSource(topicName, "test-default-FakeConsumer"), mappedConsumerSource);
+ }
+
+ @Test
+ public void testDefaultTopicMapping() {
+ final String topicName = "test-topic";
+ final TopicSource anonymousSource = new TopicSource(topicName);
+
+ final GroupedTopicSource mappedConsumerSource = connector.mapToGroupedTopic(anonymousSource);
+ Assert.assertEquals(new GroupedTopicSource(topicName, GROUP_ID), mappedConsumerSource);
+ }
+
+ @Test
+ public void testGroupedTopicMapping() {
+ final String topicName = "test-topic";
+ final String groupId = "test-group";
+ final GroupedTopicSource unmappedSource = new GroupedTopicSource(topicName, groupId);
+
+ final GroupedTopicSource mappedSource = connector.mapToGroupedTopic(unmappedSource);
+ Assert.assertEquals(unmappedSource, mappedSource);
+ }
+
+ private final class FakeConsumer {}
+}
diff --git a/connectors/kafka/src/test/java/forklift/integration/server/KafkaService.java b/connectors/kafka/src/test/java/forklift/integration/server/KafkaService.java
index 9d88f01..e94e4f7 100644
--- a/connectors/kafka/src/test/java/forklift/integration/server/KafkaService.java
+++ b/connectors/kafka/src/test/java/forklift/integration/server/KafkaService.java
@@ -27,6 +27,7 @@ public void run() {
Properties properties = new Properties();
properties.setProperty("broker.id", "1");
properties.setProperty("listeners", "PLAINTEXT://:" + listenPort);
+ properties.put("log.cleaner.dedupe.buffer.size", 2 * 1024 * 1024L);
properties.setProperty("num.network.threads", "3");
properties.setProperty("num.io.threads", "8");
properties.setProperty("socket.send.buffer.bytes", "102400");
diff --git a/core/build.sbt b/core/build.sbt
index 376dd10..7b4c07c 100644
--- a/core/build.sbt
+++ b/core/build.sbt
@@ -1,28 +1,5 @@
-organization := "com.github.dcshock"
-
name := "forklift"
-version := "2.1"
-
-javacOptions ++= Seq("-source", "1.8")
-
-javacOptions in compile ++= Seq("-g:lines,vars,source", "-deprecation")
-
-javacOptions in doc += "-Xdoclint:none"
-
-initialize := {
- val _ = initialize.value
- if (sys.props("java.specification.version") != "1.8")
- sys.error("Java 8 is required for this project.")
-}
-
-resolvers ++= Seq(
- "Sonatype OSS Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots",
- "Maven Central" at "http://repo1.maven.org/maven2",
- "Fuse Snapshots" at "http://repo.fusesource.com/nexus/content/repositories/snapshots",
- "Fuse" at "http://repo.fusesource.com/nexus/content/groups/public"
-)
-
libraryDependencies ++= Seq(
"com.google.guava" % "guava" % "18.0",
"com.fasterxml.jackson.core" % "jackson-databind" % "2.7.3",
@@ -38,50 +15,3 @@ lazy val testDependencies = Seq(
)
libraryDependencies ++= testDependencies.map(_ % "test")
-
-// Remove scala dependency for pure Java libraries
-autoScalaLibrary := false
-
-// Remove the scala version from the generated/published artifact
-crossPaths := false
-
-publishMavenStyle := true
-
-publishTo := {
- val nexus = "https://oss.sonatype.org/"
- if (isSnapshot.value)
- Some("snapshots" at nexus + "content/repositories/snapshots")
- else
- Some("releases" at nexus + "service/local/staging/deploy/maven2")
-}
-
-pomIncludeRepository := { _ => false }
-
-pomExtra := (
- https://github.com/dcshock/forklift
-
-
- BSD-style
- http://www.opensource.org/licenses/bsd-license.php
- repo
-
-
-
- git@github.com:dcshock/forklift.git
- scm:git:git@github.com:dcshock/forklift.git
-
-
-
- dcshock
- Matt Conroy
- http://www.mattconroy.com
-
-
- afrieze
- Andrew Frieze
-
-
- kuroshii
- Bridger Howell
-
- )
diff --git a/core/project/plugins.sbt b/core/project/plugins.sbt
deleted file mode 100644
index 4ce4d9e..0000000
--- a/core/project/plugins.sbt
+++ /dev/null
@@ -1 +0,0 @@
-addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0")
diff --git a/core/src/main/java/forklift/source/sources/GroupedTopicSource.java b/core/src/main/java/forklift/source/sources/GroupedTopicSource.java
index 9377e92..0d961dc 100644
--- a/core/src/main/java/forklift/source/sources/GroupedTopicSource.java
+++ b/core/src/main/java/forklift/source/sources/GroupedTopicSource.java
@@ -50,6 +50,11 @@ public void overrideGroup(String group) {
this.group = group;
}
+ @Override
+ public int hashCode() {
+ return Objects.hash(name, group);
+ }
+
@Override
public boolean equals(Object o) {
if (this == o)
diff --git a/plugins/replay/build.sbt b/plugins/replay/build.sbt
index e7527bc..0c33f36 100644
--- a/plugins/replay/build.sbt
+++ b/plugins/replay/build.sbt
@@ -1,27 +1,8 @@
-organization := "com.github.dcshock"
-
name := "forklift-replay"
-version := "2.2"
-
-javacOptions ++= Seq("-source", "1.8")
-
-initialize := {
- val _ = initialize.value
- if (sys.props("java.specification.version") != "1.8")
- sys.error("Java 8 is required for this project.")
-}
-
-resolvers ++= Seq(
- "Sonatype OSS Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots",
- "Maven Central" at "http://repo1.maven.org/maven2",
- "Fuse Snapshots" at "http://repo.fusesource.com/nexus/content/repositories/snapshots",
- "Fuse" at "http://repo.fusesource.com/nexus/content/groups/public"
-)
-
libraryDependencies ++= Seq(
- "com.github.dcshock" % "forklift" % "2.0",
- "org.elasticsearch" % "elasticsearch" % "2.4.1",
+ "org.elasticsearch" % "elasticsearch" % "2.4.1", // retained for now, to support embedded ES clusters
+ "org.elasticsearch.client" % "rest" % "5.0.2",
"com.fasterxml.jackson.core" % "jackson-databind" % "2.7.3",
"com.fasterxml.jackson.datatype" % "jackson-datatype-jsr310" % "2.7.3"
)
@@ -32,48 +13,3 @@ lazy val testDependencies = Seq(
)
libraryDependencies ++= testDependencies.map(_ % "test")
-
-// Remove scala dependency for pure Java libraries
-autoScalaLibrary := false
-
-// Remove the scala version from the generated/published artifact
-crossPaths := false
-
-publishMavenStyle := true
-
-publishTo := {
- val nexus = "https://oss.sonatype.org/"
- if (isSnapshot.value)
- Some("snapshots" at nexus + "content/repositories/snapshots")
- else
- Some("releases" at nexus + "service/local/staging/deploy/maven2")
-}
-
-// Remove scala dependency for pure Java libraries
-autoScalaLibrary := false
-
-// Remove the scala version from the generated/published artifact
-crossPaths := false
-
-pomIncludeRepository := { _ => false }
-
-pomExtra := (
- https://github.com/dcshock/forklift
-
-
- BSD-style
- http://www.opensource.org/licenses/bsd-license.php
- repo
-
-
-
- git@github.com:dcshock/forklift.git
- scm:git:git@github.com:dcshock/forklift.git
-
-
-
- dcshock
- Matt Conroy
- http://www.mattconroy.com
-
- )
diff --git a/plugins/replay/project/plugins.sbt b/plugins/replay/project/plugins.sbt
deleted file mode 100644
index 4ce4d9e..0000000
--- a/plugins/replay/project/plugins.sbt
+++ /dev/null
@@ -1 +0,0 @@
-addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0")
diff --git a/plugins/replay/src/main/java/forklift/replay/ReplayES.java b/plugins/replay/src/main/java/forklift/replay/ReplayES.java
index c4cc227..7e25a9b 100644
--- a/plugins/replay/src/main/java/forklift/replay/ReplayES.java
+++ b/plugins/replay/src/main/java/forklift/replay/ReplayES.java
@@ -49,7 +49,7 @@ public class ReplayES {
private final Consumer consumer;
/**
- * Constructs a new ReplayES instance using the default transport port (9200) for
+ * Constructs a new ReplayES instance using the default REST port (9200) for
* sending messages to elasticsearch.
*
* @param clientOnly whether the plugin is only an elasticsearch client, or an
@@ -57,6 +57,8 @@ public class ReplayES {
* @param hostname the address of the ES host to send messages to
* @param clusterName the name of the elasticsearch cluster to send logs to
* @param connector the connector to use for queuing messages
+ *
+ * @deprecated use {@link #ReplayES(boolean, String, ForkliftConnectorI)} instead
*/
public ReplayES(boolean clientOnly, String hostname, String clusterName, ForkliftConnectorI connector) {
this(clientOnly, hostname, 9200, clusterName, connector);
@@ -72,8 +74,38 @@ public ReplayES(boolean clientOnly, String hostname, String clusterName, Forklif
* @param port the port number of the ES transport port for the given host
* @param clusterName the name of the elasticsearch cluster to send logs to
* @param connector the connector to use for queuing messages
+ *
+ * @deprecated use {@link #ReplayES(boolean, String, int, ForkliftConnectorI)} instead
*/
public ReplayES(boolean clientOnly, String hostname, int port, String clusterName, ForkliftConnectorI connector) {
+ this(clientOnly, hostname, port, connector);
+ }
+
+ /**
+ * Constructs a new ReplayES instance using the default REST port (9200) for
+ * sending messages to elasticsearch.
+ *
+ * @param clientOnly whether the plugin is only an elasticsearch client, or an
+ * embedded elasticsearch node should be started
+ * @param hostname the address of the ES host to send messages to
+ * @param connector the connector to use for queuing messages
+ */
+ public ReplayES(boolean clientOnly, String hostname, ForkliftConnectorI connector) {
+ this(clientOnly, hostname, 9200, connector);
+ }
+
+
+ /**
+ * Constructs a new ReplayES instance sending messages to elasticsearch based
+ * on the given parameters.
+ *
+ * @param clientOnly whether the plugin is only an elasticsearch client, or an
+ * embedded elasticsearch node should be started
+ * @param hostname the address of the ES host to send messages to
+ * @param port the port number of the ES transport port for the given host
+ * @param connector the connector to use for queuing messages
+ */
+ public ReplayES(boolean clientOnly, String hostname, int port, ForkliftConnectorI connector) {
/*
* Setup the connection to the server. If we are only a client we'll not setup a node locally to run.
* This will help developers and smaller setups avoid the pain of setting up elastic search.
@@ -96,7 +128,7 @@ public ReplayES(boolean clientOnly, String hostname, int port, String clusterNam
}
}
- this.writer = new ReplayESWriter(hostname, port, clusterName);
+ this.writer = new ReplayESWriter(hostname, port);
this.writer.start();
Runtime.getRuntime().addShutdownHook(new Thread() {
@@ -144,7 +176,7 @@ public void shutdown() {
} catch (InterruptedException ignored) {
}
- this.writer.shutdown();
+ this.writer.close();
}
@LifeCycle(value=ProcessStep.Pending, annotation=Replay.class)
diff --git a/plugins/replay/src/main/java/forklift/replay/ReplayESWriter.java b/plugins/replay/src/main/java/forklift/replay/ReplayESWriter.java
index e6746ef..775c944 100644
--- a/plugins/replay/src/main/java/forklift/replay/ReplayESWriter.java
+++ b/plugins/replay/src/main/java/forklift/replay/ReplayESWriter.java
@@ -1,94 +1,190 @@
package forklift.replay;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.client.transport.TransportClient;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.transport.InetSocketTransportAddress;
-import org.elasticsearch.index.VersionType;
-import org.elasticsearch.index.engine.VersionConflictEngineException;
-import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.search.SearchHit;
+import org.apache.http.Header;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpHost;
+import org.apache.http.HttpStatus;
+import org.apache.http.entity.ContentType;
+import org.apache.http.message.BasicHeader;
+import org.apache.http.nio.entity.NStringEntity;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.ResponseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.net.InetSocketAddress;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.Closeable;
+import java.io.IOException;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
+import java.util.Collections;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
-public class ReplayESWriter extends ReplayStoreThread {
+public class ReplayESWriter extends ReplayStoreThread implements Closeable {
private static final Logger log = LoggerFactory.getLogger(ReplayES.class);
+ private static final ObjectMapper mapper = new ObjectMapper();
- private final TransportClient client;
+ private final RestClient restClient;
+ /**
+ * Creates a log writer that sends replay information to elasticsearch over REST,
+ * using the given hostname and port {@code 9200}.
+ *
+ * @param hostname the name or address of the host to connect to
+ */
public ReplayESWriter(String hostname) {
- this(hostname, 9300, "elasticsearch");
+ this(hostname, 9200);
}
- public ReplayESWriter(String hostname, int port, String clusterName) {
- final Settings settings = Settings.settingsBuilder()
- .put("cluster.name", clusterName).build();
-
- this.client = TransportClient.builder()
- .settings(settings)
- .build()
- .addTransportAddress(new InetSocketTransportAddress(new InetSocketAddress(hostname, port)));
+ /**
+ * Creates a log writer that sends replay information to elasticsearch over REST,
+ * using the given hostname and port.
+ *
+ * @param hostname the name or address of the host to connect to
+ * @param port the connection port
+ */
+ public ReplayESWriter(String hostname, int port) {
+ this.restClient = RestClient.builder(new HttpHost(hostname, port, "http"))
+ .setRequestConfigCallback(requestConfig ->
+ requestConfig
+ .setConnectTimeout(3_000)
+ .setSocketTimeout(20_000))
+ .setDefaultHeaders(new Header[] {
+ new BasicHeader("Accept", "application/json; charset=utf-8"),
+ new BasicHeader("Content-Type", "application/json; charset=utf-8")
+ })
+ .setMaxRetryTimeoutMillis(20_000)
+ .build();
}
@Override
- protected void poll(ReplayESWriterMsg t) {
- final String replayVersion = t.getFields().get("forklift-replay-version");
+ protected void poll(ReplayESWriterMsg replayMessage) {
+ final String replayVersion = replayMessage.getFields().get("forklift-replay-version");
if ("3".equals(replayVersion)) { // latest version
- String indexDate = t.getFields().get("first-processed-date");
- if (indexDate == null)
- indexDate = LocalDate.now().format(DateTimeFormatter.BASIC_ISO_DATE);
- final String index = "forklift-replay-" + indexDate;
+ processVersion3Replay(replayMessage);
+ } else if ("2".equals(replayVersion) || replayVersion == null) { // older versions didn't have the replay version or didn't persist it with the message
+ processVersion2OrEarlierReplay(replayMessage);
+ } else {
+ log.error("Unrecognized replay version: '{}' for message ID '{}' with fields '{}'",
+ replayVersion, replayMessage.getId(), replayMessage.getFields());
+ }
+ }
- try {
- // Index the new information.
- client.prepareIndex(index, "log")
- .setVersion(t.getVersion()).setVersionType(VersionType.EXTERNAL_GTE)
- .setId(t.getId()).setSource(t.getFields()).execute().actionGet();
- } catch (VersionConflictEngineException expected) {
- log.debug("Newer replay message already exists", expected);
+ private void processVersion3Replay(final ReplayESWriterMsg replayMessage) {
+ String indexDate = replayMessage.getFields().get("first-processed-date");
+ if (indexDate == null) {
+ indexDate = LocalDate.now().format(DateTimeFormatter.BASIC_ISO_DATE);
+ }
+ final String index = "forklift-replay-" + indexDate;
+
+ indexReplayMessage(replayMessage, index);
+ }
+
+ private void processVersion2OrEarlierReplay(ReplayESWriterMsg replayMessage) {
+ final String index = "forklift-replay-" + LocalDate.now().format(DateTimeFormatter.BASIC_ISO_DATE);
+
+ for (String existingMessageIndex : searchForIndexesWithId(replayMessage.getId())) {
+ if (!existingMessageIndex.equals(index)) {
+ deleteMessageInIndex(replayMessage.getId(), existingMessageIndex);
}
- } else if ("2".equals(replayVersion) || replayVersion == null) { // older versions didn't have the replay version or didn't persist it with the message
- final String index = "forklift-replay-" + LocalDate.now().format(DateTimeFormatter.BASIC_ISO_DATE);
+ }
+
+ indexReplayMessage(replayMessage, index);
+ }
+
+ private void indexReplayMessage(final ReplayESWriterMsg replayMessage, final String index) {
+ final String id = replayMessage.getId();
+ final String endpoint = "/" + index + "/log/" + id;
+
+ final Map queryParams = new HashMap<>();
+ queryParams.put("version_type", "external_gte");
+ queryParams.put("version", "" + replayMessage.getVersion());
+
+ final HttpEntity entity;
+ try {
+ final String entityContents = mapper.writeValueAsString(replayMessage.getFields());
+ entity = new NStringEntity(entityContents, ContentType.APPLICATION_JSON);
+ } catch (JsonProcessingException e) {
+ log.error("Could not write replay fields to JSON: (id {}, fields {})", replayMessage.getId(), replayMessage.getFields(), e);
+ return;
+ }
+
+ try {
+ restClient.performRequest("PUT", endpoint, queryParams, entity);
+ } catch (ResponseException e) {
+ // conflicts are normal when versioned index requests are submitted in the wrong order
+ if (e.getResponse().getStatusLine().getStatusCode() == HttpStatus.SC_CONFLICT) {
+ return;
+ }
+
+ log.error("Error indexing replay message (id {}, fields {})", id, replayMessage.getFields(), e);
+ } catch (IOException e) {
+ log.error("Error indexing replay message (id {}, fields {})", id, replayMessage.getFields(), e);
+ }
+ }
+
+ private Iterable searchForIndexesWithId(final String id) {
+ final String endpoint = "/forklift-replay*/_search";
+ final Map queryParams = new HashMap<>();
+ queryParams.put("q", "_id:" + id);
+ queryParams.put("size", "50");
- // In order to ensure there is only one replay msg for a given id we have to clean the msg id from
- // any previously created indexes.
+ try {
+ final Response response = restClient.performRequest("GET", endpoint, queryParams);
try {
- final SearchResponse resp = client.prepareSearch("forklift-replay*").setTypes("log")
- .setQuery(QueryBuilders.termQuery("_id", t.getId()))
- .setFrom(0).setSize(50).setExplain(false)
- .execute()
- .actionGet();
-
- if (resp != null && resp.getHits() != null && resp.getHits().getHits() != null) {
- for (SearchHit hit : resp.getHits().getHits()) {
- if (!hit.getIndex().equals(index))
- client.prepareDelete(hit.getIndex(), "log", t.getId()).execute().actionGet();
- }
+ final JsonNode jsonResponse = mapper.readTree(response.getEntity().getContent());
+ final JsonNode hits = jsonResponse.get("hits").get("hits");
+ final List indexes = new ArrayList<>();
+
+ for (final JsonNode hit : hits) {
+ final String hitIndex = hit.get("_index").asText();
+ indexes.add(hitIndex);
}
+
+ return indexes;
} catch (Exception e) {
- log.error("", e);
- log.error("Unable to search for old replay logs {}", t.getId());
+ log.error("Error parsing elasticsearch response to search for id {}", id, e);
}
+ } catch (IOException e) {
+ log.error("Error searching for indexes for id {}", id, e);
+ }
- try {
- // Index the new information.
- client.prepareIndex(index, "log")
- .setVersion(t.getVersion()).setVersionType(VersionType.EXTERNAL_GTE)
- .setId(t.getId()).setSource(t.getFields()).execute().actionGet();
- } catch (VersionConflictEngineException expected) {
- log.debug("Newer replay message already exists", expected);
+ return Collections.emptyList();
+ }
+
+ private void deleteMessageInIndex(final String id, final String index) {
+ final String endpoint = "/" + index + "/log/" + id;
+ try {
+ restClient.performRequest("DELETE", endpoint);
+ } catch (ResponseException e) {
+ // sometimes we might try to delete something that was already deleted by something else;
+ // in which case there is no error
+ if (e.getResponse().getStatusLine().getStatusCode() == HttpStatus.SC_NOT_FOUND) {
+ return;
}
- } else {
- log.error("Unrecognized replay version: '{}' for message ID '{}' with fields '{}' ", replayVersion, t.getId(), t.getFields());
+
+ log.error("Error deleting message with id {} for index {}", id, index, e);
+ } catch (IOException e) {
+ log.error("Error deleting message with id {} for index {}", id, index, e);
}
}
- public void shutdown() {
- if (client != null)
- client.close();
+ @Override
+ public void close() {
+ if (restClient != null) {
+ try {
+ restClient.close();
+ } catch (IOException e) {
+ log.error("Couldn't shutdown Elasticsearch REST client", e);
+ }
+ }
}
}
diff --git a/plugins/retry/build.sbt b/plugins/retry/build.sbt
index 481e67b..4be170f 100644
--- a/plugins/retry/build.sbt
+++ b/plugins/retry/build.sbt
@@ -1,26 +1,6 @@
-organization := "com.github.dcshock"
-
name := "forklift-retry"
-version := "2.2"
-
-javacOptions ++= Seq("-source", "1.8")
-
-initialize := {
- val _ = initialize.value
- if (sys.props("java.specification.version") != "1.8")
- sys.error("Java 8 is required for this project.")
-}
-
-resolvers ++= Seq(
- "Sonatype OSS Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots",
- "Maven Central" at "http://repo1.maven.org/maven2",
- "Fuse Snapshots" at "http://repo.fusesource.com/nexus/content/repositories/snapshots",
- "Fuse" at "http://repo.fusesource.com/nexus/content/groups/public"
-)
-
libraryDependencies ++= Seq(
- "com.github.dcshock" % "forklift" % "2.0",
"com.fasterxml.jackson.core" % "jackson-databind" % "2.7.3",
"com.fasterxml.jackson.datatype" % "jackson-datatype-jsr310" % "2.7.3",
"io.searchbox" % "jest" % "2.4.0"
@@ -32,42 +12,3 @@ lazy val testDependencies = Seq(
)
libraryDependencies ++= testDependencies.map(_ % "test")
-
-// Remove scala dependency for pure Java libraries
-autoScalaLibrary := false
-
-// Remove the scala version from the generated/published artifact
-crossPaths := false
-
-publishMavenStyle := true
-
-publishTo := {
- val nexus = "https://oss.sonatype.org/"
- if (isSnapshot.value)
- Some("snapshots" at nexus + "content/repositories/snapshots")
- else
- Some("releases" at nexus + "service/local/staging/deploy/maven2")
-}
-
-pomIncludeRepository := { _ => false }
-
-pomExtra := (
- https://github.com/dcshock/forklift
-
-
- BSD-style
- http://www.opensource.org/licenses/bsd-license.php
- repo
-
-
-
- git@github.com:dcshock/forklift.git
- scm:git:git@github.com:dcshock/forklift.git
-
-
-
- dcshock
- Matt Conroy
- http://www.mattconroy.com
-
- )
diff --git a/plugins/retry/project/plugins.sbt b/plugins/retry/project/plugins.sbt
deleted file mode 100644
index 4ce4d9e..0000000
--- a/plugins/retry/project/plugins.sbt
+++ /dev/null
@@ -1 +0,0 @@
-addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0")
diff --git a/plugins/stats/build.sbt b/plugins/stats/build.sbt
index 24914a1..23f30bd 100644
--- a/plugins/stats/build.sbt
+++ b/plugins/stats/build.sbt
@@ -1,63 +1 @@
-organization := "com.github.dcshock"
-
name := "forklift-stats"
-
-version := "1.0"
-
-javacOptions ++= Seq("-source", "1.8")
-
-initialize := {
- val _ = initialize.value
- if (sys.props("java.specification.version") != "1.8")
- sys.error("Java 8 is required for this project.")
-}
-
-resolvers ++= Seq(
- "Sonatype OSS Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots",
- "Maven Central" at "http://repo1.maven.org/maven2",
- "Fuse Snapshots" at "http://repo.fusesource.com/nexus/content/repositories/snapshots",
- "Fuse" at "http://repo.fusesource.com/nexus/content/groups/public"
-)
-
-libraryDependencies ++= Seq(
- "com.github.dcshock" % "forklift" % "1.0"
-)
-
-// Remove scala dependency for pure Java libraries
-autoScalaLibrary := false
-
-// Remove the scala version from the generated/published artifact
-crossPaths := false
-
-publishMavenStyle := true
-
-publishTo := {
- val nexus = "https://oss.sonatype.org/"
- if (isSnapshot.value)
- Some("snapshots" at nexus + "content/repositories/snapshots")
- else
- Some("releases" at nexus + "service/local/staging/deploy/maven2")
-}
-
-pomIncludeRepository := { _ => false }
-
-pomExtra := (
- https://github.com/dcshock/forklift
-
-
- BSD-style
- http://www.opensource.org/licenses/bsd-license.php
- repo
-
-
-
- git@github.com:dcshock/forklift.git
- scm:git:git@github.com:dcshock/forklift.git
-
-
-
- dcshock
- Matt Conroy
- http://www.mattconroy.com
-
- )
\ No newline at end of file
diff --git a/project/.gnupg/gpg.conf b/project/.gnupg/gpg.conf
new file mode 100644
index 0000000..942678f
--- /dev/null
+++ b/project/.gnupg/gpg.conf
@@ -0,0 +1,196 @@
+# Options for GnuPG
+# Copyright 1998, 1999, 2000, 2001, 2002, 2003,
+# 2010 Free Software Foundation, Inc.
+#
+# This file is free software; as a special exception the author gives
+# unlimited permission to copy and/or distribute it, with or without
+# modifications, as long as this notice is preserved.
+#
+# This file is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY, to the extent permitted by law; without even the
+# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+#
+# Unless you specify which option file to use (with the command line
+# option "--options filename"), GnuPG uses the file ~/.gnupg/gpg.conf
+# by default.
+#
+# An options file can contain any long options which are available in
+# GnuPG. If the first non white space character of a line is a '#',
+# this line is ignored. Empty lines are also ignored.
+#
+# See the man page for a list of options.
+
+# Uncomment the following option to get rid of the copyright notice
+
+#no-greeting
+
+# If you have more than 1 secret key in your keyring, you may want to
+# uncomment the following option and set your preferred keyid.
+
+#default-key 621CC013
+
+# If you do not pass a recipient to gpg, it will ask for one. Using
+# this option you can encrypt to a default key. Key validation will
+# not be done in this case. The second form uses the default key as
+# default recipient.
+
+#default-recipient some-user-id
+#default-recipient-self
+
+# By default GnuPG creates version 4 signatures for data files as
+# specified by OpenPGP. Some earlier (PGP 6, PGP 7) versions of PGP
+# require the older version 3 signatures. Setting this option forces
+# GnuPG to create version 3 signatures.
+
+#force-v3-sigs
+
+# Because some mailers change lines starting with "From " to ">From "
+# it is good to handle such lines in a special way when creating
+# cleartext signatures; all other PGP versions do it this way too.
+# To enable full OpenPGP compliance you may want to use this option.
+
+#no-escape-from-lines
+
+# When verifying a signature made from a subkey, ensure that the cross
+# certification "back signature" on the subkey is present and valid.
+# This protects against a subtle attack against subkeys that can sign.
+# Defaults to --no-require-cross-certification. However for new
+# installations it should be enabled.
+
+require-cross-certification
+
+
+# If you do not use the Latin-1 (ISO-8859-1) charset, you should tell
+# GnuPG which is the native character set. Please check the man page
+# for supported character sets. This character set is only used for
+# metadata and not for the actual message which does not undergo any
+# translation. Note that future version of GnuPG will change to UTF-8
+# as default character set.
+
+#charset utf-8
+
+# Group names may be defined like this:
+# group mynames = paige 0x12345678 joe patti
+#
+# Any time "mynames" is a recipient (-r or --recipient), it will be
+# expanded to the names "paige", "joe", and "patti", and the key ID
+# "0x12345678". Note there is only one level of expansion - you
+# cannot make an group that points to another group. Note also that
+# if there are spaces in the recipient name, this will appear as two
+# recipients. In these cases it is better to use the key ID.
+
+#group mynames = paige 0x12345678 joe patti
+
+# Some old Windows platforms require 8.3 filenames. If your system
+# can handle long filenames, uncomment this.
+
+#no-mangle-dos-filenames
+
+# Lock the file only once for the lifetime of a process. If you do
+# not define this, the lock will be obtained and released every time
+# it is needed - normally this is not needed.
+
+#lock-once
+
+# GnuPG can send and receive keys to and from a keyserver. These
+# servers can be HKP, email, or LDAP (if GnuPG is built with LDAP
+# support).
+#
+# Example HKP keyservers:
+# hkp://keys.gnupg.net
+#
+# Example LDAP keyservers:
+# ldap://pgp.surfnet.nl:11370
+#
+# Regular URL syntax applies, and you can set an alternate port
+# through the usual method:
+# hkp://keyserver.example.net:22742
+#
+# If you have problems connecting to a HKP server through a buggy http
+# proxy, you can use keyserver option broken-http-proxy (see below),
+# but first you should make sure that you have read the man page
+# regarding proxies (keyserver option honor-http-proxy)
+#
+# Most users just set the name and type of their preferred keyserver.
+# Note that most servers (with the notable exception of
+# ldap://keyserver.pgp.com) synchronize changes with each other. Note
+# also that a single server name may actually point to multiple
+# servers via DNS round-robin. hkp://keys.gnupg.net is an example of
+# such a "server", which spreads the load over a number of physical
+# servers. To see the IP address of the server actually used, you may use
+# the "--keyserver-options debug".
+
+keyserver hkp://keys.gnupg.net
+#keyserver http://http-keys.gnupg.net
+#keyserver mailto:pgp-public-keys@keys.nl.pgp.net
+
+# Common options for keyserver functions:
+#
+# include-disabled = when searching, include keys marked as "disabled"
+# on the keyserver (not all keyservers support this).
+#
+# no-include-revoked = when searching, do not include keys marked as
+# "revoked" on the keyserver.
+#
+# verbose = show more information as the keys are fetched.
+# Can be used more than once to increase the amount
+# of information shown.
+#
+# use-temp-files = use temporary files instead of a pipe to talk to the
+# keyserver. Some platforms (Win32 for one) always
+# have this on.
+#
+# keep-temp-files = do not delete temporary files after using them
+# (really only useful for debugging)
+#
+# honor-http-proxy = if the keyserver uses HTTP, honor the http_proxy
+# environment variable
+#
+# broken-http-proxy = try to work around a buggy HTTP proxy
+#
+# auto-key-retrieve = automatically fetch keys as needed from the keyserver
+# when verifying signatures or when importing keys that
+# have been revoked by a revocation key that is not
+# present on the keyring.
+#
+# no-include-attributes = do not include attribute IDs (aka "photo IDs")
+# when sending keys to the keyserver.
+
+#keyserver-options auto-key-retrieve
+
+# Uncomment this line to display photo user IDs in key listings and
+# when a signature from a key with a photo is verified.
+
+#show-photos
+
+# Use this program to display photo user IDs
+#
+# %i is expanded to a temporary file that contains the photo.
+# %I is the same as %i, but the file isn't deleted afterwards by GnuPG.
+# %k is expanded to the key ID of the key.
+# %K is expanded to the long OpenPGP key ID of the key.
+# %t is expanded to the extension of the image (e.g. "jpg").
+# %T is expanded to the MIME type of the image (e.g. "image/jpeg").
+# %f is expanded to the fingerprint of the key.
+# %% is %, of course.
+#
+# If %i or %I are not present, then the photo is supplied to the
+# viewer on standard input. If your platform supports it, standard
+# input is the best way to do this as it avoids the time and effort in
+# generating and then cleaning up a secure temp file.
+#
+# The default program is "xloadimage -fork -quiet -title 'KeyID 0x%k' stdin"
+# On Mac OS X and Windows, the default is to use your regular JPEG image
+# viewer.
+#
+# Some other viewers:
+# photo-viewer "qiv %i"
+# photo-viewer "ee %i"
+# photo-viewer "display -title 'KeyID 0x%k'"
+#
+# This one saves a copy of the photo ID in your home directory:
+# photo-viewer "cat > ~/photoid-for-key-%k.%t"
+#
+# Use your MIME handler to view photos:
+# photo-viewer "metamail -q -d -b -c %T -s 'KeyID 0x%k' -f GnuPG"
+
diff --git a/project/.gnupg/pubring.gpg b/project/.gnupg/pubring.gpg
new file mode 100644
index 0000000..a98f74e
Binary files /dev/null and b/project/.gnupg/pubring.gpg differ
diff --git a/project/.gnupg/pubring.gpg~ b/project/.gnupg/pubring.gpg~
new file mode 100644
index 0000000..a98f74e
Binary files /dev/null and b/project/.gnupg/pubring.gpg~ differ
diff --git a/project/.gnupg/secring.gpg b/project/.gnupg/secring.gpg
new file mode 100644
index 0000000..7367b67
Binary files /dev/null and b/project/.gnupg/secring.gpg differ
diff --git a/project/Build.scala b/project/Build.scala
deleted file mode 100644
index 44284a8..0000000
--- a/project/Build.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-import sbt._
-import Keys._
-
-object ForkliftBuild extends Build {
- lazy val core = Project(
- id = "core",
- base = file("core")
- )
-
- lazy val replay = Project(
- id = "replay",
- base = file("plugins/replay")
- ).dependsOn(core)
-
- lazy val retry = Project(
- id = "retry",
- base = file("plugins/retry")
- ).dependsOn(core)
-
- lazy val stats = Project(
- id = "stats",
- base = file("plugins/stats")
- ).dependsOn(core)
-
- lazy val activemq = Project(
- id = "activemq",
- base = file("connectors/activemq")
- ).dependsOn(core)
-
- lazy val kafka = Project(
- id = "kafka",
- base = file("connectors/kafka")
- ).dependsOn(core)
-
- lazy val server = Project(
- id = "server",
- base = file("server")
- ).dependsOn(core, activemq, replay, retry, stats)
-
-}
diff --git a/project/plugins.sbt b/project/plugins.sbt
index a571e0b..1a0c34b 100644
--- a/project/plugins.sbt
+++ b/project/plugins.sbt
@@ -1,7 +1,5 @@
addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.0.0-M4")
-
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.13.0")
-
-addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0")
-
+addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "2.0")
+addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.1.0")
addSbtPlugin("com.cavorite" % "sbt-avro-1-7" % "1.1.2")
diff --git a/server/build.sbt b/server/build.sbt
index 5d22512..0d41c21 100644
--- a/server/build.sbt
+++ b/server/build.sbt
@@ -1,32 +1,8 @@
-organization := "com.github.dcshock"
-
name := "forklift-server"
-version := "2.2"
-
enablePlugins(JavaAppPackaging)
-javacOptions ++= Seq("-source", "1.8")
-
-initialize := {
- val _ = initialize.value
- if (sys.props("java.specification.version") != "1.8")
- sys.error("Java 8 is required for this project.")
-}
-
-resolvers ++= Seq(
- "Sonatype OSS Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots",
- "Maven Central" at "http://repo1.maven.org/maven2",
- "Fuse Snapshots" at "http://repo.fusesource.com/nexus/content/repositories/snapshots",
- "Fuse" at "http://repo.fusesource.com/nexus/content/groups/public"
-)
-
libraryDependencies ++= Seq(
- "com.github.dcshock" % "forklift" % "2.1",
- "com.github.dcshock" % "forklift-activemq" % "2.0",
- "com.github.dcshock" % "forklift-replay" % "2.2",
- "com.github.dcshock" % "forklift-retry" % "2.2",
- "com.github.dcshock" % "forklift-stats" % "1.0",
"com.github.dcshock" % "consul-rest-client" % "0.10",
"org.apache.activemq" % "activemq-broker" % "5.14.0",
"io.searchbox" % "jest" % "2.0.0",
@@ -41,44 +17,3 @@ libraryDependencies ++= Seq(
"com.novocode" % "junit-interface" % "0.11" % "test",
"commons-io" % "commons-io" % "2.4" % "test"
)
-
-// Remove scala dependency for pure Java libraries
-autoScalaLibrary := false
-
-// Remove the scala version from the generated/published artifact
-crossPaths := false
-
-publishMavenStyle := true
-
-publishTo := {
- val nexus = "https://oss.sonatype.org/"
- if (isSnapshot.value)
- Some("snapshots" at nexus + "content/repositories/snapshots")
- else
- Some("releases" at nexus + "service/local/staging/deploy/maven2")
-}
-
-pomIncludeRepository := { _ => false }
-
-pomExtra := (
- https://github.com/dcshock/forklift
-
-
- BSD-style
- http://www.opensource.org/licenses/bsd-license.php
- repo
-
-
-
- git@github.com:dcshock/forklift.git
- scm:git:git@github.com:dcshock/forklift.git
-
-
-
- dcshock
- Matt Conroy
- http://www.mattconroy.com
-
- )
-
-useGpg := true
diff --git a/server/src/main/java/forklift/ForkliftServer.java b/server/src/main/java/forklift/ForkliftServer.java
index 900f7b3..b85aaa0 100644
--- a/server/src/main/java/forklift/ForkliftServer.java
+++ b/server/src/main/java/forklift/ForkliftServer.java
@@ -16,7 +16,6 @@
import forklift.retry.RetryHandler;
import forklift.stats.StatsCollector;
import org.apache.activemq.broker.BrokerService;
-import org.apache.http.annotation.ThreadSafe;
import java.io.File;
import java.io.FileNotFoundException;
@@ -41,7 +40,6 @@
*
* @author zdavep
*/
-@ThreadSafe
public final class ForkliftServer {
private ExecutorService executor = Executors.newSingleThreadExecutor();