From 56d6144ad81d11c201561685def116336a188230 Mon Sep 17 00:00:00 2001 From: Dan Wang Date: Sat, 9 Oct 2021 18:59:09 +0800 Subject: [PATCH 1/4] feat: support single replica (#932) --- src/client/replication_ddl_client.cpp | 4 ++-- src/meta/greedy_load_balancer.cpp | 8 ++++++-- src/meta/meta_options.cpp | 16 +++++++++++----- 3 files changed, 19 insertions(+), 9 deletions(-) diff --git a/src/client/replication_ddl_client.cpp b/src/client/replication_ddl_client.cpp index ef89208028..72231201a2 100644 --- a/src/client/replication_ddl_client.cpp +++ b/src/client/replication_ddl_client.cpp @@ -128,8 +128,8 @@ dsn::error_code replication_ddl_client::create_app(const std::string &app_name, return ERR_INVALID_PARAMETERS; } - if (replica_count < 2) { - std::cout << "create app " << app_name << " failed: replica_count should >= 2" << std::endl; + if (replica_count < 1) { + std::cout << "create app " << app_name << " failed: replica_count should >= 1" << std::endl; return ERR_INVALID_PARAMETERS; } diff --git a/src/meta/greedy_load_balancer.cpp b/src/meta/greedy_load_balancer.cpp index d9b43e174d..16da837e6b 100644 --- a/src/meta/greedy_load_balancer.cpp +++ b/src/meta/greedy_load_balancer.cpp @@ -47,6 +47,8 @@ DSN_DEFINE_uint32("meta_server", "balance operation count per round for cluster balancer"); DSN_TAG_VARIABLE(balance_op_count_per_round, FT_MUTABLE); +DSN_DECLARE_uint64(min_live_node_count_for_unfreeze); + uint32_t get_partition_count(const node_state &ns, cluster_balance_type type, int32_t app_id) { unsigned count = 0; @@ -795,7 +797,8 @@ void greedy_load_balancer::shortest_path(std::vector &visit, bool greedy_load_balancer::primary_balancer_per_app(const std::shared_ptr &app, bool only_move_primary) { - dassert(t_alive_nodes > 2, "too few alive nodes will lead to freeze"); + dassert(t_alive_nodes >= FLAGS_min_live_node_count_for_unfreeze, + "too few alive nodes will lead to freeze"); ddebug("primary balancer for app(%s:%d)", app->app_name.c_str(), app->app_id); const node_mapper &nodes = *(t_global_view->nodes); @@ -888,7 +891,8 @@ bool greedy_load_balancer::all_replica_infos_collected(const node_state &ns) void greedy_load_balancer::greedy_balancer(const bool balance_checker) { - dassert(t_alive_nodes > 2, "too few nodes will be freezed"); + dassert(t_alive_nodes >= FLAGS_min_live_node_count_for_unfreeze, + "too few nodes will be freezed"); number_nodes(*t_global_view->nodes); for (auto &kv : *(t_global_view->nodes)) { diff --git a/src/meta/meta_options.cpp b/src/meta/meta_options.cpp index 9084416564..81c24aa50b 100644 --- a/src/meta/meta_options.cpp +++ b/src/meta/meta_options.cpp @@ -34,9 +34,19 @@ */ #include "meta_options.h" +#include + namespace dsn { namespace replication { +DSN_DEFINE_uint64("meta_server", + min_live_node_count_for_unfreeze, + 3, + "minimum live node count without which the state is freezed"); +DSN_TAG_VARIABLE(min_live_node_count_for_unfreeze, FT_MUTABLE); +DSN_DEFINE_validator(min_live_node_count_for_unfreeze, + [](uint64_t min_live_node_count) -> bool { return min_live_node_count > 0; }); + std::string meta_options::concat_path_unix_style(const std::string &prefix, const std::string &postfix) { @@ -72,11 +82,7 @@ void meta_options::initialize() "if live_node_count * 100 < total_node_count * node_live_percentage_threshold_for_update, " "then freeze the cluster; default is 65"); - min_live_node_count_for_unfreeze = - dsn_config_get_value_uint64("meta_server", - "min_live_node_count_for_unfreeze", - 3, - "minimum live node count without which the state is freezed"); + min_live_node_count_for_unfreeze = FLAGS_min_live_node_count_for_unfreeze; meta_function_level_on_start = meta_function_level::fl_invalid; const char *level_str = dsn_config_get_value_string( From 289eb4609ae781e3bcc6bbc6bfeae64dfd8fa785 Mon Sep 17 00:00:00 2001 From: Jiashuo Date: Mon, 11 Oct 2021 10:32:10 +0800 Subject: [PATCH 2/4] fix: mutation_log_test failed when compile with debug type (#933) --- src/replica/test/mutation_log_test.cpp | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/src/replica/test/mutation_log_test.cpp b/src/replica/test/mutation_log_test.cpp index 9f5806df17..945f40448b 100644 --- a/src/replica/test/mutation_log_test.cpp +++ b/src/replica/test/mutation_log_test.cpp @@ -268,10 +268,7 @@ namespace replication { class mutation_log_test : public replica_test_base { public: - gpid _gpid; - -public: - mutation_log_test() : _gpid(_replica->get_gpid()) {} + mutation_log_test() {} void SetUp() override { @@ -503,7 +500,7 @@ TEST_F(mutation_log_test, reset_from) std::vector expected; { // writing logs mutation_log_ptr mlog = - new mutation_log_private(_log_dir, 4, _gpid, _replica.get(), 1024, 512, 10000); + new mutation_log_private(_log_dir, 4, get_gpid(), _replica.get(), 1024, 512, 10000); EXPECT_EQ(mlog->open(nullptr, nullptr), ERR_OK); @@ -522,7 +519,7 @@ TEST_F(mutation_log_test, reset_from) // create another set of logs mutation_log_ptr mlog = - new mutation_log_private(_log_dir, 4, _gpid, _replica.get(), 1024, 512, 10000); + new mutation_log_private(_log_dir, 4, get_gpid(), _replica.get(), 1024, 512, 10000); EXPECT_EQ(mlog->open(nullptr, nullptr), ERR_OK); for (int i = 0; i < 1000; i++) { mutation_ptr mu = create_test_mutation(2000 + i, "hello!"); @@ -553,7 +550,7 @@ TEST_F(mutation_log_test, reset_from_while_writing) std::vector expected; { // writing logs mutation_log_ptr mlog = - new mutation_log_private(_log_dir, 4, _gpid, _replica.get(), 1024, 512, 10000); + new mutation_log_private(_log_dir, 4, get_gpid(), _replica.get(), 1024, 512, 10000); EXPECT_EQ(mlog->open(nullptr, nullptr), ERR_OK); for (int i = 0; i < 10; i++) { @@ -568,7 +565,7 @@ TEST_F(mutation_log_test, reset_from_while_writing) // create another set of logs mutation_log_ptr mlog = - new mutation_log_private(_log_dir, 4, _gpid, _replica.get(), 1024, 512, 10000); + new mutation_log_private(_log_dir, 4, get_gpid(), _replica.get(), 1024, 512, 10000); EXPECT_EQ(mlog->open(nullptr, nullptr), ERR_OK); // given with a large number of mutation to ensure From 86be8c45e02473033681e209ced126179ac3ec2d Mon Sep 17 00:00:00 2001 From: heyuchen Date: Tue, 10 May 2022 17:35:04 +0800 Subject: [PATCH 3/4] feat(bulk_load): support clear last bulk load state rpc --- .../dsn/dist/replication/replication.codes.h | 1 + .../dist/replication/replication_ddl_client.h | 2 + src/client/replication_ddl_client.cpp | 8 ++++ src/common/bulk_load.thrift | 15 +++++++ src/common/bulk_load_common.h | 1 + src/meta/meta_bulk_load_service.cpp | 41 ++++++++++++++++++ src/meta/meta_bulk_load_service.h | 4 ++ src/meta/meta_service.cpp | 19 ++++++++ src/meta/meta_service.h | 1 + src/meta/test/meta_bulk_load_service_test.cpp | 43 +++++++++++++++++++ 10 files changed, 135 insertions(+) diff --git a/include/dsn/dist/replication/replication.codes.h b/include/dsn/dist/replication/replication.codes.h index ed23ec98e9..c63022d0ff 100644 --- a/include/dsn/dist/replication/replication.codes.h +++ b/include/dsn/dist/replication/replication.codes.h @@ -122,6 +122,7 @@ MAKE_EVENT_CODE_RPC(RPC_CM_QUERY_CHILD_STATE, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE_RPC(RPC_CM_START_BULK_LOAD, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE_RPC(RPC_CM_CONTROL_BULK_LOAD, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE_RPC(RPC_CM_QUERY_BULK_LOAD_STATUS, TASK_PRIORITY_COMMON) +MAKE_EVENT_CODE_RPC(RPC_CM_CLEAR_BULK_LOAD, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE_RPC(RPC_CM_START_BACKUP_APP, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE_RPC(RPC_CM_QUERY_BACKUP_STATUS, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE_RPC(RPC_CM_START_MANUAL_COMPACT, TASK_PRIORITY_COMMON) diff --git a/include/dsn/dist/replication/replication_ddl_client.h b/include/dsn/dist/replication/replication_ddl_client.h index 94349ba505..c82868e5ec 100644 --- a/include/dsn/dist/replication/replication_ddl_client.h +++ b/include/dsn/dist/replication/replication_ddl_client.h @@ -195,6 +195,8 @@ class replication_ddl_client error_with query_bulk_load(const std::string &app_name); + error_with clear_bulk_load(const std::string &app_name); + error_code detect_hotkey(const dsn::rpc_address &target, detect_hotkey_request &req, detect_hotkey_response &resp); diff --git a/src/client/replication_ddl_client.cpp b/src/client/replication_ddl_client.cpp index 3a0b794eda..e8f38a2b5a 100644 --- a/src/client/replication_ddl_client.cpp +++ b/src/client/replication_ddl_client.cpp @@ -1610,6 +1610,14 @@ replication_ddl_client::query_bulk_load(const std::string &app_name) return call_rpc_sync(query_bulk_load_rpc(std::move(req), RPC_CM_QUERY_BULK_LOAD_STATUS)); } +error_with +replication_ddl_client::clear_bulk_load(const std::string &app_name) +{ + auto req = make_unique(); + req->app_name = app_name; + return call_rpc_sync(clear_bulk_load_rpc(std::move(req), RPC_CM_CLEAR_BULK_LOAD)); +} + error_code replication_ddl_client::detect_hotkey(const dsn::rpc_address &target, detect_hotkey_request &req, detect_hotkey_response &resp) diff --git a/src/common/bulk_load.thrift b/src/common/bulk_load.thrift index 3e0ea254da..7a2f3d88a3 100644 --- a/src/common/bulk_load.thrift +++ b/src/common/bulk_load.thrift @@ -217,3 +217,18 @@ struct query_bulk_load_response 7:optional string hint_msg; 8:optional bool is_bulk_loading; } + +struct clear_bulk_load_state_request +{ + 1:string app_name; +} + +struct clear_bulk_load_state_response +{ + // Possible error: + // - ERR_APP_NOT_EXIST: app not exist + // - ERR_APP_DROPPED: app has been dropped + // - ERR_INVALID_STATE: app is executing bulk load + 1:dsn.error_code err; + 2:string hint_msg; +} diff --git a/src/common/bulk_load_common.h b/src/common/bulk_load_common.h index 74d45e2744..ccc30a0cce 100644 --- a/src/common/bulk_load_common.h +++ b/src/common/bulk_load_common.h @@ -27,6 +27,7 @@ typedef rpc_holder start_bulk typedef rpc_holder bulk_load_rpc; typedef rpc_holder control_bulk_load_rpc; typedef rpc_holder query_bulk_load_rpc; +typedef rpc_holder clear_bulk_load_rpc; class bulk_load_constant { diff --git a/src/meta/meta_bulk_load_service.cpp b/src/meta/meta_bulk_load_service.cpp index 49b27f85f8..f49c1c7447 100644 --- a/src/meta/meta_bulk_load_service.cpp +++ b/src/meta/meta_bulk_load_service.cpp @@ -1572,6 +1572,47 @@ void bulk_load_service::on_query_bulk_load_status(query_bulk_load_rpc rpc) dsn::enum_to_string(response.app_status)); } +void bulk_load_service::on_clear_bulk_load(clear_bulk_load_rpc rpc) +{ + const auto &request = rpc.request(); + const std::string &app_name = request.app_name; + clear_bulk_load_state_response &response = rpc.response(); + + std::shared_ptr app = get_app(app_name); + if (app == nullptr || app->status != app_status::AS_AVAILABLE) { + response.err = (app == nullptr) ? ERR_APP_NOT_EXIST : ERR_APP_DROPPED; + response.hint_msg = fmt::format("app({}) is not existed or not available", app_name); + derror_f("{}", response.hint_msg); + return; + } + + if (app->is_bulk_loading) { + response.err = ERR_INVALID_STATE; + response.hint_msg = fmt::format("app({}) is executing bulk load", app_name); + derror_f("{}", response.hint_msg); + return; + } + + do_clear_app_bulk_load_result(app->app_id, rpc); +} + +void bulk_load_service::do_clear_app_bulk_load_result(int32_t app_id, clear_bulk_load_rpc rpc) +{ + FAIL_POINT_INJECT_F("meta_do_clear_app_bulk_load_result", + [rpc](dsn::string_view) { rpc.response().err = ERR_OK; }); + std::string bulk_load_path = get_app_bulk_load_path(app_id); + _meta_svc->get_meta_storage()->delete_node_recursively( + std::move(bulk_load_path), [this, app_id, bulk_load_path, rpc]() { + clear_bulk_load_state_response &response = rpc.response(); + response.err = ERR_OK; + response.hint_msg = + fmt::format("clear app({}) bulk load result succeed, remove bulk load dir succeed", + rpc.request().app_name); + reset_local_bulk_load_states(app_id, rpc.request().app_name, true); + ddebug_f("{}", response.hint_msg); + }); +} + // ThreadPool: THREAD_POOL_META_SERVER void bulk_load_service::create_bulk_load_root_dir() { diff --git a/src/meta/meta_bulk_load_service.h b/src/meta/meta_bulk_load_service.h index 6b335b4c03..861e8b8403 100644 --- a/src/meta/meta_bulk_load_service.h +++ b/src/meta/meta_bulk_load_service.h @@ -122,6 +122,8 @@ class bulk_load_service void on_control_bulk_load(control_bulk_load_rpc rpc); // client -> meta server to query bulk load status void on_query_bulk_load_status(query_bulk_load_rpc rpc); + // client -> meta server to clear bulk load state + void on_clear_bulk_load(clear_bulk_load_rpc rpc); // Called by `sync_apps_from_remote_storage`, check bulk load state consistency // Handle inconsistent conditions below: @@ -146,6 +148,8 @@ class bulk_load_service void do_start_app_bulk_load(std::shared_ptr app, start_bulk_load_rpc rpc); + void do_clear_app_bulk_load_result(int32_t app_id, clear_bulk_load_rpc rpc); + // Called by `partition_bulk_load` and `partition_ingestion` // check partition status before sending partition_bulk_load_request and // partition_ingestion_request diff --git a/src/meta/meta_service.cpp b/src/meta/meta_service.cpp index d3d0be5bbe..4666414173 100644 --- a/src/meta/meta_service.cpp +++ b/src/meta/meta_service.cpp @@ -544,6 +544,8 @@ void meta_service::register_rpc_handlers() register_rpc_handler_with_rpc_holder(RPC_CM_QUERY_BULK_LOAD_STATUS, "query_bulk_load_status", &meta_service::on_query_bulk_load_status); + register_rpc_handler_with_rpc_holder( + RPC_CM_CLEAR_BULK_LOAD, "clear_bulk_load", &meta_service::on_clear_bulk_load); register_rpc_handler_with_rpc_holder( RPC_CM_START_BACKUP_APP, "start_backup_app", &meta_service::on_start_backup_app); register_rpc_handler_with_rpc_holder( @@ -1201,6 +1203,23 @@ void meta_service::on_query_bulk_load_status(query_bulk_load_rpc rpc) _bulk_load_svc->on_query_bulk_load_status(std::move(rpc)); } +void meta_service::on_clear_bulk_load(clear_bulk_load_rpc rpc) +{ + if (!check_status(rpc)) { + return; + } + + if (_bulk_load_svc == nullptr) { + derror_f("meta doesn't support bulk load"); + rpc.response().err = ERR_SERVICE_NOT_ACTIVE; + return; + } + tasking::enqueue(LPC_META_STATE_NORMAL, + tracker(), + [this, rpc]() { _bulk_load_svc->on_clear_bulk_load(std::move(rpc)); }, + server_state::sStateHash); +} + void meta_service::on_start_backup_app(start_backup_app_rpc rpc) { if (!check_status(rpc)) { diff --git a/src/meta/meta_service.h b/src/meta/meta_service.h index d83281ab79..464b45402f 100644 --- a/src/meta/meta_service.h +++ b/src/meta/meta_service.h @@ -240,6 +240,7 @@ class meta_service : public serverlet void on_start_bulk_load(start_bulk_load_rpc rpc); void on_control_bulk_load(control_bulk_load_rpc rpc); void on_query_bulk_load_status(query_bulk_load_rpc rpc); + void on_clear_bulk_load(clear_bulk_load_rpc rpc); // manual compaction void on_start_manual_compact(start_manual_compact_rpc rpc); diff --git a/src/meta/test/meta_bulk_load_service_test.cpp b/src/meta/test/meta_bulk_load_service_test.cpp index 0e9eee6856..43f427e8d1 100644 --- a/src/meta/test/meta_bulk_load_service_test.cpp +++ b/src/meta/test/meta_bulk_load_service_test.cpp @@ -101,6 +101,20 @@ class bulk_load_service_test : public meta_test_base return rpc.response().err; } + error_code + clear_bulk_load(int32_t app_id, const std::string &app_name, bulk_load_status::type app_status) + { + bulk_svc()._app_bulk_load_info[app_id].status = app_status; + + auto request = dsn::make_unique(); + request->app_name = app_name; + + clear_bulk_load_rpc rpc(std::move(request), RPC_CM_CLEAR_BULK_LOAD); + bulk_svc().on_clear_bulk_load(rpc); + wait_all(); + return rpc.response().err; + } + void mock_meta_bulk_load_context(int32_t app_id, int32_t in_progress_partition_count, bulk_load_status::type status, @@ -663,6 +677,35 @@ TEST_F(bulk_load_service_test, query_bulk_load_status_success) ASSERT_EQ(query_bulk_load(APP_NAME), ERR_OK); } +/// clear bulk load unit tests +TEST_F(bulk_load_service_test, clear_bulk_load_test) +{ + create_app(APP_NAME); + std::shared_ptr app = find_app(APP_NAME); + mock_meta_bulk_load_context(app->app_id, app->partition_count, bulk_load_status::BLS_INVALID); + fail::setup(); + fail::cfg("meta_do_clear_app_bulk_load_result", "return()"); + + struct clear_test + { + std::string app_name; + bool is_bulk_loading; + bulk_load_status::type app_status; + error_code expected_err; + } tests[] = {{"not_exist_app", false, bulk_load_status::BLS_INVALID, ERR_APP_NOT_EXIST}, + {APP_NAME, true, bulk_load_status::BLS_DOWNLOADING, ERR_INVALID_STATE}, + {APP_NAME, false, bulk_load_status::BLS_SUCCEED, ERR_OK}, + {APP_NAME, false, bulk_load_status::BLS_FAILED, ERR_OK}, + {APP_NAME, false, bulk_load_status::BLS_CANCELED, ERR_OK}}; + + for (auto test : tests) { + app->is_bulk_loading = test.is_bulk_loading; + ASSERT_EQ(clear_bulk_load(app->app_id, test.app_name, test.app_status), test.expected_err); + } + reset_local_bulk_load_states(app->app_id, APP_NAME); + fail::teardown(); +} + /// bulk load process unit tests class bulk_load_process_test : public bulk_load_service_test { From 3d13dcafdb2774574cf2dbb086dc1842a6781596 Mon Sep 17 00:00:00 2001 From: heyuchen Date: Wed, 11 May 2022 10:32:42 +0800 Subject: [PATCH 4/4] format code --- src/common/bulk_load_common.h | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/common/bulk_load_common.h b/src/common/bulk_load_common.h index ccc30a0cce..949e2b0c32 100644 --- a/src/common/bulk_load_common.h +++ b/src/common/bulk_load_common.h @@ -27,7 +27,8 @@ typedef rpc_holder start_bulk typedef rpc_holder bulk_load_rpc; typedef rpc_holder control_bulk_load_rpc; typedef rpc_holder query_bulk_load_rpc; -typedef rpc_holder clear_bulk_load_rpc; +typedef rpc_holder + clear_bulk_load_rpc; class bulk_load_constant {