From dedfe4821471f5e9a2a18bcc0adf1ce802cb9880 Mon Sep 17 00:00:00 2001 From: "pawel.leszczynski" Date: Thu, 1 Dec 2022 08:30:04 +0100 Subject: [PATCH] point-in-timea for column-level lineage (#2265) Signed-off-by: Pawel Leszczynski Signed-off-by: Pawel Leszczynski --- CHANGELOG.md | 2 + .../marquez/api/ColumnLineageResource.java | 8 +- .../common/models/DatasetFieldVersionId.java | 31 +++ .../marquez/common/models/JobVersionId.java | 5 + .../java/marquez/db/ColumnLineageDao.java | 8 +- .../main/java/marquez/db/DatasetFieldDao.java | 40 ++- .../mappers/ColumnLineageNodeDataMapper.java | 5 +- .../db/mappers/PairUuidInstantMapper.java | 28 ++ .../db/models/ColumnLineageNodeData.java | 3 + .../marquez/db/models/InputFieldNodeData.java | 5 + .../marquez/service/ColumnLineageService.java | 132 ++++++--- .../java/marquez/service/models/NodeId.java | 37 ++- .../api/ColumnLineageResourceTest.java | 20 +- .../java/marquez/db/ColumnLineageDaoTest.java | 199 +++----------- .../marquez/db/ColumnLineageTestUtils.java | 15 ++ .../service/ColumnLineageServiceTest.java | 254 ++++++++---------- .../marquez/service/models/NodeIdTest.java | 30 +++ 17 files changed, 465 insertions(+), 357 deletions(-) create mode 100644 api/src/main/java/marquez/common/models/DatasetFieldVersionId.java create mode 100644 api/src/main/java/marquez/db/mappers/PairUuidInstantMapper.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 8565addf9b..1c38239156 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,8 @@ # Changelog ## [Unreleased](https://github.com/MarquezProject/marquez/compare/0.28.0...HEAD) +* Column-lineage endpoints supports point-in-time requests [`#2265`](https://github.com/MarquezProject/marquez/pull/2265) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski) + *Enable requesting `column-lineage` endpoint by a dataset version, job version or dataset field of a specific dataset version.* ## [0.28.0](https://github.com/MarquezProject/marquez/compare/0.27.0...0.28.0) - 2022-11-21 diff --git a/api/src/main/java/marquez/api/ColumnLineageResource.java b/api/src/main/java/marquez/api/ColumnLineageResource.java index efebc06a86..e26699c3ce 100644 --- a/api/src/main/java/marquez/api/ColumnLineageResource.java +++ b/api/src/main/java/marquez/api/ColumnLineageResource.java @@ -10,7 +10,6 @@ import com.codahale.metrics.annotation.ExceptionMetered; import com.codahale.metrics.annotation.ResponseMetered; import com.codahale.metrics.annotation.Timed; -import java.time.Instant; import java.util.concurrent.ExecutionException; import javax.validation.constraints.NotNull; import javax.ws.rs.DefaultValue; @@ -44,7 +43,10 @@ public Response getLineage( @QueryParam("depth") @DefaultValue(DEFAULT_DEPTH) int depth, @QueryParam("withDownstream") @DefaultValue("false") boolean withDownstream) throws ExecutionException, InterruptedException { - return Response.ok(columnLineageService.lineage(nodeId, depth, withDownstream, Instant.now())) - .build(); + if (nodeId.hasVersion() && withDownstream) { + return Response.status(400, "Node version cannot be specified when withDownstream is true") + .build(); + } + return Response.ok(columnLineageService.lineage(nodeId, depth, withDownstream)).build(); } } diff --git a/api/src/main/java/marquez/common/models/DatasetFieldVersionId.java b/api/src/main/java/marquez/common/models/DatasetFieldVersionId.java new file mode 100644 index 0000000000..3eb4a515a5 --- /dev/null +++ b/api/src/main/java/marquez/common/models/DatasetFieldVersionId.java @@ -0,0 +1,31 @@ +/* + * Copyright 2018-2022 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.common.models; + +import java.util.UUID; +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.ToString; + +/** ID for {@code DatasetField} with a version of {@code Dataset}. */ +@EqualsAndHashCode +@AllArgsConstructor +@ToString +public class DatasetFieldVersionId { + + @Getter private final DatasetId datasetId; + @Getter private final FieldName fieldName; + @Getter private final UUID version; + + public static DatasetFieldVersionId of( + String namespace, String datasetName, String field, UUID version) { + return new DatasetFieldVersionId( + new DatasetId(NamespaceName.of(namespace), DatasetName.of(datasetName)), + FieldName.of(field), + version); + } +} diff --git a/api/src/main/java/marquez/common/models/JobVersionId.java b/api/src/main/java/marquez/common/models/JobVersionId.java index 0f0594f898..a0f9a87473 100644 --- a/api/src/main/java/marquez/common/models/JobVersionId.java +++ b/api/src/main/java/marquez/common/models/JobVersionId.java @@ -19,4 +19,9 @@ public class JobVersionId { @NonNull NamespaceName namespace; @NonNull JobName name; @NonNull UUID version; + + public static JobVersionId of( + final NamespaceName namespaceName, final JobName jobName, final UUID version) { + return new JobVersionId(namespaceName, jobName, version); + } } diff --git a/api/src/main/java/marquez/db/ColumnLineageDao.java b/api/src/main/java/marquez/db/ColumnLineageDao.java index 9e329f76f4..ac39a8e92a 100644 --- a/api/src/main/java/marquez/db/ColumnLineageDao.java +++ b/api/src/main/java/marquez/db/ColumnLineageDao.java @@ -146,7 +146,8 @@ WHERE output_dataset_field_uuid IN () output_fields.dataset_name, output_fields.field_name, output_fields.type, - ARRAY_AGG(DISTINCT ARRAY[input_fields.namespace_name, input_fields.dataset_name, input_fields.field_name]) AS inputFields, + ARRAY_AGG(DISTINCT ARRAY[input_fields.namespace_name, input_fields.dataset_name, CAST(clr.input_dataset_version_uuid AS VARCHAR), input_fields.field_name]) AS inputFields, + clr.output_dataset_version_uuid as dataset_version_uuid, clr.transformation_description, clr.transformation_type, clr.created_at, @@ -160,6 +161,7 @@ WHERE output_dataset_field_uuid IN () output_fields.dataset_name, output_fields.field_name, output_fields.type, + clr.output_dataset_version_uuid, clr.transformation_description, clr.transformation_type, clr.created_at, @@ -191,7 +193,8 @@ dataset_fields_view AS ( output_fields.dataset_name, output_fields.field_name, output_fields.type, - ARRAY_AGG(ARRAY[input_fields.namespace_name, input_fields.dataset_name, input_fields.field_name]) AS inputFields, + ARRAY_AGG(DISTINCT ARRAY[input_fields.namespace_name, input_fields.dataset_name, CAST(c.input_dataset_version_uuid AS VARCHAR), input_fields.field_name]) AS inputFields, + c.output_dataset_version_uuid as dataset_version_uuid, c.transformation_description, c.transformation_type, c.created_at, @@ -204,6 +207,7 @@ dataset_fields_view AS ( output_fields.dataset_name, output_fields.field_name, output_fields.type, + c.output_dataset_version_uuid, c.transformation_description, c.transformation_type, c.created_at, diff --git a/api/src/main/java/marquez/db/DatasetFieldDao.java b/api/src/main/java/marquez/db/DatasetFieldDao.java index b305fe604d..d2adf16466 100644 --- a/api/src/main/java/marquez/db/DatasetFieldDao.java +++ b/api/src/main/java/marquez/db/DatasetFieldDao.java @@ -17,12 +17,14 @@ import marquez.db.mappers.DatasetFieldMapper; import marquez.db.mappers.DatasetFieldRowMapper; import marquez.db.mappers.FieldDataMapper; +import marquez.db.mappers.PairUuidInstantMapper; import marquez.db.models.DatasetFieldRow; import marquez.db.models.DatasetRow; import marquez.db.models.InputFieldData; import marquez.db.models.TagRow; import marquez.service.models.Dataset; import marquez.service.models.DatasetVersion; +import org.apache.commons.lang3.tuple.Pair; import org.jdbi.v3.sqlobject.config.RegisterRowMapper; import org.jdbi.v3.sqlobject.customizer.BindBean; import org.jdbi.v3.sqlobject.statement.SqlBatch; @@ -32,6 +34,7 @@ @RegisterRowMapper(DatasetFieldRowMapper.class) @RegisterRowMapper(DatasetFieldMapper.class) @RegisterRowMapper(FieldDataMapper.class) +@RegisterRowMapper(PairUuidInstantMapper.class) public interface DatasetFieldDao extends BaseDao { @SqlQuery( """ @@ -98,13 +101,26 @@ default Dataset updateTags( @SqlQuery( """ - SELECT df.uuid - FROM dataset_fields df + SELECT df.uuid, max(dv.created_at) + FROM dataset_fields df JOIN datasets_view AS d ON d.uuid = df.dataset_uuid + JOIN dataset_versions_field_mapping AS fm ON fm.dataset_field_uuid = df.uuid + JOIN dataset_versions AS dv ON dv.uuid = fm.dataset_version_uuid WHERE CAST((:namespaceName, :datasetName) AS DATASET_NAME) = ANY(d.dataset_symlinks) + GROUP BY df.uuid """) List findDatasetFieldsUuids(String namespaceName, String datasetName); + @SqlQuery( + """ + SELECT df.uuid, dv.created_at + FROM dataset_fields df + JOIN dataset_versions_field_mapping AS fm ON fm.dataset_field_uuid = df.uuid + JOIN dataset_versions AS dv ON dv.uuid = :datasetVersion + WHERE fm.dataset_version_uuid = :datasetVersion + """) + List> findDatasetVersionFieldsUuids(UUID datasetVersion); + @SqlQuery( """ WITH latest_run AS ( @@ -121,6 +137,15 @@ WITH latest_run AS ( """) List findFieldsUuidsByJob(String namespaceName, String jobName); + @SqlQuery( + """ + SELECT dataset_fields.uuid, r.created_at + FROM dataset_fields + JOIN dataset_versions ON dataset_versions.dataset_uuid = dataset_fields.dataset_uuid + JOIN runs_view r ON r.job_version_uuid = :jobVersion + """) + List> findFieldsUuidsByJobVersion(UUID jobVersion); + @SqlQuery( """ SELECT df.uuid @@ -131,6 +156,17 @@ WHERE CAST((:namespaceName, :datasetName) AS DATASET_NAME) = ANY(d.dataset_symli """) Optional findUuid(String namespaceName, String datasetName, String name); + @SqlQuery( + """ + SELECT df.uuid, dv.created_at + FROM dataset_fields df + JOIN datasets_view AS d ON d.uuid = df.dataset_uuid + JOIN dataset_versions AS dv ON dv.uuid = :datasetVersion + JOIN dataset_versions_field_mapping AS fm ON fm.dataset_field_uuid = df.uuid + WHERE fm.dataset_version_uuid = :datasetVersion AND df.name = :fieldName + """) + List> findDatasetVersionFieldsUuids(String fieldName, UUID datasetVersion); + @SqlQuery( "SELECT f.*, " + "ARRAY(SELECT t.name " diff --git a/api/src/main/java/marquez/db/mappers/ColumnLineageNodeDataMapper.java b/api/src/main/java/marquez/db/mappers/ColumnLineageNodeDataMapper.java index 1163a62e2b..cc24e74f76 100644 --- a/api/src/main/java/marquez/db/mappers/ColumnLineageNodeDataMapper.java +++ b/api/src/main/java/marquez/db/mappers/ColumnLineageNodeDataMapper.java @@ -9,12 +9,14 @@ import static marquez.db.Columns.TRANSFORMATION_TYPE; import static marquez.db.Columns.stringOrNull; import static marquez.db.Columns.stringOrThrow; +import static marquez.db.Columns.uuidOrThrow; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import java.sql.ResultSet; import java.sql.SQLException; import java.util.Arrays; +import java.util.UUID; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import marquez.common.Utils; @@ -35,6 +37,7 @@ public ColumnLineageNodeData map(ResultSet results, StatementContext ctx) throws return new ColumnLineageNodeData( stringOrThrow(results, Columns.NAMESPACE_NAME), stringOrThrow(results, Columns.DATASET_NAME), + uuidOrThrow(results, Columns.DATASET_VERSION_UUID), stringOrThrow(results, Columns.FIELD_NAME), stringOrThrow(results, Columns.TYPE), stringOrNull(results, TRANSFORMATION_DESCRIPTION), @@ -54,7 +57,7 @@ public static ImmutableList toInputFields(ResultSet results, return ImmutableList.copyOf( Arrays.asList(deserializedArray).stream() .map(o -> (String[]) o) - .map(arr -> new InputFieldNodeData(arr[0], arr[1], arr[2])) + .map(arr -> new InputFieldNodeData(arr[0], arr[1], UUID.fromString(arr[2]), arr[3])) .collect(Collectors.toList())); } } diff --git a/api/src/main/java/marquez/db/mappers/PairUuidInstantMapper.java b/api/src/main/java/marquez/db/mappers/PairUuidInstantMapper.java new file mode 100644 index 0000000000..490a260533 --- /dev/null +++ b/api/src/main/java/marquez/db/mappers/PairUuidInstantMapper.java @@ -0,0 +1,28 @@ +/* + * Copyright 2018-2022 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.db.mappers; + +import static marquez.db.Columns.timestampOrNull; +import static marquez.db.Columns.uuidOrThrow; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.time.Instant; +import java.util.UUID; +import lombok.NonNull; +import marquez.db.Columns; +import org.apache.commons.lang3.tuple.Pair; +import org.jdbi.v3.core.mapper.RowMapper; +import org.jdbi.v3.core.statement.StatementContext; + +public final class PairUuidInstantMapper implements RowMapper> { + @Override + public Pair map(@NonNull ResultSet results, @NonNull StatementContext context) + throws SQLException { + return Pair.of( + uuidOrThrow(results, Columns.ROW_UUID), timestampOrNull(results, Columns.CREATED_AT)); + } +} diff --git a/api/src/main/java/marquez/db/models/ColumnLineageNodeData.java b/api/src/main/java/marquez/db/models/ColumnLineageNodeData.java index 3c54dc1fe0..43f3137cbe 100644 --- a/api/src/main/java/marquez/db/models/ColumnLineageNodeData.java +++ b/api/src/main/java/marquez/db/models/ColumnLineageNodeData.java @@ -6,6 +6,8 @@ package marquez.db.models; import java.util.List; +import java.util.UUID; +import javax.annotation.Nullable; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.NonNull; @@ -15,6 +17,7 @@ public class ColumnLineageNodeData implements NodeData { @NonNull String namespace; @NonNull String dataset; + @Nullable UUID datasetVersion; @NonNull String field; @NonNull String fieldType; String transformationDescription; diff --git a/api/src/main/java/marquez/db/models/InputFieldNodeData.java b/api/src/main/java/marquez/db/models/InputFieldNodeData.java index a068bf271f..493c1d0414 100644 --- a/api/src/main/java/marquez/db/models/InputFieldNodeData.java +++ b/api/src/main/java/marquez/db/models/InputFieldNodeData.java @@ -5,14 +5,19 @@ package marquez.db.models; +import java.util.UUID; +import javax.annotation.Nullable; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.NonNull; +import lombok.ToString; @Getter @AllArgsConstructor +@ToString public class InputFieldNodeData { @NonNull String namespace; @NonNull String dataset; + @Nullable UUID datasetVersion; @NonNull String field; } diff --git a/api/src/main/java/marquez/service/ColumnLineageService.java b/api/src/main/java/marquez/service/ColumnLineageService.java index ccb0b27f66..180db08c9a 100644 --- a/api/src/main/java/marquez/service/ColumnLineageService.java +++ b/api/src/main/java/marquez/service/ColumnLineageService.java @@ -18,11 +18,15 @@ import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import marquez.common.models.DatasetFieldId; +import marquez.common.models.DatasetFieldVersionId; import marquez.common.models.DatasetId; +import marquez.common.models.DatasetVersionId; import marquez.common.models.JobId; +import marquez.common.models.JobVersionId; import marquez.db.ColumnLineageDao; import marquez.db.DatasetFieldDao; import marquez.db.models.ColumnLineageNodeData; +import marquez.db.models.InputFieldNodeData; import marquez.service.models.ColumnLineage; import marquez.service.models.ColumnLineageInputField; import marquez.service.models.Dataset; @@ -41,15 +45,18 @@ public ColumnLineageService(ColumnLineageDao dao, DatasetFieldDao datasetFieldDa this.datasetFieldDao = datasetFieldDao; } - public Lineage lineage(NodeId nodeId, int depth, boolean withDownstream, Instant createdAtUntil) { - List columnNodeUuids = getColumnNodeUuids(nodeId); - if (columnNodeUuids.isEmpty()) { + public Lineage lineage(NodeId nodeId, int depth, boolean withDownstream) { + ColumnNodes columnNodes = getColumnNodes(nodeId); + if (columnNodes.nodeIds.isEmpty()) { throw new NodeIdNotFoundException("Could not find node"); } - return toLineage(getLineage(depth, columnNodeUuids, withDownstream, createdAtUntil)); + + return toLineage( + getLineage(depth, columnNodes.nodeIds, withDownstream, columnNodes.createdAtUntil), + nodeId.hasVersion()); } - private Lineage toLineage(Set lineageNodeData) { + private Lineage toLineage(Set lineageNodeData, boolean includeVersion) { Map graphNodes = new HashMap<>(); Map> inEdges = new HashMap<>(); Map> outEdges = new HashMap<>(); @@ -58,18 +65,10 @@ private Lineage toLineage(Set lineageNodeData) { lineageNodeData.stream() .forEach( columnLineageNodeData -> { - NodeId nodeId = - NodeId.of( - DatasetFieldId.of( - columnLineageNodeData.getNamespace(), - columnLineageNodeData.getDataset(), - columnLineageNodeData.getField())); + NodeId nodeId = toNodeId(columnLineageNodeData, includeVersion); graphNodes.put(nodeId, Node.datasetField().data(columnLineageNodeData).id(nodeId)); columnLineageNodeData.getInputFields().stream() - .map( - i -> - NodeId.of( - DatasetFieldId.of(i.getNamespace(), i.getDataset(), i.getField()))) + .map(i -> toNodeId(i, includeVersion)) .forEach( inputNodeId -> { graphNodes.putIfAbsent(inputNodeId, Node.datasetField().id(inputNodeId)); @@ -110,28 +109,91 @@ private Lineage toLineage(Set lineageNodeData) { graphNodes.values().stream().map(Node.Builder::build).collect(Collectors.toSet()))); } - List getColumnNodeUuids(NodeId nodeId) { - List columnNodeUuids = new ArrayList<>(); - if (nodeId.isDatasetType()) { - DatasetId datasetId = nodeId.asDatasetId(); - columnNodeUuids.addAll( - datasetFieldDao.findDatasetFieldsUuids( - datasetId.getNamespace().getValue(), datasetId.getName().getValue())); + private static NodeId toNodeId(ColumnLineageNodeData node, boolean includeVersion) { + if (!includeVersion) { + return NodeId.of(DatasetFieldId.of(node.getNamespace(), node.getDataset(), node.getField())); + } else { + return NodeId.of( + DatasetFieldVersionId.of( + node.getNamespace(), node.getDataset(), node.getField(), node.getDatasetVersion())); + } + } + + private static NodeId toNodeId(InputFieldNodeData node, boolean includeVersion) { + if (!includeVersion) { + return NodeId.of(DatasetFieldId.of(node.getNamespace(), node.getDataset(), node.getField())); + } else { + return NodeId.of( + DatasetFieldVersionId.of( + node.getNamespace(), node.getDataset(), node.getField(), node.getDatasetVersion())); + } + } + + private ColumnNodes getColumnNodes(NodeId nodeId) { + if (nodeId.isDatasetFieldVersionType()) { + return getColumnNodes(nodeId.asDatasetFieldVersionId()); + } else if (nodeId.isDatasetVersionType()) { + return getColumnNodes(nodeId.asDatasetVersionId()); + } else if (nodeId.isJobVersionType()) { + return getColumnNodes(nodeId.asJobVersionId()); + } else if (nodeId.isDatasetType()) { + return getColumnNodes(nodeId.asDatasetId()); } else if (nodeId.isDatasetFieldType()) { - DatasetFieldId datasetFieldId = nodeId.asDatasetFieldId(); - datasetFieldDao - .findUuid( - datasetFieldId.getDatasetId().getNamespace().getValue(), - datasetFieldId.getDatasetId().getName().getValue(), - datasetFieldId.getFieldName().getValue()) - .ifPresent(uuid -> columnNodeUuids.add(uuid)); + return getColumnNodes(nodeId.asDatasetFieldId()); } else if (nodeId.isJobType()) { - JobId jobId = nodeId.asJobId(); - columnNodeUuids.addAll( - datasetFieldDao.findFieldsUuidsByJob( - jobId.getNamespace().getValue(), jobId.getName().getValue())); + return getColumnNodes(nodeId.asJobId()); } - return columnNodeUuids; + throw new UnsupportedOperationException("Unsupported NodeId: " + nodeId); + } + + private ColumnNodes getColumnNodes(DatasetVersionId datasetVersionId) { + List> fieldsWithInstant = + datasetFieldDao.findDatasetVersionFieldsUuids(datasetVersionId.getVersion()); + return new ColumnNodes( + fieldsWithInstant.stream().map(pair -> pair.getValue()).findAny().orElse(Instant.now()), + fieldsWithInstant.stream().map(pair -> pair.getKey()).collect(Collectors.toList())); + } + + private ColumnNodes getColumnNodes(DatasetFieldVersionId datasetFieldVersionId) { + List> fieldsWithInstant = + datasetFieldDao.findDatasetVersionFieldsUuids( + datasetFieldVersionId.getFieldName().getValue(), datasetFieldVersionId.getVersion()); + return new ColumnNodes( + fieldsWithInstant.stream().map(pair -> pair.getValue()).findAny().orElse(Instant.now()), + fieldsWithInstant.stream().map(pair -> pair.getKey()).collect(Collectors.toList())); + } + + private ColumnNodes getColumnNodes(JobVersionId jobVersionId) { + List> fieldsWithInstant = + datasetFieldDao.findFieldsUuidsByJobVersion(jobVersionId.getVersion()); + return new ColumnNodes( + fieldsWithInstant.stream().map(pair -> pair.getValue()).findAny().orElse(Instant.now()), + fieldsWithInstant.stream().map(pair -> pair.getKey()).collect(Collectors.toList())); + } + + private ColumnNodes getColumnNodes(DatasetId datasetId) { + return new ColumnNodes( + Instant.now(), + datasetFieldDao.findDatasetFieldsUuids( + datasetId.getNamespace().getValue(), datasetId.getName().getValue())); + } + + private ColumnNodes getColumnNodes(DatasetFieldId datasetFieldId) { + ColumnNodes columnNodes = new ColumnNodes(Instant.now(), new ArrayList<>()); + datasetFieldDao + .findUuid( + datasetFieldId.getDatasetId().getNamespace().getValue(), + datasetFieldId.getDatasetId().getName().getValue(), + datasetFieldId.getFieldName().getValue()) + .ifPresent(uuid -> columnNodes.nodeIds.add(uuid)); + return columnNodes; + } + + private ColumnNodes getColumnNodes(JobId jobId) { + return new ColumnNodes( + Instant.now(), + datasetFieldDao.findFieldsUuidsByJob( + jobId.getNamespace().getValue(), jobId.getName().getValue())); } public void enrichWithColumnLineage(List datasets) { @@ -180,4 +242,6 @@ public void enrichWithColumnLineage(List datasets) { .filter(dataset -> datasetLineage.containsKey(dataset)) .forEach(dataset -> dataset.setColumnLineage(datasetLineage.get(dataset))); } + + private record ColumnNodes(Instant createdAtUntil, List nodeIds) {} } diff --git a/api/src/main/java/marquez/service/models/NodeId.java b/api/src/main/java/marquez/service/models/NodeId.java index 04cbf2b6ce..6b4de0ed70 100644 --- a/api/src/main/java/marquez/service/models/NodeId.java +++ b/api/src/main/java/marquez/service/models/NodeId.java @@ -21,6 +21,7 @@ import lombok.NonNull; import lombok.ToString; import marquez.common.models.DatasetFieldId; +import marquez.common.models.DatasetFieldVersionId; import marquez.common.models.DatasetId; import marquez.common.models.DatasetName; import marquez.common.models.DatasetVersionId; @@ -116,13 +117,24 @@ public static NodeId of(@NonNull DatasetId datasetId) { return NodeId.of(datasetId.getNamespace(), datasetId.getName()); } - public static NodeId of(@NonNull DatasetFieldId datasetFieldIdId) { + public static NodeId of(@NonNull DatasetFieldId datasetFieldId) { return of( ID_JOINER.join( ID_PREFX_DATASET_FIELD, - datasetFieldIdId.getDatasetId().getNamespace().getValue(), - datasetFieldIdId.getDatasetId().getName().getValue(), - datasetFieldIdId.getFieldName().getValue())); + datasetFieldId.getDatasetId().getNamespace().getValue(), + datasetFieldId.getDatasetId().getName().getValue(), + datasetFieldId.getFieldName().getValue())); + } + + public static NodeId of(@NonNull DatasetFieldVersionId datasetFieldVersionId) { + return of( + appendVersionTo( + ID_JOINER.join( + ID_PREFX_DATASET_FIELD, + datasetFieldVersionId.getDatasetId().getNamespace().getValue(), + datasetFieldVersionId.getDatasetId().getName().getValue(), + datasetFieldVersionId.getFieldName().getValue()), + datasetFieldVersionId.getVersion())); } public static NodeId of(@NonNull JobId jobId) { @@ -156,6 +168,11 @@ public boolean isDatasetFieldType() { return value.startsWith(ID_PREFX_DATASET_FIELD); } + @JsonIgnore + public boolean isDatasetFieldVersionType() { + return value.startsWith(ID_PREFX_DATASET_FIELD) && hasVersion(); + } + @JsonIgnore public boolean isDatasetVersionType() { return value.startsWith(ID_PREFX_DATASET) && hasVersion(); @@ -243,12 +260,22 @@ public DatasetId asDatasetId() { @JsonIgnore public DatasetFieldId asDatasetFieldId() { - String[] parts = parts(4, ID_PREFX_DATASET); + String[] parts = parts(4, ID_PREFX_DATASET_FIELD); return new DatasetFieldId( new DatasetId(NamespaceName.of(parts[1]), DatasetName.of(parts[2])), FieldName.of(parts[3])); } + @JsonIgnore + public DatasetFieldVersionId asDatasetFieldVersionId() { + String[] parts = parts(4, ID_PREFX_DATASET_FIELD); + String[] nameAndVersion = parts[3].split(VERSION_DELIM); + return new DatasetFieldVersionId( + new DatasetId(NamespaceName.of(parts[1]), DatasetName.of(parts[2])), + FieldName.of(nameAndVersion[0]), + UUID.fromString(nameAndVersion[1])); + } + @JsonIgnore public JobVersionId asJobVersionId() { String[] parts = parts(3, ID_PREFX_JOB); diff --git a/api/src/test/java/marquez/api/ColumnLineageResourceTest.java b/api/src/test/java/marquez/api/ColumnLineageResourceTest.java index 9ebf944e40..43cab8eeb2 100644 --- a/api/src/test/java/marquez/api/ColumnLineageResourceTest.java +++ b/api/src/test/java/marquez/api/ColumnLineageResourceTest.java @@ -5,6 +5,7 @@ package marquez.api; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; @@ -15,7 +16,6 @@ import com.google.common.collect.ImmutableSortedSet; import io.dropwizard.testing.junit5.DropwizardExtensionsSupport; import io.dropwizard.testing.junit5.ResourceExtension; -import java.time.Instant; import java.util.Map; import marquez.common.Utils; import marquez.service.ColumnLineageService; @@ -40,8 +40,7 @@ public class ColumnLineageResourceTest { ColumnLineageResourceTest.class.getResourceAsStream("/column_lineage/node.json"), new TypeReference<>() {}); LINEAGE = new Lineage(ImmutableSortedSet.of(testNode)); - when(lineageService.lineage(any(NodeId.class), eq(20), eq(false), any(Instant.class))) - .thenReturn(LINEAGE); + when(lineageService.lineage(any(NodeId.class), eq(20), eq(false))).thenReturn(LINEAGE); ServiceFactory serviceFactory = ApiTestUtils.mockServiceFactory(Map.of(ColumnLineageService.class, lineageService)); @@ -75,4 +74,19 @@ public void testGetColumnLineageByDataset() { assertEquals(lineage, LINEAGE); } + + @Test + public void testGetColumnLineageByVersionedNodeWithDownstream() { + assertThat( + UNDER_TEST + .target("/api/v1/column-lineage") + .queryParam( + "nodeId", + "dataset:namespace:commonDataset#aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee") + .queryParam("withDownstream", "true") + .request() + .get() + .getStatus()) + .isEqualTo(400); + } } diff --git a/api/src/test/java/marquez/db/ColumnLineageDaoTest.java b/api/src/test/java/marquez/db/ColumnLineageDaoTest.java index 2f2e7c20f7..c84ec5bb14 100644 --- a/api/src/test/java/marquez/db/ColumnLineageDaoTest.java +++ b/api/src/test/java/marquez/db/ColumnLineageDaoTest.java @@ -5,6 +5,7 @@ package marquez.db; +import static marquez.db.ColumnLineageTestUtils.createLineage; import static marquez.db.ColumnLineageTestUtils.getDatasetA; import static marquez.db.ColumnLineageTestUtils.getDatasetB; import static marquez.db.ColumnLineageTestUtils.getDatasetC; @@ -62,6 +63,10 @@ public class ColumnLineageDaoTest { private DatasetVersionRow outputDatasetVersionRow; private LineageEvent.JobFacet jobFacet; + private Dataset dataset_A = getDatasetA(); // dataset_A (col_a, col_b) + private Dataset dataset_B = getDatasetB(); // dataset_B (col_c) depends on (col_a, col_b) + private Dataset dataset_C = getDatasetC(); // dataset_C (col_d) depends on col_c + @BeforeAll public static void setUpOnce(Jdbi jdbi) { openLineageDao = jdbi.onDemand(OpenLineageDao.class); @@ -144,8 +149,6 @@ public void setup() { // insert output dataset field fieldDao.upsert( outputDatasetFieldUuid, now, "output-field", "string", "desc", outputDatasetRow.getUuid()); - - jobFacet = new LineageEvent.JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); } @AfterEach @@ -228,36 +231,11 @@ void testUpsertOnUpdatePreventsDuplicates() { now.plusSeconds(1000).getEpochSecond(), rows.get(0).getUpdatedAt().getEpochSecond()); } - // dataset_A (col_a, col_b) - // dataset_B (col_c) depends on (col_a, col_b) - // dataset_C (col_d) depends on col_c @Test void testGetLineage() { - Dataset dataset_A = getDatasetA(); - Dataset dataset_B = getDatasetB(); - Dataset dataset_C = getDatasetC(); - - LineageTestUtils.createLineageRow( - openLineageDao, - "job1", - "COMPLETE", - jobFacet, - Arrays.asList(dataset_A), - Arrays.asList(dataset_B)); - - UpdateLineageRow lineageRow = - LineageTestUtils.createLineageRow( - openLineageDao, - "job2", - "COMPLETE", - jobFacet, - Arrays.asList(dataset_B), - Arrays.asList(dataset_C)); - - UpdateLineageRow.DatasetRecord datasetRecord_c = lineageRow.getOutputs().get().get(0); - UUID field_col_d = fieldDao.findUuid(datasetRecord_c.getDatasetRow().getUuid(), "col_d").get(); - Set lineage = - dao.getLineage(20, Collections.singletonList(field_col_d), false, Instant.now()); + createLineage(openLineageDao, dataset_A, dataset_B); + UpdateLineageRow lineageRow = createLineage(openLineageDao, dataset_B, dataset_C); + Set lineage = getColumnLineage(lineageRow, "col_d"); assertEquals(2, lineage.size()); @@ -301,26 +279,8 @@ void testGetLineage() { @Test void testGetLineageWhenNoLineageForColumn() { - Dataset dataset_A = getDatasetA(); - Dataset dataset_B = getDatasetB(); - Dataset dataset_C = getDatasetC(); - - UpdateLineageRow lineageRow = - LineageTestUtils.createLineageRow( - openLineageDao, - "job1", - "COMPLETE", - jobFacet, - Arrays.asList(dataset_A), - Arrays.asList(dataset_B)); - - LineageTestUtils.createLineageRow( - openLineageDao, - "job2", - "COMPLETE", - jobFacet, - Arrays.asList(dataset_B), - Arrays.asList(dataset_C)); + UpdateLineageRow lineageRow = createLineage(openLineageDao, dataset_A, dataset_B); + createLineage(openLineageDao, dataset_B, dataset_C); UpdateLineageRow.DatasetRecord datasetRecord_a = lineageRow.getInputs().get().get(0); UUID field_col_a = fieldDao.findUuid(datasetRecord_a.getDatasetRow().getUuid(), "col_a").get(); @@ -336,9 +296,6 @@ void testGetLineageWhenNoLineageForColumn() { */ @Test void testGetLineageWithLimitedDepth() { - Dataset dataset_A = getDatasetA(); - Dataset dataset_B = getDatasetB(); - Dataset dataset_C = getDatasetC(); Dataset dataset_D = new Dataset( "namespace", @@ -364,30 +321,9 @@ void testGetLineageWithLimitedDepth() { ""))))) .build()); - LineageTestUtils.createLineageRow( - openLineageDao, - "job1", - "COMPLETE", - jobFacet, - Arrays.asList(dataset_A), - Arrays.asList(dataset_B)); - - LineageTestUtils.createLineageRow( - openLineageDao, - "job2", - "COMPLETE", - jobFacet, - Arrays.asList(dataset_B), - Arrays.asList(dataset_C)); - - UpdateLineageRow lineageRow = - LineageTestUtils.createLineageRow( - openLineageDao, - "job2", - "COMPLETE", - jobFacet, - Arrays.asList(dataset_C), - Arrays.asList(dataset_D)); + createLineage(openLineageDao, dataset_A, dataset_B); + createLineage(openLineageDao, dataset_B, dataset_C); + UpdateLineageRow lineageRow = createLineage(openLineageDao, dataset_C, dataset_D); UpdateLineageRow.DatasetRecord datasetRecord_d = lineageRow.getOutputs().get().get(0); UUID field_col_e = fieldDao.findUuid(datasetRecord_d.getDatasetRow().getUuid(), "col_e").get(); @@ -429,33 +365,10 @@ void testGetLineageWhenCycleExists() { "description3", "type3"))))) .build()); - Dataset dataset_B = getDatasetB(); - Dataset dataset_C = getDatasetC(); - - LineageTestUtils.createLineageRow( - openLineageDao, - "job1", - "COMPLETE", - jobFacet, - Arrays.asList(dataset_A), - Arrays.asList(dataset_B)); - - LineageTestUtils.createLineageRow( - openLineageDao, - "job2", - "COMPLETE", - jobFacet, - Arrays.asList(dataset_B), - Arrays.asList(dataset_C)); - - UpdateLineageRow lineageRow = - LineageTestUtils.createLineageRow( - openLineageDao, - "job3", - "COMPLETE", - jobFacet, - Arrays.asList(dataset_C), - Arrays.asList(dataset_A)); + + createLineage(openLineageDao, dataset_A, dataset_B); + createLineage(openLineageDao, dataset_B, dataset_C); + UpdateLineageRow lineageRow = createLineage(openLineageDao, dataset_C, dataset_A); UpdateLineageRow.DatasetRecord datasetRecord_a = lineageRow.getOutputs().get().get(0); UpdateLineageRow.DatasetRecord datasetRecord_c = lineageRow.getInputs().get().get(0); @@ -476,9 +389,6 @@ void testGetLineageWhenCycleExists() { */ @Test void testGetLineageWhenTwoJobsWriteToSameDataset() { - Dataset dataset_A = getDatasetA(); - Dataset dataset_B = getDatasetB(); - Dataset dataset_C = getDatasetC(); Dataset dataset_B_another_job = new Dataset( "namespace", @@ -504,29 +414,12 @@ void testGetLineageWhenTwoJobsWriteToSameDataset() { "type1"))))) .build()); - LineageTestUtils.createLineageRow( - openLineageDao, - "job1", - "COMPLETE", - jobFacet, - Arrays.asList(dataset_A), - Arrays.asList(dataset_B)); - - UpdateLineageRow lineageRow = - LineageTestUtils.createLineageRow( - openLineageDao, - "job1", - "COMPLETE", - jobFacet, - Arrays.asList(dataset_C), - Arrays.asList(dataset_B_another_job)); - - UpdateLineageRow.DatasetRecord datasetRecord_b = lineageRow.getOutputs().get().get(0); - UUID field_col_c = fieldDao.findUuid(datasetRecord_b.getDatasetRow().getUuid(), "col_c").get(); + createLineage(openLineageDao, dataset_A, dataset_B); + UpdateLineageRow lineageRow = createLineage(openLineageDao, dataset_C, dataset_B_another_job); // assert input fields for col_d contain col_a and col_c List inputFields = - dao.getLineage(20, Collections.singletonList(field_col_c), false, Instant.now()).stream() + getColumnLineage(lineageRow, "col_c").stream() .filter(node -> node.getDataset().equals("dataset_b")) .flatMap(node -> node.getInputFields().stream()) .map(input -> input.getField()) @@ -537,17 +430,7 @@ void testGetLineageWhenTwoJobsWriteToSameDataset() { @Test void testGetLineagePointInTime() { - Dataset dataset_A = getDatasetA(); - Dataset dataset_B = getDatasetB(); - - UpdateLineageRow lineageRow = - LineageTestUtils.createLineageRow( - openLineageDao, - "job1", - "COMPLETE", - jobFacet, - Arrays.asList(dataset_A), - Arrays.asList(dataset_B)); + UpdateLineageRow lineageRow = createLineage(openLineageDao, dataset_A, dataset_B); UpdateLineageRow.DatasetRecord datasetRecord_b = lineageRow.getOutputs().get().get(0); UUID field_col_b = fieldDao.findUuid(datasetRecord_b.getDatasetRow().getUuid(), "col_c").get(); @@ -576,29 +459,25 @@ void testGetLineagePointInTime() { @Test void testGetLineageWhenJobRunMultipleTimes() { - Dataset dataset_A = getDatasetA(); - Dataset dataset_B = getDatasetB(); - - LineageTestUtils.createLineageRow( - openLineageDao, - "job1", - "COMPLETE", - jobFacet, - Arrays.asList(dataset_A), - Arrays.asList(dataset_B)); - UpdateLineageRow lineageRow = - LineageTestUtils.createLineageRow( - openLineageDao, - "job1", - "COMPLETE", - jobFacet, - Arrays.asList(dataset_A), - Arrays.asList(dataset_B)); + createLineage(openLineageDao, dataset_A, dataset_B); + createLineage(openLineageDao, dataset_A, dataset_B); + UpdateLineageRow lineageRow = createLineage(openLineageDao, dataset_A, dataset_B); - UpdateLineageRow.DatasetRecord datasetRecord_b = lineageRow.getOutputs().get().get(0); - UUID field_col_b = fieldDao.findUuid(datasetRecord_b.getDatasetRow().getUuid(), "col_c").get(); + Set columnLineage = getColumnLineage(lineageRow, "col_c"); + assertThat(columnLineage).hasSize(1); - assertThat(dao.getLineage(20, Collections.singletonList(field_col_b), false, Instant.now())) - .hasSize(1); + ColumnLineageNodeData node = columnLineage.stream().findAny().get(); + + assertThat(node.getDatasetVersion()) + .isEqualTo(lineageRow.getOutputs().get().get(0).getDatasetVersionRow().getUuid()); + assertThat(node.getInputFields().get(0).getDatasetVersion()) + .isEqualTo(lineageRow.getInputs().get().get(0).getDatasetVersionRow().getUuid()); + } + + private Set getColumnLineage(UpdateLineageRow lineageRow, String field) { + UpdateLineageRow.DatasetRecord datasetRecord = lineageRow.getOutputs().get().get(0); + UUID field_UUID = fieldDao.findUuid(datasetRecord.getDatasetRow().getUuid(), field).get(); + + return dao.getLineage(20, Collections.singletonList(field_UUID), false, Instant.now()); } } diff --git a/api/src/test/java/marquez/db/ColumnLineageTestUtils.java b/api/src/test/java/marquez/db/ColumnLineageTestUtils.java index c6f6e6ff82..861fdf869c 100644 --- a/api/src/test/java/marquez/db/ColumnLineageTestUtils.java +++ b/api/src/test/java/marquez/db/ColumnLineageTestUtils.java @@ -10,7 +10,9 @@ import java.util.Arrays; import java.util.Collections; +import java.util.UUID; import marquez.api.JdbiUtils; +import marquez.db.models.UpdateLineageRow; import marquez.service.models.LineageEvent; import org.jdbi.v3.core.Jdbi; @@ -105,4 +107,17 @@ public static LineageEvent.Dataset getDatasetC() { PRODUCER_URL, SCHEMA_URL, "the source", "http://thesource.com")) .build()); } + + public static UpdateLineageRow createLineage( + OpenLineageDao openLineageDao, LineageEvent.Dataset input, LineageEvent.Dataset output) { + LineageEvent.JobFacet jobFacet = + new LineageEvent.JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + return LineageTestUtils.createLineageRow( + openLineageDao, + "job_" + UUID.randomUUID(), + "COMPLETE", + jobFacet, + Arrays.asList(input), + Arrays.asList(output)); + } } diff --git a/api/src/test/java/marquez/service/ColumnLineageServiceTest.java b/api/src/test/java/marquez/service/ColumnLineageServiceTest.java index 540b3e6e08..a95ecd483a 100644 --- a/api/src/test/java/marquez/service/ColumnLineageServiceTest.java +++ b/api/src/test/java/marquez/service/ColumnLineageServiceTest.java @@ -5,6 +5,7 @@ package marquez.service; +import static marquez.db.ColumnLineageTestUtils.createLineage; import static marquez.db.ColumnLineageTestUtils.getDatasetA; import static marquez.db.ColumnLineageTestUtils.getDatasetB; import static marquez.db.ColumnLineageTestUtils.getDatasetC; @@ -13,15 +14,18 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; -import java.time.Instant; import java.util.Arrays; import java.util.List; import java.util.Optional; import marquez.common.models.DatasetFieldId; +import marquez.common.models.DatasetFieldVersionId; import marquez.common.models.DatasetId; import marquez.common.models.DatasetName; +import marquez.common.models.DatasetVersionId; +import marquez.common.models.FieldName; import marquez.common.models.JobId; import marquez.common.models.JobName; +import marquez.common.models.JobVersionId; import marquez.common.models.NamespaceName; import marquez.db.ColumnLineageDao; import marquez.db.ColumnLineageTestUtils; @@ -31,6 +35,7 @@ import marquez.db.OpenLineageDao; import marquez.db.models.ColumnLineageNodeData; import marquez.db.models.InputFieldNodeData; +import marquez.db.models.UpdateLineageRow; import marquez.jdbi.MarquezJdbiExternalPostgresExtension; import marquez.service.models.ColumnLineageInputField; import marquez.service.models.Dataset; @@ -54,6 +59,10 @@ public class ColumnLineageServiceTest { private static ColumnLineageService lineageService; private static LineageEvent.JobFacet jobFacet; + private LineageEvent.Dataset dataset_A = getDatasetA(); + private LineageEvent.Dataset dataset_B = getDatasetB(); + private LineageEvent.Dataset dataset_C = getDatasetC(); + @BeforeAll public static void setUpOnce(Jdbi jdbi) { dao = jdbi.onDemand(ColumnLineageDao.class); @@ -71,31 +80,12 @@ public void tearDown(Jdbi jdbi) { @Test public void testLineageByDatasetFieldId() { - LineageEvent.Dataset dataset_A = getDatasetA(); - LineageEvent.Dataset dataset_B = getDatasetB(); - LineageEvent.Dataset dataset_C = getDatasetC(); - - LineageTestUtils.createLineageRow( - openLineageDao, - "job1", - "COMPLETE", - jobFacet, - Arrays.asList(dataset_A), - Arrays.asList(dataset_B)); - LineageTestUtils.createLineageRow( - openLineageDao, - "job2", - "COMPLETE", - jobFacet, - Arrays.asList(dataset_B), - Arrays.asList(dataset_C)); + createLineage(openLineageDao, dataset_A, dataset_B); + createLineage(openLineageDao, dataset_B, dataset_C); Lineage lineage = lineageService.lineage( - NodeId.of(DatasetFieldId.of("namespace", "dataset_b", "col_c")), - 20, - false, - Instant.now()); + NodeId.of(DatasetFieldId.of("namespace", "dataset_b", "col_c")), 20, false); assertThat(lineage.getGraph()).hasSize(3); @@ -136,76 +126,47 @@ public void testLineageByDatasetFieldId() { // verify dataset_C not present in the graph assertThat(getNode(lineage, "dataset_c", "col_d")).isEmpty(); + assertThat( + lineage.getGraph().stream() + .filter(node -> node.getId().isDatasetFieldVersionType()) + .findAny()) + .isEmpty(); // none of the graph nodes contains version } @Test public void testLineageByDatasetId() { - LineageEvent.Dataset dataset_A = getDatasetA(); - LineageEvent.Dataset dataset_B = getDatasetB(); - LineageEvent.Dataset dataset_C = getDatasetC(); - - LineageTestUtils.createLineageRow( - openLineageDao, - "job1", - "COMPLETE", - jobFacet, - Arrays.asList(dataset_A), - Arrays.asList(dataset_B)); - LineageTestUtils.createLineageRow( - openLineageDao, - "job2", - "COMPLETE", - jobFacet, - Arrays.asList(dataset_B), - Arrays.asList(dataset_C)); + createLineage(openLineageDao, dataset_A, dataset_B); + createLineage(openLineageDao, dataset_B, dataset_C); Lineage lineageByField = lineageService.lineage( - NodeId.of(DatasetFieldId.of("namespace", "dataset_b", "col_c")), - 20, - false, - Instant.now()); + NodeId.of(DatasetFieldId.of("namespace", "dataset_b", "col_c")), 20, false); Lineage lineageByDataset = lineageService.lineage( NodeId.of(new DatasetId(NamespaceName.of("namespace"), DatasetName.of("dataset_b"))), 20, - false, - Instant.now()); + false); // lineage of dataset and column should be equal assertThat(lineageByField).isEqualTo(lineageByDataset); + assertThat( + lineageByDataset.getGraph().stream() + .filter(node -> node.getId().isDatasetFieldVersionType()) + .findAny()) + .isEmpty(); // none of the graph nodes contains version } @Test public void testLineageWhenLineageEmpty() { - LineageEvent.Dataset dataset_A = getDatasetA(); - LineageEvent.Dataset dataset_B = getDatasetB(); - LineageEvent.Dataset dataset_C = getDatasetC(); - - LineageTestUtils.createLineageRow( - openLineageDao, - "job1", - "COMPLETE", - jobFacet, - Arrays.asList(dataset_A), - Arrays.asList(dataset_B)); - LineageTestUtils.createLineageRow( - openLineageDao, - "job2", - "COMPLETE", - jobFacet, - Arrays.asList(dataset_B), - Arrays.asList(dataset_C)); + createLineage(openLineageDao, dataset_A, dataset_B); + createLineage(openLineageDao, dataset_B, dataset_C); assertThrows( NodeIdNotFoundException.class, () -> lineageService.lineage( - NodeId.of(DatasetFieldId.of("namespace", "dataset_b", "col_d")), - 20, - false, - Instant.now())); + NodeId.of(DatasetFieldId.of("namespace", "dataset_b", "col_d")), 20, false)); assertThrows( NodeIdNotFoundException.class, @@ -214,41 +175,19 @@ public void testLineageWhenLineageEmpty() { NodeId.of( new DatasetId(NamespaceName.of("namespace"), DatasetName.of("dataset_d"))), 20, - false, - Instant.now())); + false)); assertThat( lineageService - .lineage( - NodeId.of(DatasetFieldId.of("namespace", "dataset_a", "col_a")), - 20, - false, - Instant.now()) + .lineage(NodeId.of(DatasetFieldId.of("namespace", "dataset_a", "col_a")), 20, false) .getGraph()) .hasSize(0); } @Test public void testEnrichDatasets() { - LineageEvent.Dataset dataset_A = getDatasetA(); - LineageEvent.Dataset dataset_B = getDatasetB(); - LineageEvent.Dataset dataset_C = getDatasetC(); - - LineageTestUtils.createLineageRow( - openLineageDao, - "job1", - "COMPLETE", - jobFacet, - Arrays.asList(dataset_A), - Arrays.asList(dataset_B)); - - LineageTestUtils.createLineageRow( - openLineageDao, - "job2", - "COMPLETE", - jobFacet, - Arrays.asList(dataset_B), - Arrays.asList(dataset_C)); + createLineage(openLineageDao, dataset_A, dataset_B); + createLineage(openLineageDao, dataset_B, dataset_C); Dataset dataset_b = datasetDao.findDatasetByName("namespace", "dataset_b").get(); Dataset dataset_c = datasetDao.findDatasetByName("namespace", "dataset_c").get(); @@ -282,32 +221,12 @@ public void testEnrichDatasets() { @Test public void testGetLineageWithDownstream() { - LineageEvent.Dataset dataset_A = getDatasetA(); - LineageEvent.Dataset dataset_B = getDatasetB(); - LineageEvent.Dataset dataset_C = getDatasetC(); - - LineageTestUtils.createLineageRow( - openLineageDao, - "job1", - "COMPLETE", - jobFacet, - Arrays.asList(dataset_A), - Arrays.asList(dataset_B)); - - LineageTestUtils.createLineageRow( - openLineageDao, - "job2", - "COMPLETE", - jobFacet, - Arrays.asList(dataset_B), - Arrays.asList(dataset_C)); + createLineage(openLineageDao, dataset_A, dataset_B); + createLineage(openLineageDao, dataset_B, dataset_C); Lineage lineage = lineageService.lineage( - NodeId.of(DatasetFieldId.of("namespace", "dataset_b", "col_c")), - 20, - true, - Instant.now()); + NodeId.of(DatasetFieldId.of("namespace", "dataset_b", "col_c")), 20, true); // assert that get lineage of dataset_B should co also return dataset_A and dataset_C assertThat( @@ -333,24 +252,8 @@ public void testGetLineageWithDownstream() { @Test public void testEnrichDatasetsHasNoDuplicates() { - LineageEvent.Dataset dataset_A = getDatasetA(); - LineageEvent.Dataset dataset_B = getDatasetB(); - - // run job twice - LineageTestUtils.createLineageRow( - openLineageDao, - "job1", - "COMPLETE", - jobFacet, - Arrays.asList(dataset_A), - Arrays.asList(dataset_B)); - LineageTestUtils.createLineageRow( - openLineageDao, - "job1", - "COMPLETE", - jobFacet, - Arrays.asList(dataset_A), - Arrays.asList(dataset_B)); + createLineage(openLineageDao, dataset_A, dataset_B); + createLineage(openLineageDao, dataset_B, dataset_C); Dataset dataset_b = datasetDao.findDatasetByName("namespace", "dataset_b").get(); lineageService.enrichWithColumnLineage(Arrays.asList(dataset_b)); @@ -359,10 +262,6 @@ public void testEnrichDatasetsHasNoDuplicates() { @Test public void testGetLineageByJob() { - LineageEvent.Dataset dataset_A = getDatasetA(); - LineageEvent.Dataset dataset_B = getDatasetB(); - LineageEvent.Dataset dataset_C = getDatasetC(); - LineageTestUtils.createLineageRow( openLineageDao, "job1", @@ -382,22 +281,83 @@ public void testGetLineageByJob() { // getting lineage by job_1 should be the same as getting it by dataset_B assertThat( lineageService.lineage( - NodeId.of(JobId.of(NamespaceName.of("namespace"), JobName.of("job1"))), - 20, - true, - Instant.now())) + NodeId.of(JobId.of(NamespaceName.of("namespace"), JobName.of("job1"))), 20, true)) .isEqualTo( lineageService.lineage( NodeId.of( new DatasetId(NamespaceName.of("namespace"), DatasetName.of("dataset_b"))), 20, - true, - Instant.now())); + true)); + } + + @Test + public void testGetLineagePointInTime() { + createLineage(openLineageDao, dataset_A, dataset_B); + UpdateLineageRow lineageRow = + createLineage(openLineageDao, dataset_A, dataset_B); // we will obtain this version + createLineage(openLineageDao, dataset_A, dataset_B); + + Lineage lineage = + lineageService.lineage( + NodeId.of( + new DatasetVersionId( + NamespaceName.of("namespace"), + DatasetName.of("dataset_b"), + lineageRow.getOutputs().get().get(0).getDatasetVersionRow().getUuid())), + 20, + false); + + assertThat(lineage.getGraph().size()).isEqualTo(3); // col_a, col_b and col_c + assertThat( + getNode(lineage, "dataset_a", "col_b") + .get() + .getId() + .asDatasetFieldVersionId() + .getVersion()) + .isEqualTo(lineageRow.getInputs().get().get(0).getDatasetVersionRow().getUuid()); + assertThat( + getNode(lineage, "dataset_b", "col_c") + .get() + .getId() + .asDatasetFieldVersionId() + .getVersion()) + .isEqualTo(lineageRow.getOutputs().get().get(0).getDatasetVersionRow().getUuid()); + + // assert lineage by field version and by job are the same + assertThat(lineage) + .isEqualTo( + lineageService.lineage( + NodeId.of( + new DatasetFieldVersionId( + new DatasetId(NamespaceName.of("namespace"), DatasetName.of("dataset_b")), + FieldName.of("col_c"), + lineageRow.getOutputs().get().get(0).getDatasetVersionRow().getUuid())), + 20, + false)); + + assertThat(lineage) + .isEqualTo( + lineageService.lineage( + NodeId.of( + JobVersionId.of( + NamespaceName.of("namespace"), + JobName.of("job1"), + lineageRow.getJobVersionBag().getJobVersionRow().getUuid())), + 20, + true)); } private Optional getNode(Lineage lineage, String datasetName, String fieldName) { return lineage.getGraph().stream() - .filter(n -> n.getId().asDatasetFieldId().getFieldName().getValue().equals(fieldName)) + .filter( + n -> + n.getId().isDatasetFieldVersionType() + && n.getId() + .asDatasetFieldVersionId() + .getFieldName() + .getValue() + .equals(fieldName) + || n.getId().asDatasetFieldId().getFieldName().getValue().equals(fieldName)) .filter( n -> n.getId() diff --git a/api/src/test/java/marquez/service/models/NodeIdTest.java b/api/src/test/java/marquez/service/models/NodeIdTest.java index f7b15b9300..a65b76dbd1 100644 --- a/api/src/test/java/marquez/service/models/NodeIdTest.java +++ b/api/src/test/java/marquez/service/models/NodeIdTest.java @@ -141,4 +141,34 @@ public void testDatasetField(String namespace, String dataset, String field) { assertEquals(dataset, nodeId.asDatasetFieldId().getDatasetId().getName().getValue()); assertEquals(field, nodeId.asDatasetFieldId().getFieldName().getValue()); } + + @ParameterizedTest(name = "testDatasetField-{index} {argumentsWithNames}") + @CsvSource( + value = { + "my-namespace$my-dataset$colA#aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee", + "gs://bucket$/path/to/data$colA#aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee", + "gs://bucket$/path/to/data$col_A#aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee" + }, + delimiter = '$') + public void testDatasetFieldVersion(String namespace, String dataset, String field) { + NamespaceName namespaceName = NamespaceName.of(namespace); + FieldName fieldName = FieldName.of(field); + DatasetName datasetName = DatasetName.of(dataset); + DatasetId dsId = new DatasetId(namespaceName, datasetName); + DatasetFieldId dsfId = new DatasetFieldId(dsId, fieldName); + NodeId nodeId = NodeId.of(dsfId); + assertFalse(nodeId.isRunType()); + assertFalse(nodeId.isJobType()); + assertFalse(nodeId.isDatasetType()); + assertTrue(nodeId.hasVersion()); + assertTrue(nodeId.isDatasetFieldVersionType()); + + assertEquals(dsfId, nodeId.asDatasetFieldId()); + assertEquals(nodeId, NodeId.of(nodeId.getValue())); + assertEquals(namespace, nodeId.asDatasetFieldId().getDatasetId().getNamespace().getValue()); + assertEquals(dataset, nodeId.asDatasetFieldId().getDatasetId().getName().getValue()); + assertEquals(field, nodeId.asDatasetFieldId().getFieldName().getValue()); + assertEquals( + field.split(VERSION_DELIM)[1], nodeId.asDatasetFieldVersionId().getVersion().toString()); + } }