diff --git a/.gitignore b/.gitignore index 9bdc891..9ef42c0 100644 --- a/.gitignore +++ b/.gitignore @@ -19,3 +19,5 @@ activemq-data .vscode classpath.txt javaconfig.json + +build.properties diff --git a/.travis.yml b/.travis.yml index 94d5896..1c0f80b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,6 +1,13 @@ language: scala scala: - - "2.11.4" +- 2.11.4 jdk: - - oraclejdk8 +- oraclejdk8 sudo: false +env: + global: + - secure: f9pej7Im4e7dwwv9s/ctP1Mh+RgLF3tlUCPsewLZbHhv1plgbGQnKW10j0B/GZIy+Bc8M1+H/EnNhsHp9QE7mUOMJXM8kW/OlcBMIwxYkHFGBhHND2j7GXwUCXbjctwryjFxOg3hHMwQy+z3K4r9ShdrOnJ9cqLKo3OkTZ5Vi+M= + - secure: olvXVQJ6c/uCWXrm7dw/LsWCKEguylLybPoiM40trNIrHMpEcDWQ/2u+fvcQ9KWm8ztQeBSmXwlkHMCMicbh2iTX9W1hmxfzrFE8N33iVet74N72tsRfrFxcUI3HXjPfB3hCF3NbPAeplfxqYXRY2OzNOueigHb4oW6jMf1a97I= + - secure: JhqZSvXjlGDMHvVli9Cuv6hwOeCWLywagZsocigtIOI9b6lO4um9qokcNJTg+rbQE+bd7E65zMviJPnwMWzbuTdI4LG5Ldqxr4IQbFrkythWSDMWLHsziFPAYHPZMZPf0tALzf1F7bMj0yo4YulqeeAr/Iy7bpAQPVWPf+Lqox0= +after_success: +- ./bin/checkPublish diff --git a/README.adoc b/README.adoc index 7311188..f94fa4d 100644 --- a/README.adoc +++ b/README.adoc @@ -10,6 +10,16 @@ link:doc/forklift.adoc[Documentation] == Releases link:doc/prev_releases.adoc[Previous Releases] +* *December 4th 2017* - v3.1 +** Kafka: Add internal support for separate dedicated consumers on the same topic + +* *December 2nd 2017* - v3.0 +** Upgrade replay logging to utilize Elastic Search 5 + +* *Novemeber 28th 2017* - v2.3 +** Update build to automate releases to sonatype +** Align release numbers of subcomponents to avoid having a version 1 of a plugin while a version 2 of the core is being used. + * *September 28th 2017* - v2.2 ** Eliminate unneeded queries to elasticsearch, improving preformance the performance replay processing ** Fix bug in re-runing failed retries in the retry plugin diff --git a/bin/checkPublish b/bin/checkPublish new file mode 100755 index 0000000..c5920c3 --- /dev/null +++ b/bin/checkPublish @@ -0,0 +1,9 @@ +#!/bin/bash +BRANCH=$(if [ "$TRAVIS_PULL_REQUEST" == "false" ]; then echo $TRAVIS_BRANCH; else echo $TRAVIS_PULL_REQUEST_BRANCH; fi) +echo Found branch $BRANCH + +if [ $BRANCH == 'master' ] + then + echo Publishing to Sonatype + sbt publishSigned sonatypeReleaseAll +fi diff --git a/build.sbt b/build.sbt new file mode 100644 index 0000000..e576c05 --- /dev/null +++ b/build.sbt @@ -0,0 +1,78 @@ +lazy val baseSettings = Seq( + organization := "com.github.dcshock", + version := "3.1", + scalaVersion := "2.11.7", + javacOptions ++= Seq("-source", "1.8"), + javacOptions in compile ++= Seq("-g:lines,vars,source", "-deprecation"), + javacOptions in doc += "-Xdoclint:none", + crossPaths := false, + autoScalaLibrary := false, + resolvers ++= Seq( + "Sonatype OSS Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots", + "Maven Central" at "http://repo1.maven.org/maven2", + "Fuse Snapshots" at "http://repo.fusesource.com/nexus/content/repositories/snapshots", + "Fuse" at "http://repo.fusesource.com/nexus/content/groups/public", + "Confluent Maven Repo" at "http://packages.confluent.io/maven/" + ), + publishTo := { + val nexus = "https://oss.sonatype.org/" + if (isSnapshot.value) + Some("snapshots" at nexus + "content/repositories/snapshots") + else + Some("releases" at nexus + "service/local/staging/deploy/maven2") + }, + publishMavenStyle := true, + credentials += Credentials( + "Sonatype Nexus Repository Manager", + "oss.sonatype.org", + sys.env.getOrElse("SONATYPE_USER", ""), + sys.env.getOrElse("SONATYPE_PASS", "") + ), + useGpg := false, + usePgpKeyHex("E46770E4F1ED27F3"), + pgpPublicRing := file(sys.props("user.dir")) / "project" / ".gnupg" / "pubring.gpg", + pgpSecretRing := file(sys.props("user.dir")) / "project" / ".gnupg" / "secring.gpg", + pgpPassphrase := sys.env.get("GPG_PASS").map(_.toArray), + pomIncludeRepository := { _ => false }, + pomExtra := ( + https://github.com/dcshock/forklift + + + BSD-style + http://www.opensource.org/licenses/bsd-license.php + repo + + + + git@github.com:dcshock/forklift.git + scm:git:git@github.com:dcshock/forklift.git + + + + dcshock + Matt Conroy + http://www.mattconroy.com + + + afrieze + Andrew Frieze + + + kuroshii + Bridger Howell + + ) +) + +lazy val core = project in file("core") settings baseSettings +lazy val replay = project.dependsOn(core) in file("plugins/replay") settings baseSettings +lazy val retry = project.dependsOn(core) in file("plugins/retry") settings baseSettings +lazy val stats = project.dependsOn(core) in file("plugins/stats") settings baseSettings +lazy val activemq = project.dependsOn(core) in file("connectors/activemq") settings baseSettings +lazy val kafka = project.dependsOn(core) in file("connectors/kafka") settings baseSettings +lazy val server = project.dependsOn(core, replay, retry, stats, activemq, kafka) in file("server") settings baseSettings + +lazy val rootSettings = baseSettings ++ Seq( + skip in publish := true +) +lazy val root = project in file(".") aggregate(core, replay, retry, stats, activemq, kafka, server) settings rootSettings diff --git a/connectors/activemq/build.sbt b/connectors/activemq/build.sbt index 2b01798..b088875 100644 --- a/connectors/activemq/build.sbt +++ b/connectors/activemq/build.sbt @@ -1,28 +1,6 @@ -organization := "com.github.dcshock" - name := "forklift-activemq" -version := "2.0" - -javacOptions ++= Seq("-source", "1.8") - -javacOptions in compile ++= Seq("-g:lines,vars,source") - -initialize := { - val _ = initialize.value - if (sys.props("java.specification.version") != "1.8") - sys.error("Java 8 is required for this project.") -} - -resolvers ++= Seq( - "Sonatype OSS Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots", - "Maven Central" at "http://repo1.maven.org/maven2", - "Fuse Snapshots" at "http://repo.fusesource.com/nexus/content/repositories/snapshots", - "Fuse" at "http://repo.fusesource.com/nexus/content/groups/public" -) - libraryDependencies ++= Seq( - "com.github.dcshock" % "forklift" % "2.0", "org.apache.activemq" % "activemq-client" % "5.14.0", "org.apache.activemq" % "activemq-broker" % "5.14.0", "com.fasterxml.jackson.core" % "jackson-databind" % "2.7.3", @@ -42,42 +20,3 @@ libraryDependencies ++= testDependencies.map(_ % "test") // so disable parallel test execution parallelExecution in Test := false testOptions += Tests.Argument(TestFrameworks.JUnit, "-v", "-a") - -// Remove scala dependency for pure Java libraries -autoScalaLibrary := false - -// Remove the scala version from the generated/published artifact -crossPaths := false - -publishMavenStyle := true - -publishTo := { - val nexus = "https://oss.sonatype.org/" - if (isSnapshot.value) - Some("snapshots" at nexus + "content/repositories/snapshots") - else - Some("releases" at nexus + "service/local/staging/deploy/maven2") -} - -pomIncludeRepository := { _ => false } - -pomExtra := ( - https://github.com/dcshock/forklift - - - BSD-style - http://www.opensource.org/licenses/bsd-license.php - repo - - - - git@github.com:dcshock/forklift.git - scm:git:git@github.com:dcshock/forklift.git - - - - dcshock - Matt Conroy - http://www.mattconroy.com - - ) diff --git a/connectors/activemq/project/plugins.sbt b/connectors/activemq/project/plugins.sbt deleted file mode 100644 index 4ce4d9e..0000000 --- a/connectors/activemq/project/plugins.sbt +++ /dev/null @@ -1 +0,0 @@ -addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0") diff --git a/connectors/kafka/build.sbt b/connectors/kafka/build.sbt index 12017ca..4308670 100644 --- a/connectors/kafka/build.sbt +++ b/connectors/kafka/build.sbt @@ -1,32 +1,6 @@ -organization := "com.github.dcshock" - name := "forklift-kafka" -version := "2.0" - -//required for some test dependencies -scalaVersion := "2.11.7" - -javacOptions ++= Seq("-source", "1.8") - -javacOptions in compile ++= Seq("-g:lines,vars,source") - -initialize := { - val _ = initialize.value - if (sys.props("java.specification.version") != "1.8") - sys.error("Java 8 is required for this project.") -} - -resolvers ++= Seq( - "Sonatype OSS Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots", - "Maven Central" at "http://repo1.maven.org/maven2", - "Fuse Snapshots" at "http://repo.fusesource.com/nexus/content/repositories/snapshots", - "Fuse" at "http://repo.fusesource.com/nexus/content/groups/public", - "Confluent Maven Repo" at "http://packages.confluent.io/maven/" -) - libraryDependencies ++= Seq( - "com.github.dcshock" % "forklift" % "2.0" , "com.fasterxml.jackson.core" % "jackson-databind" % "2.7.3", "com.fasterxml.jackson.datatype" % "jackson-datatype-jsr310" % "2.7.3", "org.apache.kafka" % "kafka-clients" % "0.10.1.1-cp1" exclude("org.slf4j","slf4j-log4j12"), @@ -55,45 +29,3 @@ testOptions += Tests.Argument(TestFrameworks.JUnit, "-v", "-a") // avro settings (javaSource in avroConfig) := baseDirectory(_/"target/generated-sources").value (sourceDirectory in avroConfig) := baseDirectory(_/"src/test/resources/schemas").value - -// Remove scala dependency for pure Java libraries -autoScalaLibrary := false - -// Remove the scala version from the generated/published artifact -crossPaths := false - -publishMavenStyle := true - -publishTo := { - val nexus = "https://oss.sonatype.org/" - if (isSnapshot.value) - Some("snapshots" at nexus + "content/repositories/snapshots") - else - Some("releases" at nexus + "service/local/staging/deploy/maven2") -} - -pomIncludeRepository := { _ => false } - -pomExtra := ( - https://github.com/dcshock/forklift-kafka - - - BSD-style - http://www.opensource.org/licenses/bsd-license.php - repo - - - - git@github.com:dcshock/forklift-kafka.git - scm:git:git@github.com:dcshock/forklift-kafka.git - - - - afrieze - Andrew Frieze - - - kuroshii - Bridger Howell - - ) diff --git a/connectors/kafka/project/plugins.sbt b/connectors/kafka/project/plugins.sbt index 3a29115..16ad420 100644 --- a/connectors/kafka/project/plugins.sbt +++ b/connectors/kafka/project/plugins.sbt @@ -1,3 +1 @@ addSbtPlugin("com.cavorite" % "sbt-avro-1-7" % "1.1.2") - -addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0") diff --git a/connectors/kafka/src/main/java/forklift/connectors/KafkaConnector.java b/connectors/kafka/src/main/java/forklift/connectors/KafkaConnector.java index 8044d32..ba292bd 100644 --- a/connectors/kafka/src/main/java/forklift/connectors/KafkaConnector.java +++ b/connectors/kafka/src/main/java/forklift/connectors/KafkaConnector.java @@ -39,24 +39,24 @@ public class KafkaConnector implements ForkliftConnectorI { private final String kafkaHosts; private final String schemaRegistries; - private final String groupId; + private final String defaultGroupId; private KafkaProducer kafkaProducer; private KafkaController controller; private ForkliftSerializer serializer; - private Map controllers = new HashMap<>(); + private Map controllers = new HashMap<>(); /** * Constructs a new instance of the KafkaConnector * * @param kafkaHosts list of kafka servers in host:port,... format * @param schemaRegistries list of schema registry servers in http://host:port,... format - * @param groupId the groupId to use when subscribing to topics + * @param defaultGroupId the default groupId to use when subscribing to topics */ - public KafkaConnector(String kafkaHosts, String schemaRegistries, String groupId) { + public KafkaConnector(String kafkaHosts, String schemaRegistries, String defaultGroupId) { this.kafkaHosts = kafkaHosts; this.schemaRegistries = schemaRegistries; - this.groupId = groupId; + this.defaultGroupId = defaultGroupId; this.serializer = new KafkaSerializer(this, newSerializer(), newDeserializer()); } @@ -101,7 +101,7 @@ private KafkaProducer createKafkaProducer() { return new KafkaProducer(producerProperties); } - private KafkaController createController(String topicName) { + private KafkaController createController(String topicName, String groupId) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHosts); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); @@ -137,48 +137,41 @@ public synchronized void stop() throws ConnectorException { @Override public ForkliftConsumerI getConsumerForSource(SourceI source) throws ConnectorException { - return source - .apply(QueueSource.class, queue -> getQueue(queue.getName())) - .apply(TopicSource.class, topic -> getTopic(topic.getName())) - .apply(GroupedTopicSource.class, topic -> getGroupedTopic(topic)) - .apply(RoleInputSource.class, roleSource -> { - final ForkliftConsumerI rawConsumer = getConsumerForSource(roleSource.getActionSource(this)); - return new RoleInputConsumerWrapper(rawConsumer); - }) - .elseUnsupportedError(); + if (source instanceof RoleInputSource) { + final RoleInputSource roleSource = (RoleInputSource) source; + final ForkliftConsumerI rawConsumer = getConsumerForSource(roleSource.getActionSource(this)); + return new RoleInputConsumerWrapper(rawConsumer); + } + return getGroupedTopic(mapToGroupedTopic(source)); } public synchronized ForkliftConsumerI getGroupedTopic(GroupedTopicSource source) throws ConnectorException { if (!source.groupSpecified()) { - source.overrideGroup(groupId); + source.overrideGroup(defaultGroupId); } - if (!source.getGroup().equals(groupId)) { //TODO actually support GroupedTopics - throw new ConnectorException("Unexpected group '" + source.getGroup() + "'; only the connector group '" + groupId + "' is allowed"); - } - - KafkaController controller = controllers.get(source.getName()); + KafkaController controller = controllers.get(source); if (controller != null && controller.isRunning()) { - log.warn("Consumer for topic already exists under this controller's groupname. Messages will be divided amongst consumers."); + log.warn("Consumer for topic and group already exists. Messages will be divided amongst consumers."); } else { - controller = createController(source.getName()); - this.controllers.put(source.getName(), controller); + controller = createController(source.getName(), source.getGroup()); + this.controllers.put(source, controller); controller.start(); } + return new KafkaTopicConsumer(source.getName(), controller); } @Override public ForkliftConsumerI getQueue(String name) throws ConnectorException { - return getGroupedTopic(new GroupedTopicSource(name, groupId)); + return getGroupedTopic(mapToGroupedTopic(new QueueSource(name))); } @Override public ForkliftConsumerI getTopic(String name) throws ConnectorException { - return getGroupedTopic(new GroupedTopicSource(name, groupId)); + return getGroupedTopic(mapToGroupedTopic(new TopicSource(name))); } - @Override public ForkliftProducerI getQueueProducer(String name) { return getTopicProducer(name); @@ -200,11 +193,28 @@ public ActionSource mapSource(LogicalSource source) { } protected GroupedTopicSource mapRoleInputSource(RoleInputSource roleSource) { - return new GroupedTopicSource("forklift-role-" + roleSource.getRole(), groupId); + return new GroupedTopicSource("forklift-role-" + roleSource.getRole(), defaultGroupId); } @Override public boolean supportsResponse() { return true; } + + /* visible for testing */ + protected GroupedTopicSource mapToGroupedTopic(SourceI source) { + return source + .apply(QueueSource.class, queueSource -> new GroupedTopicSource(queueSource.getName(), defaultGroupId)) + .apply(TopicSource.class, topicSource -> topicToGroupedTopic(topicSource)) + .apply(GroupedTopicSource.class, groupedTopicSource -> groupedTopicSource) + .elseUnsupportedError(); + } + + private GroupedTopicSource topicToGroupedTopic(TopicSource topicSource) { + if (topicSource.getContextClass() == null) { + return new GroupedTopicSource(topicSource.getName(), defaultGroupId); + } + final String groupName = defaultGroupId + "-" + topicSource.getContextClass().getSimpleName(); + return new GroupedTopicSource(topicSource.getName(), groupName); + } } diff --git a/connectors/kafka/src/test/java/forklift/connectors/KafkaConnectorTests.java b/connectors/kafka/src/test/java/forklift/connectors/KafkaConnectorTests.java new file mode 100644 index 0000000..6abcdac --- /dev/null +++ b/connectors/kafka/src/test/java/forklift/connectors/KafkaConnectorTests.java @@ -0,0 +1,58 @@ +package forklift.connectors; + +import forklift.source.sources.GroupedTopicSource; +import forklift.source.sources.QueueSource; +import forklift.source.sources.TopicSource; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class KafkaConnectorTests { + private static final String GROUP_ID = "test-default"; + private KafkaConnector connector; + + @Before + public void setup() throws Exception { + connector = new KafkaConnector("blah", "blah", GROUP_ID); + } + + @Test + public void testQueueMapping() { + final String topicName = "test-topic"; + + final GroupedTopicSource mappedSource = connector.mapToGroupedTopic(new QueueSource(topicName)); + Assert.assertEquals(new GroupedTopicSource(topicName, GROUP_ID), mappedSource); + } + + @Test + public void testTopicMapping() { + final String topicName = "test-topic"; + final TopicSource consumerSource = new TopicSource(topicName); + consumerSource.setContextClass(FakeConsumer.class); + + final GroupedTopicSource mappedConsumerSource = connector.mapToGroupedTopic(consumerSource); + Assert.assertEquals(new GroupedTopicSource(topicName, "test-default-FakeConsumer"), mappedConsumerSource); + } + + @Test + public void testDefaultTopicMapping() { + final String topicName = "test-topic"; + final TopicSource anonymousSource = new TopicSource(topicName); + + final GroupedTopicSource mappedConsumerSource = connector.mapToGroupedTopic(anonymousSource); + Assert.assertEquals(new GroupedTopicSource(topicName, GROUP_ID), mappedConsumerSource); + } + + @Test + public void testGroupedTopicMapping() { + final String topicName = "test-topic"; + final String groupId = "test-group"; + final GroupedTopicSource unmappedSource = new GroupedTopicSource(topicName, groupId); + + final GroupedTopicSource mappedSource = connector.mapToGroupedTopic(unmappedSource); + Assert.assertEquals(unmappedSource, mappedSource); + } + + private final class FakeConsumer {} +} diff --git a/connectors/kafka/src/test/java/forklift/integration/server/KafkaService.java b/connectors/kafka/src/test/java/forklift/integration/server/KafkaService.java index 9d88f01..e94e4f7 100644 --- a/connectors/kafka/src/test/java/forklift/integration/server/KafkaService.java +++ b/connectors/kafka/src/test/java/forklift/integration/server/KafkaService.java @@ -27,6 +27,7 @@ public void run() { Properties properties = new Properties(); properties.setProperty("broker.id", "1"); properties.setProperty("listeners", "PLAINTEXT://:" + listenPort); + properties.put("log.cleaner.dedupe.buffer.size", 2 * 1024 * 1024L); properties.setProperty("num.network.threads", "3"); properties.setProperty("num.io.threads", "8"); properties.setProperty("socket.send.buffer.bytes", "102400"); diff --git a/core/build.sbt b/core/build.sbt index 376dd10..7b4c07c 100644 --- a/core/build.sbt +++ b/core/build.sbt @@ -1,28 +1,5 @@ -organization := "com.github.dcshock" - name := "forklift" -version := "2.1" - -javacOptions ++= Seq("-source", "1.8") - -javacOptions in compile ++= Seq("-g:lines,vars,source", "-deprecation") - -javacOptions in doc += "-Xdoclint:none" - -initialize := { - val _ = initialize.value - if (sys.props("java.specification.version") != "1.8") - sys.error("Java 8 is required for this project.") -} - -resolvers ++= Seq( - "Sonatype OSS Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots", - "Maven Central" at "http://repo1.maven.org/maven2", - "Fuse Snapshots" at "http://repo.fusesource.com/nexus/content/repositories/snapshots", - "Fuse" at "http://repo.fusesource.com/nexus/content/groups/public" -) - libraryDependencies ++= Seq( "com.google.guava" % "guava" % "18.0", "com.fasterxml.jackson.core" % "jackson-databind" % "2.7.3", @@ -38,50 +15,3 @@ lazy val testDependencies = Seq( ) libraryDependencies ++= testDependencies.map(_ % "test") - -// Remove scala dependency for pure Java libraries -autoScalaLibrary := false - -// Remove the scala version from the generated/published artifact -crossPaths := false - -publishMavenStyle := true - -publishTo := { - val nexus = "https://oss.sonatype.org/" - if (isSnapshot.value) - Some("snapshots" at nexus + "content/repositories/snapshots") - else - Some("releases" at nexus + "service/local/staging/deploy/maven2") -} - -pomIncludeRepository := { _ => false } - -pomExtra := ( - https://github.com/dcshock/forklift - - - BSD-style - http://www.opensource.org/licenses/bsd-license.php - repo - - - - git@github.com:dcshock/forklift.git - scm:git:git@github.com:dcshock/forklift.git - - - - dcshock - Matt Conroy - http://www.mattconroy.com - - - afrieze - Andrew Frieze - - - kuroshii - Bridger Howell - - ) diff --git a/core/project/plugins.sbt b/core/project/plugins.sbt deleted file mode 100644 index 4ce4d9e..0000000 --- a/core/project/plugins.sbt +++ /dev/null @@ -1 +0,0 @@ -addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0") diff --git a/core/src/main/java/forklift/source/sources/GroupedTopicSource.java b/core/src/main/java/forklift/source/sources/GroupedTopicSource.java index 9377e92..0d961dc 100644 --- a/core/src/main/java/forklift/source/sources/GroupedTopicSource.java +++ b/core/src/main/java/forklift/source/sources/GroupedTopicSource.java @@ -50,6 +50,11 @@ public void overrideGroup(String group) { this.group = group; } + @Override + public int hashCode() { + return Objects.hash(name, group); + } + @Override public boolean equals(Object o) { if (this == o) diff --git a/plugins/replay/build.sbt b/plugins/replay/build.sbt index e7527bc..0c33f36 100644 --- a/plugins/replay/build.sbt +++ b/plugins/replay/build.sbt @@ -1,27 +1,8 @@ -organization := "com.github.dcshock" - name := "forklift-replay" -version := "2.2" - -javacOptions ++= Seq("-source", "1.8") - -initialize := { - val _ = initialize.value - if (sys.props("java.specification.version") != "1.8") - sys.error("Java 8 is required for this project.") -} - -resolvers ++= Seq( - "Sonatype OSS Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots", - "Maven Central" at "http://repo1.maven.org/maven2", - "Fuse Snapshots" at "http://repo.fusesource.com/nexus/content/repositories/snapshots", - "Fuse" at "http://repo.fusesource.com/nexus/content/groups/public" -) - libraryDependencies ++= Seq( - "com.github.dcshock" % "forklift" % "2.0", - "org.elasticsearch" % "elasticsearch" % "2.4.1", + "org.elasticsearch" % "elasticsearch" % "2.4.1", // retained for now, to support embedded ES clusters + "org.elasticsearch.client" % "rest" % "5.0.2", "com.fasterxml.jackson.core" % "jackson-databind" % "2.7.3", "com.fasterxml.jackson.datatype" % "jackson-datatype-jsr310" % "2.7.3" ) @@ -32,48 +13,3 @@ lazy val testDependencies = Seq( ) libraryDependencies ++= testDependencies.map(_ % "test") - -// Remove scala dependency for pure Java libraries -autoScalaLibrary := false - -// Remove the scala version from the generated/published artifact -crossPaths := false - -publishMavenStyle := true - -publishTo := { - val nexus = "https://oss.sonatype.org/" - if (isSnapshot.value) - Some("snapshots" at nexus + "content/repositories/snapshots") - else - Some("releases" at nexus + "service/local/staging/deploy/maven2") -} - -// Remove scala dependency for pure Java libraries -autoScalaLibrary := false - -// Remove the scala version from the generated/published artifact -crossPaths := false - -pomIncludeRepository := { _ => false } - -pomExtra := ( - https://github.com/dcshock/forklift - - - BSD-style - http://www.opensource.org/licenses/bsd-license.php - repo - - - - git@github.com:dcshock/forklift.git - scm:git:git@github.com:dcshock/forklift.git - - - - dcshock - Matt Conroy - http://www.mattconroy.com - - ) diff --git a/plugins/replay/project/plugins.sbt b/plugins/replay/project/plugins.sbt deleted file mode 100644 index 4ce4d9e..0000000 --- a/plugins/replay/project/plugins.sbt +++ /dev/null @@ -1 +0,0 @@ -addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0") diff --git a/plugins/replay/src/main/java/forklift/replay/ReplayES.java b/plugins/replay/src/main/java/forklift/replay/ReplayES.java index c4cc227..7e25a9b 100644 --- a/plugins/replay/src/main/java/forklift/replay/ReplayES.java +++ b/plugins/replay/src/main/java/forklift/replay/ReplayES.java @@ -49,7 +49,7 @@ public class ReplayES { private final Consumer consumer; /** - * Constructs a new ReplayES instance using the default transport port (9200) for + * Constructs a new ReplayES instance using the default REST port (9200) for * sending messages to elasticsearch. * * @param clientOnly whether the plugin is only an elasticsearch client, or an @@ -57,6 +57,8 @@ public class ReplayES { * @param hostname the address of the ES host to send messages to * @param clusterName the name of the elasticsearch cluster to send logs to * @param connector the connector to use for queuing messages + * + * @deprecated use {@link #ReplayES(boolean, String, ForkliftConnectorI)} instead */ public ReplayES(boolean clientOnly, String hostname, String clusterName, ForkliftConnectorI connector) { this(clientOnly, hostname, 9200, clusterName, connector); @@ -72,8 +74,38 @@ public ReplayES(boolean clientOnly, String hostname, String clusterName, Forklif * @param port the port number of the ES transport port for the given host * @param clusterName the name of the elasticsearch cluster to send logs to * @param connector the connector to use for queuing messages + * + * @deprecated use {@link #ReplayES(boolean, String, int, ForkliftConnectorI)} instead */ public ReplayES(boolean clientOnly, String hostname, int port, String clusterName, ForkliftConnectorI connector) { + this(clientOnly, hostname, port, connector); + } + + /** + * Constructs a new ReplayES instance using the default REST port (9200) for + * sending messages to elasticsearch. + * + * @param clientOnly whether the plugin is only an elasticsearch client, or an + * embedded elasticsearch node should be started + * @param hostname the address of the ES host to send messages to + * @param connector the connector to use for queuing messages + */ + public ReplayES(boolean clientOnly, String hostname, ForkliftConnectorI connector) { + this(clientOnly, hostname, 9200, connector); + } + + + /** + * Constructs a new ReplayES instance sending messages to elasticsearch based + * on the given parameters. + * + * @param clientOnly whether the plugin is only an elasticsearch client, or an + * embedded elasticsearch node should be started + * @param hostname the address of the ES host to send messages to + * @param port the port number of the ES transport port for the given host + * @param connector the connector to use for queuing messages + */ + public ReplayES(boolean clientOnly, String hostname, int port, ForkliftConnectorI connector) { /* * Setup the connection to the server. If we are only a client we'll not setup a node locally to run. * This will help developers and smaller setups avoid the pain of setting up elastic search. @@ -96,7 +128,7 @@ public ReplayES(boolean clientOnly, String hostname, int port, String clusterNam } } - this.writer = new ReplayESWriter(hostname, port, clusterName); + this.writer = new ReplayESWriter(hostname, port); this.writer.start(); Runtime.getRuntime().addShutdownHook(new Thread() { @@ -144,7 +176,7 @@ public void shutdown() { } catch (InterruptedException ignored) { } - this.writer.shutdown(); + this.writer.close(); } @LifeCycle(value=ProcessStep.Pending, annotation=Replay.class) diff --git a/plugins/replay/src/main/java/forklift/replay/ReplayESWriter.java b/plugins/replay/src/main/java/forklift/replay/ReplayESWriter.java index e6746ef..775c944 100644 --- a/plugins/replay/src/main/java/forklift/replay/ReplayESWriter.java +++ b/plugins/replay/src/main/java/forklift/replay/ReplayESWriter.java @@ -1,94 +1,190 @@ package forklift.replay; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.client.transport.TransportClient; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.transport.InetSocketTransportAddress; -import org.elasticsearch.index.VersionType; -import org.elasticsearch.index.engine.VersionConflictEngineException; -import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.search.SearchHit; +import org.apache.http.Header; +import org.apache.http.HttpEntity; +import org.apache.http.HttpHost; +import org.apache.http.HttpStatus; +import org.apache.http.entity.ContentType; +import org.apache.http.message.BasicHeader; +import org.apache.http.nio.entity.NStringEntity; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.ResponseException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.net.InetSocketAddress; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.Closeable; +import java.io.IOException; import java.time.LocalDate; import java.time.format.DateTimeFormatter; +import java.util.Collections; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.HashMap; -public class ReplayESWriter extends ReplayStoreThread { +public class ReplayESWriter extends ReplayStoreThread implements Closeable { private static final Logger log = LoggerFactory.getLogger(ReplayES.class); + private static final ObjectMapper mapper = new ObjectMapper(); - private final TransportClient client; + private final RestClient restClient; + /** + * Creates a log writer that sends replay information to elasticsearch over REST, + * using the given hostname and port {@code 9200}. + * + * @param hostname the name or address of the host to connect to + */ public ReplayESWriter(String hostname) { - this(hostname, 9300, "elasticsearch"); + this(hostname, 9200); } - public ReplayESWriter(String hostname, int port, String clusterName) { - final Settings settings = Settings.settingsBuilder() - .put("cluster.name", clusterName).build(); - - this.client = TransportClient.builder() - .settings(settings) - .build() - .addTransportAddress(new InetSocketTransportAddress(new InetSocketAddress(hostname, port))); + /** + * Creates a log writer that sends replay information to elasticsearch over REST, + * using the given hostname and port. + * + * @param hostname the name or address of the host to connect to + * @param port the connection port + */ + public ReplayESWriter(String hostname, int port) { + this.restClient = RestClient.builder(new HttpHost(hostname, port, "http")) + .setRequestConfigCallback(requestConfig -> + requestConfig + .setConnectTimeout(3_000) + .setSocketTimeout(20_000)) + .setDefaultHeaders(new Header[] { + new BasicHeader("Accept", "application/json; charset=utf-8"), + new BasicHeader("Content-Type", "application/json; charset=utf-8") + }) + .setMaxRetryTimeoutMillis(20_000) + .build(); } @Override - protected void poll(ReplayESWriterMsg t) { - final String replayVersion = t.getFields().get("forklift-replay-version"); + protected void poll(ReplayESWriterMsg replayMessage) { + final String replayVersion = replayMessage.getFields().get("forklift-replay-version"); if ("3".equals(replayVersion)) { // latest version - String indexDate = t.getFields().get("first-processed-date"); - if (indexDate == null) - indexDate = LocalDate.now().format(DateTimeFormatter.BASIC_ISO_DATE); - final String index = "forklift-replay-" + indexDate; + processVersion3Replay(replayMessage); + } else if ("2".equals(replayVersion) || replayVersion == null) { // older versions didn't have the replay version or didn't persist it with the message + processVersion2OrEarlierReplay(replayMessage); + } else { + log.error("Unrecognized replay version: '{}' for message ID '{}' with fields '{}'", + replayVersion, replayMessage.getId(), replayMessage.getFields()); + } + } - try { - // Index the new information. - client.prepareIndex(index, "log") - .setVersion(t.getVersion()).setVersionType(VersionType.EXTERNAL_GTE) - .setId(t.getId()).setSource(t.getFields()).execute().actionGet(); - } catch (VersionConflictEngineException expected) { - log.debug("Newer replay message already exists", expected); + private void processVersion3Replay(final ReplayESWriterMsg replayMessage) { + String indexDate = replayMessage.getFields().get("first-processed-date"); + if (indexDate == null) { + indexDate = LocalDate.now().format(DateTimeFormatter.BASIC_ISO_DATE); + } + final String index = "forklift-replay-" + indexDate; + + indexReplayMessage(replayMessage, index); + } + + private void processVersion2OrEarlierReplay(ReplayESWriterMsg replayMessage) { + final String index = "forklift-replay-" + LocalDate.now().format(DateTimeFormatter.BASIC_ISO_DATE); + + for (String existingMessageIndex : searchForIndexesWithId(replayMessage.getId())) { + if (!existingMessageIndex.equals(index)) { + deleteMessageInIndex(replayMessage.getId(), existingMessageIndex); } - } else if ("2".equals(replayVersion) || replayVersion == null) { // older versions didn't have the replay version or didn't persist it with the message - final String index = "forklift-replay-" + LocalDate.now().format(DateTimeFormatter.BASIC_ISO_DATE); + } + + indexReplayMessage(replayMessage, index); + } + + private void indexReplayMessage(final ReplayESWriterMsg replayMessage, final String index) { + final String id = replayMessage.getId(); + final String endpoint = "/" + index + "/log/" + id; + + final Map queryParams = new HashMap<>(); + queryParams.put("version_type", "external_gte"); + queryParams.put("version", "" + replayMessage.getVersion()); + + final HttpEntity entity; + try { + final String entityContents = mapper.writeValueAsString(replayMessage.getFields()); + entity = new NStringEntity(entityContents, ContentType.APPLICATION_JSON); + } catch (JsonProcessingException e) { + log.error("Could not write replay fields to JSON: (id {}, fields {})", replayMessage.getId(), replayMessage.getFields(), e); + return; + } + + try { + restClient.performRequest("PUT", endpoint, queryParams, entity); + } catch (ResponseException e) { + // conflicts are normal when versioned index requests are submitted in the wrong order + if (e.getResponse().getStatusLine().getStatusCode() == HttpStatus.SC_CONFLICT) { + return; + } + + log.error("Error indexing replay message (id {}, fields {})", id, replayMessage.getFields(), e); + } catch (IOException e) { + log.error("Error indexing replay message (id {}, fields {})", id, replayMessage.getFields(), e); + } + } + + private Iterable searchForIndexesWithId(final String id) { + final String endpoint = "/forklift-replay*/_search"; + final Map queryParams = new HashMap<>(); + queryParams.put("q", "_id:" + id); + queryParams.put("size", "50"); - // In order to ensure there is only one replay msg for a given id we have to clean the msg id from - // any previously created indexes. + try { + final Response response = restClient.performRequest("GET", endpoint, queryParams); try { - final SearchResponse resp = client.prepareSearch("forklift-replay*").setTypes("log") - .setQuery(QueryBuilders.termQuery("_id", t.getId())) - .setFrom(0).setSize(50).setExplain(false) - .execute() - .actionGet(); - - if (resp != null && resp.getHits() != null && resp.getHits().getHits() != null) { - for (SearchHit hit : resp.getHits().getHits()) { - if (!hit.getIndex().equals(index)) - client.prepareDelete(hit.getIndex(), "log", t.getId()).execute().actionGet(); - } + final JsonNode jsonResponse = mapper.readTree(response.getEntity().getContent()); + final JsonNode hits = jsonResponse.get("hits").get("hits"); + final List indexes = new ArrayList<>(); + + for (final JsonNode hit : hits) { + final String hitIndex = hit.get("_index").asText(); + indexes.add(hitIndex); } + + return indexes; } catch (Exception e) { - log.error("", e); - log.error("Unable to search for old replay logs {}", t.getId()); + log.error("Error parsing elasticsearch response to search for id {}", id, e); } + } catch (IOException e) { + log.error("Error searching for indexes for id {}", id, e); + } - try { - // Index the new information. - client.prepareIndex(index, "log") - .setVersion(t.getVersion()).setVersionType(VersionType.EXTERNAL_GTE) - .setId(t.getId()).setSource(t.getFields()).execute().actionGet(); - } catch (VersionConflictEngineException expected) { - log.debug("Newer replay message already exists", expected); + return Collections.emptyList(); + } + + private void deleteMessageInIndex(final String id, final String index) { + final String endpoint = "/" + index + "/log/" + id; + try { + restClient.performRequest("DELETE", endpoint); + } catch (ResponseException e) { + // sometimes we might try to delete something that was already deleted by something else; + // in which case there is no error + if (e.getResponse().getStatusLine().getStatusCode() == HttpStatus.SC_NOT_FOUND) { + return; } - } else { - log.error("Unrecognized replay version: '{}' for message ID '{}' with fields '{}' ", replayVersion, t.getId(), t.getFields()); + + log.error("Error deleting message with id {} for index {}", id, index, e); + } catch (IOException e) { + log.error("Error deleting message with id {} for index {}", id, index, e); } } - public void shutdown() { - if (client != null) - client.close(); + @Override + public void close() { + if (restClient != null) { + try { + restClient.close(); + } catch (IOException e) { + log.error("Couldn't shutdown Elasticsearch REST client", e); + } + } } } diff --git a/plugins/retry/build.sbt b/plugins/retry/build.sbt index 481e67b..4be170f 100644 --- a/plugins/retry/build.sbt +++ b/plugins/retry/build.sbt @@ -1,26 +1,6 @@ -organization := "com.github.dcshock" - name := "forklift-retry" -version := "2.2" - -javacOptions ++= Seq("-source", "1.8") - -initialize := { - val _ = initialize.value - if (sys.props("java.specification.version") != "1.8") - sys.error("Java 8 is required for this project.") -} - -resolvers ++= Seq( - "Sonatype OSS Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots", - "Maven Central" at "http://repo1.maven.org/maven2", - "Fuse Snapshots" at "http://repo.fusesource.com/nexus/content/repositories/snapshots", - "Fuse" at "http://repo.fusesource.com/nexus/content/groups/public" -) - libraryDependencies ++= Seq( - "com.github.dcshock" % "forklift" % "2.0", "com.fasterxml.jackson.core" % "jackson-databind" % "2.7.3", "com.fasterxml.jackson.datatype" % "jackson-datatype-jsr310" % "2.7.3", "io.searchbox" % "jest" % "2.4.0" @@ -32,42 +12,3 @@ lazy val testDependencies = Seq( ) libraryDependencies ++= testDependencies.map(_ % "test") - -// Remove scala dependency for pure Java libraries -autoScalaLibrary := false - -// Remove the scala version from the generated/published artifact -crossPaths := false - -publishMavenStyle := true - -publishTo := { - val nexus = "https://oss.sonatype.org/" - if (isSnapshot.value) - Some("snapshots" at nexus + "content/repositories/snapshots") - else - Some("releases" at nexus + "service/local/staging/deploy/maven2") -} - -pomIncludeRepository := { _ => false } - -pomExtra := ( - https://github.com/dcshock/forklift - - - BSD-style - http://www.opensource.org/licenses/bsd-license.php - repo - - - - git@github.com:dcshock/forklift.git - scm:git:git@github.com:dcshock/forklift.git - - - - dcshock - Matt Conroy - http://www.mattconroy.com - - ) diff --git a/plugins/retry/project/plugins.sbt b/plugins/retry/project/plugins.sbt deleted file mode 100644 index 4ce4d9e..0000000 --- a/plugins/retry/project/plugins.sbt +++ /dev/null @@ -1 +0,0 @@ -addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0") diff --git a/plugins/stats/build.sbt b/plugins/stats/build.sbt index 24914a1..23f30bd 100644 --- a/plugins/stats/build.sbt +++ b/plugins/stats/build.sbt @@ -1,63 +1 @@ -organization := "com.github.dcshock" - name := "forklift-stats" - -version := "1.0" - -javacOptions ++= Seq("-source", "1.8") - -initialize := { - val _ = initialize.value - if (sys.props("java.specification.version") != "1.8") - sys.error("Java 8 is required for this project.") -} - -resolvers ++= Seq( - "Sonatype OSS Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots", - "Maven Central" at "http://repo1.maven.org/maven2", - "Fuse Snapshots" at "http://repo.fusesource.com/nexus/content/repositories/snapshots", - "Fuse" at "http://repo.fusesource.com/nexus/content/groups/public" -) - -libraryDependencies ++= Seq( - "com.github.dcshock" % "forklift" % "1.0" -) - -// Remove scala dependency for pure Java libraries -autoScalaLibrary := false - -// Remove the scala version from the generated/published artifact -crossPaths := false - -publishMavenStyle := true - -publishTo := { - val nexus = "https://oss.sonatype.org/" - if (isSnapshot.value) - Some("snapshots" at nexus + "content/repositories/snapshots") - else - Some("releases" at nexus + "service/local/staging/deploy/maven2") -} - -pomIncludeRepository := { _ => false } - -pomExtra := ( - https://github.com/dcshock/forklift - - - BSD-style - http://www.opensource.org/licenses/bsd-license.php - repo - - - - git@github.com:dcshock/forklift.git - scm:git:git@github.com:dcshock/forklift.git - - - - dcshock - Matt Conroy - http://www.mattconroy.com - - ) \ No newline at end of file diff --git a/project/.gnupg/gpg.conf b/project/.gnupg/gpg.conf new file mode 100644 index 0000000..942678f --- /dev/null +++ b/project/.gnupg/gpg.conf @@ -0,0 +1,196 @@ +# Options for GnuPG +# Copyright 1998, 1999, 2000, 2001, 2002, 2003, +# 2010 Free Software Foundation, Inc. +# +# This file is free software; as a special exception the author gives +# unlimited permission to copy and/or distribute it, with or without +# modifications, as long as this notice is preserved. +# +# This file is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY, to the extent permitted by law; without even the +# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +# +# Unless you specify which option file to use (with the command line +# option "--options filename"), GnuPG uses the file ~/.gnupg/gpg.conf +# by default. +# +# An options file can contain any long options which are available in +# GnuPG. If the first non white space character of a line is a '#', +# this line is ignored. Empty lines are also ignored. +# +# See the man page for a list of options. + +# Uncomment the following option to get rid of the copyright notice + +#no-greeting + +# If you have more than 1 secret key in your keyring, you may want to +# uncomment the following option and set your preferred keyid. + +#default-key 621CC013 + +# If you do not pass a recipient to gpg, it will ask for one. Using +# this option you can encrypt to a default key. Key validation will +# not be done in this case. The second form uses the default key as +# default recipient. + +#default-recipient some-user-id +#default-recipient-self + +# By default GnuPG creates version 4 signatures for data files as +# specified by OpenPGP. Some earlier (PGP 6, PGP 7) versions of PGP +# require the older version 3 signatures. Setting this option forces +# GnuPG to create version 3 signatures. + +#force-v3-sigs + +# Because some mailers change lines starting with "From " to ">From " +# it is good to handle such lines in a special way when creating +# cleartext signatures; all other PGP versions do it this way too. +# To enable full OpenPGP compliance you may want to use this option. + +#no-escape-from-lines + +# When verifying a signature made from a subkey, ensure that the cross +# certification "back signature" on the subkey is present and valid. +# This protects against a subtle attack against subkeys that can sign. +# Defaults to --no-require-cross-certification. However for new +# installations it should be enabled. + +require-cross-certification + + +# If you do not use the Latin-1 (ISO-8859-1) charset, you should tell +# GnuPG which is the native character set. Please check the man page +# for supported character sets. This character set is only used for +# metadata and not for the actual message which does not undergo any +# translation. Note that future version of GnuPG will change to UTF-8 +# as default character set. + +#charset utf-8 + +# Group names may be defined like this: +# group mynames = paige 0x12345678 joe patti +# +# Any time "mynames" is a recipient (-r or --recipient), it will be +# expanded to the names "paige", "joe", and "patti", and the key ID +# "0x12345678". Note there is only one level of expansion - you +# cannot make an group that points to another group. Note also that +# if there are spaces in the recipient name, this will appear as two +# recipients. In these cases it is better to use the key ID. + +#group mynames = paige 0x12345678 joe patti + +# Some old Windows platforms require 8.3 filenames. If your system +# can handle long filenames, uncomment this. + +#no-mangle-dos-filenames + +# Lock the file only once for the lifetime of a process. If you do +# not define this, the lock will be obtained and released every time +# it is needed - normally this is not needed. + +#lock-once + +# GnuPG can send and receive keys to and from a keyserver. These +# servers can be HKP, email, or LDAP (if GnuPG is built with LDAP +# support). +# +# Example HKP keyservers: +# hkp://keys.gnupg.net +# +# Example LDAP keyservers: +# ldap://pgp.surfnet.nl:11370 +# +# Regular URL syntax applies, and you can set an alternate port +# through the usual method: +# hkp://keyserver.example.net:22742 +# +# If you have problems connecting to a HKP server through a buggy http +# proxy, you can use keyserver option broken-http-proxy (see below), +# but first you should make sure that you have read the man page +# regarding proxies (keyserver option honor-http-proxy) +# +# Most users just set the name and type of their preferred keyserver. +# Note that most servers (with the notable exception of +# ldap://keyserver.pgp.com) synchronize changes with each other. Note +# also that a single server name may actually point to multiple +# servers via DNS round-robin. hkp://keys.gnupg.net is an example of +# such a "server", which spreads the load over a number of physical +# servers. To see the IP address of the server actually used, you may use +# the "--keyserver-options debug". + +keyserver hkp://keys.gnupg.net +#keyserver http://http-keys.gnupg.net +#keyserver mailto:pgp-public-keys@keys.nl.pgp.net + +# Common options for keyserver functions: +# +# include-disabled = when searching, include keys marked as "disabled" +# on the keyserver (not all keyservers support this). +# +# no-include-revoked = when searching, do not include keys marked as +# "revoked" on the keyserver. +# +# verbose = show more information as the keys are fetched. +# Can be used more than once to increase the amount +# of information shown. +# +# use-temp-files = use temporary files instead of a pipe to talk to the +# keyserver. Some platforms (Win32 for one) always +# have this on. +# +# keep-temp-files = do not delete temporary files after using them +# (really only useful for debugging) +# +# honor-http-proxy = if the keyserver uses HTTP, honor the http_proxy +# environment variable +# +# broken-http-proxy = try to work around a buggy HTTP proxy +# +# auto-key-retrieve = automatically fetch keys as needed from the keyserver +# when verifying signatures or when importing keys that +# have been revoked by a revocation key that is not +# present on the keyring. +# +# no-include-attributes = do not include attribute IDs (aka "photo IDs") +# when sending keys to the keyserver. + +#keyserver-options auto-key-retrieve + +# Uncomment this line to display photo user IDs in key listings and +# when a signature from a key with a photo is verified. + +#show-photos + +# Use this program to display photo user IDs +# +# %i is expanded to a temporary file that contains the photo. +# %I is the same as %i, but the file isn't deleted afterwards by GnuPG. +# %k is expanded to the key ID of the key. +# %K is expanded to the long OpenPGP key ID of the key. +# %t is expanded to the extension of the image (e.g. "jpg"). +# %T is expanded to the MIME type of the image (e.g. "image/jpeg"). +# %f is expanded to the fingerprint of the key. +# %% is %, of course. +# +# If %i or %I are not present, then the photo is supplied to the +# viewer on standard input. If your platform supports it, standard +# input is the best way to do this as it avoids the time and effort in +# generating and then cleaning up a secure temp file. +# +# The default program is "xloadimage -fork -quiet -title 'KeyID 0x%k' stdin" +# On Mac OS X and Windows, the default is to use your regular JPEG image +# viewer. +# +# Some other viewers: +# photo-viewer "qiv %i" +# photo-viewer "ee %i" +# photo-viewer "display -title 'KeyID 0x%k'" +# +# This one saves a copy of the photo ID in your home directory: +# photo-viewer "cat > ~/photoid-for-key-%k.%t" +# +# Use your MIME handler to view photos: +# photo-viewer "metamail -q -d -b -c %T -s 'KeyID 0x%k' -f GnuPG" + diff --git a/project/.gnupg/pubring.gpg b/project/.gnupg/pubring.gpg new file mode 100644 index 0000000..a98f74e Binary files /dev/null and b/project/.gnupg/pubring.gpg differ diff --git a/project/.gnupg/pubring.gpg~ b/project/.gnupg/pubring.gpg~ new file mode 100644 index 0000000..a98f74e Binary files /dev/null and b/project/.gnupg/pubring.gpg~ differ diff --git a/project/.gnupg/secring.gpg b/project/.gnupg/secring.gpg new file mode 100644 index 0000000..7367b67 Binary files /dev/null and b/project/.gnupg/secring.gpg differ diff --git a/project/Build.scala b/project/Build.scala deleted file mode 100644 index 44284a8..0000000 --- a/project/Build.scala +++ /dev/null @@ -1,40 +0,0 @@ -import sbt._ -import Keys._ - -object ForkliftBuild extends Build { - lazy val core = Project( - id = "core", - base = file("core") - ) - - lazy val replay = Project( - id = "replay", - base = file("plugins/replay") - ).dependsOn(core) - - lazy val retry = Project( - id = "retry", - base = file("plugins/retry") - ).dependsOn(core) - - lazy val stats = Project( - id = "stats", - base = file("plugins/stats") - ).dependsOn(core) - - lazy val activemq = Project( - id = "activemq", - base = file("connectors/activemq") - ).dependsOn(core) - - lazy val kafka = Project( - id = "kafka", - base = file("connectors/kafka") - ).dependsOn(core) - - lazy val server = Project( - id = "server", - base = file("server") - ).dependsOn(core, activemq, replay, retry, stats) - -} diff --git a/project/plugins.sbt b/project/plugins.sbt index a571e0b..1a0c34b 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,7 +1,5 @@ addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.0.0-M4") - addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.13.0") - -addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0") - +addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "2.0") +addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.1.0") addSbtPlugin("com.cavorite" % "sbt-avro-1-7" % "1.1.2") diff --git a/server/build.sbt b/server/build.sbt index 5d22512..0d41c21 100644 --- a/server/build.sbt +++ b/server/build.sbt @@ -1,32 +1,8 @@ -organization := "com.github.dcshock" - name := "forklift-server" -version := "2.2" - enablePlugins(JavaAppPackaging) -javacOptions ++= Seq("-source", "1.8") - -initialize := { - val _ = initialize.value - if (sys.props("java.specification.version") != "1.8") - sys.error("Java 8 is required for this project.") -} - -resolvers ++= Seq( - "Sonatype OSS Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots", - "Maven Central" at "http://repo1.maven.org/maven2", - "Fuse Snapshots" at "http://repo.fusesource.com/nexus/content/repositories/snapshots", - "Fuse" at "http://repo.fusesource.com/nexus/content/groups/public" -) - libraryDependencies ++= Seq( - "com.github.dcshock" % "forklift" % "2.1", - "com.github.dcshock" % "forklift-activemq" % "2.0", - "com.github.dcshock" % "forklift-replay" % "2.2", - "com.github.dcshock" % "forklift-retry" % "2.2", - "com.github.dcshock" % "forklift-stats" % "1.0", "com.github.dcshock" % "consul-rest-client" % "0.10", "org.apache.activemq" % "activemq-broker" % "5.14.0", "io.searchbox" % "jest" % "2.0.0", @@ -41,44 +17,3 @@ libraryDependencies ++= Seq( "com.novocode" % "junit-interface" % "0.11" % "test", "commons-io" % "commons-io" % "2.4" % "test" ) - -// Remove scala dependency for pure Java libraries -autoScalaLibrary := false - -// Remove the scala version from the generated/published artifact -crossPaths := false - -publishMavenStyle := true - -publishTo := { - val nexus = "https://oss.sonatype.org/" - if (isSnapshot.value) - Some("snapshots" at nexus + "content/repositories/snapshots") - else - Some("releases" at nexus + "service/local/staging/deploy/maven2") -} - -pomIncludeRepository := { _ => false } - -pomExtra := ( - https://github.com/dcshock/forklift - - - BSD-style - http://www.opensource.org/licenses/bsd-license.php - repo - - - - git@github.com:dcshock/forklift.git - scm:git:git@github.com:dcshock/forklift.git - - - - dcshock - Matt Conroy - http://www.mattconroy.com - - ) - -useGpg := true diff --git a/server/src/main/java/forklift/ForkliftServer.java b/server/src/main/java/forklift/ForkliftServer.java index 900f7b3..b85aaa0 100644 --- a/server/src/main/java/forklift/ForkliftServer.java +++ b/server/src/main/java/forklift/ForkliftServer.java @@ -16,7 +16,6 @@ import forklift.retry.RetryHandler; import forklift.stats.StatsCollector; import org.apache.activemq.broker.BrokerService; -import org.apache.http.annotation.ThreadSafe; import java.io.File; import java.io.FileNotFoundException; @@ -41,7 +40,6 @@ * * @author zdavep */ -@ThreadSafe public final class ForkliftServer { private ExecutorService executor = Executors.newSingleThreadExecutor();