Skip to content

Commit

Permalink
fix(ingest): ensure upgrade checks run async (datahub-project#5383)
Browse files Browse the repository at this point in the history
  • Loading branch information
shirshanka authored and maggiehays committed Aug 1, 2022
1 parent f2411be commit 2b0ef2e
Show file tree
Hide file tree
Showing 7 changed files with 222 additions and 133 deletions.
7 changes: 4 additions & 3 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def get_long_description():
"types-Deprecated",
"humanfriendly",
"packaging",
"aiohttp<4",
}

kafka_common = {
Expand Down Expand Up @@ -266,7 +267,7 @@ def get_long_description():
"redshift": sql_common | redshift_common,
"redshift-usage": sql_common | usage_common | redshift_common,
"sagemaker": aws_common,
"salesforce":{"simple-salesforce"},
"salesforce": {"simple-salesforce"},
"snowflake": snowflake_common,
"snowflake-usage": snowflake_common
| usage_common
Expand Down Expand Up @@ -347,6 +348,7 @@ def get_long_description():
"bigquery-usage",
"clickhouse",
"clickhouse-usage",
"delta-lake",
"druid",
"elasticsearch",
"ldap",
Expand All @@ -363,7 +365,6 @@ def get_long_description():
"redshift",
"redshift-usage",
"data-lake",
"delta-lake",
"s3",
"tableau",
"trino",
Expand Down Expand Up @@ -432,14 +433,14 @@ def get_long_description():
"feast-legacy",
"hana",
"hive",
"kafka-connect",
"ldap",
"mongodb",
"mssql",
"mysql",
"mariadb",
"snowflake",
"redash",
"kafka-connect",
"vertica",
]
for dependency in plugins[plugin]
Expand Down
78 changes: 54 additions & 24 deletions metadata-ingestion/src/datahub/cli/ingest_cli.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import asyncio
import csv
import functools
import json
import logging
import os
Expand Down Expand Up @@ -82,7 +84,6 @@ def ingest() -> None:
help="Suppress display of variable values in logs by suppressing elaborate stacktrace (stackprinter) during ingestion failures",
)
@click.pass_context
@upgrade.check_upgrade
@telemetry.with_telemetry
@memory_leak_detector.with_leak_detection
def run(
Expand All @@ -96,6 +97,56 @@ def run(
) -> None:
"""Ingest metadata into DataHub."""

def run_pipeline_to_completion(pipeline: Pipeline) -> int:
logger.info("Starting metadata ingestion")
try:
pipeline.run()
except Exception as e:
logger.info(
f"Source ({pipeline.config.source.type}) report:\n{pipeline.source.get_report().as_string()}"
)
logger.info(
f"Sink ({pipeline.config.sink.type}) report:\n{pipeline.sink.get_report().as_string()}"
)
# We dont want to log sensitive information in variables if the pipeline fails due to
# an unexpected error. Disable printing sensitive info to logs if ingestion is running
# with `--suppress-error-logs` flag.
if suppress_error_logs:
raise SensitiveError() from e
else:
raise e
else:
logger.info("Finished metadata ingestion")
pipeline.log_ingestion_stats()
ret = pipeline.pretty_print_summary(warnings_as_failure=strict_warnings)
return ret

async def run_pipeline_async(pipeline: Pipeline) -> int:
loop = asyncio._get_running_loop()
return await loop.run_in_executor(
None, functools.partial(run_pipeline_to_completion, pipeline)
)

async def run_func_check_upgrade(pipeline: Pipeline) -> None:
version_stats_future = asyncio.ensure_future(
upgrade.retrieve_version_stats(pipeline.ctx.graph)
)
the_one_future = asyncio.ensure_future(run_pipeline_async(pipeline))
ret = await the_one_future

# the one future has returned
if ret == 0:
try:
# we check the other futures quickly on success
version_stats = await asyncio.wait_for(version_stats_future, 0.5)
upgrade.maybe_print_upgrade_message(version_stats=version_stats)
except Exception as e:
logger.debug(
f"timed out with {e} waiting for version stats to be computed... skipping ahead."
)

sys.exit(ret)

logger.info("DataHub CLI version: %s", datahub_package.nice_version_name())

config_file = pathlib.Path(config)
Expand All @@ -112,29 +163,8 @@ def run(
# in a SensitiveError to prevent detailed variable-level information from being logged.
raise SensitiveError() from e

logger.info("Starting metadata ingestion")
try:
pipeline.run()
except Exception as e:
logger.info(
f"Source ({pipeline.config.source.type}) report:\n{pipeline.source.get_report().as_string()}"
)
logger.info(
f"Sink ({pipeline.config.sink.type}) report:\n{pipeline.sink.get_report().as_string()}"
)
# We dont want to log sensitive information in variables if the pipeline fails due to
# an unexpected error. Disable printing sensitive info to logs if ingestion is running
# with `--suppress-error-logs` flag.
if suppress_error_logs:
raise SensitiveError() from e
else:
raise e
else:
logger.info("Finished metadata pipeline")
pipeline.log_ingestion_stats()
ret = pipeline.pretty_print_summary(warnings_as_failure=strict_warnings)
upgrade.maybe_print_upgrade_message(pipeline.ctx.graph)
sys.exit(ret)
loop = asyncio.get_event_loop()
loop.run_until_complete(run_func_check_upgrade(pipeline))


def get_runs_url(gms_host: str) -> str:
Expand Down
10 changes: 7 additions & 3 deletions metadata-ingestion/src/datahub/entrypoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ def main(**kwargs):
if isinstance(exc, (ConfigurationError, ValidationError)):
logger.error(exc)
else:
logger.error(
# only print stacktraces during debug
logger.debug(
stackprinter.format(
exc,
line_wrap=MAX_CONTENT_WIDTH,
Expand All @@ -184,11 +185,14 @@ def main(**kwargs):
**kwargs,
)
)
logger.error(
f"Command failed with {exc}. Run with --debug to get full trace"
)
logger.info(
f"DataHub CLI version: {datahub_package.__version__} at {datahub_package.__file__}"
)
logger.info(
logger.debug(
f"Python version: {sys.version} at {sys.executable} on {platform.platform()}"
)
logger.info(f"GMS config {get_gms_config()}")
logger.debug(f"GMS config {get_gms_config()}")
sys.exit(1)
2 changes: 2 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/run/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,10 @@ def run(self) -> None:
for record_envelope in self.transform(record_envelopes):
if not self.dry_run:
self.sink.write_record_async(record_envelope, callback)

except Exception as e:
logger.error(f"Failed to extract some records due to: {e}")

extractor.close()
if not self.dry_run:
self.sink.handle_work_unit_end(wu)
Expand Down
Loading

0 comments on commit 2b0ef2e

Please sign in to comment.