From db47b40028c4ddcc15c93c354d0789b03dd65dcc Mon Sep 17 00:00:00 2001 From: Michael Robinson Date: Mon, 20 Mar 2023 13:51:08 -0400 Subject: [PATCH 01/16] Prepare next development version 0.33.0-SNAPSHOT Signed-off-by: Michael Robinson --- clients/python/marquez_client/__init__.py | 2 +- clients/python/setup.cfg | 2 +- clients/python/setup.py | 2 +- gradle.properties | 2 +- spec/openapi.yml | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/clients/python/marquez_client/__init__.py b/clients/python/marquez_client/__init__.py index e37efdabec..1455f556ec 100644 --- a/clients/python/marquez_client/__init__.py +++ b/clients/python/marquez_client/__init__.py @@ -4,7 +4,7 @@ # -*- coding: utf-8 -*- __author__ = """Marquez Project""" -__version__ = "0.32.0" +__version__ = "0.33.0" from marquez_client.client import MarquezClient # noqa: F401 from marquez_client.clients import Clients # noqa: F401 diff --git a/clients/python/setup.cfg b/clients/python/setup.cfg index d9baaed7f0..54c722d179 100644 --- a/clients/python/setup.cfg +++ b/clients/python/setup.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 0.32.0 +current_version = 0.33.0 commit = False tag = False parse = (?P\d+)\.(?P\d+)\.(?P\d+)(?P.*) diff --git a/clients/python/setup.py b/clients/python/setup.py index 2219ecc477..dd333cd068 100644 --- a/clients/python/setup.py +++ b/clients/python/setup.py @@ -24,7 +24,7 @@ setup( name="marquez-python", - version="0.32.0", + version="0.33.0", description="Marquez Python Client", long_description=readme, long_description_content_type="text/markdown", diff --git a/gradle.properties b/gradle.properties index b68cc23842..1a1a51f494 100644 --- a/gradle.properties +++ b/gradle.properties @@ -7,4 +7,4 @@ org.gradle.jvmargs=--add-exports jdk.compiler/com.sun.tools.javac.api=ALL-UNNAME --add-exports jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED \ --add-exports jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED -version=0.32.0 +version=0.33.0-SNAPSHOT diff --git a/spec/openapi.yml b/spec/openapi.yml index 1c39859d91..ffb0255774 100644 --- a/spec/openapi.yml +++ b/spec/openapi.yml @@ -3,7 +3,7 @@ openapi: 3.0.2 info: title: Marquez - version: 0.32.0 + version: 0.33.0-SNAPSHOT description: Marquez is an open source **metadata service** for the **collection**, **aggregation**, and **visualization** of a data ecosystem's metadata. license: From ffc2ac753cb69fe4340e058d20143f5771846883 Mon Sep 17 00:00:00 2001 From: Michael Collado <40346148+collado-mike@users.noreply.github.com> Date: Tue, 28 Mar 2023 10:04:54 -0700 Subject: [PATCH 02/16] Fix job update SQL to correctly use simple_name for job updates (#2457) Signed-off-by: Michael Collado --- .../db/migration/R__1_Jobs_view_and_rewrite_function.sql | 2 +- api/src/test/java/marquez/OpenLineageIntegrationTest.java | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/api/src/main/resources/marquez/db/migration/R__1_Jobs_view_and_rewrite_function.sql b/api/src/main/resources/marquez/db/migration/R__1_Jobs_view_and_rewrite_function.sql index 9445681743..3c8c8e741f 100644 --- a/api/src/main/resources/marquez/db/migration/R__1_Jobs_view_and_rewrite_function.sql +++ b/api/src/main/resources/marquez/db/migration/R__1_Jobs_view_and_rewrite_function.sql @@ -62,7 +62,7 @@ BEGIN DO UPDATE SET updated_at = now(), parent_job_uuid = COALESCE(jobs.parent_job_uuid, EXCLUDED.parent_job_uuid), simple_name = CASE - WHEN EXCLUDED.parent_job_uuid IS NOT NULL THEN EXCLUDED.name + WHEN EXCLUDED.parent_job_uuid IS NOT NULL THEN EXCLUDED.simple_name ELSE jobs.name END, type = EXCLUDED.type, diff --git a/api/src/test/java/marquez/OpenLineageIntegrationTest.java b/api/src/test/java/marquez/OpenLineageIntegrationTest.java index 76da1a26a4..7861f342ee 100644 --- a/api/src/test/java/marquez/OpenLineageIntegrationTest.java +++ b/api/src/test/java/marquez/OpenLineageIntegrationTest.java @@ -545,7 +545,8 @@ public void testOpenLineageJobHierarchyAirflowAddParentForExistingJob() assertThat(job) .isNotNull() .hasFieldOrPropertyWithValue("id", new JobId(NAMESPACE_NAME, dagName + "." + task1Name)) - .hasFieldOrPropertyWithValue("parentJobName", dagName); + .hasFieldOrPropertyWithValue("parentJobName", dagName) + .hasFieldOrPropertyWithValue("simpleName", task1Name); Job parentJob = client.getJob(NAMESPACE_NAME, dagName); assertThat(parentJob) From 60581e37282c68a75b9f7bf1deb740d90eebedf0 Mon Sep 17 00:00:00 2001 From: wslulciuc Date: Tue, 28 Mar 2023 10:25:48 -0700 Subject: [PATCH 03/16] Fix for `0.32.0` release Signed-off-by: wslulciuc --- .circleci/api-load-test.sh | 2 +- chart/values.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.circleci/api-load-test.sh b/.circleci/api-load-test.sh index 69bf930d64..805c1aa457 100755 --- a/.circleci/api-load-test.sh +++ b/.circleci/api-load-test.sh @@ -14,7 +14,7 @@ set -e # Build version of Marquez -readonly MARQUEZ_VERSION="0.32.0-SNAPSHOT" +readonly MARQUEZ_VERSION="0.33.0-SNAPSHOT" # Fully qualified path to marquez.jar readonly MARQUEZ_JAR="api/build/libs/marquez-api-${MARQUEZ_VERSION}.jar" diff --git a/chart/values.yaml b/chart/values.yaml index 79d8394c44..4d871e7529 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -107,7 +107,7 @@ postgresql: ## @param image.tag PostgreSQL image tag (immutable tags are recommended) ## image: - tag: 0.32.0 + tag: 12.1.0 ## Authentication parameters ## ref: https://github.com/bitnami/bitnami-docker-postgresql/blob/master/README.md#setting-the-root-password-on-first-run ## ref: https://github.com/bitnami/bitnami-docker-postgresql/blob/master/README.md#creating-a-database-on-first-run From e7abc0ab1fa7d969137942296e892869566b8203 Mon Sep 17 00:00:00 2001 From: Michael Collado <40346148+collado-mike@users.noreply.github.com> Date: Thu, 30 Mar 2023 09:39:41 -0700 Subject: [PATCH 04/16] Update SQL in backfill script for facet tables to improve performance on large installations (#2461) Signed-off-by: Michael Collado --- .../db/migrations/V57_1__BackfillFacets.java | 176 +++++++----------- .../migrations/V57_1__BackfillFacetsTest.java | 38 ---- 2 files changed, 68 insertions(+), 146 deletions(-) diff --git a/api/src/main/java/marquez/db/migrations/V57_1__BackfillFacets.java b/api/src/main/java/marquez/db/migrations/V57_1__BackfillFacets.java index 52a40fe784..bc76e591fd 100644 --- a/api/src/main/java/marquez/db/migrations/V57_1__BackfillFacets.java +++ b/api/src/main/java/marquez/db/migrations/V57_1__BackfillFacets.java @@ -5,15 +5,12 @@ package marquez.db.migrations; -import java.time.Instant; -import java.util.Optional; -import java.util.UUID; import lombok.Setter; import lombok.extern.slf4j.Slf4j; -import marquez.db.Columns; import org.flywaydb.core.api.MigrationVersion; import org.flywaydb.core.api.migration.Context; import org.flywaydb.core.api.migration.JavaMigration; +import org.jdbi.v3.core.Handle; import org.jdbi.v3.core.Jdbi; @Slf4j @@ -23,72 +20,62 @@ public class V57_1__BackfillFacets implements JavaMigration { private static int BASIC_MIGRATION_LIMIT = 100000; - private static final String GET_CURRENT_LOCK_SQL = + private static final String CREATE_TEMP_EVENT_RUNS_TABLE = """ - SELECT * FROM facet_migration_lock - ORDER BY created_at ASC, run_uuid ASC - LIMIT 1 - """; + CREATE TEMP TABLE lineage_event_runs AS + SELECT DISTINCT ON (run_uuid) run_uuid, + COALESCE(created_at, event_time) AS created_at + FROM lineage_events + """; - private static final String GET_FINISHING_LOCK_SQL = + private static final String CREATE_INDEX_EVENT_RUNS_TABLE = """ - SELECT run_uuid, created_at FROM lineage_events - ORDER BY - COALESCE(created_at, event_time) ASC, - run_uuid ASC - LIMIT 1 - """; - - private static final String GET_INITIAL_LOCK_SQL = - """ - SELECT - run_uuid, - COALESCE(created_at, event_time, NOW()) + INTERVAL '1 MILLISECONDS' as created_at - FROM lineage_events ORDER BY COALESCE(created_at, event_time) DESC, run_uuid DESC LIMIT 1 - """; + CREATE INDEX ON lineage_event_runs (created_at DESC) INCLUDE (run_uuid) + """; private static final String COUNT_LINEAGE_EVENTS_SQL = """ - SELECT count(*) as cnt FROM lineage_events - """; + SELECT COUNT(*) FROM lineage_events; + """; - private static final String COUNT_LINEAGE_EVENTS_TO_PROCESS_SQL = + private static final String ESTIMATE_COUNT_LINEAGE_EVENTS_SQL = """ - SELECT count(*) as cnt FROM lineage_events e - WHERE - COALESCE(e.created_at, e.event_time) < :createdAt - OR (COALESCE(e.created_at, e.event_time) = :createdAt AND e.run_uuid < :runUuid) + SELECT reltuples AS cnt FROM pg_class WHERE relname = 'lineage_events'; """; private String getBackFillFacetsSQL() { return String.format( """ - WITH events_chunk AS ( - SELECT e.* FROM lineage_events e - WHERE - COALESCE(e.created_at, e.event_time) < :createdAt - OR (COALESCE(e.created_at, e.event_time) = :createdAt AND e.run_uuid < :runUuid) - ORDER BY COALESCE(e.created_at, e.event_time) DESC, e.run_uuid DESC - LIMIT :chunkSize - ), - insert_datasets AS ( - INSERT INTO dataset_facets %s - ), - insert_runs AS ( - INSERT INTO run_facets %s - ), - insert_jobs AS ( - INSERT INTO job_facets %s - ) - INSERT INTO facet_migration_lock - SELECT events_chunk.created_at, events_chunk.run_uuid - FROM events_chunk - ORDER BY - COALESCE(events_chunk.created_at, events_chunk.event_time) ASC, - events_chunk.run_uuid ASC - LIMIT 1 - RETURNING created_at, run_uuid; - """, + WITH queued_runs AS ( + SELECT created_at, run_uuid + FROM lineage_event_runs + ORDER BY created_at DESC, run_uuid + LIMIT :chunkSize + ), + processed_runs AS ( + DELETE FROM lineage_event_runs + USING queued_runs qe + WHERE lineage_event_runs.run_uuid=qe.run_uuid + RETURNING lineage_event_runs.run_uuid + ), + events_chunk AS ( + SELECT e.* + FROM lineage_events e + WHERE run_uuid IN (SELECT run_uuid FROM processed_runs) + ), + insert_datasets AS ( + INSERT INTO dataset_facets %s + ), + insert_runs AS ( + INSERT INTO run_facets %s + ), + insert_jobs AS ( + INSERT INTO job_facets %s + ) + INSERT INTO facet_migration_lock + SELECT events_chunk.created_at, events_chunk.run_uuid + FROM events_chunk + """, V56_1__FacetViews.getDatasetFacetsDefinitionSQL("events_chunk"), V56_1__FacetViews.getRunFacetsDefinitionSQL("events_chunk"), V56_1__FacetViews.getJobFacetsDefinitionSQL("events_chunk")); @@ -140,7 +127,9 @@ public void migrate(Context context) throws Exception { jdbi = Jdbi.create(context.getConnection()); } - if (getLock(GET_INITIAL_LOCK_SQL).isEmpty()) { + int estimatedEventsCount = estimateCountLineageEvents(); + log.info("Estimating {} events in lineage_events table", estimatedEventsCount); + if (estimatedEventsCount == 0 && countLineageEvents() == 0) { // lineage_events table is empty -> no need to run migration // anyway. we need to create lock to mark that no data requires migration execute("INSERT INTO facet_migration_lock VALUES (NOW(), null)"); @@ -148,9 +137,7 @@ public void migrate(Context context) throws Exception { createTargetViews(); return; } - Optional lastExpectedLock = getLock(GET_FINISHING_LOCK_SQL); - - if (!manual && countLineageEvents() >= BASIC_MIGRATION_LIMIT) { + if (!manual && estimatedEventsCount >= BASIC_MIGRATION_LIMIT) { log.warn( """ ================================================== @@ -168,14 +155,21 @@ public void migrate(Context context) throws Exception { return; } - log.info("Configured chunkSize is {}", getChunkSize()); - MigrationLock lock = getLock(GET_CURRENT_LOCK_SQL).orElse(getLock(GET_INITIAL_LOCK_SQL).get()); - while (!lock.equals(lastExpectedLock.get())) { - lock = backFillChunk(lock); - log.info( - "Migrating chunk finished. Still having {} records to migrate.", - countLineageEventsToProcess(lock)); - } + jdbi.withHandle( + h -> { + h.createUpdate(CREATE_TEMP_EVENT_RUNS_TABLE).execute(); + h.createUpdate(CREATE_INDEX_EVENT_RUNS_TABLE).execute(); + log.info("Configured chunkSize is {}", getChunkSize()); + boolean doMigration = true; + while (doMigration) { + int results = backFillChunk(h); + log.info("Migrating chunk finished processing {} records.", results); + if (results < 1) { + doMigration = false; + } + } + return null; + }); createTargetViews(); log.info("All records migrated"); @@ -195,51 +189,17 @@ private void execute(String sql) { jdbi.inTransaction(handle -> handle.execute(sql)); } - private MigrationLock backFillChunk(MigrationLock lock) { + private int backFillChunk(Handle h) { String backFillQuery = getBackFillFacetsSQL(); - return jdbi.withHandle( - h -> - h.createQuery(backFillQuery) - .bind("chunkSize", getChunkSize()) - .bind("createdAt", lock.created_at) - .bind("runUuid", lock.run_uuid) - .map( - rs -> - new MigrationLock( - rs.getColumn(Columns.RUN_UUID, UUID.class), - rs.getColumn(Columns.CREATED_AT, Instant.class))) - .one()); + return h.createUpdate(backFillQuery).bind("chunkSize", getChunkSize()).execute(); } - private Optional getLock(String sql) { + private int estimateCountLineageEvents() { return jdbi.withHandle( - h -> - h.createQuery(sql) - .map( - rs -> - new MigrationLock( - rs.getColumn(Columns.RUN_UUID, UUID.class), - rs.getColumn(Columns.CREATED_AT, Instant.class))) - .findFirst()); + h -> h.createQuery(ESTIMATE_COUNT_LINEAGE_EVENTS_SQL).mapTo(Integer.class).one()); } private int countLineageEvents() { - return jdbi.withHandle( - h -> - h.createQuery(COUNT_LINEAGE_EVENTS_SQL) - .map(rs -> rs.getColumn("cnt", Integer.class)) - .one()); + return jdbi.withHandle(h -> h.createQuery(COUNT_LINEAGE_EVENTS_SQL).mapTo(Integer.class).one()); } - - private int countLineageEventsToProcess(MigrationLock lock) { - return jdbi.withHandle( - h -> - h.createQuery(COUNT_LINEAGE_EVENTS_TO_PROCESS_SQL) - .bind("createdAt", lock.created_at) - .bind("runUuid", lock.run_uuid) - .map(rs -> rs.getColumn("cnt", Integer.class)) - .one()); - } - - private record MigrationLock(UUID run_uuid, Instant created_at) {} } diff --git a/api/src/test/java/marquez/db/migrations/V57_1__BackfillFacetsTest.java b/api/src/test/java/marquez/db/migrations/V57_1__BackfillFacetsTest.java index 0118f8ad36..b5f98d12e4 100644 --- a/api/src/test/java/marquez/db/migrations/V57_1__BackfillFacetsTest.java +++ b/api/src/test/java/marquez/db/migrations/V57_1__BackfillFacetsTest.java @@ -165,44 +165,6 @@ public void testMigrateForMultipleChunks() throws Exception { } } - @Test - public void testWhenCurrentLockIsAvailable() throws Exception { - FacetTestUtils.createLineageWithFacets(openLineageDao); - FacetTestUtils.createLineageWithFacets(openLineageDao); - lineageRow = - FacetTestUtils.createLineageWithFacets( - openLineageDao); // point migration_lock to only match the latest lineage event - - jdbi.withHandle( - h -> - h.execute( - """ - INSERT INTO facet_migration_lock - SELECT created_at, run_uuid FROM lineage_events - ORDER by created_at DESC LIMIT 1 - """)); // last lineage row should be skipped - - jdbi.withHandle( - h -> - h.execute( - """ - INSERT INTO facet_migration_lock - SELECT created_at, run_uuid FROM lineage_events - ORDER by created_at DESC LIMIT 1 OFFSET 1 - """)); // middle lineage row should be skipped - - try (MockedStatic jdbiMockedStatic = Mockito.mockStatic(Jdbi.class)) { - when(Jdbi.create(connection)).thenReturn(jdbi); - subject.setChunkSize(1); - - // clear migration lock and dataset_facets table - jdbi.inTransaction(handle -> handle.execute("DELETE FROM dataset_facets")); - subject.migrate(flywayContext); - - assertThat(countDatasetFacets(jdbi)).isEqualTo(15); - } - } - @Test public void testMigrateForLineageWithNoDatasets() throws Exception { LineageEvent.JobFacet jobFacet = From 3b0cde360cddd92f2987728df161ab8f914dac3f Mon Sep 17 00:00:00 2001 From: Michael Robinson <68482867+merobi-hub@users.noreply.github.com> Date: Fri, 31 Mar 2023 14:30:10 -0400 Subject: [PATCH 05/16] update committers list (#2465) Signed-off-by: Michael Robinson --- COMMITTERS.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/COMMITTERS.md b/COMMITTERS.md index b7b57bed6d..5f0c08eb0a 100644 --- a/COMMITTERS.md +++ b/COMMITTERS.md @@ -22,6 +22,9 @@ They take responsibility for guiding new pull requests into the main branch. | Michael Collado | [@collado-mike](https://github.com/collado-mike) | | Kevin Mellott | [@KevinMellott91](https://github.com/KevinMellott91) | | Michael Robinson | [@merobi-hub](https://github.com/merobi-hub) | +| Ross Turk | [@rossturk](https://github.com/rossturk) | +| Minkyu Park | [@fm100](https://github.com/fm100) | +| Pawel Leszczynski | [@pawel-big-lebowski](https://github.com/pawel-big-lebowski) | ## Emeritus From da3863c4e3c06fbf56cada08d918b9f8a93c60d1 Mon Sep 17 00:00:00 2001 From: Michael Collado <40346148+collado-mike@users.noreply.github.com> Date: Fri, 31 Mar 2023 14:28:52 -0700 Subject: [PATCH 06/16] Update v61 migration to handle duplicate job names before unique constraint (#2464) Signed-off-by: Michael Collado --- .../migration/V61__unique_job_fqn_index.sql | 43 +++++++++++++++++-- 1 file changed, 39 insertions(+), 4 deletions(-) diff --git a/api/src/main/resources/marquez/db/migration/V61__unique_job_fqn_index.sql b/api/src/main/resources/marquez/db/migration/V61__unique_job_fqn_index.sql index e8c155a298..317db55e73 100644 --- a/api/src/main/resources/marquez/db/migration/V61__unique_job_fqn_index.sql +++ b/api/src/main/resources/marquez/db/migration/V61__unique_job_fqn_index.sql @@ -1,10 +1,45 @@ +WITH fqn AS ( + SELECT f.uuid, + f.job_fqn AS name, + f.namespace_name, + j.name AS simple_name, + j.parent_job_uuid, + f.parent_job_name::text, + j.symlink_target_uuid + FROM jobs_fqn f, + jobs j + WHERE j.uuid = f.uuid +) UPDATE jobs SET symlink_target_uuid=q.target_uuid FROM ( SELECT j.uuid, j.namespace_name, j.name, j.simple_name, jv.uuid AS target_uuid, jv.simple_name - FROM jobs_view j INNER JOIN jobs_view jv ON j.namespace_name=jv.namespace_name AND j.name=jv.name AND j.simple_name != jv.simple_name + FROM fqn j + INNER JOIN fqn jv ON j.namespace_name=jv.namespace_name AND j.name=jv.name AND j.simple_name != jv.simple_name WHERE j.symlink_target_uuid IS NULL - AND jv.symlink_target_uuid IS NULL - AND j.parent_job_uuid IS NULL) q + AND jv.symlink_target_uuid IS NULL + AND j.parent_job_uuid IS NULL) q +WHERE jobs.uuid=q.uuid; + +WITH fqn AS ( + SELECT f.uuid, + f.job_fqn AS name, + f.namespace_name, + j.name AS simple_name, + j.parent_job_uuid, + f.parent_job_name::text, + j.symlink_target_uuid, + j.created_at + FROM jobs_fqn f, + jobs j + WHERE j.uuid = f.uuid +) +UPDATE jobs SET name=(q.simple_name || '_' || q.row) +FROM ( + SELECT j.uuid, j.namespace_name, j.name, j.simple_name, jv.uuid AS target_uuid, + row_number() over (PARTITION BY j.namespace_name, j.name ORDER BY j.created_at) AS row + FROM fqn j + INNER JOIN fqn jv ON j.namespace_name=jv.namespace_name AND j.name=jv.name AND j.symlink_target_uuid=jv.uuid +) q WHERE jobs.uuid=q.uuid; ALTER TABLE jobs RENAME COLUMN name TO simple_name; @@ -30,4 +65,4 @@ WHERE jobs.uuid=f.uuid; ALTER TABLE jobs ALTER COLUMN name SET NOT NULL; ALTER TABLE jobs DROP CONSTRAINT unique_jobs_namespace_uuid_name_parent; -ALTER TABLE jobs ADD CONSTRAINT unique_jobs_namespace_uuid_name_parent UNIQUE (namespace_uuid, name); \ No newline at end of file +ALTER TABLE jobs ADD CONSTRAINT unique_jobs_namespace_uuid_name_parent UNIQUE (namespace_uuid, name); From 289fa3eef967c8f7915b074325bb6f8f55480030 Mon Sep 17 00:00:00 2001 From: Michael Robinson <68482867+merobi-hub@users.noreply.github.com> Date: Mon, 3 Apr 2023 05:19:55 -0400 Subject: [PATCH 07/16] add missing special characters to committer name (#2466) Signed-off-by: Michael Robinson --- COMMITTERS.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/COMMITTERS.md b/COMMITTERS.md index 5f0c08eb0a..35b38266a9 100644 --- a/COMMITTERS.md +++ b/COMMITTERS.md @@ -24,7 +24,7 @@ They take responsibility for guiding new pull requests into the main branch. | Michael Robinson | [@merobi-hub](https://github.com/merobi-hub) | | Ross Turk | [@rossturk](https://github.com/rossturk) | | Minkyu Park | [@fm100](https://github.com/fm100) | -| Pawel Leszczynski | [@pawel-big-lebowski](https://github.com/pawel-big-lebowski) | +| Paweł Leszczyński | [@pawel-big-lebowski](https://github.com/pawel-big-lebowski) | ## Emeritus From caca9a08ef588f3e4c613f06751f1b571aaf9850 Mon Sep 17 00:00:00 2001 From: Michael Collado <40346148+collado-mike@users.noreply.github.com> Date: Wed, 5 Apr 2023 15:56:11 -0700 Subject: [PATCH 08/16] Made improvements to lineage query performance (#2472) Signed-off-by: Michael Collado --- api/src/main/java/marquez/db/LineageDao.java | 70 +++++++++++--------- 1 file changed, 39 insertions(+), 31 deletions(-) diff --git a/api/src/main/java/marquez/db/LineageDao.java b/api/src/main/java/marquez/db/LineageDao.java index 04c86b7701..f71f24e562 100644 --- a/api/src/main/java/marquez/db/LineageDao.java +++ b/api/src/main/java/marquez/db/LineageDao.java @@ -38,37 +38,45 @@ public interface LineageDao { */ @SqlQuery( """ - WITH RECURSIVE - job_io AS ( - SELECT COALESCE(j.symlink_target_uuid, j.uuid) AS job_uuid, - ARRAY_AGG(DISTINCT j.uuid) AS ids, - ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='INPUT') AS inputs, - ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='OUTPUT') AS outputs - FROM jobs j - LEFT JOIN jobs_view s On s.uuid=j.symlink_target_uuid - LEFT JOIN job_versions v on v.uuid=COALESCE(s.current_version_uuid, j.current_version_uuid) - LEFT JOIN job_versions_io_mapping io ON io.job_version_uuid=v.uuid - GROUP BY COALESCE(j.symlink_target_uuid, j.uuid) - ), - lineage(job_uuid, inputs, outputs) AS ( - SELECT COALESCE(j.symlink_target_uuid, j.uuid) AS job_uuid, - COALESCE(inputs, Array[]::uuid[]) AS inputs, - COALESCE(outputs, Array[]::uuid[]) AS outputs, - 0 AS depth - FROM jobs_view j - INNER JOIN job_io io ON j.uuid=ANY(io.ids) - WHERE io.ids && ARRAY[]::uuid[] - UNION - SELECT io.job_uuid, io.inputs, io.outputs, l.depth + 1 - FROM job_io io, - lineage l - WHERE io.job_uuid != l.job_uuid AND - array_cat(io.inputs, io.outputs) && array_cat(l.inputs, l.outputs) - AND depth < :depth) - SELECT DISTINCT ON (j.uuid) j.*, inputs AS input_uuids, outputs AS output_uuids, jc.context - FROM lineage l2 - INNER JOIN jobs_view j ON j.uuid=l2.job_uuid - LEFT JOIN job_contexts jc on jc.uuid = j.current_job_context_uuid; + WITH RECURSIVE + -- Find the current version of a job or its symlink target if the target has no + -- current_version_uuid. This ensures that we don't lose lineage for a job after it is + -- symlinked to another job but before that target job has run successfully. + job_current_version AS ( + SELECT COALESCE(j.symlink_target_uuid, j.uuid) AS job_uuid, + COALESCE(s.current_version_uuid, j.current_version_uuid) AS job_version_uuid + FROM jobs j + LEFT JOIN jobs s ON s.uuid=j.symlink_target_uuid + WHERE s.current_version_uuid IS NULL + ), + job_io AS ( + SELECT j.job_uuid, + ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='INPUT') AS inputs, + ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='OUTPUT') AS outputs + FROM job_versions_io_mapping io + INNER JOIN job_current_version j ON io.job_version_uuid=j.job_version_uuid + GROUP BY j.job_uuid + ), + lineage(job_uuid, inputs, outputs) AS ( + SELECT v.job_uuid AS job_uuid, + COALESCE(inputs, Array[]::uuid[]) AS inputs, + COALESCE(outputs, Array[]::uuid[]) AS outputs, + 0 AS depth + FROM jobs j + INNER JOIN job_current_version v ON (j.symlink_target_uuid IS NULL AND j.uuid=v.job_uuid) OR v.job_uuid=j.symlink_target_uuid + LEFT JOIN job_io io ON io.job_uuid=v.job_uuid + WHERE j.uuid IN () OR j.symlink_target_uuid IN () + UNION + SELECT io.job_uuid, io.inputs, io.outputs, l.depth + 1 + FROM job_io io, + lineage l + WHERE io.job_uuid != l.job_uuid AND + array_cat(io.inputs, io.outputs) && array_cat(l.inputs, l.outputs) + AND depth < :depth) + SELECT DISTINCT ON (j.uuid) j.*, inputs AS input_uuids, outputs AS output_uuids, jc.context + FROM lineage l2 + INNER JOIN jobs_view j ON j.uuid=l2.job_uuid + LEFT JOIN job_contexts jc on jc.uuid = j.current_job_context_uuid; """) Set getLineage(@BindList Set jobIds, int depth); From 4bf01b565679cc16705564ce6f35f4786f449201 Mon Sep 17 00:00:00 2001 From: Vlad S Date: Thu, 6 Apr 2023 23:02:09 +0200 Subject: [PATCH 09/16] Change color for selected node and edges (#2458) Signed-off-by: tito12 --- web/src/components/lineage/components/edge/Edge.tsx | 4 ++-- web/src/components/lineage/components/node/Node.tsx | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/web/src/components/lineage/components/edge/Edge.tsx b/web/src/components/lineage/components/edge/Edge.tsx index cb5e3b3114..983e0e2992 100644 --- a/web/src/components/lineage/components/edge/Edge.tsx +++ b/web/src/components/lineage/components/edge/Edge.tsx @@ -52,7 +52,7 @@ class Edge extends React.Component { data={edge.points} x={(d, index) => (index === 0 ? d.x + 20 : d.x - 25)} y={d => d.y} - stroke={edge.isSelected ? theme.palette.common.white : theme.palette.secondary.main} + stroke={edge.isSelected ? theme.palette.primary.main : theme.palette.secondary.main} strokeWidth={1} opacity={1} shapeRendering='geometricPrecision' @@ -66,7 +66,7 @@ class Edge extends React.Component { y={edge.y - ICON_SIZE / 2} width={ICON_SIZE} height={ICON_SIZE} - color={edge.isSelected ? theme.palette.common.white : theme.palette.secondary.main} + color={edge.isSelected ? theme.palette.primary.main : theme.palette.secondary.main} /> ))} diff --git a/web/src/components/lineage/components/node/Node.tsx b/web/src/components/lineage/components/node/Node.tsx index f76c3b494e..3c4c85a401 100644 --- a/web/src/components/lineage/components/node/Node.tsx +++ b/web/src/components/lineage/components/node/Node.tsx @@ -62,7 +62,7 @@ class Node extends React.Component { style={{ cursor: 'pointer' }} r={RADIUS} fill={isSelected ? theme.palette.secondary.main : theme.palette.common.white} - stroke={isSelected ? theme.palette.common.white : theme.palette.secondary.main} + stroke={isSelected ? theme.palette.primary.main : theme.palette.secondary.main} strokeWidth={BORDER / 2} cx={node.x} cy={node.y} @@ -76,7 +76,7 @@ class Node extends React.Component { height={ICON_SIZE} x={node.x - ICON_SIZE / 2} y={node.y - ICON_SIZE / 2} - color={isSelected ? theme.palette.common.white : theme.palette.secondary.main} + color={isSelected ? theme.palette.primary.main : theme.palette.secondary.main} /> ) : ( @@ -86,7 +86,7 @@ class Node extends React.Component { x={node.x - RADIUS} y={node.y - RADIUS} fill={isSelected ? theme.palette.secondary.main : theme.palette.common.white} - stroke={isSelected ? theme.palette.common.white : theme.palette.secondary.main} + stroke={isSelected ? theme.palette.primary.main: theme.palette.secondary.main} strokeWidth={BORDER / 2} width={RADIUS * 2} height={RADIUS * 2} @@ -109,7 +109,7 @@ class Node extends React.Component { height={ICON_SIZE} x={node.x - ICON_SIZE / 2} y={node.y - ICON_SIZE / 2} - color={isSelected ? theme.palette.common.white : theme.palette.secondary.main} + color={isSelected ? theme.palette.primary.main : theme.palette.secondary.main} /> )} From 2a416757176d553e3c872af3d5091d5e09889218 Mon Sep 17 00:00:00 2001 From: "pawel.leszczynski" Date: Tue, 11 Apr 2023 11:56:59 +0200 Subject: [PATCH 10/16] implement inputFacets & outputFacets (#2417) Signed-off-by: Pawel Leszczynski --- CHANGELOG.md | 6 + .../common/models/InputDatasetVersion.java | 34 +++++ .../common/models/OutputDatasetVersion.java | 34 +++++ api/src/main/java/marquez/db/Columns.java | 1 + api/src/main/java/marquez/db/DatasetDao.java | 4 +- .../java/marquez/db/DatasetFacetsDao.java | 52 ++++++++ .../java/marquez/db/DatasetVersionDao.java | 49 ++++--- .../main/java/marquez/db/OpenLineageDao.java | 24 ++++ api/src/main/java/marquez/db/RunDao.java | 95 ++++++++----- .../java/marquez/db/mappers/RunMapper.java | 126 ++++++++++++++++-- .../marquez/service/models/LineageEvent.java | 60 +++++++++ .../main/java/marquez/service/models/Run.java | 28 ++-- .../java/marquez/FlowIntegrationTest.java | 30 +++-- .../java/marquez/db/DatasetFacetsDaoTest.java | 66 +++++++++ .../test/java/marquez/db/LineageDaoTest.java | 2 +- .../java/marquez/db/OpenLineageDaoTest.java | 70 ++++++++++ api/src/test/java/marquez/db/RunDaoTest.java | 11 +- .../marquez/service/LineageServiceTest.java | 17 ++- .../client/models/InputDatasetVersion.java | 26 ++++ .../client/models/OutputDatasetVersion.java | 26 ++++ .../main/java/marquez/client/models/Run.java | 12 +- .../marquez/client/MarquezClientTest.java | 35 ++++- .../marquez/client/models/JsonGenerator.java | 5 + .../marquez/client/models/ModelGenerator.java | 26 +++- spec/openapi.yml | 42 +++++- 25 files changed, 781 insertions(+), 100 deletions(-) create mode 100644 api/src/main/java/marquez/common/models/InputDatasetVersion.java create mode 100644 api/src/main/java/marquez/common/models/OutputDatasetVersion.java create mode 100644 clients/java/src/main/java/marquez/client/models/InputDatasetVersion.java create mode 100644 clients/java/src/main/java/marquez/client/models/OutputDatasetVersion.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 79276c4ec1..a52f96e6d7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,12 @@ ## [Unreleased](https://github.com/MarquezProject/marquez/compare/0.32.0...HEAD) +### Added + +* Support `inputFacets` and `outputFacets` from Openlineage specificatio [`#2417`](https://github.com/MarquezProject/marquez/pull/2417) [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski) + *Adds the ability to store `inputFacets` / `outputFacets` which are sent within datasets.* + *Expose them through Marquez API as a member of `Run` resource.* + ## [0.32.0](https://github.com/MarquezProject/marquez/compare/0.31.0...0.32.0) - 2023-03-20 ### Fixed diff --git a/api/src/main/java/marquez/common/models/InputDatasetVersion.java b/api/src/main/java/marquez/common/models/InputDatasetVersion.java new file mode 100644 index 0000000000..f054db44c3 --- /dev/null +++ b/api/src/main/java/marquez/common/models/InputDatasetVersion.java @@ -0,0 +1,34 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.common.models; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableMap; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NonNull; +import lombok.ToString; + +/** + * Class used to store dataset version and `inputFacets` which are assigned to datasets within + * OpenLineage spec, but are exposed within Marquez api as a part of {@link + * marquez.service.models.Run} + */ +@EqualsAndHashCode +@ToString +@Getter +public class InputDatasetVersion { + + private final DatasetVersionId datasetVersionId; + private final ImmutableMap facets; + + public InputDatasetVersion( + @JsonProperty("datasetVersionId") @NonNull DatasetVersionId datasetVersionId, + @JsonProperty("facets") @NonNull ImmutableMap facets) { + this.datasetVersionId = datasetVersionId; + this.facets = facets; + } +} diff --git a/api/src/main/java/marquez/common/models/OutputDatasetVersion.java b/api/src/main/java/marquez/common/models/OutputDatasetVersion.java new file mode 100644 index 0000000000..a31a004dff --- /dev/null +++ b/api/src/main/java/marquez/common/models/OutputDatasetVersion.java @@ -0,0 +1,34 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.common.models; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableMap; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NonNull; +import lombok.ToString; + +/** + * Class used to store dataset version and `outputFacets` which are assigned to datasets within + * OpenLineage spec, but are exposed within Marquez api as a part of {@link + * marquez.service.models.Run} + */ +@EqualsAndHashCode +@ToString +@Getter +public class OutputDatasetVersion { + + private final DatasetVersionId datasetVersionId; + private final ImmutableMap facets; + + public OutputDatasetVersion( + @JsonProperty("datasetVersionId") @NonNull DatasetVersionId datasetVersionId, + @JsonProperty("facets") @NonNull ImmutableMap facets) { + this.datasetVersionId = datasetVersionId; + this.facets = facets; + } +} diff --git a/api/src/main/java/marquez/db/Columns.java b/api/src/main/java/marquez/db/Columns.java index fca22cff50..8abc5ab162 100644 --- a/api/src/main/java/marquez/db/Columns.java +++ b/api/src/main/java/marquez/db/Columns.java @@ -55,6 +55,7 @@ private Columns() {} public static final String NAMESPACE_NAME = "namespace_name"; public static final String DATASET_NAME = "dataset_name"; public static final String FACETS = "facets"; + public static final String DATASET_FACETS = "dataset_facets"; public static final String TAGS = "tags"; public static final String IS_HIDDEN = "is_hidden"; diff --git a/api/src/main/java/marquez/db/DatasetDao.java b/api/src/main/java/marquez/db/DatasetDao.java index 64da665b3f..04919cd8fd 100644 --- a/api/src/main/java/marquez/db/DatasetDao.java +++ b/api/src/main/java/marquez/db/DatasetDao.java @@ -86,7 +86,7 @@ LEFT JOIN ( df.dataset_version_uuid, JSONB_AGG(df.facet ORDER BY df.lineage_event_time ASC) AS facets FROM dataset_facets_view AS df - WHERE df.facet IS NOT NULL + WHERE df.facet IS NOT NULL AND (df.type ILIKE 'dataset' OR df.type ILIKE 'unknown') GROUP BY df.dataset_version_uuid ) f ON f.dataset_version_uuid = d.current_version_uuid WHERE CAST((:namespaceName, :datasetName) AS DATASET_NAME) = ANY(d.dataset_symlinks) @@ -134,7 +134,7 @@ LEFT JOIN ( df.dataset_version_uuid, JSONB_AGG(df.facet ORDER BY df.lineage_event_time ASC) AS facets FROM dataset_facets_view AS df - WHERE df.facet IS NOT NULL + WHERE df.facet IS NOT NULL AND (df.type ILIKE 'dataset' OR df.type ILIKE 'unknown') GROUP BY df.dataset_version_uuid ) f ON f.dataset_version_uuid = d.current_version_uuid WHERE d.namespace_name = :namespaceName diff --git a/api/src/main/java/marquez/db/DatasetFacetsDao.java b/api/src/main/java/marquez/db/DatasetFacetsDao.java index 28361aaa70..679a9bfaa3 100644 --- a/api/src/main/java/marquez/db/DatasetFacetsDao.java +++ b/api/src/main/java/marquez/db/DatasetFacetsDao.java @@ -149,6 +149,58 @@ default void insertDatasetFacetsFor( FacetUtils.toPgObject(fieldName, jsonNode.get(fieldName)))); } + default void insertInputDatasetFacetsFor( + @NonNull UUID datasetUuid, + @NonNull UUID datasetVersionUuid, + @NonNull UUID runUuid, + @NonNull Instant lineageEventTime, + @NonNull String lineageEventType, + @NonNull LineageEvent.InputDatasetFacets inputFacets) { + final Instant now = Instant.now(); + + JsonNode jsonNode = Utils.getMapper().valueToTree(inputFacets); + StreamSupport.stream( + Spliterators.spliteratorUnknownSize(jsonNode.fieldNames(), Spliterator.DISTINCT), false) + .forEach( + fieldName -> + insertDatasetFacet( + now, + datasetUuid, + datasetVersionUuid, + runUuid, + lineageEventTime, + lineageEventType, + Type.INPUT, + fieldName, + FacetUtils.toPgObject(fieldName, jsonNode.get(fieldName)))); + } + + default void insertOutputDatasetFacetsFor( + @NonNull UUID datasetUuid, + @NonNull UUID datasetVersionUuid, + @NonNull UUID runUuid, + @NonNull Instant lineageEventTime, + @NonNull String lineageEventType, + @NonNull LineageEvent.OutputDatasetFacets outputFacets) { + final Instant now = Instant.now(); + + JsonNode jsonNode = Utils.getMapper().valueToTree(outputFacets); + StreamSupport.stream( + Spliterators.spliteratorUnknownSize(jsonNode.fieldNames(), Spliterator.DISTINCT), false) + .forEach( + fieldName -> + insertDatasetFacet( + now, + datasetUuid, + datasetVersionUuid, + runUuid, + lineageEventTime, + lineageEventType, + Type.OUTPUT, + fieldName, + FacetUtils.toPgObject(fieldName, jsonNode.get(fieldName)))); + } + record DatasetFacetRow( Instant createdAt, UUID datasetUuid, diff --git a/api/src/main/java/marquez/db/DatasetVersionDao.java b/api/src/main/java/marquez/db/DatasetVersionDao.java index f08b2f3903..7b3b544a32 100644 --- a/api/src/main/java/marquez/db/DatasetVersionDao.java +++ b/api/src/main/java/marquez/db/DatasetVersionDao.java @@ -156,10 +156,19 @@ default void updateDatasetVersionMetric( @SqlQuery( """ + WITH selected_dataset_versions AS ( + SELECT dv.* + FROM dataset_versions dv + WHERE dv.version = :version + ), selected_dataset_version_facets AS ( + SELECT dv.uuid, dv.dataset_name, dv.namespace_name, df.run_uuid, df.lineage_event_time, df.facet + FROM selected_dataset_versions dv + LEFT JOIN dataset_facets_view df ON df.dataset_version_uuid = dv.uuid + ) SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.lifecycle_state,\s dv.created_at, dv.version, dv.fields, dv.run_uuid AS createdByRunUuid, sv.schema_location, t.tags, f.facets - FROM dataset_versions dv + FROM selected_dataset_versions dv LEFT JOIN datasets_view d ON d.uuid = dv.dataset_uuid LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid LEFT JOIN ( @@ -169,21 +178,28 @@ SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid GROUP BY m.dataset_uuid ) t ON t.dataset_uuid = dv.dataset_uuid LEFT JOIN ( - SELECT dvf.dataset_version_uuid, - JSONB_AGG(dvf.facet ORDER BY dvf.lineage_event_time ASC) AS facets - FROM dataset_facets_view dvf - GROUP BY dataset_version_uuid - ) f ON f.dataset_version_uuid = dv.uuid - WHERE dv.version = :version - """) + SELECT dvf.uuid AS dataset_uuid, JSONB_AGG(dvf.facet ORDER BY dvf.lineage_event_time ASC) AS facets + FROM selected_dataset_version_facets dvf + WHERE dvf.run_uuid = dvf.run_uuid + GROUP BY dvf.uuid + ) f ON f.dataset_uuid = dv.uuid""") Optional findBy(UUID version); @SqlQuery( """ + WITH selected_dataset_versions AS ( + SELECT dv.* + FROM dataset_versions dv + WHERE dv.uuid = :uuid + ), selected_dataset_version_facets AS ( + SELECT dv.uuid, dv.dataset_name, dv.namespace_name, df.run_uuid, df.lineage_event_time, df.facet + FROM selected_dataset_versions dv + LEFT JOIN dataset_facets_view df ON df.dataset_version_uuid = dv.uuid AND (df.type ILIKE 'dataset' OR df.type ILIKE 'unknown') + ) SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.lifecycle_state,\s dv.created_at, dv.version, dv.fields, dv.run_uuid AS createdByRunUuid, sv.schema_location, t.tags, f.facets - FROM dataset_versions dv + FROM selected_dataset_versions dv LEFT JOIN datasets_view d ON d.uuid = dv.dataset_uuid LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid LEFT JOIN ( @@ -192,14 +208,12 @@ SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid INNER JOIN datasets_tag_mapping AS m ON m.tag_uuid = t.uuid GROUP BY m.dataset_uuid ) t ON t.dataset_uuid = dv.dataset_uuid - LEFT JOIN ( - SELECT dvf.dataset_version_uuid, - JSONB_AGG(dvf.facet ORDER BY dvf.lineage_event_time ASC) AS facets - FROM dataset_facets_view dvf - GROUP BY dataset_version_uuid - ) f ON f.dataset_version_uuid = dv.uuid - WHERE dv.uuid = :uuid - """) + LEFT JOIN ( + SELECT dvf.uuid AS dataset_uuid, JSONB_AGG(dvf.facet ORDER BY dvf.lineage_event_time ASC) AS facets + FROM selected_dataset_version_facets dvf + WHERE dvf.run_uuid = dvf.run_uuid + GROUP BY dvf.uuid + ) f ON f.dataset_uuid = dv.uuid""") Optional findByUuid(UUID uuid); default Optional findByWithRun(UUID version) { @@ -246,6 +260,7 @@ LEFT JOIN ( SELECT dvf.dataset_version_uuid, JSONB_AGG(dvf.facet ORDER BY dvf.lineage_event_time ASC) AS facets FROM dataset_facets_view dvf + WHERE (type ILIKE 'dataset' OR type ILIKE 'unknown') GROUP BY dataset_version_uuid ) f ON f.dataset_version_uuid = dv.uuid WHERE dv.namespace_name = :namespaceName diff --git a/api/src/main/java/marquez/db/OpenLineageDao.java b/api/src/main/java/marquez/db/OpenLineageDao.java index 7be42d0dca..5f1b30c545 100644 --- a/api/src/main/java/marquez/db/OpenLineageDao.java +++ b/api/src/main/java/marquez/db/OpenLineageDao.java @@ -279,6 +279,18 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper now, event.getEventType(), facets)); + + // InputFacets ... + Optional.ofNullable(dataset.getInputFacets()) + .ifPresent( + facets -> + datasetFacetsDao.insertInputDatasetFacetsFor( + record.getDatasetRow().getUuid(), + record.getDatasetVersionRow().getUuid(), + runUuid, + now, + event.getEventType(), + facets)); } } bag.setInputs(Optional.ofNullable(datasetInputs)); @@ -314,6 +326,18 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper now, event.getEventType(), facets)); + + // OutputFacets ... + Optional.ofNullable(dataset.getOutputFacets()) + .ifPresent( + facets -> + datasetFacetsDao.insertOutputDatasetFacetsFor( + record.getDatasetRow().getUuid(), + record.getDatasetVersionRow().getUuid(), + runUuid, + now, + event.getEventType(), + facets)); } } diff --git a/api/src/main/java/marquez/db/RunDao.java b/api/src/main/java/marquez/db/RunDao.java index 0437b9605a..57a708bd64 100644 --- a/api/src/main/java/marquez/db/RunDao.java +++ b/api/src/main/java/marquez/db/RunDao.java @@ -74,33 +74,51 @@ public interface RunDao extends BaseDao { void updateEndState(UUID rowUuid, Instant transitionedAt, UUID endRunStateUuid); String BASE_FIND_RUN_SQL = - "SELECT r.*, ra.args, f.facets,\n" - + "jv.version AS job_version,\n" - + "ri.input_versions, ro.output_versions\n" - + "FROM runs_view AS r\n" - + "LEFT OUTER JOIN\n" - + "(\n" - + " SELECT rf.run_uuid, JSON_AGG(rf.facet ORDER BY rf.lineage_event_time ASC) AS facets\n" - + " FROM run_facets_view rf\n" - + " GROUP BY rf.run_uuid\n" - + ") AS f ON r.uuid=f.run_uuid\n" - + "LEFT OUTER JOIN run_args AS ra ON ra.uuid = r.run_args_uuid\n" - + "LEFT OUTER JOIN job_versions jv ON jv.uuid=r.job_version_uuid\n" - + "LEFT OUTER JOIN (\n" - + " SELECT im.run_uuid, JSON_AGG(json_build_object('namespace', dv.namespace_name,\n" - + " 'name', dv.dataset_name,\n" - + " 'version', dv.version)) AS input_versions\n" - + " FROM runs_input_mapping im\n" - + " INNER JOIN dataset_versions dv on im.dataset_version_uuid = dv.uuid\n" - + " GROUP BY im.run_uuid\n" - + ") ri ON ri.run_uuid=r.uuid\n" - + "LEFT OUTER JOIN (\n" - + " SELECT run_uuid, JSON_AGG(json_build_object('namespace', namespace_name,\n" - + " 'name', dataset_name,\n" - + " 'version', version)) AS output_versions\n" - + " FROM dataset_versions\n" - + " GROUP BY run_uuid\n" - + ") ro ON ro.run_uuid=r.uuid\n"; + """ + SELECT r.*, ra.args, f.facets, + jv.version AS job_version, + ri.input_versions, ro.output_versions, df.dataset_facets + FROM runs_view AS r + LEFT OUTER JOIN + ( + SELECT rf.run_uuid, JSON_AGG(rf.facet ORDER BY rf.lineage_event_time ASC) AS facets + FROM run_facets_view rf + GROUP BY rf.run_uuid + ) AS f ON r.uuid=f.run_uuid + LEFT OUTER JOIN run_args AS ra ON ra.uuid = r.run_args_uuid + LEFT OUTER JOIN job_versions jv ON jv.uuid=r.job_version_uuid + LEFT OUTER JOIN ( + SELECT im.run_uuid, JSON_AGG(json_build_object('namespace', dv.namespace_name, + 'name', dv.dataset_name, + 'version', dv.version, + 'dataset_version_uuid', uuid)) AS input_versions + FROM runs_input_mapping im + INNER JOIN dataset_versions dv on im.dataset_version_uuid = dv.uuid + GROUP BY im.run_uuid + ) ri ON ri.run_uuid=r.uuid + LEFT OUTER JOIN ( + SELECT run_uuid, JSON_AGG(json_build_object('namespace', namespace_name, + 'name', dataset_name, + 'version', version, + 'dataset_version_uuid', uuid + )) AS output_versions + FROM dataset_versions + GROUP BY run_uuid + ) ro ON ro.run_uuid=r.uuid + LEFT OUTER JOIN ( + SELECT + run_uuid, + JSON_AGG(json_build_object( + 'dataset_version_uuid', dataset_version_uuid, + 'name', name, + 'type', type, + 'facet', facet + ) ORDER BY created_at ASC) as dataset_facets + FROM dataset_facets_view + WHERE (type ILIKE 'output' OR type ILIKE 'input') + GROUP BY run_uuid + ) AS df ON r.uuid = df.run_uuid + """; @SqlQuery(BASE_FIND_RUN_SQL + "WHERE r.uuid = :runUuid") Optional findRunByUuid(UUID runUuid); @@ -123,7 +141,7 @@ public interface RunDao extends BaseDao { """ SELECT r.*, ra.args, f.facets, j.namespace_name, j.name, jv.version AS job_version, - ri.input_versions, ro.output_versions + ri.input_versions, ro.output_versions, df.dataset_facets FROM runs_view AS r INNER JOIN jobs_view j ON r.job_uuid=j.uuid LEFT JOIN LATERAL @@ -138,7 +156,9 @@ SELECT rf.run_uuid, JSON_AGG(rf.facet ORDER BY rf.lineage_event_time ASC) AS fac LEFT OUTER JOIN ( SELECT im.run_uuid, JSON_AGG(json_build_object('namespace', dv.namespace_name, 'name', dv.dataset_name, - 'version', dv.version)) AS input_versions + 'version', dv.version, + 'dataset_version_uuid', uuid + )) AS input_versions FROM runs_input_mapping im INNER JOIN dataset_versions dv on im.dataset_version_uuid = dv.uuid GROUP BY im.run_uuid @@ -146,10 +166,25 @@ SELECT im.run_uuid, JSON_AGG(json_build_object('namespace', dv.namespace_name, LEFT OUTER JOIN ( SELECT run_uuid, JSON_AGG(json_build_object('namespace', namespace_name, 'name', dataset_name, - 'version', version)) AS output_versions + 'version', version, + 'dataset_version_uuid', uuid + )) AS output_versions FROM dataset_versions GROUP BY run_uuid ) ro ON ro.run_uuid=r.uuid + LEFT OUTER JOIN ( + SELECT + run_uuid, + JSON_AGG(json_build_object( + 'dataset_version_uuid', dataset_version_uuid, + 'name', name, + 'type', type, + 'facet', facet + ) ORDER BY created_at ASC) as dataset_facets + FROM dataset_facets_view + WHERE (type ILIKE 'output' OR type ILIKE 'input') + GROUP BY run_uuid + ) AS df ON r.uuid = df.run_uuid WHERE j.namespace_name=:namespace AND (j.name=:jobName OR :jobName = ANY(j.aliases)) ORDER BY STARTED_AT DESC NULLS LAST LIMIT :limit OFFSET :offset diff --git a/api/src/main/java/marquez/db/mappers/RunMapper.java b/api/src/main/java/marquez/db/mappers/RunMapper.java index 0dad020ade..9f9354fa4b 100644 --- a/api/src/main/java/marquez/db/mappers/RunMapper.java +++ b/api/src/main/java/marquez/db/mappers/RunMapper.java @@ -6,6 +6,7 @@ package marquez.db.mappers; import static java.time.temporal.ChronoUnit.MILLIS; +import static java.util.stream.Collectors.toList; import static marquez.common.models.RunState.NEW; import static marquez.db.Columns.stringOrNull; import static marquez.db.Columns.stringOrThrow; @@ -15,7 +16,10 @@ import static marquez.db.Columns.uuidOrThrow; import static marquez.db.mappers.MapperUtils.toFacetsOrNull; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import java.sql.ResultSet; @@ -26,19 +30,30 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; import marquez.common.Utils; +import marquez.common.models.DatasetName; import marquez.common.models.DatasetVersionId; +import marquez.common.models.InputDatasetVersion; +import marquez.common.models.NamespaceName; +import marquez.common.models.OutputDatasetVersion; import marquez.common.models.RunId; import marquez.common.models.RunState; import marquez.db.Columns; import marquez.service.models.Run; import org.jdbi.v3.core.mapper.RowMapper; import org.jdbi.v3.core.statement.StatementContext; +import org.postgresql.util.PGobject; +@Slf4j public final class RunMapper implements RowMapper { private final String columnPrefix; + private static final ObjectMapper MAPPER = Utils.getMapper(); + public RunMapper() { this(""); } @@ -56,6 +71,14 @@ public Run map(@NonNull ResultSet results, @NonNull StatementContext context) Optional durationMs = Optional.ofNullable(timestampOrNull(results, columnPrefix + Columns.ENDED_AT)) .flatMap(endedAt -> startedAt.map(s -> s.until(endedAt, MILLIS))); + List inputDatasetVersions = + columnNames.contains(columnPrefix + Columns.INPUT_VERSIONS) + ? toQueryDatasetVersion(results, columnPrefix + Columns.INPUT_VERSIONS) + : ImmutableList.of(); + List outputDatasetVersions = + columnNames.contains(columnPrefix + Columns.OUTPUT_VERSIONS) + ? toQueryDatasetVersion(results, columnPrefix + Columns.OUTPUT_VERSIONS) + : ImmutableList.of(); return new Run( RunId.of(uuidOrThrow(results, columnPrefix + Columns.ROW_UUID)), timestampOrThrow(results, columnPrefix + Columns.CREATED_AT), @@ -77,21 +100,18 @@ public Run map(@NonNull ResultSet results, @NonNull StatementContext context) stringOrThrow(results, columnPrefix + Columns.JOB_NAME), uuidOrNull(results, columnPrefix + Columns.JOB_VERSION), stringOrNull(results, columnPrefix + Columns.LOCATION), - columnNames.contains(columnPrefix + Columns.INPUT_VERSIONS) - ? toDatasetVersion(results, columnPrefix + Columns.INPUT_VERSIONS) - : ImmutableList.of(), - columnNames.contains(columnPrefix + Columns.OUTPUT_VERSIONS) - ? toDatasetVersion(results, columnPrefix + Columns.OUTPUT_VERSIONS) - : ImmutableList.of(), + toInputDatasetVersions(results, inputDatasetVersions, true), + toOutputDatasetVersions(results, outputDatasetVersions, false), toFacetsOrNull(results, columnPrefix + Columns.FACETS)); } - private List toDatasetVersion(ResultSet rs, String column) throws SQLException { + private List toQueryDatasetVersion(ResultSet rs, String column) + throws SQLException { String dsString = rs.getString(column); if (dsString == null) { return Collections.emptyList(); } - return Utils.fromJson(dsString, new TypeReference>() {}); + return Utils.fromJson(dsString, new TypeReference>() {}); } private Map toArgsOrNull(ResultSet results, String argsColumn) @@ -105,4 +125,94 @@ private Map toArgsOrNull(ResultSet results, String argsColumn) } return Utils.fromJson(args, new TypeReference>() {}); } + + private List toInputDatasetVersions( + ResultSet rs, List datasetVersionIds, boolean input) + throws SQLException { + ImmutableList queryFacets = getQueryDatasetFacets(rs); + try { + return datasetVersionIds.stream() + .map( + version -> + new InputDatasetVersion( + version.toDatasetVersionId(), getFacetsMap(input, queryFacets, version))) + .collect(toList()); + } catch (IllegalStateException e) { + return Collections.emptyList(); + } + } + + private List toOutputDatasetVersions( + ResultSet rs, List datasetVersionIds, boolean input) + throws SQLException { + ImmutableList queryFacets = getQueryDatasetFacets(rs); + try { + return datasetVersionIds.stream() + .map( + version -> + new OutputDatasetVersion( + version.toDatasetVersionId(), getFacetsMap(input, queryFacets, version))) + .collect(toList()); + } catch (IllegalStateException e) { + return Collections.emptyList(); + } + } + + private ImmutableMap getFacetsMap( + boolean input, + ImmutableList queryDatasetFacets, + QueryDatasetVersion queryDatasetVersion) { + return ImmutableMap.copyOf( + queryDatasetFacets.stream() + .filter(rf -> rf.type.equalsIgnoreCase(input ? "input" : "output")) + .filter(rf -> rf.datasetVersionUUID.equals(queryDatasetVersion.datasetVersionUUID)) + .collect( + Collectors.toMap( + QueryDatasetFacet::name, + facet -> + Utils.getMapper() + .convertValue( + Utils.getMapper().valueToTree(facet.facet).get(facet.name), + Object.class), + (a1, a2) -> a2 // in case of duplicates, choose more recent + ))); + } + + private ImmutableList getQueryDatasetFacets(ResultSet resultSet) + throws SQLException { + String column = columnPrefix + Columns.DATASET_FACETS; + ImmutableList queryDatasetFacets = ImmutableList.of(); + if (Columns.exists(resultSet, column) && resultSet.getObject(column) != null) { + try { + queryDatasetFacets = + MAPPER.readValue( + ((PGobject) resultSet.getObject(column)).getValue(), + new TypeReference>() {}); + } catch (JsonProcessingException e) { + log.error(String.format("Could not read dataset from job row %s", column), e); + } + } + return queryDatasetFacets; + } + + record QueryDatasetFacet( + @JsonProperty("dataset_version_uuid") String datasetVersionUUID, + String name, + String type, + Object facet) {} + + record QueryDatasetVersion( + String namespace, + String name, + UUID version, + // field required to merge input versions with input dataset facets + @JsonProperty("dataset_version_uuid") String datasetVersionUUID) { + public DatasetVersionId toDatasetVersionId() { + return DatasetVersionId.builder() + .name(DatasetName.of(name)) + .namespace(NamespaceName.of(namespace)) + .version(version) + .build(); + } + } } diff --git a/api/src/main/java/marquez/service/models/LineageEvent.java b/api/src/main/java/marquez/service/models/LineageEvent.java index 6ae0b2419c..9478a7bdbd 100644 --- a/api/src/main/java/marquez/service/models/LineageEvent.java +++ b/api/src/main/java/marquez/service/models/LineageEvent.java @@ -309,6 +309,22 @@ public static class Dataset extends BaseJsonModel { @NotNull private String namespace; @NotNull private String name; @Valid private DatasetFacets facets; + @Valid private InputDatasetFacets inputFacets; + @Valid private OutputDatasetFacets outputFacets; + + /** + * Constructor with three args added manually to support dozens of existing usages created + * before adding inputFacets and outputFacets, as Lombok does not provide SomeArgsConstructor. + * + * @param namespace + * @param name + * @param facets + */ + public Dataset(String namespace, String name, DatasetFacets facets) { + this.namespace = namespace; + this.name = name; + this.facets = facets; + } } @Builder @@ -561,4 +577,48 @@ public static class ColumnLineageInputField extends BaseJsonModel { @NotNull private String name; @NotNull private String field; } + + @Builder + @AllArgsConstructor + @NoArgsConstructor + @Setter + @Getter + @Valid + @ToString + public static class InputDatasetFacets { + + @Builder.Default @JsonIgnore private Map additional = new LinkedHashMap<>(); + + @JsonAnySetter + public void setInputFacet(String key, Object value) { + additional.put(key, value); + } + + @JsonAnyGetter + public Map getAdditionalFacets() { + return additional; + } + } + + @Builder + @AllArgsConstructor + @NoArgsConstructor + @Setter + @Getter + @Valid + @ToString + public static class OutputDatasetFacets { + + @Builder.Default @JsonIgnore private Map additional = new LinkedHashMap<>(); + + @JsonAnySetter + public void setOutputFacet(String key, Object value) { + additional.put(key, value); + } + + @JsonAnyGetter + public Map getAdditionalFacets() { + return additional; + } + } } diff --git a/api/src/main/java/marquez/service/models/Run.java b/api/src/main/java/marquez/service/models/Run.java index 99231772f9..29e4f61b67 100644 --- a/api/src/main/java/marquez/service/models/Run.java +++ b/api/src/main/java/marquez/service/models/Run.java @@ -25,10 +25,11 @@ import lombok.ToString; import lombok.extern.slf4j.Slf4j; import marquez.api.models.JobVersion; -import marquez.common.models.DatasetVersionId; +import marquez.common.models.InputDatasetVersion; import marquez.common.models.JobName; import marquez.common.models.JobVersionId; import marquez.common.models.NamespaceName; +import marquez.common.models.OutputDatasetVersion; import marquez.common.models.RunId; import marquez.common.models.RunState; @@ -58,9 +59,10 @@ public final class Run { private final String jobName; private final UUID jobVersion; private final String location; - @Getter private final List inputVersions; - @Getter private final List outputVersions; + @Getter private final List inputDatasetVersions; + @Getter private final List outputDatasetVersions; @Getter private final ImmutableMap facets; + ; public Run( @NonNull final RunId id, @@ -77,8 +79,8 @@ public Run( String jobName, UUID jobVersion, String location, - List inputVersions, - List outputVersions, + List inputDatasetVersions, + List outputDatasetFacets, @Nullable final ImmutableMap facets) { this.id = id; this.createdAt = createdAt; @@ -94,8 +96,8 @@ public Run( this.jobName = jobName; this.jobVersion = jobVersion; this.location = location; - this.inputVersions = inputVersions; - this.outputVersions = outputVersions; + this.inputDatasetVersions = inputDatasetVersions; + this.outputDatasetVersions = outputDatasetFacets; this.facets = (facets == null) ? ImmutableMap.of() : facets; } @@ -161,12 +163,16 @@ public static class Builder { private Map args; private JobVersionId jobVersion; private String location; - private List inputVersions; - private List outputVersions; @JsonInclude(JsonInclude.Include.NON_NULL) private ImmutableMap facets; + @JsonInclude(JsonInclude.Include.NON_NULL) + private List inputDatasetVersions; + + @JsonInclude(JsonInclude.Include.NON_NULL) + private List outputDatasetVersions; + public Run build() { return new Run( id, @@ -183,8 +189,8 @@ public Run build() { jobVersion.getName().getValue(), jobVersion.getVersion(), location, - inputVersions, - outputVersions, + inputDatasetVersions, + outputDatasetVersions, facets); } } diff --git a/api/src/test/java/marquez/FlowIntegrationTest.java b/api/src/test/java/marquez/FlowIntegrationTest.java index b4e7ff631d..5ea0d47850 100644 --- a/api/src/test/java/marquez/FlowIntegrationTest.java +++ b/api/src/test/java/marquez/FlowIntegrationTest.java @@ -109,29 +109,41 @@ public void testOutputVersionShouldBeOnlyOneCreatedViaJobAndDatasetApi() throws client.markRunAs(createdRun.getId(), RunState.COMPLETED); Map body = getRunResponse(createdRun); - assertThat(((List>) body.get("outputVersions"))).size().isEqualTo(1); + assertThat(((List>) body.get("outputDatasetVersions"))).size().isEqualTo(1); assertInputDatasetVersionDiffersFromOutput(body); } private void assertInputDatasetVersionDiffersFromOutput(Map body) throws IOException { - List> inputDatasetVersionIds = - ((List>) body.get("inputVersions")); - assertThat(inputDatasetVersionIds.stream().map(Map::entrySet).collect(Collectors.toList())) + List>> inputDatasetVersionIds = + ((List>>) body.get("inputDatasetVersions")); + assertThat( + inputDatasetVersionIds.stream() + .map(e -> e.get("datasetVersionId")) + .map(Map::entrySet) + .collect(Collectors.toList())) .allMatch(e -> e.contains(entry("namespace", NAMESPACE_NAME))) .allMatch(e -> e.contains(entry("name", DATASET_NAME))); - List> outputDatasetVersionIds = - ((List>) body.get("outputVersions")); - assertThat(outputDatasetVersionIds.stream().map(Map::entrySet).collect(Collectors.toList())) + List>> outputDatasetVersionIds = + ((List>>) body.get("outputDatasetVersions")); + assertThat( + outputDatasetVersionIds.stream() + .map(e -> e.get("datasetVersionId")) + .map(Map::entrySet) + .collect(Collectors.toList())) .allMatch(e -> e.contains(entry("namespace", NAMESPACE_NAME))) .allMatch(e -> e.contains(entry("name", DATASET_NAME))); List inputVersions = - inputDatasetVersionIds.stream().map(it -> it.get("version")).collect(Collectors.toList()); + inputDatasetVersionIds.stream() + .map(it -> it.get("datasetVersionId").get("version")) + .collect(Collectors.toList()); List outputVersions = - outputDatasetVersionIds.stream().map(it -> it.get("version")).collect(Collectors.toList()); + outputDatasetVersionIds.stream() + .map(it -> it.get("datasetVersionId").get("version")) + .collect(Collectors.toList()); assertThat(Collections.disjoint(inputVersions, outputVersions)).isTrue(); } diff --git a/api/src/test/java/marquez/db/DatasetFacetsDaoTest.java b/api/src/test/java/marquez/db/DatasetFacetsDaoTest.java index fd408a2477..6a4e9e0ed3 100644 --- a/api/src/test/java/marquez/db/DatasetFacetsDaoTest.java +++ b/api/src/test/java/marquez/db/DatasetFacetsDaoTest.java @@ -9,6 +9,7 @@ import static marquez.db.LineageTestUtils.SCHEMA_URL; import static org.assertj.core.api.Assertions.assertThat; +import com.google.common.collect.ImmutableMap; import java.time.Instant; import java.util.Arrays; import java.util.Collections; @@ -18,6 +19,7 @@ import marquez.db.models.UpdateLineageRow; import marquez.jdbi.MarquezJdbiExternalPostgresExtension; import marquez.service.models.LineageEvent; +import marquez.service.models.LineageEvent.Dataset; import org.jdbi.v3.core.Jdbi; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; @@ -304,6 +306,70 @@ public void testInsertDatasetFacetsForUnknownTypeFacet() { assertThat(facet.facet().toString()).isEqualTo("{\"custom-output\": \"{whatever}\"}"); } + @Test + public void testInsertOutputDatasetFacetsFor() { + LineageEvent.JobFacet jobFacet = + new LineageEvent.JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + + UpdateLineageRow lineageRow = + LineageTestUtils.createLineageRow( + openLineageDao, + "job_" + UUID.randomUUID(), + "COMPLETE", + jobFacet, + Collections.emptyList(), + Arrays.asList( + new Dataset( + "namespace", + "dataset_output", + null, + null, + LineageEvent.OutputDatasetFacets.builder() + .additional( + ImmutableMap.of( + "outputFacet1", "{some-facet1}", + "outputFacet2", "{some-facet2}")) + .build())), + null); + + assertThat(getDatasetFacet(lineageRow, "outputFacet1").facet().toString()) + .isEqualTo("{\"outputFacet1\": \"{some-facet1}\"}"); + assertThat(getDatasetFacet(lineageRow, "outputFacet2").facet().toString()) + .isEqualTo("{\"outputFacet2\": \"{some-facet2}\"}"); + } + + @Test + public void testInsertInputDatasetFacetsFor() { + LineageEvent.JobFacet jobFacet = + new LineageEvent.JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + + UpdateLineageRow lineageRow = + LineageTestUtils.createLineageRow( + openLineageDao, + "job_" + UUID.randomUUID(), + "COMPLETE", + jobFacet, + Arrays.asList( + new Dataset( + "namespace", + "dataset_output", + null, + LineageEvent.InputDatasetFacets.builder() + .additional( + ImmutableMap.of( + "inputFacet1", "{some-facet1}", + "inputFacet2", "{some-facet2}")) + .build(), + null)), + Collections.emptyList(), + null); + + assertThat(getDatasetFacet(lineageRow, "inputFacet1").facet().toString()) + .isEqualTo("{\"inputFacet1\": \"{some-facet1}\"}"); + assertThat(getDatasetFacet(lineageRow, "inputFacet2").facet().toString()) + .isEqualTo("{\"inputFacet2\": \"{some-facet2}\"}"); + } + private UpdateLineageRow createLineageRowWithInputDataset( LineageEvent.DatasetFacets.DatasetFacetsBuilder inputDatasetFacetsbuilder) { LineageEvent.JobFacet jobFacet = diff --git a/api/src/test/java/marquez/db/LineageDaoTest.java b/api/src/test/java/marquez/db/LineageDaoTest.java index e42b1667d5..354ab495dc 100644 --- a/api/src/test/java/marquez/db/LineageDaoTest.java +++ b/api/src/test/java/marquez/db/LineageDaoTest.java @@ -841,7 +841,7 @@ public void testGetCurrentRunsWithFacetsGetsLatestRun() { // assert that run_args, input/output versions, and run facets are fetched from the dao. for (Run run : currentRuns) { assertThat(run.getArgs()).hasSize(2); - assertThat(run.getOutputVersions()).hasSize(1); + assertThat(run.getOutputDatasetVersions()).hasSize(1); assertThat(run.getFacets()).hasSize(1); } } diff --git a/api/src/test/java/marquez/db/OpenLineageDaoTest.java b/api/src/test/java/marquez/db/OpenLineageDaoTest.java index fd68d1ec9a..d0ddf7253b 100644 --- a/api/src/test/java/marquez/db/OpenLineageDaoTest.java +++ b/api/src/test/java/marquez/db/OpenLineageDaoTest.java @@ -14,6 +14,9 @@ import java.util.Collections; import java.util.List; import java.util.UUID; +import marquez.common.models.DatasetName; +import marquez.common.models.DatasetVersionId; +import marquez.common.models.NamespaceName; import marquez.db.models.UpdateLineageRow; import marquez.db.models.UpdateLineageRow.DatasetRecord; import marquez.jdbi.MarquezJdbiExternalPostgresExtension; @@ -23,12 +26,14 @@ import marquez.service.models.LineageEvent.JobFacet; import marquez.service.models.LineageEvent.SchemaDatasetFacet; import marquez.service.models.LineageEvent.SchemaField; +import marquez.service.models.Run; import org.assertj.core.api.InstanceOfAssertFactories; import org.assertj.core.groups.Tuple; import org.jdbi.v3.core.Jdbi; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.testcontainers.shaded.com.google.common.collect.ImmutableMap; @ExtendWith(MarquezJdbiExternalPostgresExtension.class) class OpenLineageDaoTest { @@ -48,6 +53,7 @@ class OpenLineageDaoTest { private static DatasetSymlinkDao symlinkDao; private static NamespaceDao namespaceDao; private static DatasetFieldDao datasetFieldDao; + private static RunDao runDao; private final DatasetFacets datasetFacets = LineageTestUtils.newDatasetFacet( new SchemaField("name", "STRING", "my name"), new SchemaField("age", "INT", "my age")); @@ -58,6 +64,7 @@ public static void setUpOnce(Jdbi jdbi) { symlinkDao = jdbi.onDemand(DatasetSymlinkDao.class); namespaceDao = jdbi.onDemand(NamespaceDao.class); datasetFieldDao = jdbi.onDemand(DatasetFieldDao.class); + runDao = jdbi.onDemand(RunDao.class); } /** When reading a dataset, the version is assumed to be the version last written */ @@ -508,6 +515,69 @@ void testGetOpenLineageEvents() { .contains(LineageTestUtils.NAMESPACE, WRITE_JOB_NAME); } + @Test + void testInputOutputDatasetFacets() { + JobFacet jobFacet = new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + UpdateLineageRow lineageRow = + LineageTestUtils.createLineageRow( + dao, + WRITE_JOB_NAME, + "COMPLETE", + jobFacet, + Arrays.asList( + new Dataset( + "namespace", + "dataset_input", + null, + LineageEvent.InputDatasetFacets.builder() + .additional( + ImmutableMap.of( + "inputFacet1", "{some-facet1}", + "inputFacet2", "{some-facet2}")) + .build(), + null)), + Arrays.asList( + new Dataset( + "namespace", + "dataset_output", + null, + null, + LineageEvent.OutputDatasetFacets.builder() + .additional( + ImmutableMap.of( + "outputFacet1", "{some-facet1}", + "outputFacet2", "{some-facet2}")) + .build()))); + + Run run = runDao.findRunByUuid(lineageRow.getRun().getUuid()).get(); + + assertThat(run.getInputDatasetVersions()).hasSize(1); + assertThat(run.getInputDatasetVersions().get(0).getDatasetVersionId()) + .isEqualTo( + new DatasetVersionId( + NamespaceName.of("namespace"), + DatasetName.of("dataset_input"), + lineageRow.getInputs().get().get(0).getDatasetVersionRow().getVersion())); + assertThat(run.getInputDatasetVersions().get(0).getFacets()) + .containsAllEntriesOf( + ImmutableMap.of( + "inputFacet1", "{some-facet1}", + "inputFacet2", "{some-facet2}")); + + assertThat(run.getOutputDatasetVersions()).hasSize(1); + assertThat(run.getOutputDatasetVersions().get(0).getDatasetVersionId()) + .isEqualTo( + new DatasetVersionId( + NamespaceName.of("namespace"), + DatasetName.of("dataset_output"), + lineageRow.getOutputs().get().get(0).getDatasetVersionRow().getVersion())); + assertThat(run.getOutputDatasetVersions().get(0).getFacets()) + .containsAllEntriesOf( + ImmutableMap.of( + "outputFacet1", "{some-facet1}", + "outputFacet2", "{some-facet2}")); + } + private Dataset getInputDataset() { return new Dataset( INPUT_NAMESPACE, diff --git a/api/src/test/java/marquez/db/RunDaoTest.java b/api/src/test/java/marquez/db/RunDaoTest.java index 3095aebf0f..102c2c63ce 100644 --- a/api/src/test/java/marquez/db/RunDaoTest.java +++ b/api/src/test/java/marquez/db/RunDaoTest.java @@ -25,7 +25,9 @@ import marquez.api.JdbiUtils; import marquez.common.models.DatasetId; import marquez.common.models.DatasetVersionId; +import marquez.common.models.InputDatasetVersion; import marquez.common.models.NamespaceName; +import marquez.common.models.OutputDatasetVersion; import marquez.common.models.RunId; import marquez.common.models.RunState; import marquez.db.models.ExtendedRunRow; @@ -87,16 +89,21 @@ public void getRun() { assertThat(run) .isPresent() .get() - .extracting(Run::getInputVersions, InstanceOfAssertFactories.list(DatasetVersionId.class)) + .extracting( + Run::getInputDatasetVersions, InstanceOfAssertFactories.list(InputDatasetVersion.class)) .hasSize(jobMeta.getInputs().size()) + .map(InputDatasetVersion::getDatasetVersionId) .map(DatasetVersionId::getName) .containsAll( jobMeta.getInputs().stream().map(DatasetId::getName).collect(Collectors.toSet())); assertThat(run) .get() - .extracting(Run::getOutputVersions, InstanceOfAssertFactories.list(DatasetVersionId.class)) + .extracting( + Run::getOutputDatasetVersions, + InstanceOfAssertFactories.list(OutputDatasetVersion.class)) .hasSize(jobMeta.getOutputs().size()) + .map(OutputDatasetVersion::getDatasetVersionId) .map(DatasetVersionId::getName) .containsAll( jobMeta.getOutputs().stream().map(DatasetId::getName).collect(Collectors.toSet())); diff --git a/api/src/test/java/marquez/service/LineageServiceTest.java b/api/src/test/java/marquez/service/LineageServiceTest.java index b1a3bc8213..7f6828dfa0 100644 --- a/api/src/test/java/marquez/service/LineageServiceTest.java +++ b/api/src/test/java/marquez/service/LineageServiceTest.java @@ -16,10 +16,11 @@ import java.util.Optional; import marquez.api.JdbiUtils; import marquez.common.models.DatasetName; -import marquez.common.models.DatasetVersionId; +import marquez.common.models.InputDatasetVersion; import marquez.common.models.JobId; import marquez.common.models.JobName; import marquez.common.models.NamespaceName; +import marquez.common.models.OutputDatasetVersion; import marquez.db.DatasetDao; import marquez.db.JobDao; import marquez.db.LineageDao; @@ -158,10 +159,13 @@ public void testLineage() { .get(); runAssert.extracting(r -> r.getId().getValue()).isEqualTo(secondRun.getRun().getUuid()); runAssert - .extracting(Run::getInputVersions, InstanceOfAssertFactories.list(DatasetVersionId.class)) + .extracting( + Run::getInputDatasetVersions, InstanceOfAssertFactories.list(InputDatasetVersion.class)) .hasSize(0); runAssert - .extracting(Run::getOutputVersions, InstanceOfAssertFactories.list(DatasetVersionId.class)) + .extracting( + Run::getOutputDatasetVersions, + InstanceOfAssertFactories.list(OutputDatasetVersion.class)) .hasSize(1); // check the output edges for the commonDataset node @@ -267,10 +271,13 @@ public void testLineageWithDeletedDataset() { .get(); runAssert.extracting(r -> r.getId().getValue()).isEqualTo(secondRun.getRun().getUuid()); runAssert - .extracting(Run::getInputVersions, InstanceOfAssertFactories.list(DatasetVersionId.class)) + .extracting( + Run::getInputDatasetVersions, InstanceOfAssertFactories.list(InputDatasetVersion.class)) .hasSize(0); runAssert - .extracting(Run::getOutputVersions, InstanceOfAssertFactories.list(DatasetVersionId.class)) + .extracting( + Run::getOutputDatasetVersions, + InstanceOfAssertFactories.list(InputDatasetVersion.class)) .hasSize(1); // check the output edges for the commonDataset node diff --git a/clients/java/src/main/java/marquez/client/models/InputDatasetVersion.java b/clients/java/src/main/java/marquez/client/models/InputDatasetVersion.java new file mode 100644 index 0000000000..6aa0526949 --- /dev/null +++ b/clients/java/src/main/java/marquez/client/models/InputDatasetVersion.java @@ -0,0 +1,26 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.client.models; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableMap; +import lombok.NonNull; +import lombok.Value; + +/** Class to contain inputFacets. */ +@Value +public class InputDatasetVersion { + + private final DatasetVersionId datasetVersionId; + private final ImmutableMap facets; + + public InputDatasetVersion( + @JsonProperty("datasetVersionId") @NonNull DatasetVersionId datasetVersionId, + @JsonProperty("facets") @NonNull ImmutableMap facets) { + this.datasetVersionId = datasetVersionId; + this.facets = facets; + } +} diff --git a/clients/java/src/main/java/marquez/client/models/OutputDatasetVersion.java b/clients/java/src/main/java/marquez/client/models/OutputDatasetVersion.java new file mode 100644 index 0000000000..3c3f61c75a --- /dev/null +++ b/clients/java/src/main/java/marquez/client/models/OutputDatasetVersion.java @@ -0,0 +1,26 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.client.models; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableMap; +import lombok.NonNull; +import lombok.Value; + +/** Class to contain outputFacets. */ +@Value +public class OutputDatasetVersion { + + private final DatasetVersionId datasetVersionId; + private final ImmutableMap facets; + + public OutputDatasetVersion( + @JsonProperty("datasetVersionId") @NonNull DatasetVersionId datasetVersionId, + @JsonProperty("facets") @NonNull ImmutableMap facets) { + this.datasetVersionId = datasetVersionId; + this.facets = facets; + } +} diff --git a/clients/java/src/main/java/marquez/client/models/Run.java b/clients/java/src/main/java/marquez/client/models/Run.java index e2e8d43ec8..24c57936ad 100644 --- a/clients/java/src/main/java/marquez/client/models/Run.java +++ b/clients/java/src/main/java/marquez/client/models/Run.java @@ -8,6 +8,8 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.collect.ImmutableMap; import java.time.Instant; +import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Optional; import javax.annotation.Nullable; @@ -27,6 +29,8 @@ public final class Run extends RunMeta { @Nullable private final Long durationMs; @Nullable private final Instant endedAt; @Getter private final Map facets; + @Getter private final List inputDatasetVersions; + @Getter private final List outputDatasetVersions; public Run( @NonNull final String id, @@ -39,7 +43,9 @@ public Run( @Nullable final Instant endedAt, @Nullable final Long durationMs, @Nullable final Map args, - @Nullable final Map facets) { + @Nullable final Map facets, + @Nullable final List inputDatasetVersions, + @Nullable final List outputDatasetVersions) { super(id, nominalStartTime, nominalEndTime, args); this.createdAt = createdAt; this.updatedAt = updatedAt; @@ -48,6 +54,10 @@ public Run( this.durationMs = durationMs; this.endedAt = endedAt; this.facets = (facets == null) ? ImmutableMap.of() : ImmutableMap.copyOf(facets); + this.inputDatasetVersions = + (inputDatasetVersions == null) ? Collections.emptyList() : inputDatasetVersions; + this.outputDatasetVersions = + (outputDatasetVersions == null) ? Collections.emptyList() : outputDatasetVersions; } public Optional getStartedAt() { diff --git a/clients/java/src/test/java/marquez/client/MarquezClientTest.java b/clients/java/src/test/java/marquez/client/MarquezClientTest.java index ff95f3bdbd..a24c98315b 100644 --- a/clients/java/src/test/java/marquez/client/MarquezClientTest.java +++ b/clients/java/src/test/java/marquez/client/MarquezClientTest.java @@ -14,11 +14,13 @@ import static marquez.client.models.ModelGenerator.newDatasetPhysicalName; import static marquez.client.models.ModelGenerator.newDescription; import static marquez.client.models.ModelGenerator.newFields; +import static marquez.client.models.ModelGenerator.newInputDatasetVersion; import static marquez.client.models.ModelGenerator.newInputs; import static marquez.client.models.ModelGenerator.newJobIdWith; import static marquez.client.models.ModelGenerator.newJobType; import static marquez.client.models.ModelGenerator.newLocation; import static marquez.client.models.ModelGenerator.newNamespaceName; +import static marquez.client.models.ModelGenerator.newOutputDatasetVersion; import static marquez.client.models.ModelGenerator.newOutputs; import static marquez.client.models.ModelGenerator.newOwnerName; import static marquez.client.models.ModelGenerator.newRunArgs; @@ -78,6 +80,7 @@ import marquez.client.models.DbTableVersion; import marquez.client.models.Edge; import marquez.client.models.Field; +import marquez.client.models.InputDatasetVersion; import marquez.client.models.Job; import marquez.client.models.JobId; import marquez.client.models.JobMeta; @@ -89,6 +92,7 @@ import marquez.client.models.Node; import marquez.client.models.NodeId; import marquez.client.models.NodeType; +import marquez.client.models.OutputDatasetVersion; import marquez.client.models.Run; import marquez.client.models.RunMeta; import marquez.client.models.RunState; @@ -261,6 +265,13 @@ public class MarquezClientTest { private static final Instant ENDED_AT = START_AT.plusMillis(1000L); private static final long DURATION = START_AT.until(ENDED_AT, MILLIS); private static final Map RUN_ARGS = newRunArgs(); + + private static final List INPUT_RUN_DATASET_FACETS = + Collections.singletonList(newInputDatasetVersion()); + + private static final List OUTPUT_RUN_DATASET_FACETS = + Collections.singletonList(newOutputDatasetVersion()); + private static final Run NEW = new Run( newRunId(), @@ -273,7 +284,9 @@ public class MarquezClientTest { ENDED_AT, DURATION, RUN_ARGS, - null); + null, + INPUT_RUN_DATASET_FACETS, + OUTPUT_RUN_DATASET_FACETS); private static final Run RUNNING = new Run( newRunId(), @@ -286,7 +299,9 @@ public class MarquezClientTest { ENDED_AT, DURATION, RUN_ARGS, - null); + null, + INPUT_RUN_DATASET_FACETS, + OUTPUT_RUN_DATASET_FACETS); private static final Run COMPLETED = new Run( newRunId(), @@ -299,7 +314,9 @@ public class MarquezClientTest { ENDED_AT, DURATION, RUN_ARGS, - null); + null, + INPUT_RUN_DATASET_FACETS, + OUTPUT_RUN_DATASET_FACETS); private static final Run ABORTED = new Run( newRunId(), @@ -312,7 +329,9 @@ public class MarquezClientTest { ENDED_AT, DURATION, RUN_ARGS, - null); + null, + INPUT_RUN_DATASET_FACETS, + OUTPUT_RUN_DATASET_FACETS); private static final Run FAILED = new Run( newRunId(), @@ -325,7 +344,9 @@ public class MarquezClientTest { ENDED_AT, DURATION, RUN_ARGS, - null); + null, + INPUT_RUN_DATASET_FACETS, + OUTPUT_RUN_DATASET_FACETS); private static final String RUN_ID = newRunId(); private static final Job JOB_WITH_LATEST_RUN = @@ -353,7 +374,9 @@ public class MarquezClientTest { ENDED_AT, DURATION, RUN_ARGS, - null), + null, + INPUT_RUN_DATASET_FACETS, + OUTPUT_RUN_DATASET_FACETS), null, null); diff --git a/clients/java/src/test/java/marquez/client/models/JsonGenerator.java b/clients/java/src/test/java/marquez/client/models/JsonGenerator.java index ef1b7e1c27..8a248c7cd5 100644 --- a/clients/java/src/test/java/marquez/client/models/JsonGenerator.java +++ b/clients/java/src/test/java/marquez/client/models/JsonGenerator.java @@ -310,12 +310,17 @@ private static ObjectNode toObj(final Run run) { .put("id", run.getId()) .put("createdAt", ISO_INSTANT.format(run.getCreatedAt())) .put("updatedAt", ISO_INSTANT.format(run.getUpdatedAt())); + final ArrayNode inputDatasetVersions = MAPPER.valueToTree(run.getInputDatasetVersions()); + final ArrayNode outputDatasetVersions = MAPPER.valueToTree(run.getOutputDatasetVersions()); + obj.put("nominalStartTime", run.getNominalStartTime().map(ISO_INSTANT::format).orElse(null)); obj.put("nominalEndTime", run.getNominalEndTime().map(ISO_INSTANT::format).orElse(null)); obj.put("state", run.getState().name()); obj.put("startedAt", run.getStartedAt().map(ISO_INSTANT::format).orElse(null)); obj.put("endedAt", run.getEndedAt().map(ISO_INSTANT::format).orElse(null)); obj.put("durationMs", run.getDurationMs().orElse(null)); + obj.putArray("inputDatasetVersions").addAll(inputDatasetVersions); + obj.putArray("outputDatasetVersions").addAll(outputDatasetVersions); final ObjectNode runArgs = MAPPER.createObjectNode(); run.getArgs().forEach(runArgs::put); diff --git a/clients/java/src/test/java/marquez/client/models/ModelGenerator.java b/clients/java/src/test/java/marquez/client/models/ModelGenerator.java index 5a18683d3d..3bfa758374 100644 --- a/clients/java/src/test/java/marquez/client/models/ModelGenerator.java +++ b/clients/java/src/test/java/marquez/client/models/ModelGenerator.java @@ -238,7 +238,19 @@ public static List newRuns(final int limit) { public static Run newRun() { final Instant now = newTimestamp(); return new Run( - newRunId(), now, now, now, now, RunState.NEW, null, null, null, newRunArgs(), null); + newRunId(), + now, + now, + now, + now, + RunState.NEW, + null, + null, + null, + newRunArgs(), + null, + null, + null); } public static String newOwnerName() { @@ -394,4 +406,16 @@ public static Map.Entry newFacetProducer() { public static Map.Entry newFacetSchemaURL() { return new AbstractMap.SimpleImmutableEntry<>("_schemaURL", "test_schemaURL" + newId()); } + + public static InputDatasetVersion newInputDatasetVersion() { + return new InputDatasetVersion( + new DatasetVersionId(newNamespaceName(), newDatasetName(), UUID.randomUUID()), + ImmutableMap.of("datasetFacet", "{some-facet1}")); + } + + public static OutputDatasetVersion newOutputDatasetVersion() { + return new OutputDatasetVersion( + new DatasetVersionId(newNamespaceName(), newDatasetName(), UUID.randomUUID()), + ImmutableMap.of("datasetFacet", "{some-facet1}")); + } } diff --git a/spec/openapi.yml b/spec/openapi.yml index ffb0255774..ab8d0b225a 100644 --- a/spec/openapi.yml +++ b/spec/openapi.yml @@ -475,7 +475,7 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/IncompleteRun' + $ref: '#/components/schemas/Run' /jobs/runs/{id}/facets: parameters: @@ -1734,20 +1734,20 @@ components: Run: type: object - allOf: + anyOf: - $ref: '#/components/schemas/IncompleteRun' - type: object properties: jobVersion: $ref: '#/components/schemas/JobVersionId' - inputVersions: + inputDatasetVersions: type: array items: - $ref: '#/components/schemas/DatasetVersionId' - outputVersions: + $ref: '#/components/schemas/InputDatasetVersion' + outputDatasetVersions: type: array items: - $ref: '#/components/schemas/DatasetVersionId' + $ref: '#/components/schemas/OutputDatasetVersion' context: description: A key/value pair that must be of type `string`. A context can be used for getting additional details about the job. type: object @@ -1955,4 +1955,32 @@ components: description: The ID associated with the run modifying the table. type: string facets: - $ref: '#/components/schemas/CustomFacet' \ No newline at end of file + $ref: '#/components/schemas/CustomFacet' + + InputDatasetVersion: + type: object + properties: + datasetVersionId: + $ref: '#/components/schemas/DatasetVersionId' + facets: + type: object + additionalProperties: + type: string + description: Dataset facets in run context, like `inputFacets`. + required: + - datasetVersionId + - facets + + OutputDatasetVersion: + type: object + properties: + datasetVersionId: + $ref: '#/components/schemas/DatasetVersionId' + facets: + type: object + additionalProperties: + type: string + description: Dataset facets in run context, like `outputFacets`. + required: + - datasetVersionId + - facets \ No newline at end of file From cf0ba3e6c33ac3313ffb7dca0106e975f889b2ab Mon Sep 17 00:00:00 2001 From: Michael Robinson <68482867+merobi-hub@users.noreply.github.com> Date: Tue, 11 Apr 2023 08:33:22 -0400 Subject: [PATCH 11/16] make get_changes modify changelog directly and update readme in dev (#2473) Signed-off-by: Michael Robinson --- dev/README.md | 5 ++++- dev/get_changes.sh | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/dev/README.md b/dev/README.md index c6a4067957..7e9cb49207 100644 --- a/dev/README.md +++ b/dev/README.md @@ -4,10 +4,11 @@ The `get_changes.sh` script uses a fork of saadmk11/changelog-ci to get all merged changes between two specified releases. To get all changes since the latest release, set `END_RELEASE_VERSION` to the planned next release. -The changes will appear in this directory in a new file, CHANGES.md. +The changes will appear at the top of CHANGELOG.md. #### Requirements +Python 3.10 or newer is required. See the requirements.txt file for required dependencies. The script also requires that the following environment variables be set: @@ -18,6 +19,8 @@ The script also requires that the following environment variables be set: For example: `export END_RELEASE_VERSION=0.21.0`. +Use the planned next release for the end release version. + For instructions on creating a GitHub personal access token to use the GitHub API, see: https://docs.github.com/en/authentication/keeping-your-account-and-data-secure/creating-a-personal-access-token. diff --git a/dev/get_changes.sh b/dev/get_changes.sh index 1c79edcd7b..c75f7e7013 100755 --- a/dev/get_changes.sh +++ b/dev/get_changes.sh @@ -3,7 +3,7 @@ # Copyright 2018-2023 contributors to the Marquez project # SPDX-License-Identifier: Apache-2.0 -export INPUT_CHANGELOG_FILENAME=CHANGES.md +export INPUT_CHANGELOG_FILENAME=../CHANGELOG.md export GITHUB_REPOSITORY=MarquezProject/marquez git clone --branch add-testing-script --single-branch git@github.com:merobi-hub/changelog-ci.git From a972c048d581962cbb77e7390cc7475dc0102f44 Mon Sep 17 00:00:00 2001 From: Perttu Salonen Date: Mon, 17 Apr 2023 11:32:52 +0300 Subject: [PATCH 12/16] Handle null `job.latestRun` before `runStateColor` (#2467) * Handle null `job.latestRun` before `runStateColor` Signed-off-by: Perttu Salonen * generate default color with `NEW` `runState` Signed-off-by: Perttu Salonen * added changelog description Signed-off-by: Perttu Salonen --------- Signed-off-by: Perttu Salonen --- CHANGELOG.md | 4 ++++ web/src/routes/jobs/Jobs.tsx | 4 ++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a52f96e6d7..eb6fcb1bb1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ ## [Unreleased](https://github.com/MarquezProject/marquez/compare/0.32.0...HEAD) +### Fixed + +* UI: better handling of null job latestRun for Jobs page [#2467](https://github.com/MarquezProject/marquez/pull/2467) [@perttus](https://github.com/perttus) + ### Added * Support `inputFacets` and `outputFacets` from Openlineage specificatio [`#2417`](https://github.com/MarquezProject/marquez/pull/2417) [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski) diff --git a/web/src/routes/jobs/Jobs.tsx b/web/src/routes/jobs/Jobs.tsx index 74704d3bab..e97ffcfed3 100644 --- a/web/src/routes/jobs/Jobs.tsx +++ b/web/src/routes/jobs/Jobs.tsx @@ -118,8 +118,8 @@ class Jobs extends React.Component { From 7f80746ff4e9b894c8eb1a52ca579ab8886ad4c8 Mon Sep 17 00:00:00 2001 From: Perttu Salonen Date: Mon, 17 Apr 2023 12:02:33 +0300 Subject: [PATCH 13/16] Handle null run.jobVersion in DatasetInfo (#2471) * handle null run.jobVersion Signed-off-by: Perttu Salonen * updated changelog Signed-off-by: Perttu Salonen --------- Signed-off-by: Perttu Salonen --- CHANGELOG.md | 1 + web/src/components/datasets/DatasetInfo.tsx | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index eb6fcb1bb1..69473f48ff 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ ### Fixed +* UI: Handle null `run.jobVersion` in `DatasetInfo.tsx` to fix rendering issues. * UI: better handling of null job latestRun for Jobs page [#2467](https://github.com/MarquezProject/marquez/pull/2467) [@perttus](https://github.com/perttus) ### Added diff --git a/web/src/components/datasets/DatasetInfo.tsx b/web/src/components/datasets/DatasetInfo.tsx index cd75be1a5e..e789eb88ea 100644 --- a/web/src/components/datasets/DatasetInfo.tsx +++ b/web/src/components/datasets/DatasetInfo.tsx @@ -119,7 +119,7 @@ const DatasetInfo: FunctionComponent = props => { {stopWatchDuration(run.durationMs)} - {run.jobVersion.name} + {run.jobVersion && run.jobVersion.name} {} From 0906d8223e55ed4175c60b6a5c239ef06de3d0cf Mon Sep 17 00:00:00 2001 From: Michael Robinson <68482867+merobi-hub@users.noreply.github.com> Date: Mon, 17 Apr 2023 15:03:24 -0700 Subject: [PATCH 14/16] add latest changes to changelog in advance of 0.33.0 release (#2474) * add latest changes to changelog in advance of 0.33.0 release Signed-off-by: Michael Robinson * changelog updates before 0.33.0 release --------- Signed-off-by: Michael Robinson Co-authored-by: Harel Shein --- CHANGELOG.md | 31 ++++++++++++++++++++++--------- 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 69473f48ff..0866f0590f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,17 +1,30 @@ # Changelog -## [Unreleased](https://github.com/MarquezProject/marquez/compare/0.32.0...HEAD) +## [Unreleased](https://github.com/MarquezProject/marquez/compare/0.33.0...HEAD) + +## [0.33.0](https://github.com/MarquezProject/marquez/compare/0.32.0...0.33.0) - 2023-04-17 + +### Added + +* API: support `inputFacets` and `outputFacets` from Openlineage specification [`#2417`](https://github.com/MarquezProject/marquez/pull/2417) [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski) + *Adds the ability to store `inputFacets` / `outputFacets` sent within datasets, exposing them through the Marquez API as part of the `Run` resource.* ### Fixed +* API: fix job update SQL to correctly use `simple_name` for job updates [`#2457`](https://github.com/MarquezProject/marquez/pull/2457) by [collado-mike](https://github.com/collado-mike) + *Fixes a bug in the job update logic stemming from use of the FQN rather than the `simple_name` and updates the relevant test.* +* API: update SQL in backfill script for facet tables to improve performance [`#2461`](https://github.com/MarquezProject/marquez/pull/2461) by [collado-mike](https://github.com/collado-mike) + *Dramatically improves migration performance by making the backfill script fetch events by `run_uuid` via a new temp table for tracking and sorting runs.* +* API: update v61 migration to handle duplicate job names before unique constraint [`#2464`](https://github.com/MarquezProject/marquez/pull/2464) by [collado-mike](https://github.com/collado-mike) + *To fix a bug in the case of duplicate job FQNs, this renames jobs that have been symlinked to point to newer versions of themselves so that the job FQN doesn't conflict and the unique constraint (without regard to parent job) can be applied. Note: Any installations that have already applied this migration will not see any new operations on their data, but installations that have duplicates will need this fix for the migration to complete successfully.* +* API: make improvements to lineage query performance [`#2472`](https://github.com/MarquezProject/marquez/pull/2472) by [collado-mike](https://github.com/collado-mike) + *Dramatically lessens the lineage query performance regression caused by removal of the `jobs_fqn` table in [`#2448`](https://github.com/MarquezProject/marquez/pull/2448).* +* UI: change color for selected node and edges on graph [`#2458`](https://github.com/MarquezProject/marquez/pull/2458) by [tito12](https://github.com/tito12) + *Improves the visibility of the selected node and edges by increasing the contrast with the background.* * UI: Handle null `run.jobVersion` in `DatasetInfo.tsx` to fix rendering issues. + *In some cases Marquez UI fails to render DatasetInfo, this addresses that issue.* * UI: better handling of null job latestRun for Jobs page [#2467](https://github.com/MarquezProject/marquez/pull/2467) [@perttus](https://github.com/perttus) - -### Added - -* Support `inputFacets` and `outputFacets` from Openlineage specificatio [`#2417`](https://github.com/MarquezProject/marquez/pull/2417) [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski) - *Adds the ability to store `inputFacets` / `outputFacets` which are sent within datasets.* - *Expose them through Marquez API as a member of `Run` resource.* + *Fixes a bug where Jobs view fails to load where some jobs don't have latestRun.* ## [0.32.0](https://github.com/MarquezProject/marquez/compare/0.31.0...0.32.0) - 2023-03-20 @@ -21,7 +34,7 @@ *Improves database query performance when accessing dataset facets by rewriting SQL queries in `DatasetDao` and `DatasetVersionDao`.* * Chart: fix communication between the UI and the API [`#2430`](https://github.com/MarquezProject/marquez/pull/2430) [@thomas-delrue](https://github.com/thomas-delrue) *Defines the value for `MARQUEZ_PORT` as .Values.marquez.port (80) in the Helm Chart so the Marquez Web component can communicate with the API.* -* UI: always render `MqCode` [#2454](https://github.com/MarquezProject/marquez/pull/2454) [@JDarDagran](https://github.com/JDarDagran) +* UI: always render `MqCode` [`#2454`](https://github.com/MarquezProject/marquez/pull/2454) [@JDarDagran](https://github.com/JDarDagran) *Fixes rendering of `DatasetInfo` and `RunInfo` pages when no `SqlJobFacet` exists.* ### Removed @@ -63,7 +76,7 @@ *Adds a JSON preview of column-level lineage of a selected dataset to the UI.* * UI: Add soft delete option to UI [`#2343`](https://github.com/MarquezProject/marquez/pull/2343) [@tito12](https://github.com/tito12) *Adds option to soft delete a data record with a dialog component and double confirmation.* -* API: split `lineage_events` table to `dataset_facets`, `run_facets`, and `job_facets` tables. [`2350`](https://github.com/MarquezProject/marquez/pull/2350), [`2355`](https://github.com/MarquezProject/marquez/pull/2355), [`2359`](https://github.com/MarquezProject/marquez/pull/2359) +* API: split `lineage_events` table to `dataset_facets`, `run_facets`, and `job_facets` tables. [`#2350`](https://github.com/MarquezProject/marquez/pull/2350), [`2355`](https://github.com/MarquezProject/marquez/pull/2355), [`2359`](https://github.com/MarquezProject/marquez/pull/2359) [@wslulciuc](https://github.com/wslulciuc,), [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski) *Performance improvement storing and querying facets.* *Migration procedure requires manual steps if database has more than 100K lineage events.* From 31b3bef01f35c81cdc5026e053d95662ee67317c Mon Sep 17 00:00:00 2001 From: Michael Robinson <68482867+merobi-hub@users.noreply.github.com> Date: Wed, 19 Apr 2023 08:48:52 -0700 Subject: [PATCH 15/16] edit changelog in advance of 0.33.0 release (#2476) * add latest changes to changelog in advance of 0.33.0 release Signed-off-by: Michael Robinson * editing continued Signed-off-by: Michael Robinson --------- Signed-off-by: Michael Robinson --- CHANGELOG.md | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0866f0590f..9ed43f64cd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,7 @@ ## [Unreleased](https://github.com/MarquezProject/marquez/compare/0.33.0...HEAD) -## [0.33.0](https://github.com/MarquezProject/marquez/compare/0.32.0...0.33.0) - 2023-04-17 +## [0.33.0](https://github.com/MarquezProject/marquez/compare/0.32.0...0.33.0) - 2023-04-19 ### Added @@ -11,20 +11,20 @@ ### Fixed -* API: fix job update SQL to correctly use `simple_name` for job updates [`#2457`](https://github.com/MarquezProject/marquez/pull/2457) by [collado-mike](https://github.com/collado-mike) +* API: fix job update SQL to correctly use `simple_name` for job updates [`#2457`](https://github.com/MarquezProject/marquez/pull/2457) [collado-mike](https://github.com/collado-mike) *Fixes a bug in the job update logic stemming from use of the FQN rather than the `simple_name` and updates the relevant test.* -* API: update SQL in backfill script for facet tables to improve performance [`#2461`](https://github.com/MarquezProject/marquez/pull/2461) by [collado-mike](https://github.com/collado-mike) +* API: update SQL in backfill script for facet tables to improve performance [`#2461`](https://github.com/MarquezProject/marquez/pull/2461) [collado-mike](https://github.com/collado-mike) *Dramatically improves migration performance by making the backfill script fetch events by `run_uuid` via a new temp table for tracking and sorting runs.* -* API: update v61 migration to handle duplicate job names before unique constraint [`#2464`](https://github.com/MarquezProject/marquez/pull/2464) by [collado-mike](https://github.com/collado-mike) +* API: update v61 migration to handle duplicate job names before unique constraint [`#2464`](https://github.com/MarquezProject/marquez/pull/2464) [collado-mike](https://github.com/collado-mike) *To fix a bug in the case of duplicate job FQNs, this renames jobs that have been symlinked to point to newer versions of themselves so that the job FQN doesn't conflict and the unique constraint (without regard to parent job) can be applied. Note: Any installations that have already applied this migration will not see any new operations on their data, but installations that have duplicates will need this fix for the migration to complete successfully.* -* API: make improvements to lineage query performance [`#2472`](https://github.com/MarquezProject/marquez/pull/2472) by [collado-mike](https://github.com/collado-mike) +* API: make improvements to lineage query performance [`#2472`](https://github.com/MarquezProject/marquez/pull/2472) [collado-mike](https://github.com/collado-mike) *Dramatically lessens the lineage query performance regression caused by removal of the `jobs_fqn` table in [`#2448`](https://github.com/MarquezProject/marquez/pull/2448).* -* UI: change color for selected node and edges on graph [`#2458`](https://github.com/MarquezProject/marquez/pull/2458) by [tito12](https://github.com/tito12) +* UI: change color for selected node and edges on graph [`#2458`](https://github.com/MarquezProject/marquez/pull/2458) [tito12](https://github.com/tito12) *Improves the visibility of the selected node and edges by increasing the contrast with the background.* -* UI: Handle null `run.jobVersion` in `DatasetInfo.tsx` to fix rendering issues. - *In some cases Marquez UI fails to render DatasetInfo, this addresses that issue.* -* UI: better handling of null job latestRun for Jobs page [#2467](https://github.com/MarquezProject/marquez/pull/2467) [@perttus](https://github.com/perttus) - *Fixes a bug where Jobs view fails to load where some jobs don't have latestRun.* +* UI: handle null `run.jobVersion` in `DatasetInfo.tsx` to fix rendering issues [#2471](https://github.com/MarquezProject/marquez/pull/2471) [@perttus](https://github.com/perttus) + *Fixes an issue causing the UI to fail to render `DatasetInfo`.* +* UI: better handling of null `latestRun` for Jobs page [#2467](https://github.com/MarquezProject/marquez/pull/2467) [@perttus](https://github.com/perttus) + *Fixes a bug causing the Jobs view to fail when `latestRun` is null.* ## [0.32.0](https://github.com/MarquezProject/marquez/compare/0.31.0...0.32.0) - 2023-03-20 From 2eea85666b3e12357d338a1f3315f2db96ff4a81 Mon Sep 17 00:00:00 2001 From: Michael Robinson Date: Wed, 19 Apr 2023 11:50:38 -0400 Subject: [PATCH 16/16] Prepare for release 0.33.0 Signed-off-by: Michael Robinson --- .circleci/db-migration.sh | 2 +- .env.example | 2 +- chart/Chart.yaml | 2 +- chart/values.yaml | 6 +++--- clients/java/README.md | 4 ++-- docker/up.sh | 4 ++-- docs/openapi.html | 10 +++++----- gradle.properties | 2 +- spec/openapi.yml | 2 +- 9 files changed, 17 insertions(+), 17 deletions(-) diff --git a/.circleci/db-migration.sh b/.circleci/db-migration.sh index e2bdd90f4e..587cddae1c 100755 --- a/.circleci/db-migration.sh +++ b/.circleci/db-migration.sh @@ -13,7 +13,7 @@ # Version of PostgreSQL readonly POSTGRES_VERSION="12.1" # Version of Marquez -readonly MARQUEZ_VERSION=0.32.0 +readonly MARQUEZ_VERSION=0.33.0 # Build version of Marquez readonly MARQUEZ_BUILD_VERSION="$(git log --pretty=format:'%h' -n 1)" # SHA1 diff --git a/.env.example b/.env.example index cda03f7f74..18d63d3811 100644 --- a/.env.example +++ b/.env.example @@ -1,4 +1,4 @@ API_PORT=5000 API_ADMIN_PORT=5001 WEB_PORT=3000 -TAG=0.32.0 +TAG=0.33.0 diff --git a/chart/Chart.yaml b/chart/Chart.yaml index 70c1e1dea5..7c359dafd3 100644 --- a/chart/Chart.yaml +++ b/chart/Chart.yaml @@ -29,4 +29,4 @@ name: marquez sources: - https://github.com/MarquezProject/marquez - https://marquezproject.github.io/marquez/ -version: 0.32.0 +version: 0.33.0 diff --git a/chart/values.yaml b/chart/values.yaml index 4d871e7529..8f8b2897ce 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -17,7 +17,7 @@ marquez: image: registry: docker.io repository: marquezproject/marquez - tag: 0.32.0 + tag: 0.33.0 pullPolicy: IfNotPresent ## Name of the existing secret containing credentials for the Marquez installation. ## When this is specified, it will take precedence over the values configured in the 'db' section. @@ -75,7 +75,7 @@ web: image: registry: docker.io repository: marquezproject/marquez-web - tag: 0.32.0 + tag: 0.33.0 pullPolicy: IfNotPresent ## Marquez website will run on this port ## @@ -107,7 +107,7 @@ postgresql: ## @param image.tag PostgreSQL image tag (immutable tags are recommended) ## image: - tag: 12.1.0 + tag: 0.33.0 ## Authentication parameters ## ref: https://github.com/bitnami/bitnami-docker-postgresql/blob/master/README.md#setting-the-root-password-on-first-run ## ref: https://github.com/bitnami/bitnami-docker-postgresql/blob/master/README.md#creating-a-database-on-first-run diff --git a/clients/java/README.md b/clients/java/README.md index 43c77066f5..20bea4faa1 100644 --- a/clients/java/README.md +++ b/clients/java/README.md @@ -10,14 +10,14 @@ Maven: io.github.marquezproject marquez-java - 0.32.0 + 0.33.0 ``` or Gradle: ```groovy -implementation 'io.github.marquezproject:marquez-java:0.32.0 +implementation 'io.github.marquezproject:marquez-java:0.33.0 ``` ## Usage diff --git a/docker/up.sh b/docker/up.sh index d998dbced7..742aba6fd9 100755 --- a/docker/up.sh +++ b/docker/up.sh @@ -8,9 +8,9 @@ set -e # Version of Marquez -readonly VERSION=0.32.0 +readonly VERSION=0.33.0 # Build version of Marquez -readonly BUILD_VERSION=0.32.0 +readonly BUILD_VERSION=0.33.0 title() { echo -e "\033[1m${1}\033[0m" diff --git a/docs/openapi.html b/docs/openapi.html index ede8cd760f..ca4db31b76 100644 --- a/docs/openapi.html +++ b/docs/openapi.html @@ -2174,7 +2174,7 @@ 55.627 l 55.6165,55.627 -231.245496,231.24803 c -127.185,127.1864 -231.5279,231.248 -231.873,231.248 -0.3451,0 -104.688, -104.0616 -231.873,-231.248 z - " fill="currentColor">

Marquez (0.32.0)

Download OpenAPI specification:Download

License: Apache 2.0

Marquez is an open source metadata service for the collection, aggregation, and visualization of a data ecosystem's metadata.

+ " fill="currentColor">

Marquez (0.33.0)

Download OpenAPI specification:Download

License: Apache 2.0

Marquez is an open source metadata service for the collection, aggregation, and visualization of a data ecosystem's metadata.

Namespaces

Create a namespace

Creates a new namespace object. A namespace enables the contextual grouping of related jobs and datasets. Namespaces must contain only letters (a-z, A-Z), numbers (0-9), underscores (_), dashes (-), colons (:), slashes (/), or dots (.). A namespace is case-insensitive with a maximum length of 1024 characters. Note jobs and datasets will be unique within a namespace, but not across namespaces.

path Parameters
namespace
required
string <= 1024 characters
Example: my-namespace

The name of the namespace.

Request Body schema: application/json
ownerName
required
string

The owner of the namespace.

@@ -2321,7 +2321,7 @@
args
object

The arguments of the run.

Responses

Request samples

Content type
application/json
{
  • "args": {
    }
}

Response samples

Content type
application/json
{
  • "id": "870492da-ecfb-4be0-91b9-9a89ddd3db90",
  • "createdAt": "2019-05-09T19:49:24.201361Z",
  • "updatedAt": "2019-05-09T19:49:24.201361Z",
  • "nominalStartTime": null,
  • "nominalEndTime": null,
  • "state": "COMPLETED",
  • "startedAt": "2019-05-09T15:17:32.690346",
  • "endedAt": "2019-05-09T20:05:46.815920Z",
  • "durationMs": 4250894125,
  • "args": {
    },
  • "context": {
    },
  • "facets": { }
}

List all runs

Returns a list of runs for a job.

+
http://localhost:5000/api/v1/namespaces/{namespace}/jobs/{job}/runs

Request samples

Content type
application/json
{
  • "args": {
    }
}

Response samples

Content type
application/json
Example
{
  • "id": "870492da-ecfb-4be0-91b9-9a89ddd3db90",
  • "createdAt": "2019-05-09T19:49:24.201361Z",
  • "updatedAt": "2019-05-09T19:49:24.201361Z",
  • "nominalStartTime": null,
  • "nominalEndTime": null,
  • "state": "RUNNING",
  • "startedAt": "2019-05-09T15:17:32.690346",
  • "endedAt": null,
  • "durationMs": null,
  • "args": {
    },
  • "facets": { }
}

List all runs

Returns a list of runs for a job.

path Parameters
namespace
required
string <= 1024 characters
Example: my-namespace

The name of the namespace.

job
required
string <= 1024 characters
Example: my-job

The name of the job.

query Parameters
limit
integer
Default: 100
Example: limit=25

The number of results to return from offset

@@ -2332,7 +2332,7 @@
path Parameters
id
required
string <uuid>
Example: ea9badc5-7cb2-49af-9a9f-155771d3a797

The ID of the run.

Responses

Response samples

Content type
application/json
{
  • "id": "870492da-ecfb-4be0-91b9-9a89ddd3db90",
  • "createdAt": "2019-05-09T19:49:24.201361Z",
  • "updatedAt": "2019-05-09T19:49:24.201361Z",
  • "nominalStartTime": null,
  • "nominalEndTime": null,
  • "state": "RUNNING",
  • "startedAt": "2019-05-09T15:17:32.690346",
  • "endedAt": null,
  • "durationMs": null,
  • "args": {
    },
  • "facets": { }
}

Retrieve run or job facets for a run.

Retrieve run or job facets for a run.

+
http://localhost:5000/api/v1/jobs/runs/{id}

Response samples

Content type
application/json
Example
{
  • "id": "870492da-ecfb-4be0-91b9-9a89ddd3db90",
  • "createdAt": "2019-05-09T19:49:24.201361Z",
  • "updatedAt": "2019-05-09T19:49:24.201361Z",
  • "nominalStartTime": null,
  • "nominalEndTime": null,
  • "state": "RUNNING",
  • "startedAt": "2019-05-09T15:17:32.690346",
  • "endedAt": null,
  • "durationMs": null,
  • "args": {
    },
  • "facets": { }
}

Retrieve run or job facets for a run.

Retrieve run or job facets for a run.

path Parameters
id
required
string <uuid>
Example: ea9badc5-7cb2-49af-9a9f-155771d3a797

The ID of the run.

query Parameters
type
required
string
Enum: "run" "job"

Indicates if should return job or run facets.

Responses

query Parameters
at
string <date-time>

An ISO-8601 timestamp representing the time when the run transitioned.

Responses

Response samples

Content type
application/json
{
  • "id": "870492da-ecfb-4be0-91b9-9a89ddd3db90",
  • "createdAt": "2019-05-09T19:49:24.201361Z",
  • "updatedAt": "2019-05-09T19:49:24.201361Z",
  • "nominalStartTime": null,
  • "nominalEndTime": null,
  • "state": "COMPLETED",
  • "startedAt": "2019-05-09T15:17:32.690346",
  • "endedAt": "2019-05-09T20:05:46.815920Z",
  • "durationMs": 4250894125,
  • "args": {
    },
  • "context": {
    },
  • "facets": { }
}

Fail a run Deprecated

Marks the run as FAILED.

+
http://localhost:5000/api/v1/jobs/runs/{id}/complete

Response samples

Content type
application/json
Example
{
  • "id": "870492da-ecfb-4be0-91b9-9a89ddd3db90",
  • "createdAt": "2019-05-09T19:49:24.201361Z",
  • "updatedAt": "2019-05-09T19:49:24.201361Z",
  • "nominalStartTime": null,
  • "nominalEndTime": null,
  • "state": "RUNNING",
  • "startedAt": "2019-05-09T15:17:32.690346",
  • "endedAt": null,
  • "durationMs": null,
  • "args": {
    },
  • "facets": { }
}

Fail a run Deprecated

Marks the run as FAILED.

path Parameters
id
required
string <uuid>
Example: ea9badc5-7cb2-49af-9a9f-155771d3a797

The ID of the run.

query Parameters
at
string <date-time>

An ISO-8601 timestamp representing the time when the run transitioned.

Responses

Response samples

Content type
application/json
{
  • "totalCount": 1,
  • "results": [
    ]
}