Skip to content

Commit

Permalink
transport: refactor snapshot express
Browse files Browse the repository at this point in the history
Signed-off-by: jasonyuchen <[email protected]>
  • Loading branch information
JasonYuchen committed Jan 25, 2022
1 parent 48d4b85 commit fb05c1f
Show file tree
Hide file tree
Showing 7 changed files with 225 additions and 115 deletions.
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ set(SOURCES
storage/stats.hh
transport/exchanger.cc
transport/exchanger.hh
transport/express.cc
transport/express.hh
transport/logger.cc
transport/logger.hh
transport/registry.cc
Expand Down
30 changes: 21 additions & 9 deletions test/transport_exchanger_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -109,18 +109,30 @@ RAFTER_TEST_F(exchanger_test, streaming) {
};

co_await _exchanger->invoke_on_all([fiber](exchanger& exc) -> future<> {
exc.register_snapshot_chunk([fiber](auto& info, auto source) -> future<> {
// use this source to make a sink for bidirectional communication
// save this source/sink in other coroutine
l.info("source from {}", info.addr);
(void)fiber(source).handle_exception(
[](std::exception_ptr ep) { ADD_FAILURE() << "exception: " << ep; });
co_return;
});
exc.register_snapshot_chunk(
[fiber](
auto& info,
uint64_t cluster,
uint64_t from,
uint64_t to,
auto source) -> future<> {
// use this source to make a sink for bidirectional communication
// save this source/sink in other coroutine
l.info(
"source from {}, cluster:{}, from:{}, to:{}",
info.addr,
cluster,
from,
to);
(void)fiber(source).handle_exception([](std::exception_ptr ep) {
ADD_FAILURE() << "exception: " << ep;
});
co_return;
});
return make_ready_future<>();
});
auto sink = co_await _exchanger->invoke_on(
0, &exchanger::make_sink_for_snapshot_chunk, _gid);
0, &exchanger::make_sink_for_snapshot_chunk, _gid.cluster, 2, _gid.node);
EXPECT_EQ(state, fiber_state::created);
EXPECT_EQ(sink.get_id().id, source_stream_id);
auto chunk = make_lw_shared<protocol::snapshot_chunk>();
Expand Down
35 changes: 29 additions & 6 deletions transport/exchanger.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ using namespace protocol;
using namespace seastar;
using namespace std::chrono_literals;

exchanger::exchanger(const config& config, registry& reg)
exchanger::exchanger(const struct config& config, registry& reg)
: _config(config)
, _registry(reg)
, _express(*this)
, _dropped_messages{0}
, _rpc(std::make_unique<rpc_protocol>(serializer{})) {
_rpc->set_logger(&l);
Expand Down Expand Up @@ -52,6 +53,7 @@ future<> exchanger::shutdown() {
l.info("exchanger::shutdown: closing connection to {}", peer.first);
co_await peer.second.rpc_client->stop();
});
co_await _express.stop();
l.info("exchanger::shutdown: done");
}

Expand All @@ -68,24 +70,45 @@ future<> exchanger::send_message(message_ptr msg) {
messaging_verb::message, *address, std::move(msg));
}

future<> exchanger::send_snapshot(protocol::message_ptr message) {
if (!message->snapshot) {
throw util::invalid_argument("snapshot", "empty snapshot in message");
}
return _express.send(message);
}

future<rpc::sink<snapshot_chunk_ptr>> exchanger::make_sink_for_snapshot_chunk(
group_id gid) {
uint64_t cluster_id, uint64_t from, uint64_t to) {
// if shutting down
auto address = _registry.resolve(gid);
protocol::group_id remote = {.cluster = cluster_id, .node = to};
auto address = _registry.resolve(remote);
if (!address) {
// TODO(jyc): group unreachable
co_return coroutine::make_exception(util::peer_not_found_error(gid));
co_return coroutine::make_exception(util::peer_not_found_error(remote));
}
auto client = get_rpc_client(messaging_verb::message, *address);
auto sink =
co_await client->make_stream_sink<serializer, snapshot_chunk_ptr>();
// register streaming pipeline in the server
auto rpc_handler = _rpc->make_client<void(rpc::sink<snapshot_chunk_ptr>)>(
auto rpc_handler = _rpc->make_client<void(
uint64_t, uint64_t, uint64_t, rpc::sink<snapshot_chunk_ptr>)>(
messaging_verb::snapshot);
co_await rpc_handler(*client, sink);
co_await rpc_handler(*client, cluster_id, from, to, sink);
co_return sink;
}

future<> exchanger::notify_unreachable(protocol::group_id target) {
// TODO(jyc): notify unreachable
l.warn("exchanger::notify_unreachable: {}", target);
co_return;
}

future<> exchanger::notify_successful(protocol::group_id target) {
// TODO(jyc): notify successful
l.info("exchanger::notify_successful: {}", target);
co_return;
}

shared_ptr<exchanger::rpc_protocol_client> exchanger::get_rpc_client(
messaging_verb, peer_address address) {
auto it = _clients.find(address);
Expand Down
20 changes: 12 additions & 8 deletions transport/exchanger.hh
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class exchanger
seastar::shared_ptr<rpc_protocol_client> rpc_client;
seastar::rpc::stats stats() const { return rpc_client->get_stats(); }
};
// TODO(jyc): initialize all handlers
seastar::future<> start_listen();
seastar::future<> shutdown();
seastar::future<> stop() { return seastar::make_ready_future<>(); }
Expand All @@ -70,30 +71,33 @@ class exchanger
_rpc->register_handler(messaging_verb::message, std::move(func));
}

seastar::future<> register_message() {
seastar::future<> unregister_message() {
return _rpc->unregister_handler(messaging_verb::message);
}

seastar::future<> send_message(protocol::message_ptr message);
seastar::future<> send_snapshot(protocol::snapshot_ptr snapshot);
seastar::future<> send_snapshot(protocol::message_ptr message);

void register_snapshot_chunk(
std::function<seastar::future<>(
const seastar::rpc::client_info& info,
uint64_t cluster,
uint64_t from,
uint64_t to,
seastar::rpc::source<protocol::snapshot_chunk_ptr> source)>&& func) {
_rpc->register_handler(messaging_verb::snapshot, std::move(func));
}

seastar::future<> register_snapshot_chunk() {
seastar::future<> unregister_snapshot_chunk() {
return _rpc->unregister_handler(messaging_verb::snapshot);
}

// TODO(jyc): use raft group_id to get address from registry
seastar::future<seastar::rpc::sink<protocol::snapshot_chunk_ptr>>
make_sink_for_snapshot_chunk(protocol::group_id gid);
make_sink_for_snapshot_chunk(uint64_t cluster_id, uint64_t from, uint64_t to);

seastar::future<> notify_unreachable(protocol::group_id target) { co_return; }
seastar::future<> notify_successful(protocol::group_id target) { co_return; }
seastar::future<> notify_unreachable(protocol::group_id target);
seastar::future<> notify_successful(protocol::group_id target);

private:
// TODO(jyc): split normal client and streaming client
Expand All @@ -102,14 +106,14 @@ class exchanger

bool remove_rpc_client(peer_address address);

const config& _config;
const struct config& _config;
registry& _registry;
express _express;
bool _shutting_down = false;
uint64_t _dropped_messages[static_cast<int32_t>(messaging_verb::num_of_verb)];
std::unique_ptr<rpc_protocol> _rpc;
std::unique_ptr<rpc_protocol_server> _server;
std::unordered_map<peer_address, peer_info, peer_address::hash> _clients;
std::unordered_map<protocol::group_id, express_ptr> _expresses;
};

} // namespace rafter::transport
167 changes: 107 additions & 60 deletions transport/express.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,107 +10,154 @@

#include "transport/exchanger.hh"
#include "transport/logger.hh"
#include "util/error.hh"

namespace rafter::transport {

using namespace seastar;
using namespace protocol;

future<> express::start() {
_worker.start([this](auto& t, bool& open) { return this->main(t, open); });
future<> express::stop() {
auto s = _senders.begin();
while (s != _senders.end()) {
co_await s->second->stop();
s = _senders.begin();
}
auto r = _receivers.begin();
while (r != _receivers.end()) {
co_await r->second->stop();
r = _receivers.begin();
}
co_return;
}

future<> express::close() {
_open = false;
co_await _worker.close();
if (_split_task) {
co_await _split_task->discard_result();
future<> express::send(message_ptr message) {
auto key = pair{message->cluster, message->from, message->to};
if (_senders.contains(key)) {
l.warn(
"express::send: ongoing at cluster:{}, from:{}, to:{}",
key.cluster,
key.from,
key.to);
co_return;
}
co_await _sink.flush();
co_await _sink.close();
auto s = make_lw_shared<sender>(_exchanger, key);
_senders.emplace(key, s);
s->_task = s->start(message->snapshot).finally([this, key]() {
_senders.erase(key);
});
co_return;
}

future<> express::send(protocol::snapshot_ptr snapshot) {
// TODO(jyc): handle split failure
_split_task = split(std::move(snapshot));
future<> express::receive(pair key, rpc::source<snapshot_chunk_ptr> source) {
if (_receivers.contains(key)) {
l.warn(
"express::receive: ongoing at cluster:{}, from:{}, to:{}",
key.cluster,
key.from,
key.to);
co_return;
}
auto s = make_lw_shared<receiver>(_exchanger, key);
_receivers.emplace(key, s);
s->_task = s->start(std::move(source));
co_return;
}

future<> express::sender::start(snapshot_ptr snapshot) {
// TODO(jyc)
try {
auto sink = co_await _exchanger.make_sink_for_snapshot_chunk(
_pair.cluster, _pair.from, _pair.to);
auto deferred = defer([&sink] { (void)sink.close().discard_result(); });
uint64_t chunk_id = 0;
uint64_t total_chunks = 0;
uint64_t snapshot_chunk_size = _exchanger.config().snapshot_chunk_size;
total_chunks += (snapshot->file_size - 1) / snapshot_chunk_size + 1;
for (auto file : snapshot->files) {
total_chunks += (file->file_size - 1) / snapshot_chunk_size + 1;
}
co_await split_and_send(snapshot, {}, total_chunks, chunk_id, sink);
for (auto file : snapshot->files) {
co_await split_and_send(snapshot, file, total_chunks, chunk_id, sink);
}
co_await sink.flush();
(void)_exchanger.notify_successful({_pair.cluster, _pair.to})
.discard_result();
} catch (util::logic_error& e) {
l.error("express::sender::start: {}", e.what());
} catch (...) {
l.error("express::sender::start: {}", std::current_exception());
(void)_exchanger.notify_unreachable({_pair.cluster, _pair.to})
.discard_result();
}
co_return;
}

future<> express::split(
const protocol::snapshot& snapshot,
const std::string& file_path,
uint64_t start_chunk_id,
future<> express::sender::stop() {
if (_task) {
return _task->discard_result();
}
return make_ready_future<>();
}

future<> express::sender::split_and_send(
protocol::snapshot_ptr snapshot,
protocol::snapshot_file_ptr file,
std::vector<protocol::snapshot_chunk_ptr>& chunks) {
uint64_t total_chunks,
uint64_t& chunk_id,
seastar::rpc::sink<protocol::snapshot_chunk_ptr>& sink) {
const auto& file_path = file ? file->file_path : snapshot->file_path;
auto file_size = file ? file->file_size : snapshot->file_size;
auto f = co_await open_file_dma(file_path, open_flags::ro);
auto defer_close = seastar::defer([&f] { (void)f.close().discard_result(); });
uint64_t file_size = co_await f.size();
uint64_t actual_file_size = co_await f.size();
if (file_size != actual_file_size) {
throw util::failed_precondition_error(fmt::format(
"inconsistent file size, expect:{}, actual:{}",
file_size,
actual_file_size));
}
if (file_size == 0) {
throw util::out_of_range_error(fmt::format("empty file:{}", file_path));
}
auto fstream = make_file_input_stream(f);
uint64_t snapshot_chunk_size = _exchanger.config().snapshot_chunk_size;
uint64_t file_chunk_count = (file_size - 1) / snapshot_chunk_size + 1;
for (uint64_t i = 0; i < file_chunk_count; ++i) {
auto& c = chunks.emplace_back(make_lw_shared<protocol::snapshot_chunk>());
c->group_id = snapshot.group_id;
c->log_id = snapshot.log_id;
c->from = _local.node;
c->id = start_chunk_id + i;
auto c = make_lw_shared<protocol::snapshot_chunk>();
c->group_id = snapshot->group_id;
c->log_id = snapshot->log_id;
c->from = _pair.from;
c->id = chunk_id++;
c->count = total_chunks;
c->size = (i == file_chunk_count - 1) ? file_size % snapshot_chunk_size
: snapshot_chunk_size;
auto buf = co_await fstream.read_exactly(c->size);
c->data = std::string(buf.get(), buf.size());
c->membership = snapshot.membership;
c->membership = snapshot->membership;
c->file_path = file_path;
c->file_size = file_size;
c->file_chunk_id = i;
c->file_chunk_count = file_chunk_count;
c->file_info = file;
c->on_disk_index = snapshot.on_disk_index;
c->witness = snapshot.witness;
c->on_disk_index = snapshot->on_disk_index;
c->witness = snapshot->witness;
co_await sink(c);
}
co_return;
}

future<> express::split(protocol::snapshot_ptr snapshot) {
// TODO(jyc): split the snapshot into chunks and push to sink
uint64_t chunk_id = 0;
std::vector<protocol::snapshot_chunk_ptr> chunks;
co_await split(*snapshot, snapshot->file_path, chunk_id, {}, chunks);
chunk_id += chunks.size();
for (auto file : snapshot->files) {
// TODO(jyc): validation size?
co_await split(*snapshot, file->file_path, chunk_id, file, chunks);
chunk_id += chunks.size();
}
// TODO(jyc): use file size to determine the total number of chunks at first
std::for_each(chunks.begin(), chunks.end(), [n = chunks.size()](auto& chunk) {
chunk->count = n;
});
future<> express::receiver::start(rpc::source<snapshot_chunk_ptr> source) {
// TODO(jyc)
co_return;
}

future<> express::main(
std::vector<protocol::snapshot_chunk_ptr>& chunks, bool& open) {
if (!_open) {
co_return;
}
try {
for (const auto& chunk : chunks) {
if (!open) {
break;
}
co_await _sink(chunk);
if (chunk->id == chunk->count) {
(void)_exchanger.notify_successful(_target).discard_result();
}
}
} catch (... /*TODO(jyc): rpc error, report unreachable*/) {
l.error("express::main: {} {}", _target, std::current_exception());
(void)_exchanger.notify_unreachable(_target).discard_result();
future<> express::receiver::stop() {
if (_task) {
return _task->discard_result();
}
co_return;
return make_ready_future<>();
}

} // namespace rafter::transport
Loading

0 comments on commit fb05c1f

Please sign in to comment.