Skip to content

Commit

Permalink
transport: pipeline snapshot loading and sending
Browse files Browse the repository at this point in the history
  • Loading branch information
JasonYuchen committed Jan 22, 2022
1 parent ff40b7a commit 3084c31
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 31 deletions.
67 changes: 41 additions & 26 deletions transport/express.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,69 +26,83 @@ future<> express::close() {
if (_split_task) {
co_await _split_task->discard_result();
}
co_await _sink.flush();
co_await _sink.flush().discard_result();
co_await _sink.close();
co_return;
}

future<> express::send(protocol::snapshot_ptr snapshot) {
// TODO(jyc): handle split failure
_split_task = split(std::move(snapshot));
co_return;
}

future<> express::split(
const protocol::snapshot& snapshot,
const std::string& file_path,
uint64_t start_chunk_id,
protocol::snapshot_file_ptr file,
std::vector<protocol::snapshot_chunk_ptr>& chunks) {
protocol::snapshot_ptr snapshot,
uint64_t& chunk_id,
uint64_t total_chunks,
protocol::snapshot_file_ptr file) {
const auto& file_path = file ? file->file_path : snapshot->file_path;
auto file_size = file ? file->file_size : snapshot->file_size;
auto f = co_await open_file_dma(file_path, open_flags::ro);
auto defer_close = seastar::defer([&f] { (void)f.close().discard_result(); });
uint64_t file_size = co_await f.size();
auto defer_close = defer([&f] { (void)f.close().discard_result(); });
auto actual_file_size = co_await f.size();
if (file_size != actual_file_size) {
throw util::failed_precondition_error(fmt::format(
"inconsistent file size, expect:{}, actual:{}",
file_size,
actual_file_size));
}
if (file_size == 0) {
throw util::out_of_range_error(fmt::format("empty file:{}", file_path));
}
auto fstream = make_file_input_stream(f);
uint64_t snapshot_chunk_size = _exchanger.config().snapshot_chunk_size;
uint64_t file_chunk_count = (file_size - 1) / snapshot_chunk_size + 1;
for (uint64_t i = 0; i < file_chunk_count; ++i) {
auto& c = chunks.emplace_back(make_lw_shared<protocol::snapshot_chunk>());
c->group_id = snapshot.group_id;
c->log_id = snapshot.log_id;
auto c = make_lw_shared<protocol::snapshot_chunk>();
c->group_id = snapshot->group_id;
c->log_id = snapshot->log_id;
c->from = _local.node;
c->id = start_chunk_id + i;
c->id = chunk_id++;
c->count = total_chunks;
c->size = (i == file_chunk_count - 1) ? file_size % snapshot_chunk_size
: snapshot_chunk_size;
auto buf = co_await fstream.read_exactly(c->size);
c->data = std::string(buf.get(), buf.size());
c->membership = snapshot.membership;
c->membership = snapshot->membership;
c->file_path = file_path;
c->file_size = file_size;
c->file_chunk_id = i;
c->file_chunk_count = file_chunk_count;
c->file_info = file;
c->on_disk_index = snapshot.on_disk_index;
c->witness = snapshot.witness;
c->on_disk_index = snapshot->on_disk_index;
c->witness = snapshot->witness;
co_await _worker.push_eventually(std::move(c));
}
co_return;
}

future<> express::split(protocol::snapshot_ptr snapshot) {
// TODO(jyc): split the snapshot into chunks and push to sink
uint64_t chunk_id = 0;
std::vector<protocol::snapshot_chunk_ptr> chunks;
co_await split(*snapshot, snapshot->file_path, chunk_id, {}, chunks);
chunk_id += chunks.size();
uint64_t total_chunks = 0;
uint64_t snapshot_chunk_size = _exchanger.config().snapshot_chunk_size;
total_chunks += (snapshot->file_size - 1) / snapshot_chunk_size + 1;
for (auto file : snapshot->files) {
// TODO(jyc): validation size?
co_await split(*snapshot, file->file_path, chunk_id, file, chunks);
chunk_id += chunks.size();
total_chunks += (file->file_size - 1) / snapshot_chunk_size + 1;
}
try {
co_await split(snapshot, chunk_id, total_chunks, {});
for (auto file : snapshot->files) {
co_await split(snapshot, chunk_id, total_chunks, file);
}
} catch (util::closed_error& ex) {
l.info("express::split: {}", ex);
} catch (...) {
// TODO(jyc): remove express when encountered an unexpected error ?
// e.g. disk error
l.error("express::split: {}", std::current_exception());
}
// TODO(jyc): use file size to determine the total number of chunks at first
std::for_each(chunks.begin(), chunks.end(), [n = chunks.size()](auto& chunk) {
chunk->count = n;
});
}

future<> express::main(
Expand All @@ -103,6 +117,7 @@ future<> express::main(
}
co_await _sink(chunk);
if (chunk->id == chunk->count) {
co_await _sink.flush();
(void)_exchanger.notify_successful(_target).discard_result();
}
}
Expand Down
9 changes: 4 additions & 5 deletions transport/express.hh
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,10 @@ class express : seastar::enable_lw_shared_from_this<express> {

private:
seastar::future<> split(
const protocol::snapshot& snapshot,
const std::string& file_path,
uint64_t start_chunk_id,
protocol::snapshot_file_ptr file,
std::vector<protocol::snapshot_chunk_ptr>& chunks);
protocol::snapshot_ptr snapshot,
uint64_t& chunk_id,
uint64_t total_chunks,
protocol::snapshot_file_ptr file);
seastar::future<> split(protocol::snapshot_ptr snapshot);
seastar::future<> main(
std::vector<protocol::snapshot_chunk_ptr>& chunks, bool& open);
Expand Down

0 comments on commit 3084c31

Please sign in to comment.