-
Notifications
You must be signed in to change notification settings - Fork 17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Distributed Arcon #212
Comments
I am wondering how we will go about to take a program written in the Arcon API and execute it in a distributed environment. We must be able to setup/replicate/deploy the dataflow graph and operators. I see a couple of questions and constraints to consider. How to initialise operators?When for example creating a The logical dataflow graph and operator state (including initialisation parameters such as UDFs) are values and therefore undecidable just by looking at the source code of an Arcon / Arc-Script program. The only way to obtain these values is by evaluating the program. Operator definitions on the other hand are not values, i.e., they cannot be constructed by evaluating an Arcon program, and are therefore known a-priori to executing the Arcon program. How to setup and replicate the dataflow graph?The construction of a logical dataflow graph can be non-deterministic (e.g., users may for example initialise the dataflow graph depending on local configuration). The construction of the dataflow graph can also be non-idempotent (e.g., users may for example initialise other functionality or systems in the the same binary as the Arcon program (e.g., Kafka sources/sinks, dashboards). For this reason I think we need to setup the dataflow graph in a single driver process and then replicate it later. |
So if I understood the outcome of the discussion it was that we compile the same binary (see below). So with this, it shouldn't be a problem with the initialise part? All binaries will compile the different stages and based on the decision from the control plane glue it together through ActorPaths or local ActorRef's. fn main() {
let mut app = Application::default()
.kafka(consumer_conf, JsonSchema::new(), 2, |conf| {
conf.set_arcon_time(ArconTime::Event);
conf.set_timestamp_extractor(|x: &u64| *x);
})
.map(|x| x + 10)
.map(|x| x + 20)
.to_console()
.build();
app.start();
app.await_termination();
} |
I don't fully understand the non-deterministic part. When are users gonna create different dataflows based on input parameter? Then can't we assume that the input parameter would be same for all binaries? Is this something we should have to worry about? |
Does it mean every machine is given the same binary, but only the control plane executes the main function to setup and distribute the pipeline?
I think users will want to have their own configuration files to control how the pipeline is setup. In my view it would solve a lot of headache for the users if we could create a canonical pipeline in a single driver process and replicate/distribute it later. 🤔 Is there a reason we do it like we do it? |
I mean in the long run yes we want a driver that can deploy onto different cloud resources. But imo I think its a step by step process. The first iteration of distributed arcon is not gonna be the final one. Where Step 1 may be to manually start the same binary on different machines with a configuration that connects to the control plane to coordinate the setup. Then there might be step 2,3,4, and 5 until we reach a final version. I don't think its a good idea to try directly jump to or focus on step 5. Edit: yes same binary at least to start off with. Seemed to be the easiest approach from the ones we discussed. |
I agree with you that we should go step by step. I'm also interested if we can model the optimal solution so we don't build ourselves into a corner 👀 but we do not need implement it yet. One idea which came to mind from Arc-Script's point of view is I could generate separate code for the driver and managers. For example, I could generate: fn main() {
match std::env::var("ARCON").unwrap().as_str() {
"DRIVER" => {
let mut app = Application::default()
.kafka(consumer_conf, JsonSchema::new(), 2, |conf| {
conf.set_arcon_time(ArconTime::Event);
conf.set_timestamp_extractor(|x: &u64| *x);
})
.filter(|x| x % 2 == 0)
.map(|x| x + 20)
.to_console()
.build();
app.start();
app.await_termination();
},
"NETWORK_MANAGER" => {
let mut network_manager = NetworkManager::default()
// Register operators which can be created by a network manager
// These operators are standard so they could be included automatically
.register_operator::<arcon::operator::function::Map>()
.register_operator::<arcon::operator::function::Filter>();
network_manager.start();
network_manager.await_termination();
}
_ => unreachable!()
}
} The user machine could run the driver and the others could run the network managers. Do you think this could be of help or is it not so useful? 🤔 |
About the thing I wrote above, we would need to get around ASLR (https://en.wikipedia.org/wiki/Address_space_layout_randomization). It might be possible if we either disable it or add a type tag to function pointers. |
If I understand correctly, the approach you mean is similar to Flink's where there are general TaskManagers that can receive and execute any "dynamic" operator? |
Yesss, I am unsure if it is possible to send instances of operators over the network to TaskManagers, but maybe it's possible to send the information necessary to construct an operator at a TaskManager. Maybe it is also possible to send enough information for setting up a whole dataflow graph 🤔 I experimented with Kompact to setup a distributed system with a driver in one process and a manager in another (https://github.com/segeljakt/kompact-distributed-test). The manager connects to the driver and then sets up 5 pipelined workers. Each worker can receive input on an input port, apply a map function, and output data on an output port. The driver can send data to the manager. The manager then uses the workers to compute a result. The network looks like this:
Both machines run the same binary. The thing I wanted to test is if you could send a function-pointer from Machine1 to Machine2. The driver here passes a function which each worker executes when mapping. I tried two different approaches:
The first approach works, but only when disabling ASLR. This is a security risk but it gets the job done.
The second approach works but requires storing a tag alongside any function pointer. This is very tedious, so I think it is only a feasible solution if we are using code generation. A good thing is both approaches are probably independent of Arcon. I could for example send a |
Support distributed execution of Arcon applications.
The text was updated successfully, but these errors were encountered: