Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor crawling / analysis quite a bit #39

Merged
merged 7 commits into from
Dec 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ MINIO_BUCKET=benchmark-de
MINIO_PWD=sample_pwd
MINIO_FOLDER=data
SENTRY_DSN=https://{my-sentry-dsn}
WEBHOOK_ENABLED=True
```

The webhook integration sends HTTP messages to `udata` when resources are stored, analyzed or checked to fill resources extras.
Expand Down
13 changes: 12 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ requests = "^2.27.1"
sentry-sdk = "^1.11.1"
aiocontextvars = "^0.2.2"
coloredlogs = "^15.0.1"
str2bool = "^1.1"

[tool.poetry.dev-dependencies]
flake8 = "^4.0.1"
Expand Down
11 changes: 11 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,17 @@ def produce_mock(mocker):
mocker.patch("udata_hydra.datalake_service.send")


@pytest.fixture
def analysis_mock(mocker):
"""Disable process_resource while crawling"""
mocker.patch("udata_hydra.crawl.process_resource").return_value = {
"error": None,
"checksum": None,
"filesize": None,
"mime_type": None
}


@pytest.fixture
def rmock():
# passthrough for local requests (aiohttp TestServer)
Expand Down
108 changes: 102 additions & 6 deletions tests/test_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from aiohttp.client_exceptions import ClientError
from asyncio.exceptions import TimeoutError
from minicli import run
from yarl import URL

from udata_hydra import config
Expand All @@ -21,13 +22,16 @@
SIMPLE_CSV_CONTENT = """code_insee,number
95211,102
36522,48"""
resource_id = "c4e3a9fb-4415-488e-ba57-d05269b27adf"
dataset_id = "601ddcfc85a59c3a45c2435a"


pytestmark = pytest.mark.asyncio
# allows nested async to test async with async :mindblown:
nest_asyncio.apply()


async def mock_download_resource(url, response):
async def mock_download_resource(url, session, headers):
tmp_file = tempfile.NamedTemporaryFile(delete=False)
tmp_file.write(SIMPLE_CSV_CONTENT.encode("utf-8"))
tmp_file.close()
Expand All @@ -45,6 +49,97 @@ async def test_catalog(setup_catalog, db):
assert resource["dataset_id"] == "601ddcfc85a59c3a45c2435a"


async def test_catalog_deleted(setup_catalog, db, rmock):
res = await db.fetch("SELECT id FROM catalog WHERE deleted = FALSE")
assert len(res) == 1
with open("tests/catalog.csv", "rb") as cfile:
catalog_content = cfile.readlines()
catalog = "https://example.com/catalog"
# feed empty catalog, should delete the previously loaded resource
rmock.get(catalog, status=200, body=catalog_content[0])
run("load_catalog", url=catalog)
res = await db.fetch("SELECT id FROM catalog WHERE deleted = TRUE")
assert len(res) == 1
res = await db.fetch("SELECT id FROM catalog")
assert len(res) == 1


async def test_catalog_deleted_with_checked_resource(setup_catalog, db, rmock, event_loop, mocker, analysis_mock):
mocker.patch("udata_hydra.config.WEBHOOK_ENABLED", False)

rurl = "https://example.com/resource-1"
rmock.get(rurl)
event_loop.run_until_complete(crawl(iterations=1))

res = await db.fetch("SELECT id FROM catalog WHERE deleted = FALSE and last_check IS NOT NULL")
assert len(res) == 1

with open("tests/catalog.csv", "rb") as cfile:
catalog_content = cfile.readlines()
catalog = "https://example.com/catalog"
# feed empty catalog, should delete the previously loaded resource
rmock.get(catalog, status=200, body=catalog_content[0])
run("load_catalog", url=catalog)
res = await db.fetch("SELECT id FROM catalog WHERE deleted = TRUE")
assert len(res) == 1
res = await db.fetch("SELECT id FROM catalog")
assert len(res) == 1


async def test_catalog_deleted_with_new_url(setup_catalog, db, rmock, event_loop, mocker, analysis_mock):
# load a new catalog with a new URL for this resource
with open("tests/catalog.csv", "r") as cfile:
catalog_content = cfile.readlines()
catalog_content[-1] = catalog_content[-1].replace("resource-1", "resource-2")
catalog_content = "\n".join(catalog_content)
catalog = "https://example.com/catalog"
rmock.get(catalog, status=200, body=catalog_content.encode("utf-8"))
run("load_catalog", url=catalog)

# check catalog coherence
res = await db.fetch("SELECT id FROM catalog WHERE deleted = TRUE")
assert len(res) == 1
res = await db.fetch("SELECT id FROM catalog WHERE resource_id = $1", resource_id)
assert len(res) == 2

# check that the crawler does not crawl the deleted resource
# check that udata is called only once
# udata is not called for analysis results since it's mocked, only for checks
udata_url = f"{config.UDATA_URI}/datasets/{dataset_id}/resources/{resource_id}/extras/"
mocker.patch("udata_hydra.config.UDATA_URI_API_KEY", "my-api-key")
rurl_1 = "https://example.com/resource-1"
rurl_2 = "https://example.com/resource-2"
rmock.get(rurl_1)
rmock.get(rurl_2)
rmock.put(udata_url, status=200)
event_loop.run_until_complete(crawl(iterations=1))
assert ("GET", URL(rurl_1)) not in rmock.requests
assert ("GET", URL(rurl_2)) in rmock.requests
assert len(rmock.requests[("PUT", URL(udata_url))]) == 1


async def test_udata_connection_error_500(setup_catalog, mocker, analysis_mock, rmock, event_loop):
udata_url = f"{config.UDATA_URI}/datasets/{dataset_id}/resources/{resource_id}/extras/"
mocker.patch("udata_hydra.config.UDATA_URI_API_KEY", "my-api-key")
rurl = "https://example.com/resource-1"
rmock.get(rurl)
rmock.put(udata_url, status=500)
event_loop.run_until_complete(crawl(iterations=1))
assert ("GET", URL(rurl)) in rmock.requests
assert ("PUT", URL(udata_url)) in rmock.requests


async def test_udata_connection_error_exception(setup_catalog, mocker, analysis_mock, rmock, event_loop):
udata_url = f"{config.UDATA_URI}/datasets/{dataset_id}/resources/{resource_id}/extras/"
mocker.patch("udata_hydra.config.UDATA_URI_API_KEY", "my-api-key")
rurl = "https://example.com/resource-1"
rmock.get(rurl)
rmock.put(udata_url, exception=ClientError("client error"))
event_loop.run_until_complete(crawl(iterations=1))
assert ("GET", URL(rurl)) in rmock.requests
assert ("PUT", URL(udata_url)) in rmock.requests


@pytest.mark.parametrize(
"resource",
[
Expand All @@ -57,7 +152,7 @@ async def test_catalog(setup_catalog, db):
(None, True, TimeoutError),
],
)
async def test_crawl(setup_catalog, rmock, event_loop, db, resource, mocker, produce_mock):
async def test_crawl(setup_catalog, rmock, event_loop, db, resource, produce_mock, analysis_mock):
status, timeout, exception = resource
rurl = "https://example.com/resource-1"
rmock.get(
Expand Down Expand Up @@ -157,8 +252,10 @@ async def test_process_resource(setup_catalog, mocker):
rurl = "https://example.com/resource-1"

mocker.patch("udata_hydra.datalake_service.download_resource", mock_download_resource)
# disable webhook, tested in following test
mocker.patch("udata_hydra.config.WEBHOOK_ENABLED", False)

result = await process_resource(rurl, "dataset_id", "resource_id", response=None)
result = await process_resource(rurl, "dataset_id", resource_id, None, {})

assert result["error"] is None
assert result["checksum"] == hashlib.sha1(SIMPLE_CSV_CONTENT.encode("utf-8")).hexdigest()
Expand All @@ -168,14 +265,13 @@ async def test_process_resource(setup_catalog, mocker):

async def test_process_resource_send_udata(setup_catalog, mocker, rmock):
rurl = "https://example.com/resource-1"
resource_id = "c4e3a9fb-4415-488e-ba57-d05269b27adf"
udata_url = f"{config.UDATA_URI}/datasets/dataset_id/resources/{resource_id}/extras/"

mocker.patch("udata_hydra.config.UDATA_URI_API_KEY", "my-api-key")
mocker.patch("udata_hydra.datalake_service.download_resource", mock_download_resource)
rmock.get(udata_url, status=200)
rmock.put(udata_url, status=200)

await process_resource(rurl, "dataset_id", resource_id, response=None)
await process_resource(rurl, "dataset_id", resource_id, None, {})

assert ("PUT", URL(udata_url)) in rmock.requests
req = rmock.requests[("PUT", URL(udata_url))]
Expand Down
23 changes: 23 additions & 0 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import pytest

from udata_hydra.utils.csv import find_delimiter, detect_encoding


def test_detect_encoding():
with pytest.raises(FileNotFoundError):
detect_encoding("not-a-real-file-hopefully.txt")
encoding = detect_encoding("tests/catalog.csv")
assert encoding is not None
encoding = detect_encoding("docs/screenshot.png")
assert encoding is None


def test_find_delimiter():
with pytest.raises(FileNotFoundError):
find_delimiter("not-a-real-file-hopefully.txt")
delimiter = find_delimiter("tests/catalog.csv")
assert delimiter == ";"
delimiter = find_delimiter("docs/screenshot.png")
assert delimiter is None
delimiter = find_delimiter("tests/catalog.csv", encoding="nimp")
assert delimiter is None
13 changes: 13 additions & 0 deletions udata_hydra/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from progressist import ProgressBar

from udata_hydra import config
from udata_hydra.crawl import check_url as crawl_check_url
from udata_hydra.logger import setup_logging


Expand Down Expand Up @@ -139,6 +140,18 @@ async def check_url(url, method="get"):
log.error(e)


@cli
async def check_resource(resource_id, method="get"):
"""Trigger a complete check for a given resource_id"""
q = "SELECT * FROM catalog WHERE resource_id = $1"
res = await context["conn"].fetch(q, resource_id)
if not res:
log.error("Resource not found in catalog")
return
async with aiohttp.ClientSession(timeout=None) as session:
await crawl_check_url(res[0], session, method=method)


@cli
async def csv_sample(size=1000, download=False, max_size="100M"):
"""Get a csv sample from latest checks
Expand Down
4 changes: 3 additions & 1 deletion udata_hydra/config.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import os

from str2bool import str2bool

from dotenv import load_dotenv

# This is a helper for development purpose which will load a
Expand Down Expand Up @@ -43,7 +45,7 @@
MAX_FILESIZE_ALLOWED = 104857600

# -- Webhook integration config -- #
ENABLE_WEBHOOK = True
WEBHOOK_ENABLED = bool(str2bool(os.getenv("WEBHOOK_ENABLED", "True")))
UDATA_URI = os.environ.get("UDATA_URI")
UDATA_URI_API_KEY = os.environ.get("UDATA_URI_API_KEY")

Expand Down
Loading