Skip to content

Commit

Permalink
Revert "Revert "[Job Submission][CPP Worker] introduce CPP job submis…
Browse files Browse the repository at this point in the history
…sion (ray-project#28104)" (ray-project#28607)" (ray-project#28598)

Signed-off-by: 久龙 <[email protected]>
Fix the failed test on macos
  • Loading branch information
SongGuyang committed Sep 20, 2022
1 parent 15883cd commit f2568e1
Show file tree
Hide file tree
Showing 6 changed files with 226 additions and 2 deletions.
52 changes: 50 additions & 2 deletions cpp/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ cc_binary(
cc_test(
name = "simple_kv_store",
srcs = glob([
"src/ray/test/examples/*.cc",
"src/ray/test/examples/simple_kv_store.cc",
]),
args = [
"--ray_code_search_path=$(location simple_kv_store.so)",
Expand All @@ -302,7 +302,7 @@ cc_binary(
name = "simple_kv_store.so",
testonly = True,
srcs = glob([
"src/ray/test/examples/*.cc",
"src/ray/test/examples/simple_kv_store.cc",
]),
copts = COPTS,
linkopts = ["-shared"],
Expand All @@ -316,6 +316,39 @@ cc_binary(
],
)

cc_binary(
name = "simple_job",
srcs = [
"src/ray/test/examples/simple_job.cc",
],
copts = COPTS,
data = [
"simple_job.so",
],
linkstatic = True,
tags = ["team:core"],
deps = [
":ray_api",
],
)

cc_binary(
name = "simple_job.so",
srcs = [
"src/ray/test/examples/simple_job.cc",
],
copts = COPTS,
linkopts = ["-shared"],
linkstatic = True,
deps = [
"ray_cpp_lib",
"@boost//:callable_traits",
"@boost//:optional",
"@msgpack",
"@nlohmann_json",
],
)

load("//bazel:python.bzl", "py_test_module_list")

py_test_module_list(
Expand All @@ -331,3 +364,18 @@ py_test_module_list(
],
deps = [],
)

py_test(
name = "test_submit_cpp_job",
size = "medium",
srcs = ["test_submit_cpp_job.py"],
data = [
"simple_job",
"simple_job.so",
],
env = {
"SIMPLE_DRIVER_SO_PATH": "$(location simple_job.so)",
"SIMPLE_DRIVER_MAIN_PATH": "$(location simple_job)",
},
tags = ["team:core"],
)
12 changes: 12 additions & 0 deletions cpp/src/ray/config_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "absl/flags/flag.h"
#include "absl/flags/parse.h"
#include "absl/strings/str_split.h"
#include "nlohmann/json.hpp"

ABSL_FLAG(std::string, ray_address, "", "The address of the Ray cluster to connect to.");

Expand Down Expand Up @@ -83,6 +84,8 @@ ABSL_FLAG(int,
-1,
"The computed hash of the runtime env for this worker.");

using json = nlohmann::json;

namespace ray {
namespace internal {

Expand Down Expand Up @@ -205,6 +208,15 @@ void ConfigInternal::Init(RayConfig &config, int argc, char **argv) {
ray_namespace =
config.ray_namespace.empty() ? GenerateUUIDV4() : config.ray_namespace;
}

auto job_config_json_string = std::getenv("RAY_JOB_CONFIG_JSON_ENV_VAR");
if (job_config_json_string) {
json job_config_json = json::parse(job_config_json_string);
runtime_env = RuntimeEnv::Deserialize(job_config_json.at("runtime_env").dump());
job_config_metadata = job_config_json.at("metadata")
.get<std::unordered_map<std::string, std::string>>();
RAY_CHECK(job_config_json.size() == 2);
}
};

void ConfigInternal::SetBootstrapAddress(std::string_view address) {
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/ray/config_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <memory>
#include <string>
#include <string_view>
#include <unordered_map>

#include "ray/core_worker/common.h"

Expand Down Expand Up @@ -68,6 +69,8 @@ class ConfigInternal {
rpc::JobConfig_ActorLifetime default_actor_lifetime =
rpc::JobConfig_ActorLifetime_NON_DETACHED;

std::unordered_map<std::string, std::string> job_config_metadata;

std::string ray_namespace = "";

static ConfigInternal &Instance() {
Expand Down
71 changes: 71 additions & 0 deletions cpp/src/ray/test/examples/simple_job.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright 2022 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http:https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/// This is a simple job for test submit cpp job.

#include <ray/api.h>

/// common function
int Plus(int x, int y) { return x + y; }
/// Declare remote function
RAY_REMOTE(Plus);

/// class
class Counter {
public:
int count;

Counter(int init) { count = init; }
/// static factory method
static Counter *FactoryCreate(int init) { return new Counter(init); }

/// non static function
int Add(int x) {
count += x;
return count;
}
};
/// Declare remote function
RAY_REMOTE(Counter::FactoryCreate, &Counter::Add);

int main(int argc, char **argv) {
/// initialization
ray::Init();

/// put and get object
auto object = ray::Put(100);
auto put_get_result = *(ray::Get(object));
std::cout << "put_get_result = " << put_get_result << std::endl;

/// common task
auto task_object = ray::Task(Plus).Remote(1, 2);
int task_result = *(ray::Get(task_object));
std::cout << "task_result = " << task_result << std::endl;

/// actor
ray::ActorHandle<Counter> actor = ray::Actor(Counter::FactoryCreate).Remote(0);
/// actor task
auto actor_object = actor.Task(&Counter::Add).Remote(3);
int actor_task_result = *(ray::Get(actor_object));
std::cout << "actor_task_result = " << actor_task_result << std::endl;
/// actor task with reference argument
auto actor_object2 = actor.Task(&Counter::Add).Remote(task_object);
int actor_task_result2 = *(ray::Get(actor_object2));
std::cout << "actor_task_result2 = " << actor_task_result2 << std::endl;

std::cout << "try to get TEST_KEY: " << std::getenv("TEST_KEY") << std::endl;
/// shutdown
ray::Shutdown();
return 0;
}
6 changes: 6 additions & 0 deletions cpp/src/ray/util/process_helper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,12 @@ void ProcessHelper::RayStart(CoreWorkerOptions::TaskExecutionCallback callback)
job_config.mutable_runtime_env_info()->set_serialized_runtime_env(
ConfigInternal::Instance().runtime_env->Serialize());
}
if (ConfigInternal::Instance().job_config_metadata.size()) {
auto metadata_ptr = job_config.mutable_metadata();
for (const auto &it : ConfigInternal::Instance().job_config_metadata) {
(*metadata_ptr)[it.first] = it.second;
}
}
std::string serialized_job_config;
RAY_CHECK(job_config.SerializeToString(&serialized_job_config));
options.serialized_job_config = serialized_job_config;
Expand Down
84 changes: 84 additions & 0 deletions cpp/test_submit_cpp_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import os
import shutil
import sys
import tempfile

import pytest

from ray._private.test_utils import (
format_web_url,
wait_for_condition,
wait_until_server_available,
)
from ray.job_submission import JobStatus, JobSubmissionClient
from ray.tests.conftest import _ray_start


@pytest.fixture(scope="module")
def headers():
return {"Connection": "keep-alive", "Authorization": "TOK:<MY_TOKEN>"}


@pytest.fixture(scope="module")
def job_sdk_client(headers):
with _ray_start(
include_dashboard=True, num_cpus=1, _node_ip_address="0.0.0.0"
) as ctx:
address = ctx.address_info["webui_url"]
assert wait_until_server_available(address)
yield JobSubmissionClient(format_web_url(address), headers=headers)


def _check_job_succeeded(client: JobSubmissionClient, job_id: str) -> bool:
status = client.get_job_status(job_id)
if status == JobStatus.FAILED:
logs = client.get_job_logs(job_id)
raise RuntimeError(f"Job failed\nlogs:\n{logs}")
return status == JobStatus.SUCCEEDED


def test_submit_simple_cpp_job(job_sdk_client):
client = job_sdk_client

simple_job_so_path = os.environ["SIMPLE_DRIVER_SO_PATH"]
simple_job_so_filename = os.path.basename(simple_job_so_path)
simple_job_main_path = os.environ["SIMPLE_DRIVER_MAIN_PATH"]
simple_job_main_filename = os.path.basename(simple_job_main_path)
with tempfile.TemporaryDirectory() as tmp_dir:
working_dir = os.path.join(tmp_dir, "cpp_worker")
os.makedirs(working_dir)
shutil.copy2(
simple_job_so_path, os.path.join(working_dir, simple_job_so_filename)
)
shutil.copy2(
simple_job_main_path,
os.path.join(working_dir, simple_job_main_filename),
)
shutil.copymode(
simple_job_main_path,
os.path.join(working_dir, simple_job_main_filename),
)
entrypoint = (
f"chmod +x {simple_job_main_filename} && ./{simple_job_main_filename}"
)
runtime_env = dict(
working_dir=working_dir,
env_vars={"TEST_KEY": "TEST_VALUE"},
)

job_id = client.submit_job(
entrypoint=entrypoint,
runtime_env=runtime_env,
)

wait_for_condition(
_check_job_succeeded, client=client, job_id=job_id, timeout=120
)

logs = client.get_job_logs(job_id)
print(f"================== logs ================== \n {logs}")
assert "try to get TEST_KEY: TEST_VALUE" in logs


if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))

0 comments on commit f2568e1

Please sign in to comment.