Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Request review and merge of Kafka Connector for forklift project #124

Closed
wants to merge 12 commits into from

Conversation

AFrieze
Copy link
Collaborator

@AFrieze AFrieze commented Mar 6, 2017

Kafka connector with confluent schema-registry for forklift. Adapts to JMS specification and should be able to replace existing activemq producers and consumer in the majority of scenarios.

@kaiserleib
Copy link
Contributor

Failing in CI, just FYI

avroRecord.put("forkliftMsg", message);
ProducerRecord record = new ProducerRecord<>(topic, null, avroRecord);
try {
kafkaProducer.send(record);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably have a callback on all of the kafkaProducer.send calls to at least log errors returned from the broker.

version := "0.1"

// target and Xlint cause sbt dist to fail
javacOptions ++= Seq("-source", "1.8")//, "-target", "1.8", "-Xlint")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can still have linting on just compile with javacOptions in (Compile, compile).

props.put("key.deserializer", io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
props.put("value.deserializer", io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
props.put("schema.registry.url", schemaRegistries);
props.put("specific.avro.reader", false);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think for most of our cases moving forward we would prefer to read avro as a SpecificRecord for the stronger type checking. Accessing data "as a map" is usually too flexible.

Also, it seems strange to support writing specific records, but not reading specific records.


static ObjectMapper mapper = new ObjectMapper().registerModule(new JavaTimeModule())
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
KafkaController controller;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason to have these two variables package-local? Just for testing?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea I could prefer to see these be protected because of general confusion around the default modifier if the access modifier is just for testing


@Override
public ForkliftProducerI getTopicProducer(String name) {
return new KafkaForkliftProducer(name, this.kafkaProducer);
Copy link
Collaborator

@Kuroshii Kuroshii Mar 8, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the kafkaProducer is null when this is called (i.e. start has not been called yet, or is in the process of being called)? Maybe an IllegalStateException would be appropriate.

But that still seems a little weird because you can somehow get a consumer without starting the connector but you can't get a producer without starting the connector.

this.controller.start();
}
}
return new KafkaTopicConsumer(name, controller, messageStream);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Design nit: I think it would make more sense to have the controller manage the details of creating a consumer. That way you can put the logic for setting up a consumer to use a controller in the controller rather than having the consumer manage state to keep track of whether the controller is actually completely set up for it. This also has the minor benefit of avoiding exposing the MessageStream object to the connector if it doesn't need to be exposed.

try {
kafkaProducer.send(record);
} catch (SerializationException e) {
throw new ProducerException("Error creating Kafka Message", e);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy-pasted exception? I don't think this message makes sense here

}

@Override
public String send(Object message) throws ProducerException {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: this method could use a little style clean up


String mapSchema = "{\"type\":\"record\"," +
"\"name\":\"ForkliftMapMessage\"," +
"\"fields\":[{\"name\":\"forkliftMapMsg\",\"type\":\"string\"}]}";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avro has a map type; can we use that instead of a string?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't think it was supported by our serializer

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean? It's definitely in the avro spec, at least...

GenericRecord genericRecord = (GenericRecord)record.value();
Object value = genericRecord.get("forkliftMapMsg");
value = value == null ? genericRecord.get("forkliftMsg") : value;
value = value == null ? genericRecord.get("forkliftJsonMsg") : value;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we deserialize these values from our message so that our message's value appears to be what was sent? I don't see anything deserializing the string that was constructed in the producer for maps, for example.

Also, with removing the JMS stuff, this is going to need to find a new home...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The string is parsed to the appropriate type based on the annotated Message object. Look at the inject method in Consumer.java

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thanks.

}
Map<TopicPartition, OffsetAndMetadata> offsetData = this.acknowlegmentHandler.flushAcknowledged();
if (offsetData.size() > 0) {
String offsetDescription =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather not do this string construction all the time if it only matters for debug logs.

It would be cool if we could just throw lazy before this and call it good, but alas.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

"com.fasterxml.jackson.datatype" % "jackson-datatype-jsr310" % "2.7.3",
"org.apache.kafka" % "kafka-clients" % "0.10.1.1-cp1",
"io.confluent" % "kafka-avro-serializer" % "3.1.1"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra newline

lazy val testDependencies = Seq(
"commons-io" % "commons-io" % "2.4" ,
"com.novocode" % "junit-interface" % "0.11",
"org.mockito" % "mockito-core" % "1.9.5"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Weird extra spaces should be:

"org.mockito" % "mockito-core" % "1.9.5"

if (isSnapshot.value)
Some("snapshots" at nexus + "content/repositories/snapshots")
else
Some("releases" at nexus + "service/local/staging/deploy/maven2")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra space, should be:

Some("releases" at nexus + "service/local/staging/deploy/maven2")


publishTo := {
val nexus = "https://oss.sonatype.org/"
if (isSnapshot.value)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Best not to use single line if

val nexus = "https://oss.sonatype.org/"
if (isSnapshot.value)
Some("snapshots" at nexus + "content/repositories/snapshots")
else

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Best not to use single line else

* Acknowledged records are those that have started processing but have not yet been committed to the Kafka Broker. This class is threadsafe.
*/
public class AcknowledgedRecordHandler {
private static final Logger log = LoggerFactory.getLogger(AcknowledgedRecordHandler.class);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be named LOGGER per java convention on static final varaibles

private KafkaProducer<?, ?> kafkaProducer;
private MessageStream messageStream = new MessageStream();

static ObjectMapper mapper = new ObjectMapper().registerModule(new JavaTimeModule())

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be final


@Override
public void start() throws ConnectorException {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra newline

this.controller.start();
}

private KafkaController createController() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing javadocs

props.put("value.deserializer", io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
props.put("schema.registry.url", schemaRegistries);
props.put("specific.avro.reader", false);
//props.put("auto.offset.reset", "earliest");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't need this you should remove

} catch (InterruptedException e) {
log.error("KafkaConnector interrupted while stopping");
}
this.kafkaProducer.close();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably wrap this in a finally is my guess`

}

@Override
public ForkliftMessage jmsToForklift(Message m) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No single character variables please, it makes debugging hard

msg.setMsg(value.toString());
}
} else {
ObjectMapper mapper = new ObjectMapper();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't you use the one that you declared at the top?

GenericRecord genericRecord = (GenericRecord)record.value();
Object value = genericRecord.get("forkliftMapMsg");
value = value == null ? genericRecord.get("forkliftMsg") : value;
value = value == null ? genericRecord.get("forkliftJsonMsg") : value;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this line just overwrite the last line? This seems like you are trying to do a weird cascading check, but it won't actually tell you what went wrong if that is the case. Why don't you just use if checks?

try {
//inefficient as we map from object to json, then back to the object. This approach works
//without any changes to the forklift core libraries however.
msg.setMsg(mapper.writeValueAsString(record.value()));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again why don't you use the ObjectMapper declared at the top>

@AFrieze
Copy link
Collaborator Author

AFrieze commented Mar 9, 2017

All - Thank you for the feedback. We have decided to keep the kafka functionality in a project specific to Kafka for now to allow us to make more changes to the core and move away from the JMS dependencies. We may have a larger PR coming in for that in the future if that's the way forklift wants to go. All the feedback is very much appreciated.

@AFrieze AFrieze closed this Mar 9, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants