From 09ddbc5850f81b982b68b08873ea7fd92ab90711 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexandre=20Bult=C3=A9?= Date: Mon, 28 Nov 2022 16:24:52 +0100 Subject: [PATCH 1/7] =?UTF-8?q?You=20should=20make=20atomic=20commits=20?= =?UTF-8?q?=E2=84=A2=EF=B8=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 1 + udata_hydra/cli.py | 13 +++ udata_hydra/config.py | 4 +- udata_hydra/crawl.py | 137 +++++++++++++++++--------------- udata_hydra/datalake_service.py | 63 +++++++++------ udata_hydra/utils/http.py | 20 +++-- 6 files changed, 142 insertions(+), 96 deletions(-) diff --git a/README.md b/README.md index 98756267..1b831cc6 100644 --- a/README.md +++ b/README.md @@ -222,6 +222,7 @@ MINIO_BUCKET=benchmark-de MINIO_PWD=sample_pwd MINIO_FOLDER=data SENTRY_DSN=https://{my-sentry-dsn} +WEBHOOK_ENABLED=True ``` The webhook integration sends HTTP messages to `udata` when resources are stored, analyzed or checked to fill resources extras. diff --git a/udata_hydra/cli.py b/udata_hydra/cli.py index 092f9c1d..b3676e12 100644 --- a/udata_hydra/cli.py +++ b/udata_hydra/cli.py @@ -11,6 +11,7 @@ from progressist import ProgressBar from udata_hydra import config +from udata_hydra.crawl import check_url as crawl_check_url from udata_hydra.logger import setup_logging @@ -139,6 +140,18 @@ async def check_url(url, method="get"): log.error(e) +@cli +async def check_resource(resource_id, method="get"): + """Trigger a complete check for a given resource_id""" + q = "SELECT * FROM catalog WHERE resource_id = $1" + res = await context["conn"].fetch(q, resource_id) + if not res: + log.error("Resource not found in catalog") + return + async with aiohttp.ClientSession(timeout=None) as session: + await crawl_check_url(res[0], session, method=method) + + @cli async def csv_sample(size=1000, download=False, max_size="100M"): """Get a csv sample from latest checks diff --git a/udata_hydra/config.py b/udata_hydra/config.py index 79dc7b2d..c81c1c83 100644 --- a/udata_hydra/config.py +++ b/udata_hydra/config.py @@ -1,5 +1,7 @@ import os +from distutils.util import strtobool + from dotenv import load_dotenv # This is a helper for development purpose which will load a @@ -43,7 +45,7 @@ MAX_FILESIZE_ALLOWED = 104857600 # -- Webhook integration config -- # -ENABLE_WEBHOOK = True +WEBHOOK_ENABLED = bool(strtobool(os.getenv("WEBHOOK_ENABLED", "True"))) UDATA_URI = os.environ.get("UDATA_URI") UDATA_URI_API_KEY = os.environ.get("UDATA_URI_API_KEY") diff --git a/udata_hydra/crawl.py b/udata_hydra/crawl.py index 90943e4a..c9d08983 100644 --- a/udata_hydra/crawl.py +++ b/udata_hydra/crawl.py @@ -54,61 +54,68 @@ async def update_check_and_catalog(check_data: dict) -> None: q = f""" SELECT * FROM catalog JOIN checks ON catalog.last_check = checks.id - WHERE catalog.resource_id = '{check_data["resource_id"]}'; + WHERE catalog.resource_id = '{check_data["resource_id"]}' + AND catalog.deleted = FALSE; """ last_checks = await connection.fetch(q) + # does not make sense to have multiple last_checks for one resource_id + # TODO: this should be unit tested but keeping it here for now + assert len(last_checks) <= 1, f"Got {len(last_checks)} checks instead of 1" + if len(last_checks) == 0: # In case we are doing our first check for given URL rows = await connection.fetch( f""" SELECT resource_id, dataset_id, priority, initialization FROM catalog - WHERE resource_id = '{check_data["resource_id"]}'; + WHERE resource_id = '{check_data["resource_id"]}' + AND deleted = FALSE; """ ) - last_checks = [ - { - "resource_id": row[0], - "dataset_id": row[1], - "priority": row[2], - "initialization": row[3], - "status": None, - "timeout": None, - } - for row in rows - ] - - # There could be multiple resources pointing to the same URL - for last_check in last_checks: - if config.ENABLE_WEBHOOK: - is_first_check = last_check["status"] is None - status_has_changed = ( - "status" in check_data - and check_data["status"] != last_check["status"] - ) - status_no_longer_available = ( - "status" not in check_data - and last_check["status"] is not None - ) - timeout_has_changed = ( - check_data["timeout"] != last_check["timeout"] - ) - - if ( - is_first_check - or status_has_changed - or status_no_longer_available - or timeout_has_changed - ): - log.debug("Sending message to udata...") - document = dict() - document["check:status"] = check_data["status"] if status_has_changed else last_check["status"] - document["check:timeout"] = check_data["timeout"] - document["check:check_date"] = str(datetime.now()) - await send(dataset_id=last_check["dataset_id"], - resource_id=last_check["resource_id"], - document=document) + last_check = { + "resource_id": rows[0]["resource_id"], + "dataset_id": rows[0]["dataset_id"], + "priority": rows[0]["priority"], + "initialization": rows[0]["initialization"], + "status": None, + "timeout": None, + } + else: + last_check = last_checks[0] + + is_first_check = last_check["status"] is None + status_has_changed = ( + "status" in check_data + and check_data["status"] != last_check["status"] + ) + status_no_longer_available = ( + "status" not in check_data + and last_check["status"] is not None + ) + timeout_has_changed = ( + check_data["timeout"] != last_check["timeout"] + ) + + criterions = { + "is_first_check": is_first_check, + "status_has_changed": status_has_changed, + "status_no_longer_available": status_no_longer_available, + "timeout_has_changed": timeout_has_changed, + } + log.debug("crawl.py -k update_checks_and_catalog:criterions %s", json.dumps(criterions, indent=4)) + if any(criterions.values()): + document = dict() + document["check:status"] = check_data["status"] if status_has_changed else last_check["status"] + document["check:timeout"] = check_data["timeout"] + document["check:check_date"] = str(datetime.now()) + await send( + dataset_id=last_check["dataset_id"], + resource_id=last_check["resource_id"], + document=document + ) + else: + log.debug("Not sending check infos to udata, criterions not met") log.debug("Updating priority...") await connection.execute( @@ -206,26 +213,30 @@ async def check_url(row, session, sleep=0, method="get"): ) as resp: end = time.time() resp.raise_for_status() + data = { + "resource_id": row["resource_id"], + "url": row["url"], + "domain": domain, + "status": resp.status, + "headers": convert_headers(resp.headers), + "timeout": False, + "response_time": end - start, + } - # Download resource, store on Minio if CSV and produce resource.analysed message - res = await process_resource(row["url"], row["dataset_id"], str(row["resource_id"]), resp) - - await update_check_and_catalog( - { - "resource_id": row["resource_id"], - "url": row["url"], - "domain": domain, - "status": resp.status, - "headers": convert_headers(resp.headers), - "timeout": False, - "response_time": end - start, - "error": res["error"], - "checksum": res["checksum"], - "filesize": res["filesize"], - "mime_type": res["mime_type"] - } - ) - return STATUS_OK + # Download resource, store on Minio if CSV and produce resource.analysed message + res = await process_resource( + row["url"], row["dataset_id"], str(row["resource_id"]), session, data["headers"] + ) + data.update({ + "error": res["error"], + "checksum": res["checksum"], + "filesize": res["filesize"], + "mime_type": res["mime_type"], + }) + + await update_check_and_catalog(data) + + return STATUS_OK except asyncio.exceptions.TimeoutError: await update_check_and_catalog( { diff --git a/udata_hydra/datalake_service.py b/udata_hydra/datalake_service.py index 8e4ba264..11b3f026 100644 --- a/udata_hydra/datalake_service.py +++ b/udata_hydra/datalake_service.py @@ -5,11 +5,10 @@ import tempfile from typing import BinaryIO +import aiohttp import magic import pandas as pd -from aiohttp import ClientResponse - from udata_hydra import config, context from udata_hydra.utils.http import send from udata_hydra.utils.json import is_json_file @@ -20,7 +19,7 @@ log = logging.getLogger("udata-hydra") -async def download_resource(url: str, response: ClientResponse) -> BinaryIO: +async def download_resource(url: str, session: aiohttp.ClientSession, headers: dict) -> BinaryIO: """ Attempts downloading a resource from a given url. Returns the downloaded file object. @@ -28,31 +27,37 @@ async def download_resource(url: str, response: ClientResponse) -> BinaryIO: """ tmp_file = tempfile.NamedTemporaryFile(delete=False) - if float(response.headers.get("content-length", -1)) > float( - config.MAX_FILESIZE_ALLOWED - ): + if float(headers.get("content-length", -1)) > float(config.MAX_FILESIZE_ALLOWED): raise IOError("File too large to download") chunk_size = 1024 i = 0 - async for chunk in response.content.iter_chunked(chunk_size): - if i * chunk_size < float(config.MAX_FILESIZE_ALLOWED): - tmp_file.write(chunk) - else: - tmp_file.close() - log.error(f"File {url} is too big, skipping") - raise IOError("File too large to download") - i += 1 + # TODO: move to queue, can be long, also add setting for this + timeout = aiohttp.ClientTimeout(total=5*60) + async with session.get(url, timeout=timeout, allow_redirects=True) as response: + async for chunk in response.content.iter_chunked(chunk_size): + if i * chunk_size < float(config.MAX_FILESIZE_ALLOWED): + tmp_file.write(chunk) + else: + tmp_file.close() + log.error(f"File {url} is too big, skipping") + raise IOError("File too large to download") + i += 1 tmp_file.close() return tmp_file -async def process_resource(url: str, dataset_id: str, resource_id: str, response: ClientResponse) -> None: - log.debug( - "Processing task for resource {} in dataset {}".format( - resource_id, dataset_id - ) - ) +# TODO: we're sending analysis info to udata every time a resource is crawled +# although we only send "meta" check info only when they change +# maybe introduce conditionnal webhook trigger here too +async def process_resource( + url: str, + dataset_id: str, + resource_id: str, + session: aiohttp.ClientSession, + headers: dict +) -> None: + log.debug(f"Processing task for resource {resource_id} in dataset {dataset_id}") tmp_file = None mime_type = None @@ -61,8 +66,7 @@ async def process_resource(url: str, dataset_id: str, resource_id: str, response error = None try: - tmp_file = await download_resource(url, response) - + tmp_file = await download_resource(url, session, headers) # Get file size filesize = os.path.getsize(tmp_file.name) @@ -70,6 +74,7 @@ async def process_resource(url: str, dataset_id: str, resource_id: str, response sha1 = await compute_checksum_from_file(tmp_file.name) # Check resource MIME type + # FIXME: this never seems to output text/csv, maybe override it later mime_type = magic.from_file(tmp_file.name, mime=True) if mime_type in ["text/plain", "text/csv", "application/csv"] and not is_json_file(tmp_file.name): # Save resource only if CSV @@ -109,7 +114,7 @@ async def process_resource(url: str, dataset_id: str, resource_id: str, response resource_id=resource_id, document=document) except ValueError: - log.debug( + log.warning( f"Resource {resource_id} in dataset {dataset_id} is not a CSV" ) @@ -131,6 +136,12 @@ async def process_resource(url: str, dataset_id: str, resource_id: str, response await send(dataset_id=dataset_id, resource_id=resource_id, document=document) + return { + "checksum": sha1, + "error": error, + "filesize": filesize, + "mime_type": mime_type + } except IOError: error = "File too large to download" document = { @@ -141,15 +152,15 @@ async def process_resource(url: str, dataset_id: str, resource_id: str, response await send(dataset_id=dataset_id, resource_id=resource_id, document=document) - finally: - if tmp_file: - os.remove(tmp_file.name) return { "checksum": sha1, "error": error, "filesize": filesize, "mime_type": mime_type } + finally: + if tmp_file: + os.remove(tmp_file.name) async def has_checksum_been_modified(resource_id, new_checksum): diff --git a/udata_hydra/utils/http.py b/udata_hydra/utils/http.py index d9543334..7742e89b 100644 --- a/udata_hydra/utils/http.py +++ b/udata_hydra/utils/http.py @@ -1,23 +1,31 @@ -import aiohttp +import json import logging +import aiohttp + from udata_hydra import config log = logging.getLogger("udata-hydra") async def send(dataset_id: str, resource_id: str, document: dict) -> None: + # Extras in udata can't be None + document = {k: document[k] for k in document if document[k] is not None} + log.debug(f"Sending payload to udata {dataset_id}/{resource_id}: {json.dumps(document, indent=4)}") + + if not config.WEBHOOK_ENABLED: + log.debug("Webhook disabled, skipping send") + return + if not config.UDATA_URI or not config.UDATA_URI_API_KEY: - log.error("Missing udata URI and API key to send http query") + log.error("UDATA_* config is not set, not sending callbacks") return + uri = f"{config.UDATA_URI}/datasets/{dataset_id}/resources/{resource_id}/extras/" headers = {"content-type": "application/json", "X-API-KEY": config.UDATA_URI_API_KEY} - # Extras in udata can't be None - document = {key: document[key] for key in document if document[key] is not None} - async with aiohttp.ClientSession() as session: async with session.put(uri, json=document, headers=headers) as resp: body = await resp.text() if not resp.status == 200: - log.error(f"udata reponded with a {resp.status} and content: {body}") + log.error(f"udata responded with a {resp.status} and content: {body}") From 4710c724019ef5eccdebb467de63669d82a938ba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexandre=20Bult=C3=A9?= Date: Tue, 29 Nov 2022 09:54:01 +0100 Subject: [PATCH 2/7] Fix tests --- tests/test_crawler.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/tests/test_crawler.py b/tests/test_crawler.py index 6af833dd..ddd4c77a 100644 --- a/tests/test_crawler.py +++ b/tests/test_crawler.py @@ -27,7 +27,7 @@ nest_asyncio.apply() -async def mock_download_resource(url, response): +async def mock_download_resource(url, session, headers): tmp_file = tempfile.NamedTemporaryFile(delete=False) tmp_file.write(SIMPLE_CSV_CONTENT.encode("utf-8")) tmp_file.close() @@ -157,8 +157,11 @@ async def test_process_resource(setup_catalog, mocker): rurl = "https://example.com/resource-1" mocker.patch("udata_hydra.datalake_service.download_resource", mock_download_resource) + # disable webhook, tested in following test + mocker.patch("udata_hydra.config.WEBHOOK_ENABLED", False) - result = await process_resource(rurl, "dataset_id", "resource_id", response=None) + resource_id = "c4e3a9fb-4415-488e-ba57-d05269b27adf" + result = await process_resource(rurl, "dataset_id", resource_id, None, {}) assert result["error"] is None assert result["checksum"] == hashlib.sha1(SIMPLE_CSV_CONTENT.encode("utf-8")).hexdigest() @@ -173,9 +176,9 @@ async def test_process_resource_send_udata(setup_catalog, mocker, rmock): mocker.patch("udata_hydra.config.UDATA_URI_API_KEY", "my-api-key") mocker.patch("udata_hydra.datalake_service.download_resource", mock_download_resource) - rmock.get(udata_url, status=200) + rmock.put(udata_url, status=200) - await process_resource(rurl, "dataset_id", resource_id, response=None) + await process_resource(rurl, "dataset_id", resource_id, None, {}) assert ("PUT", URL(udata_url)) in rmock.requests req = rmock.requests[("PUT", URL(udata_url))] From f673d210da2b1d0391411b1f48dbc9cc60c97382 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexandre=20Bult=C3=A9?= Date: Tue, 29 Nov 2022 16:29:29 +0100 Subject: [PATCH 3/7] fix tests --- tests/test_crawler.py | 8 ++ udata_hydra/crawl.py | 141 +++++++++++++++----------------- udata_hydra/datalake_service.py | 4 +- 3 files changed, 78 insertions(+), 75 deletions(-) diff --git a/tests/test_crawler.py b/tests/test_crawler.py index ddd4c77a..b3c73771 100644 --- a/tests/test_crawler.py +++ b/tests/test_crawler.py @@ -58,6 +58,14 @@ async def test_catalog(setup_catalog, db): ], ) async def test_crawl(setup_catalog, rmock, event_loop, db, resource, mocker, produce_mock): + # disable process_resource, tested elsewhere (and would call rmock twice) + mocker.patch("udata_hydra.crawl.process_resource").return_value = { + "error": None, + "checksum": None, + "filesize": None, + "mime_type": None + } + status, timeout, exception = resource rurl = "https://example.com/resource-1" rmock.get( diff --git a/udata_hydra/crawl.py b/udata_hydra/crawl.py index c9d08983..177e92f0 100644 --- a/udata_hydra/crawl.py +++ b/udata_hydra/crawl.py @@ -54,68 +54,61 @@ async def update_check_and_catalog(check_data: dict) -> None: q = f""" SELECT * FROM catalog JOIN checks ON catalog.last_check = checks.id - WHERE catalog.resource_id = '{check_data["resource_id"]}' - AND catalog.deleted = FALSE; + WHERE catalog.resource_id = '{check_data["resource_id"]}'; """ last_checks = await connection.fetch(q) - # does not make sense to have multiple last_checks for one resource_id - # TODO: this should be unit tested but keeping it here for now - assert len(last_checks) <= 1, f"Got {len(last_checks)} checks instead of 1" - if len(last_checks) == 0: # In case we are doing our first check for given URL rows = await connection.fetch( f""" SELECT resource_id, dataset_id, priority, initialization FROM catalog - WHERE resource_id = '{check_data["resource_id"]}' - AND deleted = FALSE; + WHERE resource_id = '{check_data["resource_id"]}'; """ ) - last_check = { - "resource_id": rows[0]["resource_id"], - "dataset_id": rows[0]["dataset_id"], - "priority": rows[0]["priority"], - "initialization": rows[0]["initialization"], - "status": None, - "timeout": None, - } - else: - last_check = last_checks[0] - - is_first_check = last_check["status"] is None - status_has_changed = ( - "status" in check_data - and check_data["status"] != last_check["status"] - ) - status_no_longer_available = ( - "status" not in check_data - and last_check["status"] is not None - ) - timeout_has_changed = ( - check_data["timeout"] != last_check["timeout"] - ) - - criterions = { - "is_first_check": is_first_check, - "status_has_changed": status_has_changed, - "status_no_longer_available": status_no_longer_available, - "timeout_has_changed": timeout_has_changed, - } - log.debug("crawl.py -k update_checks_and_catalog:criterions %s", json.dumps(criterions, indent=4)) - if any(criterions.values()): - document = dict() - document["check:status"] = check_data["status"] if status_has_changed else last_check["status"] - document["check:timeout"] = check_data["timeout"] - document["check:check_date"] = str(datetime.now()) - await send( - dataset_id=last_check["dataset_id"], - resource_id=last_check["resource_id"], - document=document - ) - else: - log.debug("Not sending check infos to udata, criterions not met") + last_checks = [ + { + "resource_id": row[0], + "dataset_id": row[1], + "priority": row[2], + "initialization": row[3], + "status": None, + "timeout": None, + } + for row in rows + ] + + # There could be multiple resources pointing to the same URL + for last_check in last_checks: + if config.WEBHOOK_ENABLED: + is_first_check = last_check["status"] is None + status_has_changed = ( + "status" in check_data + and check_data["status"] != last_check["status"] + ) + status_no_longer_available = ( + "status" not in check_data + and last_check["status"] is not None + ) + timeout_has_changed = ( + check_data["timeout"] != last_check["timeout"] + ) + + if ( + is_first_check + or status_has_changed + or status_no_longer_available + or timeout_has_changed + ): + log.debug("Sending message to udata...") + document = dict() + document["check:status"] = check_data["status"] if status_has_changed else last_check["status"] + document["check:timeout"] = check_data["timeout"] + document["check:check_date"] = str(datetime.now()) + await send(dataset_id=last_check["dataset_id"], + resource_id=last_check["resource_id"], + document=document) log.debug("Updating priority...") await connection.execute( @@ -213,30 +206,32 @@ async def check_url(row, session, sleep=0, method="get"): ) as resp: end = time.time() resp.raise_for_status() - data = { - "resource_id": row["resource_id"], - "url": row["url"], - "domain": domain, - "status": resp.status, - "headers": convert_headers(resp.headers), - "timeout": False, - "response_time": end - start, - } - # Download resource, store on Minio if CSV and produce resource.analysed message - res = await process_resource( - row["url"], row["dataset_id"], str(row["resource_id"]), session, data["headers"] - ) - data.update({ - "error": res["error"], - "checksum": res["checksum"], - "filesize": res["filesize"], - "mime_type": res["mime_type"], - }) - - await update_check_and_catalog(data) + # Download resource, store on Minio if CSV and produce resource.analysed message + res = await process_resource( + row["url"], + row["dataset_id"], + str(row["resource_id"]), + session, + resp.headers + ) - return STATUS_OK + await update_check_and_catalog( + { + "resource_id": row["resource_id"], + "url": row["url"], + "domain": domain, + "status": resp.status, + "headers": convert_headers(resp.headers), + "timeout": False, + "response_time": end - start, + "error": res["error"], + "checksum": res["checksum"], + "filesize": res["filesize"], + "mime_type": res["mime_type"] + } + ) + return STATUS_OK except asyncio.exceptions.TimeoutError: await update_check_and_catalog( { diff --git a/udata_hydra/datalake_service.py b/udata_hydra/datalake_service.py index 11b3f026..174b86c9 100644 --- a/udata_hydra/datalake_service.py +++ b/udata_hydra/datalake_service.py @@ -3,7 +3,7 @@ import logging import os import tempfile -from typing import BinaryIO +from typing import BinaryIO, Union import aiohttp import magic @@ -56,7 +56,7 @@ async def process_resource( resource_id: str, session: aiohttp.ClientSession, headers: dict -) -> None: +) -> Union[dict, None]: log.debug(f"Processing task for resource {resource_id} in dataset {dataset_id}") tmp_file = None From b3afd54ea23c041e27ab5521a893f2c249fb8169 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexandre=20Bult=C3=A9?= Date: Tue, 29 Nov 2022 17:51:47 +0100 Subject: [PATCH 4/7] refactor and test find_delimiter and detect_encoding --- tests/test_utils.py | 23 +++++++++++++++++++++++ udata_hydra/datalake_service.py | 13 ++----------- udata_hydra/utils/csv.py | 26 ++++++++++++++++---------- 3 files changed, 41 insertions(+), 21 deletions(-) create mode 100644 tests/test_utils.py diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 00000000..8b37a4f8 --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,23 @@ +import pytest + +from udata_hydra.utils.csv import find_delimiter, detect_encoding + + +def test_detect_encoding(): + with pytest.raises(FileNotFoundError): + detect_encoding("not-a-real-file-hopefully.txt") + encoding = detect_encoding("tests/catalog.csv") + assert encoding is not None + encoding = detect_encoding("docs/screenshot.png") + assert encoding is None + + +def test_find_delimiter(): + with pytest.raises(FileNotFoundError): + find_delimiter("not-a-real-file-hopefully.txt") + delimiter = find_delimiter("tests/catalog.csv") + assert delimiter == ";" + delimiter = find_delimiter("docs/screenshot.png") + assert delimiter is None + delimiter = find_delimiter("tests/catalog.csv", encoding="nimp") + assert delimiter is None diff --git a/udata_hydra/datalake_service.py b/udata_hydra/datalake_service.py index 174b86c9..8aa25443 100644 --- a/udata_hydra/datalake_service.py +++ b/udata_hydra/datalake_service.py @@ -80,19 +80,10 @@ async def process_resource( # Save resource only if CSV try: # Try to detect encoding from suspected csv file. If fail, set up to utf8 (most common) - with open(tmp_file.name, mode="rb") as f: - try: - encoding = detect_encoding(f) - # FIXME: catch exception more precisely - except Exception: - encoding = "utf-8" + encoding = detect_encoding(tmp_file.name) or "utf-8" # Try to detect delimiter from suspected csv file. If fail, set up to None # (pandas will use python engine and try to guess separator itself) - try: - delimiter = find_delimiter(tmp_file.name) - # FIXME: catch exception more precisely - except Exception: - delimiter = None + delimiter = find_delimiter(tmp_file.name) # Try to read first 1000 rows with pandas pd.read_csv(tmp_file.name, sep=delimiter, encoding=encoding, nrows=1000) diff --git a/udata_hydra/utils/csv.py b/udata_hydra/utils/csv.py index c580bc36..27c624fa 100644 --- a/udata_hydra/utils/csv.py +++ b/udata_hydra/utils/csv.py @@ -2,20 +2,26 @@ from cchardet import UniversalDetector -def find_delimiter(filename): +def find_delimiter(filename, encoding="utf-8"): """Find delimiter for eventual csv file""" + delimiter = None sniffer = csv.Sniffer() - with open(filename) as fp: - delimiter = sniffer.sniff(fp.read(5000), delimiters=";,|\t").delimiter + try: + with open(filename, encoding=encoding) as fp: + delimiter = sniffer.sniff(fp.read(5000), delimiters=";,|\t").delimiter + # unknown encoding, utf-8 but not, sniffer failed + except (LookupError, UnicodeDecodeError, csv.Error): + pass return delimiter -def detect_encoding(the_file): - """Detects file encoding using chardet based on N first lines""" +def detect_encoding(filename): + """Detects file encoding using cchardet""" detector = UniversalDetector() - for line in the_file.readlines(): - detector.feed(line) - if detector.done: - break + with open(filename, mode="rb") as f: + for line in f.readlines(): + detector.feed(line) + if detector.done: + break detector.close() - return detector.result["encoding"] + return detector.result.get("encoding") From 4850750e78d63ef77c1146639b5ef20328633f6d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexandre=20Bult=C3=A9?= Date: Wed, 30 Nov 2022 12:13:25 +0100 Subject: [PATCH 5/7] More meat, more tests --- tests/conftest.py | 11 ++++ tests/test_crawler.py | 108 ++++++++++++++++++++++++++++++++++---- udata_hydra/crawl.py | 104 +++++++++++++++++++----------------- udata_hydra/utils/http.py | 13 +++-- 4 files changed, 173 insertions(+), 63 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 994b41f7..7bbba243 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -56,6 +56,17 @@ def produce_mock(mocker): mocker.patch("udata_hydra.datalake_service.send") +@pytest.fixture +def analysis_mock(mocker): + """Disable process_resource while crawling""" + mocker.patch("udata_hydra.crawl.process_resource").return_value = { + "error": None, + "checksum": None, + "filesize": None, + "mime_type": None + } + + @pytest.fixture def rmock(): # passthrough for local requests (aiohttp TestServer) diff --git a/tests/test_crawler.py b/tests/test_crawler.py index b3c73771..f27a5420 100644 --- a/tests/test_crawler.py +++ b/tests/test_crawler.py @@ -10,6 +10,7 @@ from aiohttp.client_exceptions import ClientError from asyncio.exceptions import TimeoutError +from minicli import run from yarl import URL from udata_hydra import config @@ -21,6 +22,9 @@ SIMPLE_CSV_CONTENT = """code_insee,number 95211,102 36522,48""" +resource_id = "c4e3a9fb-4415-488e-ba57-d05269b27adf" +dataset_id = "601ddcfc85a59c3a45c2435a" + pytestmark = pytest.mark.asyncio # allows nested async to test async with async :mindblown: @@ -45,6 +49,98 @@ async def test_catalog(setup_catalog, db): assert resource["dataset_id"] == "601ddcfc85a59c3a45c2435a" +async def test_catalog_deleted(setup_catalog, db, rmock): + res = await db.fetch("SELECT id FROM catalog WHERE deleted = FALSE") + assert len(res) == 1 + with open("tests/catalog.csv", "rb") as cfile: + catalog_content = cfile.readlines() + catalog = "https://example.com/catalog" + # feed empty catalog, should delete the previously loaded resource + rmock.get(catalog, status=200, body=catalog_content[0]) + run("load_catalog", url=catalog) + res = await db.fetch("SELECT id FROM catalog WHERE deleted = TRUE") + assert len(res) == 1 + res = await db.fetch("SELECT id FROM catalog") + assert len(res) == 1 + + +async def test_catalog_deleted_with_checked_resource(setup_catalog, db, rmock, event_loop, mocker, analysis_mock): + mocker.patch("udata_hydra.config.WEBHOOK_ENABLED", False) + + rurl = "https://example.com/resource-1" + rmock.get(rurl) + event_loop.run_until_complete(crawl(iterations=1)) + + res = await db.fetch("SELECT id FROM catalog WHERE deleted = FALSE and last_check IS NOT NULL") + assert len(res) == 1 + + with open("tests/catalog.csv", "rb") as cfile: + catalog_content = cfile.readlines() + catalog = "https://example.com/catalog" + # feed empty catalog, should delete the previously loaded resource + rmock.get(catalog, status=200, body=catalog_content[0]) + run("load_catalog", url=catalog) + res = await db.fetch("SELECT id FROM catalog WHERE deleted = TRUE") + assert len(res) == 1 + res = await db.fetch("SELECT id FROM catalog") + assert len(res) == 1 + + +async def test_catalog_deleted_with_new_url(setup_catalog, db, rmock, event_loop, mocker, analysis_mock): + # load a new catalog with a new URL for this resource + with open("tests/catalog.csv", "r") as cfile: + catalog_content = cfile.readlines() + catalog_content[-1] = catalog_content[-1].replace("resource-1", "resource-2") + catalog_content = "\n".join(catalog_content) + catalog = "https://example.com/catalog" + rmock.get(catalog, status=200, body=catalog_content.encode("utf-8")) + run("load_catalog", url=catalog) + + # check catalog coherence + res = await db.fetch("SELECT id FROM catalog WHERE deleted = TRUE") + assert len(res) == 1 + res = await db.fetch("SELECT id FROM catalog WHERE resource_id = $1", resource_id) + assert len(res) == 2 + + # check that the crawler does not crawl the deleted resource + # check that udata is called only once + # (rmock will raise an error if a mock is called more than once) + # udata is not called for analysis results since it's mocked, only for checks + udata_url = f"{config.UDATA_URI}/datasets/{dataset_id}/resources/{resource_id}/extras/" + mocker.patch("udata_hydra.config.UDATA_URI_API_KEY", "my-api-key") + rurl_1 = "https://example.com/resource-1" + rurl_2 = "https://example.com/resource-2" + rmock.get(rurl_1) + rmock.get(rurl_2) + rmock.put(udata_url, status=200) + event_loop.run_until_complete(crawl(iterations=1)) + assert ("GET", URL(rurl_1)) not in rmock.requests + assert ("GET", URL(rurl_2)) in rmock.requests + assert ("PUT", URL(udata_url)) in rmock.requests + + +async def test_udata_connection_error_500(setup_catalog, mocker, analysis_mock, rmock, event_loop): + udata_url = f"{config.UDATA_URI}/datasets/{dataset_id}/resources/{resource_id}/extras/" + mocker.patch("udata_hydra.config.UDATA_URI_API_KEY", "my-api-key") + rurl = "https://example.com/resource-1" + rmock.get(rurl) + rmock.put(udata_url, status=500) + event_loop.run_until_complete(crawl(iterations=1)) + assert ("GET", URL(rurl)) in rmock.requests + assert ("PUT", URL(udata_url)) in rmock.requests + + +async def test_udata_connection_error_exception(setup_catalog, mocker, analysis_mock, rmock, event_loop): + udata_url = f"{config.UDATA_URI}/datasets/{dataset_id}/resources/{resource_id}/extras/" + mocker.patch("udata_hydra.config.UDATA_URI_API_KEY", "my-api-key") + rurl = "https://example.com/resource-1" + rmock.get(rurl) + rmock.put(udata_url, exception=ClientError("client error")) + event_loop.run_until_complete(crawl(iterations=1)) + assert ("GET", URL(rurl)) in rmock.requests + assert ("PUT", URL(udata_url)) in rmock.requests + + @pytest.mark.parametrize( "resource", [ @@ -57,15 +153,7 @@ async def test_catalog(setup_catalog, db): (None, True, TimeoutError), ], ) -async def test_crawl(setup_catalog, rmock, event_loop, db, resource, mocker, produce_mock): - # disable process_resource, tested elsewhere (and would call rmock twice) - mocker.patch("udata_hydra.crawl.process_resource").return_value = { - "error": None, - "checksum": None, - "filesize": None, - "mime_type": None - } - +async def test_crawl(setup_catalog, rmock, event_loop, db, resource, produce_mock, analysis_mock): status, timeout, exception = resource rurl = "https://example.com/resource-1" rmock.get( @@ -168,7 +256,6 @@ async def test_process_resource(setup_catalog, mocker): # disable webhook, tested in following test mocker.patch("udata_hydra.config.WEBHOOK_ENABLED", False) - resource_id = "c4e3a9fb-4415-488e-ba57-d05269b27adf" result = await process_resource(rurl, "dataset_id", resource_id, None, {}) assert result["error"] is None @@ -179,7 +266,6 @@ async def test_process_resource(setup_catalog, mocker): async def test_process_resource_send_udata(setup_catalog, mocker, rmock): rurl = "https://example.com/resource-1" - resource_id = "c4e3a9fb-4415-488e-ba57-d05269b27adf" udata_url = f"{config.UDATA_URI}/datasets/dataset_id/resources/{resource_id}/extras/" mocker.patch("udata_hydra.config.UDATA_URI_API_KEY", "my-api-key") diff --git a/udata_hydra/crawl.py b/udata_hydra/crawl.py index 177e92f0..425b40ae 100644 --- a/udata_hydra/crawl.py +++ b/udata_hydra/crawl.py @@ -44,6 +44,43 @@ async def insert_check(data: dict): return last_check["id"] +async def send_check_data(check_data, last_check): + is_first_check = last_check["status"] is None + status_has_changed = ( + "status" in check_data + and check_data["status"] != last_check["status"] + ) + status_no_longer_available = ( + "status" not in check_data + and last_check["status"] is not None + ) + timeout_has_changed = ( + check_data["timeout"] != last_check["timeout"] + ) + + criterions = { + "is_first_check": is_first_check, + "status_has_changed": status_has_changed, + "status_no_longer_available": status_no_longer_available, + "timeout_has_changed": timeout_has_changed, + } + log.debug("crawl.py::update_checks_and_catalog:::criterions %s", json.dumps(criterions, indent=4)) + if any(criterions.values()): + document = { + "check:status": check_data["status"] if status_has_changed else last_check["status"], + "check:timeout": check_data["timeout"], + "check:check_date": str(datetime.now()), + } + await send( + dataset_id=last_check["dataset_id"], + resource_id=last_check["resource_id"], + document=document + ) + else: + log.debug("Not sending check infos to udata, criterions not met") + + +# TODO: we should handle the case when multiple resources point to the same URL and update them all async def update_check_and_catalog(check_data: dict) -> None: """Update the catalog and checks tables""" context.monitor().set_status("Updating checks and catalog...") @@ -54,61 +91,32 @@ async def update_check_and_catalog(check_data: dict) -> None: q = f""" SELECT * FROM catalog JOIN checks ON catalog.last_check = checks.id - WHERE catalog.resource_id = '{check_data["resource_id"]}'; + WHERE catalog.resource_id = '{check_data["resource_id"]}' + AND catalog.deleted = FALSE; """ - last_checks = await connection.fetch(q) + last_check = await connection.fetchrow(q) - if len(last_checks) == 0: - # In case we are doing our first check for given URL - rows = await connection.fetch( + # In case we are doing our first check for given resource + if not last_check: + row = await connection.fetchrow( f""" SELECT resource_id, dataset_id, priority, initialization FROM catalog - WHERE resource_id = '{check_data["resource_id"]}'; + WHERE resource_id = '{check_data["resource_id"]}' + AND deleted = FALSE; """ ) - last_checks = [ - { - "resource_id": row[0], - "dataset_id": row[1], - "priority": row[2], - "initialization": row[3], - "status": None, - "timeout": None, - } - for row in rows - ] - - # There could be multiple resources pointing to the same URL - for last_check in last_checks: - if config.WEBHOOK_ENABLED: - is_first_check = last_check["status"] is None - status_has_changed = ( - "status" in check_data - and check_data["status"] != last_check["status"] - ) - status_no_longer_available = ( - "status" not in check_data - and last_check["status"] is not None - ) - timeout_has_changed = ( - check_data["timeout"] != last_check["timeout"] - ) - - if ( - is_first_check - or status_has_changed - or status_no_longer_available - or timeout_has_changed - ): - log.debug("Sending message to udata...") - document = dict() - document["check:status"] = check_data["status"] if status_has_changed else last_check["status"] - document["check:timeout"] = check_data["timeout"] - document["check:check_date"] = str(datetime.now()) - await send(dataset_id=last_check["dataset_id"], - resource_id=last_check["resource_id"], - document=document) + last_check = { + "resource_id": row["resource_id"], + "dataset_id": row["dataset_id"], + "priority": row["priority"], + "initialization": row["initialization"], + "status": None, + "timeout": None, + } + + if config.WEBHOOK_ENABLED: + await send_check_data(check_data, last_check) log.debug("Updating priority...") await connection.execute( diff --git a/udata_hydra/utils/http.py b/udata_hydra/utils/http.py index 7742e89b..b0f456d5 100644 --- a/udata_hydra/utils/http.py +++ b/udata_hydra/utils/http.py @@ -25,7 +25,12 @@ async def send(dataset_id: str, resource_id: str, document: dict) -> None: headers = {"content-type": "application/json", "X-API-KEY": config.UDATA_URI_API_KEY} async with aiohttp.ClientSession() as session: - async with session.put(uri, json=document, headers=headers) as resp: - body = await resp.text() - if not resp.status == 200: - log.error(f"udata responded with a {resp.status} and content: {body}") + # /!\ we don't want a connection error to udata to bubble up to the crawler + # TODO: this would be a lot more sane if we used a queue + try: + async with session.put(uri, json=document, headers=headers) as resp: + body = await resp.text() + if not resp.status == 200: + log.error(f"udata responded with a {resp.status} and content: {body}") + except aiohttp.ClientError as e: + log.error("Error while contacting udata", exc_info=e) From 1497f493db8a3865ee95a1ef43659fa0613eb384 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexandre=20Bult=C3=A9?= Date: Wed, 30 Nov 2022 12:36:40 +0100 Subject: [PATCH 6/7] use str2bool --- poetry.lock | 13 ++++++++++++- pyproject.toml | 1 + udata_hydra/config.py | 4 ++-- 3 files changed, 15 insertions(+), 3 deletions(-) diff --git a/poetry.lock b/poetry.lock index f4fc2fb2..871cb7e1 100644 --- a/poetry.lock +++ b/poetry.lock @@ -761,6 +761,14 @@ category = "dev" optional = false python-versions = ">=3.7" +[[package]] +name = "str2bool" +version = "1.1" +description = "Convert string to boolean" +category = "main" +optional = false +python-versions = "*" + [[package]] name = "tomli" version = "2.0.1" @@ -824,7 +832,7 @@ multidict = ">=4.0" [metadata] lock-version = "1.1" python-versions = "^3.9" -content-hash = "221f93503abaacf47cd932f4e233df6a4bb648876c76fe9fff794932aa4cb3ce" +content-hash = "b5153591b4ad6822930998f36b68d1cb9de38cd223b812095a0895dc902a5a45" [metadata.files] aiocontextvars = [ @@ -1380,6 +1388,9 @@ sniffio = [ {file = "sniffio-1.3.0-py3-none-any.whl", hash = "sha256:eecefdce1e5bbfb7ad2eeaabf7c1eeb404d7757c379bd1f7e5cce9d8bf425384"}, {file = "sniffio-1.3.0.tar.gz", hash = "sha256:e60305c5e5d314f5389259b7f22aaa33d8f7dee49763119234af3755c55b9101"}, ] +str2bool = [ + {file = "str2bool-1.1.zip", hash = "sha256:dbc3c917dca831904bce8568f6fb1f91435fcffc2ec4a46d62c9aa08d7cf77c3"}, +] tomli = [ {file = "tomli-2.0.1-py3-none-any.whl", hash = "sha256:939de3e7a6161af0c887ef91b7d41a53e7c5a1ca976325f429cb46ea9bc30ecc"}, {file = "tomli-2.0.1.tar.gz", hash = "sha256:de526c12914f0c550d15924c62d72abc48d6fe7364aa87328337a31007fe8a4f"}, diff --git a/pyproject.toml b/pyproject.toml index 354dd0f7..df6579fe 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,6 +24,7 @@ requests = "^2.27.1" sentry-sdk = "^1.11.1" aiocontextvars = "^0.2.2" coloredlogs = "^15.0.1" +str2bool = "^1.1" [tool.poetry.dev-dependencies] flake8 = "^4.0.1" diff --git a/udata_hydra/config.py b/udata_hydra/config.py index c81c1c83..815c69a3 100644 --- a/udata_hydra/config.py +++ b/udata_hydra/config.py @@ -1,6 +1,6 @@ import os -from distutils.util import strtobool +from str2bool import str2bool from dotenv import load_dotenv @@ -45,7 +45,7 @@ MAX_FILESIZE_ALLOWED = 104857600 # -- Webhook integration config -- # -WEBHOOK_ENABLED = bool(strtobool(os.getenv("WEBHOOK_ENABLED", "True"))) +WEBHOOK_ENABLED = bool(str2bool(os.getenv("WEBHOOK_ENABLED", "True"))) UDATA_URI = os.environ.get("UDATA_URI") UDATA_URI_API_KEY = os.environ.get("UDATA_URI_API_KEY") From 147e60d30b17ad39be15af772e13f16ae0e784dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexandre=20Bult=C3=A9?= Date: Mon, 5 Dec 2022 07:03:08 +0100 Subject: [PATCH 7/7] review fixes --- tests/test_crawler.py | 3 +-- udata_hydra/crawl.py | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/test_crawler.py b/tests/test_crawler.py index f27a5420..2c59cad8 100644 --- a/tests/test_crawler.py +++ b/tests/test_crawler.py @@ -104,7 +104,6 @@ async def test_catalog_deleted_with_new_url(setup_catalog, db, rmock, event_loop # check that the crawler does not crawl the deleted resource # check that udata is called only once - # (rmock will raise an error if a mock is called more than once) # udata is not called for analysis results since it's mocked, only for checks udata_url = f"{config.UDATA_URI}/datasets/{dataset_id}/resources/{resource_id}/extras/" mocker.patch("udata_hydra.config.UDATA_URI_API_KEY", "my-api-key") @@ -116,7 +115,7 @@ async def test_catalog_deleted_with_new_url(setup_catalog, db, rmock, event_loop event_loop.run_until_complete(crawl(iterations=1)) assert ("GET", URL(rurl_1)) not in rmock.requests assert ("GET", URL(rurl_2)) in rmock.requests - assert ("PUT", URL(udata_url)) in rmock.requests + assert len(rmock.requests[("PUT", URL(udata_url))]) == 1 async def test_udata_connection_error_500(setup_catalog, mocker, analysis_mock, rmock, event_loop): diff --git a/udata_hydra/crawl.py b/udata_hydra/crawl.py index 425b40ae..c972bb05 100644 --- a/udata_hydra/crawl.py +++ b/udata_hydra/crawl.py @@ -69,7 +69,7 @@ async def send_check_data(check_data, last_check): document = { "check:status": check_data["status"] if status_has_changed else last_check["status"], "check:timeout": check_data["timeout"], - "check:check_date": str(datetime.now()), + "check:check_date": str(datetime.utcnow()), } await send( dataset_id=last_check["dataset_id"],