From 3084c310cbc5794797298a00d5333f61ca709384 Mon Sep 17 00:00:00 2001 From: jasonyuchen Date: Sat, 22 Jan 2022 16:19:59 +0800 Subject: [PATCH] transport: pipeline snapshot loading and sending --- transport/express.cc | 67 +++++++++++++++++++++++++++----------------- transport/express.hh | 9 +++--- 2 files changed, 45 insertions(+), 31 deletions(-) diff --git a/transport/express.cc b/transport/express.cc index 97a4fba..a26e08d 100644 --- a/transport/express.cc +++ b/transport/express.cc @@ -26,26 +26,32 @@ future<> express::close() { if (_split_task) { co_await _split_task->discard_result(); } - co_await _sink.flush(); + co_await _sink.flush().discard_result(); co_await _sink.close(); co_return; } future<> express::send(protocol::snapshot_ptr snapshot) { - // TODO(jyc): handle split failure _split_task = split(std::move(snapshot)); co_return; } future<> express::split( - const protocol::snapshot& snapshot, - const std::string& file_path, - uint64_t start_chunk_id, - protocol::snapshot_file_ptr file, - std::vector& chunks) { + protocol::snapshot_ptr snapshot, + uint64_t& chunk_id, + uint64_t total_chunks, + protocol::snapshot_file_ptr file) { + const auto& file_path = file ? file->file_path : snapshot->file_path; + auto file_size = file ? file->file_size : snapshot->file_size; auto f = co_await open_file_dma(file_path, open_flags::ro); - auto defer_close = seastar::defer([&f] { (void)f.close().discard_result(); }); - uint64_t file_size = co_await f.size(); + auto defer_close = defer([&f] { (void)f.close().discard_result(); }); + auto actual_file_size = co_await f.size(); + if (file_size != actual_file_size) { + throw util::failed_precondition_error(fmt::format( + "inconsistent file size, expect:{}, actual:{}", + file_size, + actual_file_size)); + } if (file_size == 0) { throw util::out_of_range_error(fmt::format("empty file:{}", file_path)); } @@ -53,23 +59,25 @@ future<> express::split( uint64_t snapshot_chunk_size = _exchanger.config().snapshot_chunk_size; uint64_t file_chunk_count = (file_size - 1) / snapshot_chunk_size + 1; for (uint64_t i = 0; i < file_chunk_count; ++i) { - auto& c = chunks.emplace_back(make_lw_shared()); - c->group_id = snapshot.group_id; - c->log_id = snapshot.log_id; + auto c = make_lw_shared(); + c->group_id = snapshot->group_id; + c->log_id = snapshot->log_id; c->from = _local.node; - c->id = start_chunk_id + i; + c->id = chunk_id++; + c->count = total_chunks; c->size = (i == file_chunk_count - 1) ? file_size % snapshot_chunk_size : snapshot_chunk_size; auto buf = co_await fstream.read_exactly(c->size); c->data = std::string(buf.get(), buf.size()); - c->membership = snapshot.membership; + c->membership = snapshot->membership; c->file_path = file_path; c->file_size = file_size; c->file_chunk_id = i; c->file_chunk_count = file_chunk_count; c->file_info = file; - c->on_disk_index = snapshot.on_disk_index; - c->witness = snapshot.witness; + c->on_disk_index = snapshot->on_disk_index; + c->witness = snapshot->witness; + co_await _worker.push_eventually(std::move(c)); } co_return; } @@ -77,18 +85,24 @@ future<> express::split( future<> express::split(protocol::snapshot_ptr snapshot) { // TODO(jyc): split the snapshot into chunks and push to sink uint64_t chunk_id = 0; - std::vector chunks; - co_await split(*snapshot, snapshot->file_path, chunk_id, {}, chunks); - chunk_id += chunks.size(); + uint64_t total_chunks = 0; + uint64_t snapshot_chunk_size = _exchanger.config().snapshot_chunk_size; + total_chunks += (snapshot->file_size - 1) / snapshot_chunk_size + 1; for (auto file : snapshot->files) { - // TODO(jyc): validation size? - co_await split(*snapshot, file->file_path, chunk_id, file, chunks); - chunk_id += chunks.size(); + total_chunks += (file->file_size - 1) / snapshot_chunk_size + 1; + } + try { + co_await split(snapshot, chunk_id, total_chunks, {}); + for (auto file : snapshot->files) { + co_await split(snapshot, chunk_id, total_chunks, file); + } + } catch (util::closed_error& ex) { + l.info("express::split: {}", ex); + } catch (...) { + // TODO(jyc): remove express when encountered an unexpected error ? + // e.g. disk error + l.error("express::split: {}", std::current_exception()); } - // TODO(jyc): use file size to determine the total number of chunks at first - std::for_each(chunks.begin(), chunks.end(), [n = chunks.size()](auto& chunk) { - chunk->count = n; - }); } future<> express::main( @@ -103,6 +117,7 @@ future<> express::main( } co_await _sink(chunk); if (chunk->id == chunk->count) { + co_await _sink.flush(); (void)_exchanger.notify_successful(_target).discard_result(); } } diff --git a/transport/express.hh b/transport/express.hh index 9af9947..aa45496 100644 --- a/transport/express.hh +++ b/transport/express.hh @@ -33,11 +33,10 @@ class express : seastar::enable_lw_shared_from_this { private: seastar::future<> split( - const protocol::snapshot& snapshot, - const std::string& file_path, - uint64_t start_chunk_id, - protocol::snapshot_file_ptr file, - std::vector& chunks); + protocol::snapshot_ptr snapshot, + uint64_t& chunk_id, + uint64_t total_chunks, + protocol::snapshot_file_ptr file); seastar::future<> split(protocol::snapshot_ptr snapshot); seastar::future<> main( std::vector& chunks, bool& open);