From caca9a08ef588f3e4c613f06751f1b571aaf9850 Mon Sep 17 00:00:00 2001 From: Michael Collado <40346148+collado-mike@users.noreply.github.com> Date: Wed, 5 Apr 2023 15:56:11 -0700 Subject: [PATCH] Made improvements to lineage query performance (#2472) Signed-off-by: Michael Collado --- api/src/main/java/marquez/db/LineageDao.java | 70 +++++++++++--------- 1 file changed, 39 insertions(+), 31 deletions(-) diff --git a/api/src/main/java/marquez/db/LineageDao.java b/api/src/main/java/marquez/db/LineageDao.java index 04c86b7701..f71f24e562 100644 --- a/api/src/main/java/marquez/db/LineageDao.java +++ b/api/src/main/java/marquez/db/LineageDao.java @@ -38,37 +38,45 @@ public interface LineageDao { */ @SqlQuery( """ - WITH RECURSIVE - job_io AS ( - SELECT COALESCE(j.symlink_target_uuid, j.uuid) AS job_uuid, - ARRAY_AGG(DISTINCT j.uuid) AS ids, - ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='INPUT') AS inputs, - ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='OUTPUT') AS outputs - FROM jobs j - LEFT JOIN jobs_view s On s.uuid=j.symlink_target_uuid - LEFT JOIN job_versions v on v.uuid=COALESCE(s.current_version_uuid, j.current_version_uuid) - LEFT JOIN job_versions_io_mapping io ON io.job_version_uuid=v.uuid - GROUP BY COALESCE(j.symlink_target_uuid, j.uuid) - ), - lineage(job_uuid, inputs, outputs) AS ( - SELECT COALESCE(j.symlink_target_uuid, j.uuid) AS job_uuid, - COALESCE(inputs, Array[]::uuid[]) AS inputs, - COALESCE(outputs, Array[]::uuid[]) AS outputs, - 0 AS depth - FROM jobs_view j - INNER JOIN job_io io ON j.uuid=ANY(io.ids) - WHERE io.ids && ARRAY[]::uuid[] - UNION - SELECT io.job_uuid, io.inputs, io.outputs, l.depth + 1 - FROM job_io io, - lineage l - WHERE io.job_uuid != l.job_uuid AND - array_cat(io.inputs, io.outputs) && array_cat(l.inputs, l.outputs) - AND depth < :depth) - SELECT DISTINCT ON (j.uuid) j.*, inputs AS input_uuids, outputs AS output_uuids, jc.context - FROM lineage l2 - INNER JOIN jobs_view j ON j.uuid=l2.job_uuid - LEFT JOIN job_contexts jc on jc.uuid = j.current_job_context_uuid; + WITH RECURSIVE + -- Find the current version of a job or its symlink target if the target has no + -- current_version_uuid. This ensures that we don't lose lineage for a job after it is + -- symlinked to another job but before that target job has run successfully. + job_current_version AS ( + SELECT COALESCE(j.symlink_target_uuid, j.uuid) AS job_uuid, + COALESCE(s.current_version_uuid, j.current_version_uuid) AS job_version_uuid + FROM jobs j + LEFT JOIN jobs s ON s.uuid=j.symlink_target_uuid + WHERE s.current_version_uuid IS NULL + ), + job_io AS ( + SELECT j.job_uuid, + ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='INPUT') AS inputs, + ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='OUTPUT') AS outputs + FROM job_versions_io_mapping io + INNER JOIN job_current_version j ON io.job_version_uuid=j.job_version_uuid + GROUP BY j.job_uuid + ), + lineage(job_uuid, inputs, outputs) AS ( + SELECT v.job_uuid AS job_uuid, + COALESCE(inputs, Array[]::uuid[]) AS inputs, + COALESCE(outputs, Array[]::uuid[]) AS outputs, + 0 AS depth + FROM jobs j + INNER JOIN job_current_version v ON (j.symlink_target_uuid IS NULL AND j.uuid=v.job_uuid) OR v.job_uuid=j.symlink_target_uuid + LEFT JOIN job_io io ON io.job_uuid=v.job_uuid + WHERE j.uuid IN () OR j.symlink_target_uuid IN () + UNION + SELECT io.job_uuid, io.inputs, io.outputs, l.depth + 1 + FROM job_io io, + lineage l + WHERE io.job_uuid != l.job_uuid AND + array_cat(io.inputs, io.outputs) && array_cat(l.inputs, l.outputs) + AND depth < :depth) + SELECT DISTINCT ON (j.uuid) j.*, inputs AS input_uuids, outputs AS output_uuids, jc.context + FROM lineage l2 + INNER JOIN jobs_view j ON j.uuid=l2.job_uuid + LEFT JOIN job_contexts jc on jc.uuid = j.current_job_context_uuid; """) Set getLineage(@BindList Set jobIds, int depth);