Skip to content

Commit

Permalink
misc: cleanup
Browse files Browse the repository at this point in the history
Signed-off-by: jasonyuchen <[email protected]>
  • Loading branch information
JasonYuchen committed Feb 8, 2022
1 parent ef3cf2a commit 40b3a2c
Show file tree
Hide file tree
Showing 12 changed files with 104 additions and 53 deletions.
7 changes: 6 additions & 1 deletion 3rd/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
include(FetchContent)
FetchContent_Declare(
googletest
# Specify the commit you depend on and update it regularly.
URL https://github.com/google/googletest/archive/609281088cfefc76f9d0ce82e1ff6c30cc3591e5.zip
)

FetchContent_Declare(
simdjson
URL https://github.com/simdjson/simdjson/archive/refs/tags/v1.0.2.zip
)

FetchContent_MakeAvailable(googletest)
FetchContent_MakeAvailable(simdjson)
20 changes: 13 additions & 7 deletions core/raft.hh
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@ class raft {
};

raft(const config& config, log_reader& reader)
: _config(config), _log(_gid, reader, 0) {}
: _config(config), _log(_gid, reader) {}

seastar::future<> handle(protocol::message& m);
seastar::future<> handle(protocol::message&& m);
seastar::future<> handle(protocol::message&& m) { return handle(m); }

private:
friend class peer;
friend std::ostream& operator<<(std::ostream& os, const raft& r);
using role = protocol::raft_role;

// misc
Expand All @@ -44,15 +45,18 @@ class raft {
bool is_witness() const noexcept { return _role == role::witness; }
void must_be(protocol::raft_role role) const;
void must_not_be(protocol::raft_role role) const;
void report_dropped_config_change();
void report_dropped_proposal();
void report_dropped_read_index();
void report_dropped_config_change(protocol::log_entry_ptr e);
void report_dropped_proposal(protocol::message& m);
void report_dropped_read_index(protocol::message& m);
void finalize_message(protocol::message& m);
seastar::future<protocol::message> make_replicate(
uint64_t to, uint64_t next, uint64_t max_bytes);

// send
void send(protocol::message& m);
void send(protocol::message&& m);
void send_timeout_now(uint64_t to);
void send_heartbeat(uint64_t to, protocol::hint ctx, uint64_t match_index);
void send_replicate(uint64_t to, remote& r);
seastar::future<> send_replicate(uint64_t to, remote& r);
void broadcast_heartbeat();
void broadcast_replicate();

Expand Down Expand Up @@ -223,4 +227,6 @@ class raft {
message_handler _handlers[NUM_OF_STATE][NUM_OF_TYPE];
};

std::ostream& operator<<(std::ostream& os, const raft& r);

} // namespace rafter::core
14 changes: 13 additions & 1 deletion core/raft_log.cc
Original file line number Diff line number Diff line change
Expand Up @@ -415,14 +415,26 @@ future<> raft_log::get_entries_to_apply(protocol::log_entry_vector& entries) {
co_return;
}

future<size_t> raft_log::query(
uint64_t start,
protocol::log_entry_vector& entries,
size_t max_bytes) const {
if (start > last_index()) {
co_return max_bytes;
}
co_return co_await query(
{.low = start, .high = last_index()}, entries, max_bytes);
}

future<size_t> raft_log::query(
protocol::hint range,
protocol::log_entry_vector& entries,
size_t max_bytes) const noexcept {
size_t max_bytes) const {
check_range(range);
if (range.low == range.high) {
co_return max_bytes;
}
entries.reserve(range.count());
max_bytes = co_await query_logdb(range, entries, max_bytes);
if (max_bytes > 0) {
max_bytes = co_await query_memory(range, entries, max_bytes);
Expand Down
12 changes: 11 additions & 1 deletion core/raft_log.hh
Original file line number Diff line number Diff line change
Expand Up @@ -79,23 +79,31 @@ class log_reader {
class raft_log {
public:
raft_log(protocol::group_id gid, log_reader& log);
uint64_t committed() const noexcept { return _committed; }
uint64_t processed() const noexcept { return _processed; }
uint64_t first_index() const noexcept;
uint64_t last_index() const noexcept;
seastar::future<uint64_t> term(uint64_t index) const;
seastar::future<uint64_t> last_term() const;
seastar::future<bool> term_index_match(protocol::log_id lid) const;
// the available entries' term range + snapshot's term
protocol::hint term_entry_range() const noexcept;
// the available entries' range
protocol::hint entry_range() const noexcept;
uint64_t first_not_applied_index() const noexcept;
uint64_t apply_index_limit() const noexcept;
bool has_entries_to_apply() const noexcept;
bool has_more_entries_to_apply(uint64_t applied_to) const noexcept;
seastar::future<> get_entries_to_save(protocol::log_entry_vector& entries);
seastar::future<> get_entries_to_apply(protocol::log_entry_vector& entries);
seastar::future<size_t> query(
uint64_t start,
protocol::log_entry_vector& entries,
size_t max_bytes) const;
seastar::future<size_t> query(
protocol::hint range,
protocol::log_entry_vector& entries,
size_t max_bytes) const noexcept;
size_t max_bytes) const;
seastar::future<size_t> query_logdb(
protocol::hint range,
protocol::log_entry_vector& entries,
Expand All @@ -120,6 +128,8 @@ class raft_log {

protocol::group_id _gid;
uint64_t _committed;
// The last index known to be pushed to rsm for execution,
// not the last index confirmed to be executed by rsm
uint64_t _processed;
in_memory_log _in_memory;
log_reader& _logdb;
Expand Down
11 changes: 11 additions & 0 deletions protocol/raft.cc
Original file line number Diff line number Diff line change
Expand Up @@ -232,4 +232,15 @@ void utils::assert_continuous(log_entry_span left, log_entry_span right) {
}
}

void utils::fill_metadata_entries(log_entry_vector& entries) {
for (auto& entry : entries) {
if (entry->type != entry_type::config_change) {
log_id lid = entry->lid;
entry = seastar::make_lw_shared<log_entry>();
entry->type = entry_type::metadata;
entry->lid = lid;
}
}
}

} // namespace rafter::protocol
38 changes: 24 additions & 14 deletions protocol/raft.hh
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ enum class raft_role : uint8_t {

const char *name(enum raft_role role);
inline std::ostream &operator<<(std::ostream &os, raft_role role) {
os << name(role);
return os;
return os << name(role);
}

struct group_id {
Expand Down Expand Up @@ -86,10 +85,25 @@ enum class message_type : uint8_t {
num_of_type,
};

inline bool is_request_vote(message_type type) {
using enum message_type;
return type == request_vote || type == request_prevote;
}

inline bool is_request(message_type type) {
using enum message_type;
return type == propose || type == read_index || type == leader_transfer;
}

inline bool is_leader(message_type type) {
using enum message_type;
return type == replicate || type == install_snapshot || type == heartbeat ||
type == timeout_now || type == read_index_resp;
}

const char *name(enum message_type type);
inline std::ostream &operator<<(std::ostream &os, message_type type) {
os << name(type);
return os;
return os << name(type);
}

enum class entry_type : uint8_t {
Expand All @@ -102,8 +116,7 @@ enum class entry_type : uint8_t {

const char *name(enum entry_type type);
inline std::ostream &operator<<(std::ostream &os, entry_type type) {
os << name(type);
return os;
return os << name(type);
}

enum class config_change_type : uint8_t {
Expand All @@ -116,8 +129,7 @@ enum class config_change_type : uint8_t {

const char *name(enum config_change_type type);
inline std::ostream &operator<<(std::ostream &os, config_change_type type) {
os << name(type);
return os;
return os << name(type);
}

enum class state_machine_type : uint8_t {
Expand All @@ -127,8 +139,7 @@ enum class state_machine_type : uint8_t {

const char *name(enum state_machine_type type);
inline std::ostream &operator<<(std::ostream &os, state_machine_type type) {
os << name(type);
return os;
return os << name(type);
}

enum class compression_type : uint8_t {
Expand All @@ -140,8 +151,7 @@ enum class compression_type : uint8_t {

const char *name(enum compression_type type);
inline std::ostream &operator<<(std::ostream &os, compression_type type) {
os << name(type);
return os;
return os << name(type);
}

enum class checksum_type : uint8_t {
Expand All @@ -153,8 +163,7 @@ enum class checksum_type : uint8_t {

const char *name(enum checksum_type type);
inline std::ostream &operator<<(std::ostream &os, checksum_type type) {
os << name(type);
return os;
return os << name(type);
}

struct bootstrap {
Expand Down Expand Up @@ -426,6 +435,7 @@ using update_ptr = seastar::lw_shared_ptr<update>;
class utils {
public:
static void assert_continuous(log_entry_span left, log_entry_span right);
static void fill_metadata_entries(log_entry_vector &entries);
};

} // namespace rafter::protocol
2 changes: 1 addition & 1 deletion test/util_worker_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class worker_test : public ::testing::Test {
protected:
void SetUp() override {
base::submit([this]() -> future<> {
_worker = std::make_unique<worker<int>>("test_worker", 10);
_worker = std::make_unique<worker<int>>("test_worker", 10, l);
co_return;
});
}
Expand Down
11 changes: 4 additions & 7 deletions transport/exchanger.cc
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ future<rpc::sink<snapshot_chunk_ptr>> exchanger::make_sink_for_snapshot_chunk(
uint64_t, uint64_t, uint64_t, rpc::sink<snapshot_chunk_ptr>)>(
messaging_verb::snapshot);
co_await rpc_handler(*client, cluster_id, from, to, sink);
co_return sink;
co_return std::move(sink);
}

future<> exchanger::notify_unreachable(protocol::group_id target) {
Expand Down Expand Up @@ -147,12 +147,9 @@ bool exchanger::remove_rpc_client(peer_address address) {
}
auto client = std::move(it->second.rpc_client);
_clients.erase(it);
(void)client->stop()
.finally([address, client, exc = shared_from_this()] {
l.debug(
"exchanger::remove_rpc_client: dropped connection to {}", address);
})
.discard_result();
(void)client->stop().finally([address, client, exc = shared_from_this()] {
l.debug("exchanger::remove_rpc_client: dropped connection to {}", address);
});
return true;
}

Expand Down
18 changes: 10 additions & 8 deletions transport/express.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ future<> express::sender::start(snapshot_ptr snapshot) {
try {
auto sink = co_await _exchanger.make_sink_for_snapshot_chunk(
_pair.cluster, _pair.from, _pair.to);
auto deferred = defer([&sink] { (void)sink.close().discard_result(); });
auto deferred = defer([&sink] { (void)sink.close(); });
uint64_t chunk_id = 0;
uint64_t total_chunks = 0;
uint64_t snapshot_chunk_size = _exchanger.config().snapshot_chunk_size;
Expand All @@ -84,23 +84,23 @@ future<> express::sender::start(snapshot_ptr snapshot) {
co_await split_and_send(snapshot, file, total_chunks, chunk_id, sink);
}
co_await sink.flush();
(void)_exchanger.notify_successful({_pair.cluster, _pair.to})
.discard_result();
(void)_exchanger.notify_successful({_pair.cluster, _pair.to});
} catch (util::logic_error& e) {
l.error("express::sender::start: {}", e.what());
} catch (util::closed_error& e) {
l.info("express::sender::start: closed {}", e.what());
} catch (...) {
l.error("express::sender::start: {}", std::current_exception());
(void)_exchanger.notify_unreachable({_pair.cluster, _pair.to})
.discard_result();
(void)_exchanger.notify_unreachable({_pair.cluster, _pair.to});
}
co_return;
}

future<> express::sender::stop() {
if (_task) {
return _task->discard_result();
return _task->handle_exception([](std::exception_ptr e) {
l.warn("express::sender::stop: exception:{}", e);
});
}
return make_ready_future<>();
}
Expand All @@ -114,7 +114,7 @@ future<> express::sender::split_and_send(
const auto& file_path = file ? file->file_path : snapshot->file_path;
auto file_size = file ? file->file_size : snapshot->file_size;
auto f = co_await open_file_dma(file_path, open_flags::ro);
auto defer_close = seastar::defer([&f] { (void)f.close().discard_result(); });
auto defer_close = seastar::defer([&f] { (void)f.close(); });
uint64_t actual_file_size = co_await f.size();
if (file_size != actual_file_size) {
throw util::failed_precondition_error(fmt::format(
Expand Down Expand Up @@ -166,7 +166,9 @@ future<> express::receiver::start(rpc::source<snapshot_chunk_ptr> source) {

future<> express::receiver::stop() {
if (_task) {
return _task->discard_result();
return _task->handle_exception([](std::exception_ptr e) {
l.warn("express::receiver::stop: exception:{}", e);
});
}
return make_ready_future<>();
}
Expand Down
2 changes: 1 addition & 1 deletion util/error.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ const char* status_string(enum code e) {
"failed_precondition",
"failed_postcondition",
"invalid_argument",
"invalid_raft_role",
"invalid_raft_state",
"no_data",
"unknown"};
static_assert(
Expand Down
12 changes: 3 additions & 9 deletions util/error.hh
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ enum class code : uint8_t {
failed_postcondition,
unknown,
invalid_argument,
invalid_raft_role,
invalid_raft_state,
no_data,
num_of_codes,
};
Expand Down Expand Up @@ -186,16 +186,10 @@ class invalid_argument : public logic_error {
: logic_error("{}: arg:{}, reason:{}", code::invalid_argument, arg, msg) {}
};

class invalid_raft_role : public logic_error {
class invalid_raft_state : public logic_error {
public:
using logic_error::logic_error;
invalid_raft_role() : logic_error(code::invalid_raft_role) {}
invalid_raft_role(protocol::raft_role expect, protocol::raft_role actual)
: logic_error(
"{}: expect:{} actual:{}",
code::invalid_raft_role,
name(expect),
name(actual)) {}
invalid_raft_state() : logic_error(code::invalid_raft_state) {}
};

} // namespace rafter::util
Loading

0 comments on commit 40b3a2c

Please sign in to comment.