A SaaS Solution for Neutrino Event Reconstruction using the Skymap Scanner
SkyDriver automates the entire scanning of an event: starting all servers and workers, transferring all needed data, and finally, all tear-down. SkyDriver also includes a database for storing scan requests, progress reports, and results. The computational engine for a scan is the Skymap Scanner. The main interface is a REST server with several routes and methods.
One of many workflows may be:
- Request a scan (POST @
/scan
) - Monitor the scanning status (GET @
/scan/SCAN_ID/status
) - Check for progress updates (GET @
/scan/SCAN_ID/manifest
) - Check for partial results (GET @
/scan/SCAN_ID/result
) - Get a final result (GET @
/scan/SCAN_ID/result
) - Make plots
Another workflow:
- Find a scan id for a particular run and event (GET @
/scans/find
) - Get the scan's manifest and result (GET @
/scan/SCAN_ID
)
Users interface with SkyDriver via REST calls, so first, you will need to get a connection. This example uses wipac-rest-tools:
from rest_tools.client import RestClient, SavedDeviceGrantAuth
def get_rest_client() -> RestClient:
"""Get REST client for talking to SkyDriver.
This will present a QR code in the terminal for initial validation.
"""
# NOTE: If your script will not be interactive (like a cron job),
# then you need to first run your script manually to validate using
# the QR code in the terminal.
return SavedDeviceGrantAuth(
"https://skydriver.icecube.aq",
token_url="https://keycloak.icecube.wisc.edu/auth/realms/IceCube",
filename="device-refresh-token",
client_id="skydriver-external",
retries=0,
)
rc = get_rest_client()
Now, you can make all the REST calls needed:
rc.request_seq(method, path, args_dict)
To request a new scan (see POST @ /scan
):
manifest = rc.request_seq("POST", "/scan", {"docker_tag": ...})
print(json.dumps(manifest))
To see your scan's status (see GET @ /scan/SCAN_ID/status
):
status = rc.request_seq("GET", f"/scan/{scan_id}/status")
print(json.dumps(status))
Refer to the REST API section for comprehensive documentation detailing the available interactions with SkyDriver.
Also, see Using a Scan Result Outside of SkyDriver.
Documentation for the public-facing routes and method
Launch a new scan of an event
Argument | Type | Required/Default | Description |
---|---|---|---|
"docker_tag" |
str | [REQUIRED] | the docker tag of the Skymap Scanner image (must be in CVMFS). Ex: v3.1.4 , v3.5 , v3 , latest , eqscan-6207146 (branch-based tag) |
"cluster" |
dict or list | [REQUIRED] | the worker cluster(s) to use along with the number of workers for each: Example: {"sub-2": 1234} . NOTE: To request a schedd more than once, provide a list of 2-lists instead (Ex: [ ["sub-2", 56], ["sub-2", 1234] ] ) |
"reco_algo" |
bool | [REQUIRED] | which reco algorithm to use (see Skymap Scanner) |
"event_i3live_json" |
dict or str | [REQUIRED] | Realtime's JSON event format |
"nsides" |
dict | [REQUIRED] | the nside progression to use (see Skymap Scanner) |
"real_or_simulated_event" |
str | [REQUIRED] | whether this event is real or simulated. Ex: real , simulated |
"max_pixel_reco_time" |
int | [REQUIRED] | the max amount of time (seconds) each pixel's reco should take (accurate values will evict pixels from slow workers thereby re-delivering to faster workers -- slow workers are unavoidable due to non-deterministic errors) |
"max_worker_runtime" |
int | default: 4*60*60 |
the max amount of time (second) each client worker can work for (larger values are needed as the event size increases AND the workforce size decreases) |
"skyscan_mq_client_timeout_wait_for_first_message" |
int | default: image's default value | how long a client can wait for its first message (pixel) before giving up and exiting |
"scanner_server_memory" |
str | default: 1024M |
how much memory for the scanner server to request |
"worker_memory" |
str | default: 8G |
how much memory per client worker to request |
"worker_disk" |
str | default: 1G |
how much disk per client worker to request |
"debug_mode" |
str or list | default: None | what debug mode(s) to use: "client-logs" collects the scanner clients' stderr/stdout including icetray logs (scans are limited in # of workers) |
"predictive_scanning_threshold" |
float | default: 1.0 |
the predictive scanning threshold [0.1, 1.0] (see Skymap Scanner) |
"priority" |
int | default: 0 |
the relative priority of this scan -- higher values indicate higher priority. NOTE: Values >= 10 are reserved for Realtime alert scans (these scan requests are not throttled). Also, see HTCondor jobs |
"classifiers" |
dict[str, str | bool | float | int] |
default: {} |
a user-defined collection of labels, attributes, etc. -- this is constrained in size and is intended for user-defined metadata only |
"manifest_projection" |
list | default: all fields but these | which Manifest fields to include in the response (include * to include all fields) |
- Creates and starts a new Skymap Scanner instance spread across many client workers
- The new scanner will send updates routinely and when the scan completes (see GET (manifest) and GET (result))
dict - Manifest
Retrieve the manifest of a scan
Argument | Type | Required/Default | Description |
---|---|---|---|
"include_deleted" |
bool | default: False |
Not normally needed -- True prevents a 404 error if the scan was deleted (aborted) |
None
dict - Manifest
Retrieve the result of a scan
Argument | Type | Required/Default | Description |
---|---|---|---|
"include_deleted" |
bool | default: False |
Not normally needed -- True prevents a 404 error if the scan was deleted (aborted) |
None
dict - Result
Retrieve the manifest and result of a scan
Argument | Type | Required/Default | Description |
---|---|---|---|
"include_deleted" |
bool | default: False |
Not normally needed -- True prevents a 404 error if the scan was deleted (aborted) |
None
{
"manifest": Manifest dict,
"result": Result dict,
}
Abort a scan and/or mark scan (manifest and result) as "deleted"
Argument | Type | Required/Default | Description |
---|---|---|---|
"delete_completed_scan" |
bool | default: False |
whether to mark a completed scan as "deleted" -- this is not needed for aborting an ongoing scan |
"manifest_projection" |
list | default: all fields but these | which Manifest fields to include in the response (include * to include all fields) |
- The Skymap Scanner instance is stopped and removed
- The scan's manifest and result are marked as "deleted" in the database
{
"manifest": Manifest dict,
"result": Result dict,
}
Retrieve scan manifests corresponding to a specific search query
Argument | Type | Required/Default | Description |
---|---|---|---|
"filter" |
dict | [REQUIRED] | a MongoDB-syntax filter for Manifest |
"include_deleted" |
bool | default: False |
whether to include deleted scans (overwritten by filter 's is_deleted ) |
"manifest_projection" |
list | default: all fields but these | which Manifest fields to include in the response (include * to include all fields) |
One simple "filter"
may be:
{
"filter": {
"event_metadata.run_id": 123456789,
"event_metadata.event_id": 987654321,
"event_metadata.is_real_event": True,
}
}
See https://www.mongodb.com/docs/manual/tutorial/query-documents/ for more complex queries.
None
{
"manifests": list[Manifest dict],
}
- See Manifest
Retrieve entire backlog list
None
None
{
"entries": [
{
"scan_id": str,
"timestamp": float,
"pending_timestamp": float
},
...
]
}
Retrieve the status of a scan
Argument | Type | Required/Default | Description |
---|---|---|---|
"include_pod_statuses" |
bool | False |
whether to include the k8s pod statuses for the clientmanager & central server -- expends additional resources |
None
{
"scan_state": str, # a short human-readable code
"is_deleted": bool,
"scan_complete": bool, # workforce is done
"pods": { # field is included only if `include_pod_statuses == True`
"pod_status": dict, # a large k8s status object
"pod_status_message": str, # a human-readable message explaining the pod status retrieval
}
"clusters": list, # same as Manifest's clusters field
}
There are several codes for scan_state
:
- Successful state
SCAN_FINISHED_SUCCESSFULLY
- Non-finished scan states (in reverse order of occurrence)
IN_PROGRESS__PARTIAL_RESULT_GENERATED
IN_PROGRESS__WAITING_ON_FIRST_PIXEL_RECO
PENDING__WAITING_ON_CLUSTER_STARTUP
orPENDING__WAITING_ON_SCANNER_SERVER_STARTUP
PENDING__PRESTARTUP
- The above non-finished states have equivalents in the case that the scan failed and/or aborted
STOPPED__PARTIAL_RESULT_GENERATED
STOPPED__WAITING_ON_FIRST_PIXEL_RECO
STOPPED__WAITING_ON_CLUSTER_STARTUP
orSTOPPED__WAITING_ON_SCANNER_SERVER_STARTUP
STOPPED__PRESTARTUP
- NOTE: a failed scan my not have an above code automatically, and may need a
DELETE
request to get the code. Until then, it will retain an non-finished state code.
Retrieve the logs of a scan's pod: central server & client starter(s)
None
None
{
"pod_container_logs": str | list[ dict[str,str] ], # list
"pod_container_logs_message": str, # a human-readable message explaining the log retrieval
}
A dictionary containing non-physics metadata on a scan
Pseudo-code:
{
scan_id: str,
timestamp: float,
is_deleted: bool,
event_i3live_json_dict: dict,
scanner_server_args: str,
priority: int,
classifiers: dict[str, str | bool | float | int]
event_i3live_json_dict__hash: str, # a deterministic hash of the event json
ewms_task: {
tms_args: list[str],
env_vars: dict[str, dict[str, Any]],
clusters: [ # 2 types: condor & k8s -- different 'location' sub-fields
{
orchestrator: 'condor',
location: {
collector: str,
schedd: str,
},
cluster_id: int,
n_workers: int,
starter_info: dict,
statuses: {
'Completed': { # condor job status
'FatalError': int, # pilot status value -> # of jobs
'Done': int, # pilot status value -> # of jobs
...
},
'Running': {
'Tasking': int,
...
}
...
},
top_task_errors: dict[str, int], # error message -> # of jobs
},
...
{
orchestrator: 'k8s',
location: {
host: str,
namespace: str,
},
cluster_id: int,
n_workers: int,
starter_info: dict,
},
...
],
# signifies scanner is done (server and worker cluster(s))
complete: bool,
},
# found/created during first few seconds of scanning
event_metadata: {
run_id: int,
event_id: int,
event_type: str,
mjd: float,
is_real_event: bool, # as opposed to simulation
},
scan_metadata: dict | None,
# updated during scanning, multiple times (initially will be 'None')
progress: {
summary: str,
epilogue: str,
tallies: dict,
processing_stats: {
start: dict,
runtime: dict,
rate: dict,
end: str,
finished: bool,
predictions: dict,
},
predictive_scanning_threshold: float,
last_updated: str,
},
# timestamp of any update to manifest -- also see `progress.last_updated`
last_updated: float,
}
Some routes/methods respond with the scan's manifest. This is a large dictionary, so by default, all but GET @ /scan/SCAN_ID/manifest
exclude these fields:
event_i3live_json_dict
A dictionary containing the scan result
Pseudo-code:
{
scan_id: str,
skyscan_result: dict, # serialized version of 'skyreader.SkyScanResult'
is_final: bool, # is this result the final result?
}
See skyreader's plot_skydriver_scan_result.py
Also, see skyreader's plot_skydriver_scan_result.py