Skip to content

Commit

Permalink
fix(lineagecounts) Include entities that are filtered out due to sibl…
Browse files Browse the repository at this point in the history
…ing logic in the filtered count of lineage counts (datahub-project#8152)

Co-authored-by: Indy Prentice <[email protected]>
  • Loading branch information
iprentic and Indy Prentice committed May 31, 2023
1 parent 0b0e499 commit 6c445c8
Show file tree
Hide file tree
Showing 4 changed files with 486 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public CompletableFuture<EntityCountResults> get(final DataFetchingEnvironment e
try {
// First, get all counts
Map<String, Long> gmsResult = _entityClient.batchGetTotalEntityCount(
input.getTypes().stream().map(type -> EntityTypeMapper.getName(type)).collect(Collectors.toList()), context.getAuthentication());
input.getTypes().stream().map(EntityTypeMapper::getName).collect(Collectors.toList()), context.getAuthentication());

// bind to a result.
List<EntityCountResult> resultList = gmsResult.entrySet().stream().map(entry -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,77 +122,88 @@ public EntityLineageResult getLineage(@Nonnull Urn entityUrn, @Nonnull LineageDi
offset = Math.max(0, offset - nextEntityLineage.getTotal());
count = Math.max(0, count - nextEntityLineage.getCount() - entityLineage.getCount());

entityLineage.setFiltered(getFiltered(entityLineage) + getFiltered(nextEntityLineage));
entityLineage = nextEntityLineage;
};
}

return ValidationUtils.validateEntityLineageResult(entityLineage, _entityService);
}

private int getFiltered(@Nullable EntityLineageResult entityLineageResult) {
return (entityLineageResult != null && entityLineageResult.getFiltered() != null ? entityLineageResult.getFiltered() : 0);
}

// takes a lineage result and removes any nodes that are siblings of some other node already in the result
private EntityLineageResult filterLineageResultFromSiblings(
@Nonnull final Urn urn,
@Nonnull final Set<Urn> allSiblingsInGroup,
@Nonnull final EntityLineageResult entityLineageResult,
@Nullable final EntityLineageResult existingResult
) {
int numFiltered = 0;

// 1) remove the source entities siblings from this entity's downstreams
final List<LineageRelationship> filteredRelationships = entityLineageResult.getRelationships()
.stream()
.filter(lineageRelationship -> !allSiblingsInGroup.contains(lineageRelationship.getEntity())
|| lineageRelationship.getEntity().equals(urn))
.collect(Collectors.toList());
final Map<Boolean, List<LineageRelationship>> partitionedFilteredRelationships = entityLineageResult.getRelationships()
.stream().collect(Collectors.partitioningBy(
lineageRelationship -> !allSiblingsInGroup.contains(lineageRelationship.getEntity())
|| lineageRelationship.getEntity().equals(urn)));
numFiltered += partitionedFilteredRelationships.get(Boolean.FALSE).size();

final List<LineageRelationship> filteredRelationships = partitionedFilteredRelationships.get(Boolean.TRUE);

// 2) filter out existing lineage to avoid duplicates in our combined result
final Set<Urn> existingUrns = existingResult != null
? existingResult.getRelationships().stream().map(LineageRelationship::getEntity).collect(Collectors.toSet())
: new HashSet<>();
List<LineageRelationship> uniqueFilteredRelationships = filteredRelationships.stream().filter(
lineageRelationship -> !existingUrns.contains(lineageRelationship.getEntity())).collect(Collectors.toList());

// 3) combine this entity's lineage with the lineage we've already seen and remove duplicates
Map<Boolean, List<LineageRelationship>> partitionedUniqueFilteredRelationships = filteredRelationships.stream().collect(
Collectors.partitioningBy(lineageRelationship -> !existingUrns.contains(lineageRelationship.getEntity())));
numFiltered += partitionedUniqueFilteredRelationships.get(Boolean.FALSE).size();

List<LineageRelationship> uniqueFilteredRelationships = partitionedUniqueFilteredRelationships.get(Boolean.TRUE);

// 3) combine this entity's lineage with the lineage we've already seen
final List<LineageRelationship> combinedResults = Stream.concat(
uniqueFilteredRelationships.stream(),
existingResult != null ? existingResult.getRelationships().stream() : ImmutableList.<LineageRelationship>of().stream())
.collect(Collectors.toList());

// 4) fetch the siblings of each lineage result
final Set<Urn> combinedResultUrns = combinedResults.stream().map(result -> result.getEntity()).collect(Collectors.toSet());
final Set<Urn> combinedResultUrns = combinedResults.stream().map(LineageRelationship::getEntity).collect(Collectors.toSet());

final Map<Urn, List<RecordTemplate>> siblingAspects =
_entityService.getLatestAspects(combinedResultUrns, ImmutableSet.of(SIBLINGS_ASPECT_NAME));

// 5) if you are not primary & your sibling is in the results, filter yourself out of the return set
uniqueFilteredRelationships = combinedResults.stream().filter(result -> {
Map<Boolean, List<LineageRelationship>> partitionedFilteredSiblings = combinedResults.stream().collect(Collectors.partitioningBy(result -> {
Optional<RecordTemplate> optionalSiblingsAspect = siblingAspects.get(result.getEntity()).stream().filter(
aspect -> aspect instanceof Siblings
).findAny();

if (!optionalSiblingsAspect.isPresent()) {
if (optionalSiblingsAspect.isEmpty()) {
return true;
}


final Siblings siblingsAspect = (Siblings) optionalSiblingsAspect.get();

if (siblingsAspect.isPrimary()) {
return true;
}

// if you are not primary and your sibling exists in the result set, filter yourself out
if (siblingsAspect.getSiblings().stream().anyMatch(
sibling -> combinedResultUrns.contains(sibling)
)) {
return false;
}

return true;
}).collect(Collectors.toList());

entityLineageResult.setRelationships(new LineageRelationshipArray(uniqueFilteredRelationships));
entityLineageResult.setTotal(entityLineageResult.getTotal() + (existingResult != null ? existingResult.getTotal() : 0));
entityLineageResult.setCount(uniqueFilteredRelationships.size());
return ValidationUtils.validateEntityLineageResult(entityLineageResult, _entityService);
return siblingsAspect.getSiblings().stream().noneMatch(combinedResultUrns::contains);
}));

numFiltered += partitionedFilteredSiblings.get(Boolean.FALSE).size();
uniqueFilteredRelationships = partitionedFilteredSiblings.get(Boolean.TRUE);

EntityLineageResult combinedLineageResult = new EntityLineageResult();
combinedLineageResult.setStart(entityLineageResult.getStart());
combinedLineageResult.setRelationships(new LineageRelationshipArray(uniqueFilteredRelationships));
combinedLineageResult.setTotal(entityLineageResult.getTotal() + (existingResult != null ? existingResult.getTotal() : 0));
combinedLineageResult.setCount(uniqueFilteredRelationships.size());
combinedLineageResult.setFiltered(numFiltered + getFiltered(existingResult) + getFiltered(entityLineageResult));
return ValidationUtils.validateEntityLineageResult(combinedLineageResult, _entityService);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ public static EntityLineageResult validateEntityLineageResult(@Nullable final En
.collect(Collectors.toCollection(LineageRelationshipArray::new));

validatedEntityLineageResult.setFiltered(
entityLineageResult.getRelationships().size() - validatedRelationships.size());
(entityLineageResult.hasFiltered() && entityLineageResult.getFiltered() != null ? entityLineageResult.getFiltered() : 0)
+ entityLineageResult.getRelationships().size() - validatedRelationships.size());
validatedEntityLineageResult.setRelationships(validatedRelationships);

return validatedEntityLineageResult;
Expand Down
Loading

0 comments on commit 6c445c8

Please sign in to comment.