-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Async wallet scanner #23
Conversation
Nice to see this coming along :) |
d493a6d
to
279be2b
Compare
b9261fb
to
0cc015c
Compare
3a9010b
to
38ce4b9
Compare
@@ -0,0 +1,131 @@ | |||
// Copyright (c) 2022, The Monero Project |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These http_client_pool classes can probably go in the src/net folder, yeah?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, I'll throw them in there once it's in a ready-for-use state
//------------------------------------------------------------------------------------------------------------------- | ||
void ClientConnectionPool::release_http_client(size_t http_client_index) | ||
{ | ||
// Make the connection available for use again |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will we ever prune the list of HTTP clients?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it makes sense that once the scanner finishes a full pass, only 1 connection should be kept open, the rest can be closed. Seems good housekeeping.
In the future, I think this connection pool could potentially be a good class to build multi-daemon functionality with (1 client -> many different daemons), in which case it may make sense to keep connections to different daemons open. But that's probably a discussion for another time, just putting the thought out since I know you were interested in pushing that architecture forward
std::unique_ptr<sp::scanning::LedgerChunk> get_onchain_chunk(); | ||
|
||
/// stop the current scanning process (should be no-throw no-fail) | ||
void terminate_scanning() override { /* no-op */ } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are there plans to make it stoppable in the future?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the wallet is scanning and the user changes their restore height or wants to use a different wallet, that should cleanly terminate the scanner
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If stopping from outside is needed, it can be easily implemented with an atomic signal. Not needed for the initial implementation.
std::list<sp::ContextualBasicRecordVariant> &collected_records) const; | ||
|
||
/// abstracted function that gets blocks via RPC request | ||
const std::function<bool(const cryptonote::COMMAND_RPC_GET_BLOCKS_FAST::request&, cryptonote::COMMAND_RPC_GET_BLOCKS_FAST::response&)> &rpc_get_blocks; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be marked private
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm about to take a deeper pass at class structure and will probably make a lot of changes in line with this comment. Worth re-visiting after making those changes
Probably worth holding off on a deeper review until it's ready, almost there :) |
Co-authored-by: jeffro256 <[email protected]>
Useful for tests, similar API to utils/python-rpc/framework/daemon.py
Adds a new functional test for direct wallet2 -> live RPC daemon interactions. This sets up a framework to test pointing the Seraphis wallet lib to a live daemon. Tests sending and scanning: - a normal transfer - a sweep single (0 change) - to a single subaddress - to 3 subaddresses (i.e. scanning using additional pub keys)
- when pointing to a daemon that does not support returning empty blocks when the client requests too high of a height, we have to be careful in our scanner to always request blocks below the chain tip, in every request. - by forcing the reorg avoidance increment on first pass, we make sure clients will always include the reorg avoidance increment when requesting blocks from the daemon, so the client can expect the request for blocks should *always* return an ok height.
Stil TODO: - check complete scanning on all enote types - hit every branch condition for all enote versions
- Enables concurrent network requests using the epee http client. - Still TODO for production: 1) close_connections 2) require the pool respect max_connections
- finds owned enotes by legacy view scanning a chunk of blocks
- Useful when we want to remove elements of the token queue in an order that is different than insertion order.
*How it works* Assume the user's wallet must start scanning blocks from height 5000. 1. The scanner begins by launching 10 RPC requests in parallel to fetch chunks of blocks as follows: ``` request 0: { start_height: 5000, max_block_count: 20 } request 1: { start_height: 5020, max_block_count: 20 } ... request 9: { start_height: 5180, max_block_count: 20 } ``` 2. As soon as any single request completes, the wallet immediately parses that chunk. - This is all in parallel. For example, as soon as request 7 responds, the wallet immediately begins parsing that chunk in parallel to any other chunks it's already parsing. 3. If a chunk does not include a total of max_block_count blocks, and the chunk is not the tip of the chain, this means there was a "gap" in the chunk request. The scanner launches another parallel RPC request to fill in the gap. - This gap can occur because the server will **not** return a chunk of blocks greater than 100mb (or 20k txs) via the /getblocks.bin` RPC endpoint ([`FIND_BLOCKCHAIN_SUPPLEMENT_MAX_SIZE`](https://github.com/monero-project/monero/blob/053ba2cf07649cea8134f8a188685ab7a5365e5c/src/cryptonote_core/blockchain.cpp#L65)) - The gap is from `(req.start_height + res.blocks.size())` to `(req.start_height + req.max_block_count)`. 4. As soon as the scanner finishes parsing the chunk, it immediately submits another parallel RPC request. 5. In parallel, the scanner identifies a user's received (and spent) enotes in each chunk. - For example, as soon as request 7 responds and the wallet parses it, the wallet scans that chunk in parallel to any other chunks it's already scanning. 6. Once a single chunk is fully scanned locally, the scanner launches a parallel task to fetch and scan the next chunk. 7. Once the scanner reaches the tip of the chain (the terminal chunk), the scanner terminates. *Some technical highlights* - The wallet scanner is backwards compatible with existing daemons (though it would need to point to an updated daemon to realize the perf speed-up). - On error cases such as the daemon going offline, the same wallet errors that wallet2 uses (that the wallet API expects) are propagated up to the higher-level Seraphis lib. - The implementation uses an http client connection pool (reusing the epee http client) to support parallel network requests ([related](seraphis-migration/wallet3#58)). - A developer using the scanner can "bring their own blocks/network implementation" to the scanner by providing a callback function of the following type as a param to the async scanner constructor: `std::function<bool(const cryptonote::COMMAND_RPC_GET_BLOCKS_FAST::request, cryptonote::COMMAND_RPC_GET_BLOCKS_FAST::response)>`
1a60667
to
8b8539c
Compare
Opened a separate issue to discuss since I don't think it needs to affect this PR: #41 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still have to go through the scan_context_async_mock
and maybe play with it in some unit_test to better understand and get used to it. Will do soon.
Awesome work! Thank you!
src/cryptonote_core/blockchain.cpp
Outdated
@@ -2765,16 +2765,19 @@ bool Blockchain::find_blockchain_supplement(const std::list<crypto::hash>& qbloc | |||
// find split point between ours and foreign blockchain (or start at | |||
// blockchain height <req_start_block>), and return up to max_count FULL | |||
// blocks by reference. | |||
bool Blockchain::find_blockchain_supplement(const uint64_t req_start_block, const std::list<crypto::hash>& qblock_ids, std::vector<std::pair<std::pair<cryptonote::blobdata, crypto::hash>, std::vector<std::pair<crypto::hash, cryptonote::blobdata> > > >& blocks, uint64_t& total_height, uint64_t& start_height, bool pruned, bool get_miner_tx_hash, size_t max_block_count, size_t max_tx_count) const | |||
bool Blockchain::find_blockchain_supplement(const uint64_t req_start_block, const std::list<crypto::hash>& qblock_ids, std::vector<std::pair<std::pair<cryptonote::blobdata, crypto::hash>, std::vector<std::pair<crypto::hash, cryptonote::blobdata> > > >& blocks, uint64_t& total_height, crypto::hash& top_hash, uint64_t& start_height, bool pruned, bool get_miner_tx_hash, size_t max_block_count, size_t max_tx_count) const | |||
{ | |||
LOG_PRINT_L3("Blockchain::" << __func__); | |||
CRITICAL_REGION_LOCAL(m_blockchain_lock); | |||
|
|||
// if a specific start height has been requested |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldnt it be better to have this whole block like this:
// if a specific start height has been requested
uint64_t top_height;
top_hash = m_db->top_block_hash(&top_height);
if(req_start_block > 0)
{
// if requested height is higher than our chain, return false -- we can't help
total_height = top_height + 1;
if (req_start_block >= total_height)
{
return false;
}
start_height = req_start_block;
}
else
{
if(!find_blockchain_supplement(qblock_ids, start_height))
{
return false;
}
}
db_rtxn_guard rtxn_guard(m_db);
total_height = top_height + 1;
So we avoid calling top_block_hash
twice in some cases?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The db_rtxn_guard
seems to be there to guarantee a consistent view of the db. Even though the m_blockchain_lock
is held, which should prevent the db from changing between the 2 db reads for the top block hash, it looks as though the 2 distinct db reads (1 before and 1 after the read txn guard) are there to protect against that scenario anyway (the blockchain changing between the 2 calls).
I would personally prefer to maintain the existing concurrency model and number of db calls for ease of review here.
|
||
class t_daemon_rpc_client final | ||
{ | ||
private: | ||
t_rpc_client m_rpc_client; | ||
public: | ||
t_daemon_rpc_client( | ||
const std::string &daemon_addr | ||
, const boost::optional<epee::net_utils::http::login> &daemon_login | ||
, const epee::net_utils::ssl_options_t ssl_support | ||
) | ||
: m_rpc_client{daemon_addr, daemon_login, ssl_support} | ||
{ | ||
} | ||
|
||
cryptonote::COMMAND_RPC_GET_HEIGHT::response get_height() | ||
{ | ||
cryptonote::COMMAND_RPC_GET_HEIGHT::request req = AUTO_VAL_INIT(req); | ||
cryptonote::COMMAND_RPC_GET_HEIGHT::response res = AUTO_VAL_INIT(res); | ||
CHECK_AND_ASSERT_THROW_MES(m_rpc_client.basic_rpc_request(req, res, "/get_height"), "failed to get height"); | ||
return res; | ||
} | ||
|
||
cryptonote::COMMAND_RPC_POP_BLOCKS::response pop_blocks(uint64_t nblocks = 1) | ||
{ | ||
cryptonote::COMMAND_RPC_POP_BLOCKS::request req = AUTO_VAL_INIT(req); | ||
cryptonote::COMMAND_RPC_POP_BLOCKS::response res = AUTO_VAL_INIT(res); | ||
req.nblocks = nblocks; | ||
CHECK_AND_ASSERT_THROW_MES(m_rpc_client.basic_rpc_request(req, res, "/pop_blocks"), "failed to pop blocks"); | ||
return res; | ||
} | ||
|
||
cryptonote::COMMAND_RPC_GET_TRANSACTIONS::response get_transactions(const std::vector<std::string> &txs_hashes) | ||
{ | ||
cryptonote::COMMAND_RPC_GET_TRANSACTIONS::request req = AUTO_VAL_INIT(req); | ||
cryptonote::COMMAND_RPC_GET_TRANSACTIONS::response res = AUTO_VAL_INIT(res); | ||
req.txs_hashes = txs_hashes; | ||
req.decode_as_json = false; | ||
CHECK_AND_ASSERT_THROW_MES(m_rpc_client.basic_rpc_request(req, res, "/get_transactions"), "failed to get transactions"); | ||
return res; | ||
} | ||
|
||
cryptonote::COMMAND_RPC_FLUSH_TRANSACTION_POOL::response flush_txpool() | ||
{ | ||
cryptonote::COMMAND_RPC_FLUSH_TRANSACTION_POOL::request req = AUTO_VAL_INIT(req); | ||
cryptonote::COMMAND_RPC_FLUSH_TRANSACTION_POOL::response res = AUTO_VAL_INIT(res); | ||
CHECK_AND_ASSERT_THROW_MES(m_rpc_client.basic_json_rpc_request(req, res, "flush_txpool"), "failed to flush txpool"); | ||
return res; | ||
} | ||
|
||
cryptonote::COMMAND_RPC_GENERATEBLOCKS::response generateblocks(const std::string &address, uint64_t amount_of_blocks) | ||
{ | ||
cryptonote::COMMAND_RPC_GENERATEBLOCKS::request req = AUTO_VAL_INIT(req); | ||
cryptonote::COMMAND_RPC_GENERATEBLOCKS::response res = AUTO_VAL_INIT(res); | ||
req.amount_of_blocks = amount_of_blocks; | ||
req.wallet_address = address; | ||
req.prev_block = ""; | ||
req.starting_nonce = 0; | ||
CHECK_AND_ASSERT_THROW_MES(m_rpc_client.basic_json_rpc_request(req, res, "generateblocks"), "failed to generate blocks"); | ||
return res; | ||
} | ||
|
||
cryptonote::COMMAND_RPC_GET_LAST_BLOCK_HEADER::response get_last_block_header() | ||
{ | ||
cryptonote::COMMAND_RPC_GET_LAST_BLOCK_HEADER::request req = AUTO_VAL_INIT(req); | ||
cryptonote::COMMAND_RPC_GET_LAST_BLOCK_HEADER::response res = AUTO_VAL_INIT(res); | ||
req.client = ""; | ||
CHECK_AND_ASSERT_THROW_MES(m_rpc_client.basic_json_rpc_request(req, res, "get_last_block_header"), "failed to get last block header"); | ||
return res; | ||
} | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this whole block necessary? Do the scanner needs to create its own new set of RPC functions to interact with the blockchain?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are boilerplate helper functions to construct the RPC requests in the functional tests. Grep daemon.generateblocks
to see how it's used in both the .py
tests and wallet_scanner.cpp
as an example.
} | ||
} | ||
//------------------------------------------------------------------------------------------------------------------- | ||
void EnoteFindingContextLegacyMultithreaded::view_scan_chunk(const LegacyUnscannedChunk &legacy_unscanned_chunk, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! How the performance of these two view_scan_chunk
compare?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On my machine with 8 threads and a pending chunk queue of size 10, there is no benefit to using this multithreaded enote finding context in the async scanner. My CPU is already maxxed out.
This enote finding context should only be useful when the pending chunk queue size is < number of threads available on the machine. Example scenario: when pointing the scanner to a daemon not running the changes from this PR, it's more efficient to use a pending chunk queue size of 1, and to use this multithreaded enote finding context, since the daemon does not yet know about the max_block_count
request param. The perf gain for client-side scanning in this case is then sped up by a multiple of the number of threads on the machine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Part 1: everything except scan_context_async_mock.h/.cpp
It's looking great so far :)
src/cryptonote_core/blockchain.cpp
Outdated
{ | ||
LOG_PRINT_L3("Blockchain::" << __func__); | ||
CRITICAL_REGION_LOCAL(m_blockchain_lock); | ||
|
||
// if a specific start height has been requested | ||
uint64_t top_height; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd move top_height
to the places where it is set, I found this code pretty confusing at first.
@@ -244,6 +263,7 @@ namespace cryptonote | |||
KV_SERIALIZE(blocks) | |||
KV_SERIALIZE(start_height) | |||
KV_SERIALIZE(current_height) | |||
KV_SERIALIZE_VAL_POD_AS_BLOB_OPT(top_block_hash, crypto::null_hash) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you have a way of detecting in the scanner if pointing to a daemon that doesn't return top block hashes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The get_version
RPC endpoint returns the major and minor RPC version: https://github.com/monero-project/monero/blob/c8214782fb2a769c57382a999eaf099691c836e7/src/rpc/core_rpc_server.cpp#L2986
Wallets currently hit that endpoint first to determine if a daemon is compatible. Wallets will know which minor version returns top block hashes (and which won't fail the request on high height, and will respect max_block_count
)
// Check if we have any clients available for use | ||
bool found_unused_client = false; | ||
for (std::size_t i = 0; i < m_http_client_pool.size(); ++i) | ||
{ | ||
if (!m_http_client_pool[i].in_use) | ||
{ | ||
http_client_index_out = i; | ||
found_unused_client = true; | ||
break; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Check if we have any clients available for use | |
bool found_unused_client = false; | |
for (std::size_t i = 0; i < m_http_client_pool.size(); ++i) | |
{ | |
if (!m_http_client_pool[i].in_use) | |
{ | |
http_client_index_out = i; | |
found_unused_client = true; | |
break; | |
} | |
} | |
// Check if we have any clients available for use | |
for (std::size_t i = 0; i < m_http_client_pool.size(); ++i) | |
{ | |
if (m_http_client_pool[i].in_use) | |
continue; | |
m_http_client_pool[i].in_use = true; | |
return i; | |
} |
Simpler, less nesting here and after. I am a huge fan of early-out/early-continue patterns like this.
#pragma once | ||
#include "chaingen.h" | ||
|
||
struct gen_enote_tx_validation_base : public test_chain_unit_base |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Normally I would say to put these test changes in a separate PR since they are unrelated to the async wallet scanner, but it's fine in this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hah, I actually was about to separate it when finalizing the PR and thought heh, I'm just gonna leave it. I initially wanted to use the scanner in these tests, but that would be a bit of a lift here
@@ -146,7 +146,7 @@ def kill(): | |||
try: | |||
print('[TEST STARTED] ' + test) | |||
sys.stdout.flush() | |||
cmd = [python, srcdir + '/' + test + ".py"] | |||
cmd = [python, srcdir + '/' + test + ".py"] if test != 'wallet_scanner' else [FUNCTIONAL_TESTS_DIRECTORY + '/functional_tests', '--' + test] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks a bit jank
return conn_pool.rpc_command<cryptonote::COMMAND_RPC_GET_BLOCKS_FAST>( | ||
sp::mocks::ClientConnectionPool::http_mode::BIN, | ||
"/getblocks.bin", | ||
req, | ||
res); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This internally locks a mutex, which means you will have some random contention even though the window under lock is small. I'd add a todo
to the connection pool to get away from the mutex.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How would you design it to move away from the mutex? Seemed the simplest way to implement this conn_pool.rpc_command
function, or else I figure the caller would need to handle synchronization, which seems unnecessary (it's a nice API! :) )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know either, but adding a todo
will document that the perf could theoretically be improved if someone can figure it out.
// Now that we're done scanning, we can close all open connections and keep 1 open for more RPC calls | ||
conn_pool.close_connections(1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bit brittle because it assumes this function completely owns all connections in the pool.
Even though this is a test, it's good to demo how to use the API robustly/correctly since this code will be used as a reference.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, I wrote it this way because I think the scanner should own all connections in the pool. If the client owns n open connections already when starting the scanner, the scanner can reuse those existing connections. If the client owns 0 open connections when starting the scanner, and the scanner opens 1 connection, then the client can reuse that connection in the future.
To harden this code, I could write a wrapper "Scanner" class that basically does what this scan_chain
function does, and makes the conn_pool
a private member variable of that class, but:
- I still think the scanner should own all connections in the pool for the above reasons
- I still think it would make sense to call this
conn_pool.close_connections(1)
when the scanner finishes a scan pass (maybe you think the caller is the better place to do that?)
Thoughts?
FWIW the reason I included this unimplemented close_connections
function call was to demo how to use the API
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, I wrote it this way because I think the scanner should own all connections in the pool.
I think it's fine to want that. The problem I see in this code is you're passing the pool by reference all the way from the top, which means it could be shared by who-knows-what. IMO the pool owner should be the one calling close_connections
when it knows connections are no longer needed. So if necessary, pass by value in and out of top-level owners, then pass by ref down into the scanning code (which only makes requests, without calling close_connections
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pass by value in and out of top-level owners
How bout I use a unique_ptr
instead of this? That way it's clear the caller is the owner + passing in and out seems a bit like an anti-pattern
IMO the pool owner should be the one calling
close_connections
when it knows connections are no longer needed
Which line would you move close_connections
to?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok what you can do is 'claim' the connection pool at the top of this function, which creates a no-move/no-copy RAII wrapper that will remove connections and lock the pool when dropped. Creating the wrapper asserts there are no other claims to the pool (implying there are no connections in use).
Then panic if trying to launch a request on an unclaimed pool.
Basically make pool-use scoped.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok basically did the above and thought more on it.. technically all the tests also assume the daemon and wallets aren't shared / in use by other owners too, but they could be in theory. Separately, I also wanted to restructure the tests into a class so I don't need to pass the daemon / wallet / conection pool through nested calls. So I implemented a wrapper accessor to each of them that checks for ownership of the respective mutex before allowing access
FWIW it's not really perfect as is, since the underlying daemon / wallet / connection pool can still be accessed without going through the accessors. Maybe you have more thoughts on a better approach still
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review done
I did not validate all the edge conditions, but overall it looks correct.
private: | ||
/// config options | ||
const AsyncScanContextLegacyConfig &m_config; | ||
std::uint64_t m_max_chunk_size_hint; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
std::uint64_t m_max_chunk_size_hint; |
Not initialized
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's set in start_scanner
. I gave it a default value in the latest
return std::make_unique<sp::scanning::AsyncLedgerChunk>(m_threadpool, | ||
std::move(oldest_chunk.pending_context), | ||
std::move(pending_chunk_data), | ||
std::vector<rct::key>{rct::zero()}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just realized AsyncLedgerChunk
is unsafe because the threadpool reference can become invalid. I'll have to fix this in a follow-up.
rct::key transaction_id; | ||
uint64_t unlock_time; | ||
sp::TxExtra tx_memo; | ||
uint64_t total_enotes_before_tx; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't capture all the information needed to index legacy enotes. It might be better to make this std::vector<legacy_output_index_t>
, one for each enote after #42 is merged. Alternatively, it could be a std::vector<uint64_t>
with an additional bool is_rct
field.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ya, it'll need that PR. I have a TODO in scan_context_async_mock
for it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the detailed review :) Ready for re-review
@@ -244,6 +263,7 @@ namespace cryptonote | |||
KV_SERIALIZE(blocks) | |||
KV_SERIALIZE(start_height) | |||
KV_SERIALIZE(current_height) | |||
KV_SERIALIZE_VAL_POD_AS_BLOB_OPT(top_block_hash, crypto::null_hash) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The get_version
RPC endpoint returns the major and minor RPC version: https://github.com/monero-project/monero/blob/c8214782fb2a769c57382a999eaf099691c836e7/src/rpc/core_rpc_server.cpp#L2986
Wallets currently hit that endpoint first to determine if a daemon is compatible. Wallets will know which minor version returns top block hashes (and which won't fail the request on high height, and will respect max_block_count
)
// 6. join the tasks | ||
m_threadpool.work_while_waiting(std::move(join_condition)); | ||
|
||
// 7. set results | ||
for (auto &brpt : basic_records_per_tx) | ||
chunk_data_out.basic_records_per_tx.emplace(std::move(brpt)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you do any testing if it performs better with more txs processed per task?
Nay, going to punt that for now
#pragma once | ||
#include "chaingen.h" | ||
|
||
struct gen_enote_tx_validation_base : public test_chain_unit_base |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hah, I actually was about to separate it when finalizing the PR and thought heh, I'm just gonna leave it. I initially wanted to use the scanner in these tests, but that would be a bit of a lift here
// Now that we're done scanning, we can close all open connections and keep 1 open for more RPC calls | ||
conn_pool.close_connections(1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, I wrote it this way because I think the scanner should own all connections in the pool. If the client owns n open connections already when starting the scanner, the scanner can reuse those existing connections. If the client owns 0 open connections when starting the scanner, and the scanner opens 1 connection, then the client can reuse that connection in the future.
To harden this code, I could write a wrapper "Scanner" class that basically does what this scan_chain
function does, and makes the conn_pool
a private member variable of that class, but:
- I still think the scanner should own all connections in the pool for the above reasons
- I still think it would make sense to call this
conn_pool.close_connections(1)
when the scanner finishes a scan pass (maybe you think the caller is the better place to do that?)
Thoughts?
FWIW the reason I included this unimplemented close_connections
function call was to demo how to use the API
return conn_pool.rpc_command<cryptonote::COMMAND_RPC_GET_BLOCKS_FAST>( | ||
sp::mocks::ClientConnectionPool::http_mode::BIN, | ||
"/getblocks.bin", | ||
req, | ||
res); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How would you design it to move away from the mutex? Seemed the simplest way to implement this conn_pool.rpc_command
function, or else I figure the caller would need to handle synchronization, which seems unnecessary (it's a nice API! :) )
// We're good to go, advance the end scan index | ||
MDEBUG("We're prepared for the end condition, we scanned to " << m_last_scanned_index); | ||
m_end_scan_index = m_last_scanned_index; | ||
m_scanner_ready.store(true, std::memory_order_relaxed); // mark the scanner ready for the end condition |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a new boolean m_scanner_finished
to capture this state
//------------------------------------------------------------------------------------------------------------------- | ||
void AsyncScanContextLegacy::wait_until_pending_queue_clears() | ||
{ | ||
// TODO: implement a clean safe cancel instead of waiting |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair, re-worded the comment.
In addition to your other suggestions, it probably would take a decent amount of work for a future PR, but we could incorporate the ability to cancel in-progress http requests.
private: | ||
/// config options | ||
const AsyncScanContextLegacyConfig &m_config; | ||
std::uint64_t m_max_chunk_size_hint; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's set in start_scanner
. I gave it a default value in the latest
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good, minor comments
unscanned_block_out.block_hash = rct::hash2rct(cryptonote::get_block_hash(block)); | ||
unscanned_block_out.prev_block_hash = rct::hash2rct(block.prev_id); | ||
|
||
chunk_context_out.block_ids.emplace_back(unscanned_block_out.block_hash); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Chunk context block ids should be cleared at the top of this function, since it's an out
variable. And probably .reserve
the correct capacity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically it should've been an inout
variable, not supposed to clear in that function. I moved chunk context handling out to the caller for better clarity + added the .reserve
in the latest
@@ -657,25 +697,31 @@ std::unique_ptr<sp::scanning::LedgerChunk> AsyncScanContextLegacy::handle_end_co | |||
//------------------------------------------------------------------------------------------------------------------- | |||
void AsyncScanContextLegacy::wait_until_pending_queue_clears() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd rename to close_and_clear_pending_queue
.
src/seraphis_main/scan_machine.cpp
Outdated
const std::uint64_t num_reorg_avoidance_backoffs, | ||
const std::uint64_t lowest_scannable_index, | ||
const std::uint64_t desired_start_index) | ||
{ | ||
// 1. set reorg avoidance depth | ||
const std::uint64_t reorg_avoidance_depth{ | ||
get_reorg_avoidance_depth(reorg_avoidance_increment, num_reorg_avoidance_backoffs) | ||
get_reorg_avoidance_depth(reorg_avoidance_increment, force_reorg_avoidance_increment, num_reorg_avoidance_backoffs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
get_reorg_avoidance_depth(reorg_avoidance_increment, force_reorg_avoidance_increment, num_reorg_avoidance_backoffs) | |
get_reorg_avoidance_depth(reorg_avoidance_increment, | |
force_reorg_avoidance_increment, | |
num_reorg_avoidance_backoffs) |
Your font size is too small :p
~AsyncScanContextLegacy() | ||
{ | ||
std::lock_guard<std::mutex> lock{m_async_scan_context_mutex}; | ||
wait_until_pending_queue_clears(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wait_until_pending_queue_clears(); | |
this->wait_until_pending_queue_clears(); |
// Older daemons can return more blocks than requested because they did not support a max_block_count req param. | ||
// The scanner expects requested_chunk_size blocks however, so we only care about the blocks up until that point. | ||
// Note the scanner can also return *fewer* blocks than requested if at chain tip or the chunk exceeded max size. | ||
// Warning: fill_gap_if_needed will throw if we don't resize the chunk context before calling it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Warning: fill_gap_if_needed will throw if we don't resize the chunk context before calling it |
Don't think this adds clarity. In this code it's clear unscanned_chunk_out
needs to be resized in order to insert blocks, it's independent of fill_gap_if_needed
.
// Use the terminal chunk to update the top block hash if the chunk isn't empty. | ||
// - This is required if the daemon RPC did NOT provide the top block hash (e.g. when pointing to an older | ||
// daemon), in which case we have to use the last block ID in the terminal chunk to set the top block hash. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if a reorg replaces blocks without adding to chain length? Just wait until the chain gets longer to detect it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good spot -- mishandled this circumstance with that round of changes. Check the latest
l_chunk_request = chunk_request, | ||
l_context_stop_flag = context_stop_signal.get_future().share(), | ||
l_data_stop_flag = data_stop_signal.get_future().share(), | ||
l_chunk_context = std::make_shared<std::promise<sp::scanning::ChunkContext>>(std::move(chunk_context_handle)), | ||
l_chunk_data = std::make_shared<std::promise<sp::scanning::ChunkData>>(std::move(chunk_data_handle)), | ||
l_context_join_token = context_join_token, | ||
l_data_join_token = data_join_token |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
l_chunk_request = chunk_request, | |
l_context_stop_flag = context_stop_signal.get_future().share(), | |
l_data_stop_flag = data_stop_signal.get_future().share(), | |
l_chunk_context = std::make_shared<std::promise<sp::scanning::ChunkContext>>(std::move(chunk_context_handle)), | |
l_chunk_data = std::make_shared<std::promise<sp::scanning::ChunkData>>(std::move(chunk_data_handle)), | |
l_context_join_token = context_join_token, | |
l_data_join_token = data_join_token | |
l_chunk_request = chunk_request, | |
l_context_stop_flag = context_stop_signal.get_future().share(), | |
l_data_stop_flag = data_stop_signal.get_future().share(), | |
l_chunk_context = std::make_shared<std::promise<sp::scanning::ChunkContext>>(std::move(chunk_context_handle)), | |
l_chunk_data = std::make_shared<std::promise<sp::scanning::ChunkData>>(std::move(chunk_data_handle)), | |
l_context_join_token = context_join_token, | |
l_data_join_token = data_join_token |
clear_chunk.pending_data.stop_signal.set_value(); | ||
|
||
// Wait until all work in the pending queue is done, not just contexts | ||
// TODO: wait until every task in the pool has returned |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Drain the queue into a vec, then your aggregate wait function just loops the vec once to check all of the inner join conditions (join_condition_t
is an alias for std::function<bool()>
).
You still need the outer loop to check the pool for entries added by previously-removed tasks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I follow this suggestion and will do (also waiting to work until after sending all the stop signals should speed this up).
AFAIU checking the inner join conditions here is essentially checking that the join tokens are set to nullptr
. But the tasks still can have more work they're doing after setting the join tokens. Example: the last this->try_launch_next_chunk_task(chunk_is_terminal_chunk);
can execute and attempt to schedule a new task, even after we've drained the pending queue and waited on all inner join conditions. Am I missing something there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah checking the join condition needs to synchronize with the end of the task, so you know if all tasks are finished and the queue is empty that the queue can't gain any more tasks.
I added a comment for fixing this.
~AsyncScanContextLegacy() | ||
{ | ||
std::lock_guard<std::mutex> lock{m_async_scan_context_mutex}; | ||
wait_until_pending_queue_clears(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd add a comment explaining the safety model here. It only works because all tasks with copies of this
are tracked in the pending queue. When the pending queue returns empty (after draining and working on all removed tasks), you know that there are no lingering tasks with copies of this
.
Once this is merged, please open as many PRs to monero core as possible to reduce the file overlap between this branch and core. |
|
||
// Finished scanning the chunk | ||
chunk_data_ptr_out->set_value(std::move(chunk_data)); | ||
data_join_token_out = nullptr; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah you're right, this should not be set to nullptr
here, the token should be set by dropping it when you exit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One comment remaining
THROW_WALLET_EXCEPTION_IF(!m_pending_queue_mutex.thread_owns_lock(), tools::error::wallet_internal_error, | ||
"this thread does not own the pending queue mutex"); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ohh nice idea :)
while (!drained_chunks.empty()) | ||
{ | ||
auto &clear_chunk = drained_chunks.back(); | ||
MDEBUG("Waiting to clear onchain chunk starting at " << clear_chunk.chunk_request.start_index); | ||
|
||
// Wait until all work in the pending queue is done, not just contexts | ||
// TODO: wait until every task in the pool has returned | ||
// Wait until **data** join condition is set, we're not waiting on just the contexts | ||
m_threadpool.work_while_waiting(clear_chunk.pending_data.data_join_condition, | ||
async::DefaultPriorityLevels::MAX); | ||
|
||
clear_chunk_result = m_pending_chunk_queue.force_pop(clear_chunk); | ||
drained_chunks.pop_back(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There can be more tasks in the queue after this because task insertion uses force_push
. These loops need to be wrapped in another loop that continues until the queue is empty.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only way force_pop
can return async::TokenQueueResult::SHUTTING_DOWN
is if the queue is empty
monero/src/async/token_queue.h
Lines 109 to 110 in c742260
if (m_queue.size() == 0 && m_is_shutting_down) | |
return TokenQueueResult::SHUTTING_DOWN; |
Since this function is holding m_async_scan_context_mutex
, m_scanner_ready
is false meaning no new tasks could be scheduled while the pending tasks are all running to completion, and we've now made sure to work until all pending tasks are done at this point, nothing can theoretically exist that uses force_push
to insert a task in parallel at this point in this function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Writing that comment^ made me re-check the "gap fill" edge case which wasn't checking m_scanner_ready
before force_push
. Done. That comment should hold now^
ed1bc1b
to
f4698a7
Compare
Submitted PR's for the simple modifications to the core monero repo for the async scanner. If daemons run those changes, they'll be fully compatible with the async scanner in this PR. |
Implement async wallet scanner. Adds a new functional test for direct wallet2 -> live RPC daemon interactions. This sets up a framework to test pointing the Seraphis wallet lib to a live daemon. Tests sending and scanning: - a normal transfer - a sweep single (0 change) - to a single subaddress - to 3 subaddresses (i.e. scanning using additional pub keys) * scan machine: option to force reorg avoidance increment first pass - when pointing to a daemon that does not support returning empty blocks when the client requests too high of a height, we have to be careful in our scanner to always request blocks below the chain tip, in every request. - by forcing the reorg avoidance increment on first pass, we make sure clients will always include the reorg avoidance increment when requesting blocks from the daemon, so the client can expect the request for blocks should *always* return an ok height. * core tests: check conversion tool on all legacy enote version types Stil TODO: - check complete scanning on all enote types - hit every branch condition for all enote versions * conn pool mock: epee http client connection pool - Enables concurrent network requests using the epee http client. - Still TODO for production: 1) close_connections 2) require the pool respect max_connections * enote finding context: IN LegacyUnscannedChunk, OUT ChunkData - finds owned enotes by legacy view scanning a chunk of blocks * async: function to remove minimum element from token queue - Useful when we want to remove elements of the token queue in an order that is different than insertion order. * async scanner: scan via RPC, fetching & scanning parallel chunks *How it works* Assume the user's wallet must start scanning blocks from height 5000. 1. The scanner begins by launching 10 RPC requests in parallel to fetch chunks of blocks as follows: ``` request 0: { start_height: 5000, max_block_count: 20 } request 1: { start_height: 5020, max_block_count: 20 } ... request 9: { start_height: 5180, max_block_count: 20 } ``` 2. As soon as any single request completes, the wallet immediately parses that chunk. - This is all in parallel. For example, as soon as request 7 responds, the wallet immediately begins parsing that chunk in parallel to any other chunks it's already parsing. 3. If a chunk does not include a total of max_block_count blocks, and the chunk is not the tip of the chain, this means there was a "gap" in the chunk request. The scanner launches another parallel RPC request to fill in the gap. - This gap can occur because the server will **not** return a chunk of blocks greater than 100mb (or 20k txs) via the /getblocks.bin` RPC endpoint ([`FIND_BLOCKCHAIN_SUPPLEMENT_MAX_SIZE`](https://github.com/monero-project/monero/blob/053ba2cf07649cea8134f8a188685ab7a5365e5c/src/cryptonote_core/blockchain.cpp#L65)) - The gap is from `(req.start_height + res.blocks.size())` to `(req.start_height + req.max_block_count)`. 4. As soon as the scanner finishes parsing the chunk, it immediately submits another parallel RPC request. 5. In parallel, the scanner identifies a user's received (and spent) enotes in each chunk. - For example, as soon as request 7 responds and the wallet parses it, the wallet scans that chunk in parallel to any other chunks it's already scanning. 6. Once a single chunk is fully scanned locally, the scanner launches a parallel task to fetch and scan the next chunk. 7. Once the scanner reaches the tip of the chain (the terminal chunk), the scanner terminates. *Some technical highlights* - The wallet scanner is backwards compatible with existing daemons (though it would need to point to an updated daemon to realize the perf speed-up). - On error cases such as the daemon going offline, the same wallet errors that wallet2 uses (that the wallet API expects) are propagated up to the higher-level Seraphis lib. - The implementation uses an http client connection pool (reusing the epee http client) to support parallel network requests ([related](seraphis-migration/wallet3#58)). - A developer using the scanner can "bring their own blocks/network implementation" to the scanner by providing a callback function of the following type as a param to the async scanner constructor: `std::function<bool(const cryptonote::COMMAND_RPC_GET_BLOCKS_FAST::request, cryptonote::COMMAND_RPC_GET_BLOCKS_FAST::response)>` --------- Co-authored-by: jeffro256 <[email protected]>
Ready for review.
I've observed the following speed-ups benchmarking versus wallet2:
Benchmark results can vary widely based on setup (client/node internet speeds and machines). Source for the scanner used to benchmark.
How it works
Assume the user's wallet must start scanning blocks from height 5000.
max_block_count
blocks, and the chunk is not the tip of the chain, this means there was a "gap" in the chunk request. The scanner launches another parallel RPC request to fill in the gap./getblocks.bin
RPC endpoint.(req.start_height + res.blocks.size())
to(req.start_height + req.max_block_count)
.Some technical highlights
std::function<bool(const cryptonote::COMMAND_RPC_GET_BLOCKS_FAST::request, cryptonote::COMMAND_RPC_GET_BLOCKS_FAST::response)>
.Major TODO's for this PR
Cleanup the formatting, code structure, code names and commit history.Implement backwards compatibility so the scanner will work pointing to nodes running today.Add unit tests.Try to smooth out the scanner's progress.The scanner completes scanning many chunks in intermittent waves and then sits waiting (uncomfortably) until the next wave of chunks; it doesn't scan in a steady stream.max_chunk_size_hint
from 1000 to 20 to fetch chunks 20 blocks at a time. Overall time to scan remains the same (potentially slightly faster) and feedback to the user on scanner progress is significantly smoother.Other TODO's for this PR
Move "mock" implementations to the "production-ready" sections they belong.Complete TODO's in the code.As soon as the scanner encounters an error or encounters the terminal chunk, it should immediately cancel any pending tasks it's waiting on and error out quickly.Remove the blockchain scanner utility from this PR (and all changes to existing wallet functions for this utility). It's there to help see how the scanner works.Misc.
There are a few things from this PR I can PR to the Monero repo today in bite-sized PR's:
wallet2
and intowallet/wallet_errors.h
. I did this so I could reuse wallet2's error logic in this PR, which the higher level wallet API also expects.get_blocks.bin
daemon RPC endpoint:max_block_count
param to the request (not a breaking change)start_height
that is greater than the chain tip and the request fieldfail_on_high_height
is false, then instead of erroring, the server returns empty blocks and the chain height.TODO: make this field optional in the response so it's not a breaking change.(DONE)