Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distributed Arcon #212

Open
Max-Meldrum opened this issue Jul 16, 2021 · 9 comments
Open

Distributed Arcon #212

Max-Meldrum opened this issue Jul 16, 2021 · 9 comments
Labels
domain: deployment Anything to related to Arcon deployment epic feature

Comments

@Max-Meldrum
Copy link
Member

Support distributed execution of Arcon applications.

@Max-Meldrum Max-Meldrum added epic feature domain: deployment Anything to related to Arcon deployment labels Jul 16, 2021
@Max-Meldrum Max-Meldrum added this to Near-term in Arcon Roadmap Jul 16, 2021
@segeljakt
Copy link
Member

segeljakt commented Sep 30, 2021

I am wondering how we will go about to take a program written in the Arcon API and execute it in a distributed environment. We must be able to setup/replicate/deploy the dataflow graph and operators. I see a couple of questions and constraints to consider.

How to initialise operators?

When for example creating a Filter operator and scheduling it to execute in a different process, one option is to serialise the operator instance as a whole and send it to that process to execute. This is hard since unlike Java, Rust has no hardware virtualisation. We cannot package code into a .jar file at process X, send it to process Y and execute it. Maybe a more sensible option is to serialise and send the operator's input parameters to that process. In the case of Filter, we initialise it with a function pointer that points to a predicate. The location in memory a function pointer points to depends on both the binary and process which is executing it. Therefore I wonder how we can send and call function pointers safely.

The logical dataflow graph and operator state (including initialisation parameters such as UDFs) are values and therefore undecidable just by looking at the source code of an Arcon / Arc-Script program. The only way to obtain these values is by evaluating the program. Operator definitions on the other hand are not values, i.e., they cannot be constructed by evaluating an Arcon program, and are therefore known a-priori to executing the Arcon program.

How to setup and replicate the dataflow graph?

The construction of a logical dataflow graph can be non-deterministic (e.g., users may for example initialise the dataflow graph depending on local configuration). The construction of the dataflow graph can also be non-idempotent (e.g., users may for example initialise other functionality or systems in the the same binary as the Arcon program (e.g., Kafka sources/sinks, dashboards). For this reason I think we need to setup the dataflow graph in a single driver process and then replicate it later.

@Max-Meldrum
Copy link
Member Author

So if I understood the outcome of the discussion it was that we compile the same binary (see below).
But through the control plane glue different stages of the pipeline how we want. So for example put all kafka sources on Server 1, put both Map stages on Server 2.

So with this, it shouldn't be a problem with the initialise part? All binaries will compile the different stages and based on the decision from the control plane glue it together through ActorPaths or local ActorRef's.

fn main() {
    let mut app = Application::default()
        .kafka(consumer_conf, JsonSchema::new(), 2, |conf| {
            conf.set_arcon_time(ArconTime::Event);
            conf.set_timestamp_extractor(|x: &u64| *x);
        })
        .map(|x| x + 10)
        .map(|x| x + 20)
        .to_console()
        .build();

    app.start();
    app.await_termination();
}

@Max-Meldrum
Copy link
Member Author

I don't fully understand the non-deterministic part. When are users gonna create different dataflows based on input parameter? Then can't we assume that the input parameter would be same for all binaries? Is this something we should have to worry about?

@segeljakt
Copy link
Member

segeljakt commented Sep 30, 2021

So if I understood the outcome of the discussion it was that we compile the same binary (see below). But through the control plane glue different stages of the pipeline how we want. So for example put all kafka sources on Server 1, put both Map stages on Server 2.

Does it mean every machine is given the same binary, but only the control plane executes the main function to setup and distribute the pipeline?

I don't fully understand the non-deterministic part. When are users gonna create different dataflows based on input parameter? Then can't we assume that the input parameter would be same for all binaries? Is this something we should have to worry about?

I think users will want to have their own configuration files to control how the pipeline is setup. In my view it would solve a lot of headache for the users if we could create a canonical pipeline in a single driver process and replicate/distribute it later. 🤔 Is there a reason we do it like we do it?

@Max-Meldrum
Copy link
Member Author

Max-Meldrum commented Sep 30, 2021

So if I understood the outcome of the discussion it was that we compile the same binary (see below). But through the control plane glue different stages of the pipeline how we want. So for example put all kafka sources on Server 1, put both Map stages on Server 2.

Does it mean every machine is given the same binary, but only the control plane executes the main function to setup and distribute the pipeline?

I don't fully understand the non-deterministic part. When are users gonna create different dataflows based on input parameter? Then can't we assume that the input parameter would be same for all binaries? Is this something we should have to worry about?

I think users will want to have their own configuration files to control how the pipeline is setup. In my view it would solve a lot of headache for the users if we could create a canonical pipeline in a single driver process and replicate/distribute it later. 🤔 Is there a reason we do it like we do it?

I mean in the long run yes we want a driver that can deploy onto different cloud resources. But imo I think its a step by step process. The first iteration of distributed arcon is not gonna be the final one.

Where Step 1 may be to manually start the same binary on different machines with a configuration that connects to the control plane to coordinate the setup.

Then there might be step 2,3,4, and 5 until we reach a final version. I don't think its a good idea to try directly jump to or focus on step 5.

Edit: yes same binary at least to start off with. Seemed to be the easiest approach from the ones we discussed.

@segeljakt
Copy link
Member

segeljakt commented Sep 30, 2021

I mean in the long run yes we want a driver that can deploy onto different cloud resources. But imo I think its a step by step process. The first iteration of distributed arcon is not gonna be the final one.

Where Step 1 may be to manually start the same binary on different machines with a configuration that connects to the control plane to coordinate the setup.

Then there might be step 2,3,4, and 5 until we reach a final version. I don't think its a good idea to try directly jump to or focus on step 5.

I agree with you that we should go step by step. I'm also interested if we can model the optimal solution so we don't build ourselves into a corner 👀 but we do not need implement it yet. One idea which came to mind from Arc-Script's point of view is I could generate separate code for the driver and managers. For example, I could generate:

fn main() {
    match std::env::var("ARCON").unwrap().as_str() {
        "DRIVER" => {
            let mut app = Application::default()
                .kafka(consumer_conf, JsonSchema::new(), 2, |conf| {
                    conf.set_arcon_time(ArconTime::Event);
                    conf.set_timestamp_extractor(|x: &u64| *x);
                })
                .filter(|x| x % 2 == 0)
                .map(|x| x + 20)
                .to_console()
                .build();

            app.start();
            app.await_termination();
        },
        "NETWORK_MANAGER" => {
            let mut network_manager = NetworkManager::default()
                // Register operators which can be created by a network manager
                // These operators are standard so they could be included automatically
                .register_operator::<arcon::operator::function::Map>()
                .register_operator::<arcon::operator::function::Filter>();
            network_manager.start();
            network_manager.await_termination();
        }
        _ => unreachable!()
    }
}

The user machine could run the driver and the others could run the network managers. Do you think this could be of help or is it not so useful? 🤔

@segeljakt
Copy link
Member

segeljakt commented Sep 30, 2021

The location in memory a function pointer points to depends on both the binary and process which is executing it. Therefore I wonder how we can send and call function pointers safely.

About the thing I wrote above, we would need to get around ASLR (https://en.wikipedia.org/wiki/Address_space_layout_randomization). It might be possible if we either disable it or add a type tag to function pointers.

@Max-Meldrum
Copy link
Member Author

The user machine could run the driver and the others could run the network managers. Do you think this could be of help or is it not so useful? thinking

If I understand correctly, the approach you mean is similar to Flink's where there are general TaskManagers that can receive and execute any "dynamic" operator?

@segeljakt
Copy link
Member

segeljakt commented Oct 1, 2021

If I understand correctly, the approach you mean is similar to Flink's where there are general TaskManagers that can receive and execute any "dynamic" operator?

Yesss, I am unsure if it is possible to send instances of operators over the network to TaskManagers, but maybe it's possible to send the information necessary to construct an operator at a TaskManager. Maybe it is also possible to send enough information for setting up a whole dataflow graph 🤔

I experimented with Kompact to setup a distributed system with a driver in one process and a manager in another (https://github.com/segeljakt/kompact-distributed-test). The manager connects to the driver and then sets up 5 pipelined workers. Each worker can receive input on an input port, apply a map function, and output data on an output port. The driver can send data to the manager. The manager then uses the workers to compute a result. The network looks like this:

+------------+     +----------------------------------+
| (Machine1) |     | (Machine2)                       |
|            |     |       +---> Worker1 --> Worker2  |
|            |     |       |                    |     |
|          (ActorPath)     |                    v     |
|  Driver <===========> Manager              Worker3  |
|            |     |       ^                    |     |
|            |     |       |(Channel)           v     |
|            |     |       +---- Worker5 <-- Worker4  |
|            |     |                                  |
+------------+     +----------------------------------+

Both machines run the same binary. The thing I wanted to test is if you could send a function-pointer from Machine1 to Machine2. The driver here passes a function which each worker executes when mapping. I tried two different approaches:

  1. Transmute a function pointer of the UDF into a usize, and send it over the network, and then transmute it back.
  2. Encode the function as a tag, send it over the network, and then decode it manually. In other words, I store the tag as an enum which has a variant for each function. If I have tag then I can match on it to find out which function in the program it corresponds to.

The first approach works, but only when disabling ASLR. This is a security risk but it gets the job done.

echo 0 | sudo tee /proc/sys/kernel/randomize_va_space

The second approach works but requires storing a tag alongside any function pointer. This is very tedious, so I think it is only a feasible solution if we are using code generation.

A good thing is both approaches are probably independent of Arcon. I could for example send a u64 which in reality represents a function pointer without Arcon knowing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
domain: deployment Anything to related to Arcon deployment epic feature
Projects
Arcon Roadmap
Near-term
Development

No branches or pull requests

2 participants