Skip to content

Commit

Permalink
[Storm-Compatibility] Forward Storm Kryo registrations to Flink
Browse files Browse the repository at this point in the history
This closes apache#1495.
  • Loading branch information
mjsax committed Jan 14, 2016
1 parent d1c93d2 commit be055b7
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@
import backtype.storm.utils.NimbusClient;
import backtype.storm.utils.Utils;

import com.esotericsoftware.kryo.Serializer;
import com.google.common.collect.Lists;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.Client;
import org.apache.flink.client.program.JobWithJars;
Expand All @@ -48,8 +50,10 @@
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
import org.apache.flink.storm.util.StormConfig;

import org.apache.flink.streaming.api.graph.StreamGraph;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import scala.Some;
import scala.concurrent.Await;
import scala.concurrent.Future;
Expand All @@ -63,13 +67,17 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

/**
* {@link FlinkClient} mimics a Storm {@link NimbusClient} and {@link Nimbus}{@code .Client} at once, to interact with
* Flink's JobManager instead of Storm's Nimbus.
*/
public class FlinkClient {

/** The log used by this client. */
private static final Logger LOG = LoggerFactory.getLogger(FlinkClient.class);

/** The client's configuration */
private final Map<?,?> conf;
/** The jobmanager's host name */
Expand Down Expand Up @@ -163,9 +171,8 @@ public void submitTopology(final String name, final String uploadedJarLocation,
* Parameter {@code uploadedJarLocation} is actually used to point to the local jar, because Flink does not support
* uploading a jar file before hand. Jar files are always uploaded directly when a program is submitted.
*/
public void submitTopologyWithOpts(final String name, final String uploadedJarLocation, final FlinkTopology
topology)
throws AlreadyAliveException, InvalidTopologyException {
public void submitTopologyWithOpts(final String name, final String uploadedJarLocation, final FlinkTopology topology)
throws AlreadyAliveException, InvalidTopologyException {

if (this.getTopologyJobId(name) != null) {
throw new AlreadyAliveException();
Expand All @@ -181,9 +188,11 @@ public void submitTopologyWithOpts(final String name, final String uploadedJarLo
throw new RuntimeException("Problem with jar file " + uploadedJarLocation, e);
}

/* set storm configuration */
if (this.conf != null) {
topology.getExecutionEnvironment().getConfig().setGlobalJobParameters(new StormConfig(this.conf));
try {
FlinkClient.addStormConfigToTopology(topology, conf);
} catch(ClassNotFoundException e) {
LOG.error("Could not register class for Kryo serialization.", e);
throw new InvalidTopologyException("Could not register class for Kryo serialization.");
}

final StreamGraph streamGraph = topology.getExecutionEnvironment().getStreamGraph();
Expand Down Expand Up @@ -325,4 +334,27 @@ private ActorRef getJobManager() throws IOException {
actorSystem, AkkaUtils.getLookupTimeout(configuration));
}

@SuppressWarnings({ "unchecked", "rawtypes" })
static void addStormConfigToTopology(FlinkTopology topology, Map conf) throws ClassNotFoundException {
if (conf != null) {
ExecutionConfig flinkConfig = topology.getExecutionEnvironment().getConfig();

flinkConfig.setGlobalJobParameters(new StormConfig(conf));

// add all registered types to ExecutionConfig
List<?> registeredClasses = (List<?>) conf.get(Config.TOPOLOGY_KRYO_REGISTER);
if (registeredClasses != null) {
for (Object klass : registeredClasses) {
if (klass instanceof String) {
flinkConfig.registerKryoType(Class.forName((String) klass));
} else {
for (Entry<String,String> register : ((Map<String,String>)klass).entrySet()) {
flinkConfig.registerTypeWithKryoSerializer(Class.forName(register.getKey()),
(Class<? extends Serializer<?>>)Class.forName(register.getValue()));
}
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.minicluster.FlinkMiniCluster;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.storm.util.StormConfig;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -74,33 +73,32 @@ public void submitTopologyWithOpts(final String topologyName, final Map conf, fi
LOG.info("Running Storm topology on FlinkLocalCluster");

boolean submitBlocking = false;
if(conf != null) {
topology.getExecutionEnvironment().getConfig().setGlobalJobParameters(new StormConfig(conf));

if (conf != null) {
Object blockingFlag = conf.get(SUBMIT_BLOCKING);
if(blockingFlag != null && blockingFlag instanceof Boolean) {
submitBlocking = ((Boolean)blockingFlag).booleanValue();
}
}

FlinkClient.addStormConfigToTopology(topology, conf);

StreamGraph streamGraph = topology.getExecutionEnvironment().getStreamGraph();
streamGraph.setJobName(topologyName);

JobGraph jobGraph = streamGraph.getJobGraph();

if (flink == null) {

if (this.flink == null) {
Configuration configuration = new Configuration();
configuration.addAll(jobGraph.getJobConfiguration());

configuration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L);
configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());

flink = new LocalFlinkMiniCluster(configuration, true);
this.flink = new LocalFlinkMiniCluster(configuration, true);
this.flink.start();
}

if(submitBlocking) {
if (submitBlocking) {
this.flink.submitJobAndWait(jobGraph, false);
} else {
this.flink.submitJobDetached(jobGraph);
Expand Down

0 comments on commit be055b7

Please sign in to comment.