Skip to content

Commit

Permalink
feat(web-scraping): validate subscription dependent tracker propertie…
Browse files Browse the repository at this point in the history
…s on save
  • Loading branch information
azasypkin committed Mar 28, 2024
1 parent 21c7948 commit 02bd140
Show file tree
Hide file tree
Showing 10 changed files with 1,379 additions and 1,686 deletions.
66 changes: 30 additions & 36 deletions src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,46 +219,40 @@ mod tests {
// Create user, trackers and tracker jobs.
api.users().upsert(user.clone()).await?;
let resources_tracker = api
.web_scraping()
.create_resources_tracker(
user.id,
WebPageTrackerCreateParams {
name: "tracker-one".to_string(),
url: "https://localhost:1234/my/app?q=2".parse()?,
settings: WebPageTrackerSettings {
revisions: 1,
delay: Default::default(),
scripts: Default::default(),
headers: Default::default(),
},
job_config: Some(SchedulerJobConfig {
schedule: "1 2 3 4 5 6 2030".to_string(),
retry_strategy: None,
notifications: true,
}),
.web_scraping(&user)
.create_resources_tracker(WebPageTrackerCreateParams {
name: "tracker-one".to_string(),
url: "https://localhost:1234/my/app?q=2".parse()?,
settings: WebPageTrackerSettings {
revisions: 1,
delay: Default::default(),
scripts: Default::default(),
headers: Default::default(),
},
)
job_config: Some(SchedulerJobConfig {
schedule: "1 2 3 4 5 6 2030".to_string(),
retry_strategy: None,
notifications: true,
}),
})
.await?;
let content_tracker = api
.web_scraping()
.create_content_tracker(
user.id,
WebPageTrackerCreateParams {
name: "tracker-one".to_string(),
url: "https://localhost:1234/my/app?q=2".parse()?,
settings: WebPageTrackerSettings {
revisions: 1,
delay: Default::default(),
scripts: Default::default(),
headers: Default::default(),
},
job_config: Some(SchedulerJobConfig {
schedule: "1 2 3 4 5 6 2030".to_string(),
retry_strategy: None,
notifications: true,
}),
.web_scraping(&user)
.create_content_tracker(WebPageTrackerCreateParams {
name: "tracker-one".to_string(),
url: "https://localhost:1234/my/app?q=2".parse()?,
settings: WebPageTrackerSettings {
revisions: 1,
delay: Default::default(),
scripts: Default::default(),
headers: Default::default(),
},
)
job_config: Some(SchedulerJobConfig {
schedule: "1 2 3 4 5 6 2030".to_string(),
retry_strategy: None,
notifications: true,
}),
})
.await?;
api.web_scraping_system()
.update_web_page_tracker_job(resources_tracker.id, Some(resources_trigger_job_id))
Expand Down
159 changes: 85 additions & 74 deletions src/scheduler/scheduler_jobs/web_page_trackers_fetch_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ impl WebPageTrackersFetchJob {
let pending_trackers = web_scraping_system.get_pending_resources_trackers();
pin_mut!(pending_trackers);

let web_scraping = api.web_scraping();
while let Some(tracker) = pending_trackers.next().await {
let Some((tracker, job_id)) =
Self::validate_tracker(&api, &scheduler, tracker?).await?
Expand All @@ -95,12 +94,21 @@ impl WebPageTrackersFetchJob {

// Check if resources has changed, comparing new revision to the latest existing one.
let fetch_start = Instant::now();
let Some(user) = api.users().get(tracker.user_id).await? else {
log::error!(
user:serde = UserLogContext::new(tracker.user_id),
util:serde = tracker.log_context();
"Cannot find the user for the tracker."
);
continue;
};

// Create a new revision and retrieve a diff if any changes from the previous version are
// detected. If there are any changes and the tracker hasn't opted out of notifications,
// schedule a notification about the detected changes.
let new_revision_with_diff = match web_scraping
.create_resources_tracker_revision(tracker.user_id, tracker.id)
let new_revision_with_diff = match api
.web_scraping(&user)
.create_resources_tracker_revision(tracker.id)
.await
{
Ok(new_revision_with_diff) => new_revision_with_diff,
Expand Down Expand Up @@ -216,7 +224,6 @@ impl WebPageTrackersFetchJob {
let pending_trackers = web_scraping_system.get_pending_content_trackers();
pin_mut!(pending_trackers);

let web_scraping = api.web_scraping();
while let Some(tracker) = pending_trackers.next().await {
let Some((tracker, job_id)) =
Self::validate_tracker(&api, &scheduler, tracker?).await?
Expand All @@ -227,8 +234,18 @@ impl WebPageTrackersFetchJob {
// Try to create a new revision. If a revision is returned that means that tracker
// detected changes.
let fetch_start = Instant::now();
let new_revision = match web_scraping
.create_content_tracker_revision(tracker.user_id, tracker.id)
let Some(user) = api.users().get(tracker.user_id).await? else {
log::error!(
user:serde = UserLogContext::new(tracker.user_id),
util:serde = tracker.log_context();
"Cannot find the user for the tracker."
);
continue;
};

let new_revision = match api
.web_scraping(&user)
.create_content_tracker_revision(tracker.id)
.await
{
Ok(new_revision) => new_revision,
Expand Down Expand Up @@ -536,25 +553,22 @@ mod tests {
)
.await?;
let resources_tracker = api
.web_scraping()
.create_resources_tracker(
user.id,
WebPageTrackerCreateParams {
name: "tracker".to_string(),
url: "https://localhost:1234/my/app?q=2".parse()?,
settings: WebPageTrackerSettings {
revisions: 0,
delay: Default::default(),
scripts: Default::default(),
headers: Default::default(),
},
job_config: Some(SchedulerJobConfig {
schedule: "0 0 * * * *".to_string(),
retry_strategy: None,
notifications: true,
}),
.web_scraping(&user)
.create_resources_tracker(WebPageTrackerCreateParams {
name: "tracker".to_string(),
url: "https://localhost:1234/my/app?q=2".parse()?,
settings: WebPageTrackerSettings {
revisions: 0,
delay: Default::default(),
scripts: Default::default(),
headers: Default::default(),
},
)
job_config: Some(SchedulerJobConfig {
schedule: "0 0 * * * *".to_string(),
retry_strategy: None,
notifications: true,
}),
})
.await?;
api.web_scraping_system()
.update_web_page_tracker_job(resources_tracker.id, Some(resources_tracker_job_id))
Expand All @@ -571,25 +585,22 @@ mod tests {
)
.await?;
let content_tracker = api
.web_scraping()
.create_content_tracker(
user.id,
WebPageTrackerCreateParams {
name: "tracker".to_string(),
url: "https://localhost:1234/my/app?q=2".parse()?,
settings: WebPageTrackerSettings {
revisions: 0,
delay: Default::default(),
scripts: Default::default(),
headers: Default::default(),
},
job_config: Some(SchedulerJobConfig {
schedule: "0 0 * * * *".to_string(),
retry_strategy: None,
notifications: true,
}),
.web_scraping(&user)
.create_content_tracker(WebPageTrackerCreateParams {
name: "tracker".to_string(),
url: "https://localhost:1234/my/app?q=2".parse()?,
settings: WebPageTrackerSettings {
revisions: 0,
delay: Default::default(),
scripts: Default::default(),
headers: Default::default(),
},
)
job_config: Some(SchedulerJobConfig {
schedule: "0 0 * * * *".to_string(),
retry_strategy: None,
notifications: true,
}),
})
.await?;
api.web_scraping_system()
.update_web_page_tracker_job(content_tracker.id, Some(content_tracker_job_id))
Expand Down Expand Up @@ -747,9 +758,9 @@ mod tests {
// Start scheduler and wait for a few seconds, then stop it.
scheduler.start().await?;

let web_scraping = api.web_scraping();
let web_scraping = api.web_scraping(&user);
while web_scraping
.get_resources_tracker_history(user.id, tracker.id, Default::default())
.get_resources_tracker_history(tracker.id, Default::default())
.await?
.is_empty()
{
Expand All @@ -762,8 +773,8 @@ mod tests {

// Check that resources were saved.
assert_eq!(
api.web_scraping()
.get_resources_tracker_history(user.id, tracker.id, Default::default())
api.web_scraping(&user)
.get_resources_tracker_history(tracker.id, Default::default())
.await?
.into_iter()
.map(|rev| (rev.created_at, rev.data.scripts, rev.data.styles))
Expand Down Expand Up @@ -916,9 +927,9 @@ mod tests {
// Start scheduler and wait for a few seconds, then stop it.
scheduler.start().await?;

let web_scraping = api.web_scraping();
let web_scraping = api.web_scraping(&user);
while web_scraping
.get_content_tracker_history(user.id, tracker.id, Default::default())
.get_content_tracker_history(tracker.id, Default::default())
.await?
.is_empty()
{
Expand All @@ -931,8 +942,8 @@ mod tests {

// Check that content was saved.
assert_eq!(
api.web_scraping()
.get_content_tracker_history(user.id, tracker.id, Default::default())
api.web_scraping(&user)
.get_content_tracker_history(tracker.id, Default::default())
.await?
.into_iter()
.map(|rev| (rev.created_at, rev.data))
Expand Down Expand Up @@ -1134,8 +1145,8 @@ mod tests {
"###);

assert_eq!(
api.web_scraping()
.get_resources_tracker_history(user.id, tracker.id, Default::default())
api.web_scraping(&user)
.get_resources_tracker_history(tracker.id, Default::default())
.await?
.len(),
2
Expand Down Expand Up @@ -1304,8 +1315,8 @@ mod tests {
"###);

assert_eq!(
api.web_scraping()
.get_resources_tracker_history(user.id, tracker.id, Default::default())
api.web_scraping(&user)
.get_resources_tracker_history(tracker.id, Default::default())
.await?
.len(),
1
Expand Down Expand Up @@ -1452,8 +1463,8 @@ mod tests {
assert!(notification_ids.is_empty());

assert_eq!(
api.web_scraping()
.get_resources_tracker_history(user.id, tracker.id, Default::default())
api.web_scraping(&user)
.get_resources_tracker_history(tracker.id, Default::default())
.await?
.len(),
1
Expand Down Expand Up @@ -1512,8 +1523,8 @@ mod tests {
"###);

assert_eq!(
api.web_scraping()
.get_resources_tracker_history(user.id, tracker.id, Default::default())
api.web_scraping(&user)
.get_resources_tracker_history(tracker.id, Default::default())
.await?
.len(),
1
Expand Down Expand Up @@ -1661,8 +1672,8 @@ mod tests {
assert!(notification_ids.is_empty());

assert_eq!(
api.web_scraping()
.get_resources_tracker_history(user.id, tracker.id, Default::default())
api.web_scraping(&user)
.get_resources_tracker_history(tracker.id, Default::default())
.await?
.len(),
1
Expand Down Expand Up @@ -1757,8 +1768,8 @@ mod tests {
"###);

assert_eq!(
api.web_scraping()
.get_resources_tracker_history(user.id, tracker.id, Default::default())
api.web_scraping(&user)
.get_resources_tracker_history(tracker.id, Default::default())
.await?
.len(),
2
Expand Down Expand Up @@ -1928,8 +1939,8 @@ mod tests {
"###);

assert_eq!(
api.web_scraping()
.get_content_tracker_history(user.id, tracker.id, Default::default())
api.web_scraping(&user)
.get_content_tracker_history(tracker.id, Default::default())
.await?
.len(),
2
Expand Down Expand Up @@ -2096,8 +2107,8 @@ mod tests {
"###);

assert_eq!(
api.web_scraping()
.get_content_tracker_history(user.id, tracker.id, Default::default())
api.web_scraping(&user)
.get_content_tracker_history(tracker.id, Default::default())
.await?
.len(),
1
Expand Down Expand Up @@ -2242,8 +2253,8 @@ mod tests {
assert!(notification_ids.is_empty());

assert_eq!(
api.web_scraping()
.get_content_tracker_history(user.id, tracker.id, Default::default())
api.web_scraping(&user)
.get_content_tracker_history(tracker.id, Default::default())
.await?
.len(),
1
Expand Down Expand Up @@ -2302,8 +2313,8 @@ mod tests {
"###);

assert_eq!(
api.web_scraping()
.get_content_tracker_history(user.id, tracker.id, Default::default())
api.web_scraping(&user)
.get_content_tracker_history(tracker.id, Default::default())
.await?
.len(),
1
Expand Down Expand Up @@ -2449,8 +2460,8 @@ mod tests {
assert!(notification_ids.is_empty());

assert_eq!(
api.web_scraping()
.get_content_tracker_history(user.id, tracker.id, Default::default())
api.web_scraping(&user)
.get_content_tracker_history(tracker.id, Default::default())
.await?
.len(),
1
Expand Down Expand Up @@ -2532,8 +2543,8 @@ mod tests {
"###);

assert_eq!(
api.web_scraping()
.get_content_tracker_history(user.id, tracker.id, Default::default())
api.web_scraping(&user)
.get_content_tracker_history(tracker.id, Default::default())
.await?
.len(),
2
Expand Down
Loading

0 comments on commit 02bd140

Please sign in to comment.