Skip to content

Commit

Permalink
perf(platform): acquire single database connection for data streams
Browse files Browse the repository at this point in the history
  • Loading branch information
azasypkin committed Mar 31, 2024
1 parent 354c84d commit 2fee287
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 3 deletions.
3 changes: 2 additions & 1 deletion src/notifications/database_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,14 @@ impl Database {
let page_limit = page_size as i64;
try_stream! {
let mut last_id = 0;
let mut conn = self.pool.acquire().await?;
loop {
let raw_notification_ids = query!(
r#"SELECT id FROM notifications WHERE scheduled_at <= $1 AND id > $2 ORDER BY scheduled_at, id LIMIT $3;"#,
scheduled_before_or_at,
last_id,
page_limit
).fetch_all(&self.pool).await?;
).fetch_all(&mut *conn).await?;

let is_last_page = raw_notification_ids.len() < page_size;
for raw_notification_id in raw_notification_ids {
Expand Down
3 changes: 2 additions & 1 deletion src/scheduler/database_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,13 @@ WHERE id = $1
let page_limit = page_size as i64;
try_stream! {
let mut last_id = Uuid::nil();
let mut conn = self.pool.acquire().await?;
loop {
let jobs = query_as!(RawSchedulerJobStoredData,
r#"SELECT * FROM scheduler_jobs WHERE id > $1 ORDER BY id LIMIT $2;"#,
last_id, page_limit
)
.fetch_all(&self.pool)
.fetch_all(&mut *conn)
.await?;

let is_last_page = jobs.len() < page_size;
Expand Down
3 changes: 2 additions & 1 deletion src/utils/web_scraping/database_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ ORDER BY created_at
try_stream! {
let mut last_created_at = OffsetDateTime::UNIX_EPOCH;
let kind = Vec::try_from(Tag::KIND)?;
let mut conn = self.pool.acquire().await?;
loop {
let records = query!(
r#"
Expand All @@ -371,7 +372,7 @@ LIMIT $3;
"#,
kind, last_created_at, page_limit
)
.fetch_all(self.pool)
.fetch_all(&mut *conn)
.await?;

let is_last_page = records.len() < page_size;
Expand Down

0 comments on commit 2fee287

Please sign in to comment.