Skip to content

Commit

Permalink
feat(db): use db.run_command for faster bulk updates (#604)
Browse files Browse the repository at this point in the history
* feat(inx): use `db.run_command` for faster bulk updates

* Remove

* Clippy
  • Loading branch information
grtlr authored Aug 29, 2022
1 parent 54b376b commit efa5499
Showing 1 changed file with 23 additions and 20 deletions.
43 changes: 23 additions & 20 deletions src/db/collections/outputs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ mod indexer;

use futures::{StreamExt, TryStreamExt};
use mongodb::{
bson::{self, doc},
bson::{self, doc, to_bson, to_document},
error::Error,
options::{IndexOptions, InsertManyOptions, ReplaceOptions},
options::{IndexOptions, InsertManyOptions},
IndexModel,
};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -148,24 +148,28 @@ impl MongoDb {
/// [`OutputMetadata`](crate::types::ledger::OutputMetadata).
#[instrument(skip_all, err, level = "trace")]
pub async fn update_spent_outputs(&self, outputs: impl Iterator<Item = &LedgerSpent>) -> Result<(), Error> {
for output in outputs {
self.update_spent_output(output).await?;
// TODO: Replace `db.run_command` once the `BulkWrite` API lands in the Rust driver.
let update_docs = outputs
.map(|output| {
Ok(doc! {
"q": { "_id": output.output.output_id },
"u": to_document(&OutputDocument::from(output))?,
"upsert": true,
})
})
.collect::<Result<Vec<_>, Error>>()?;

if !update_docs.is_empty() {
let mut command = doc! {
"update": OutputDocument::COLLECTION,
"updates": update_docs,
};
if let Some(ref write_concern) = self.db.write_concern() {
command.insert("writeConcern", to_bson(write_concern)?);
}
let selection_criteria = self.db.selection_criteria().cloned();
let _ = self.db.run_command(command, selection_criteria).await?;
}
Ok(())
}

/// Upserts an [`Output`](crate::types::stardust::block::Output) together with its associated
/// [`OutputMetadata`](crate::types::ledger::OutputMetadata).
pub async fn update_spent_output(&self, output: &LedgerSpent) -> Result<(), Error> {
let output_id = output.output.output_id;
self.db
.collection::<OutputDocument>(OutputDocument::COLLECTION)
.replace_one(
doc! { "_id": output_id },
OutputDocument::from(output),
ReplaceOptions::builder().upsert(true).build(),
)
.await?;

Ok(())
}
Expand All @@ -181,7 +185,6 @@ impl MongoDb {
.insert_many_ignore_duplicates(batch, InsertManyOptions::builder().ordered(false).build())
.await?;
}

Ok(())
}

Expand Down

0 comments on commit efa5499

Please sign in to comment.