Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add OL facet tables #2152

Closed
wants to merge 21 commits into from
Closed

Add OL facet tables #2152

wants to merge 21 commits into from

Conversation

wslulciuc
Copy link
Member

Problem

To access OL facets, we have to run direct queries against OL events table, which has significant performance concerns.

Closes: #2076

Solution

Add tables for dataset, job, and run facets.

Checklist

  • You've signed-off your work
  • Your changes are accompanied by tests (if relevant)
  • Your change contains a small diff and is self-contained
  • You've updated any relevant documentation (if relevant)
  • You've updated the CHANGELOG.md with details about your change under the "Unreleased" section (if relevant, depending on the change, this may not be necessary)
  • You've versioned your .sql database schema migration according to Flyway's naming convention (if relevant)
  • You've included a header in any source code files (if relevant)

@boring-cyborg boring-cyborg bot added the api API layer changes label Sep 30, 2022
Copy link
Collaborator

@pawel-big-lebowski pawel-big-lebowski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is going to be huge improvement in terms of performance.
It will also help developing some features like getting raw events by namespace which @mobuchowski coped with some time ago (#2070)

My concerns are:

  • If my understanding is correct, current behavior leads users to losing dataset history (facets written the old way won't be read the new way). If this acceptable, at least we should put some doc notes. Otherwise, providing some migration (even the optional migration) would be great.
  • We're storing the same facets twice now: in lineage_events and new facets' tables. Is this an expected behavior?
  • Wouldn't we like to have a test that verifies after posting lineage event that introduced tables are filled? I couldn't spot anything similar in the PR.
  • Flyway DB migration version files numeration need to be adjusted (v46 seem to be outdated)

+ " WHERE run_uuid = dv.run_uuid\n"
+ " ) e ON e.run_uuid = dv.run_uuid\n"
+ " ) df ON df.run_uuid = dv.run_uuid\n"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems to be missing dataset_uuid = dv.dataset_uuid

@codecov
Copy link

codecov bot commented Dec 5, 2022

Codecov Report

Merging #2152 (ad0fa19) into main (ae6250a) will increase coverage by 0.41%.
The diff coverage is 81.64%.

@@             Coverage Diff              @@
##               main    #2152      +/-   ##
============================================
+ Coverage     76.72%   77.14%   +0.41%     
- Complexity     1177     1237      +60     
============================================
  Files           222      227       +5     
  Lines          5354     5727     +373     
  Branches        429      465      +36     
============================================
+ Hits           4108     4418     +310     
- Misses          768      801      +33     
- Partials        478      508      +30     
Impacted Files Coverage Δ
api/src/main/java/marquez/MarquezApp.java 63.75% <0.00%> (-2.49%) ⬇️
...src/main/java/marquez/cli/V55MigrationCommand.java 0.00% <0.00%> (ø)
api/src/main/java/marquez/db/DatasetDao.java 98.64% <ø> (ø)
api/src/main/java/marquez/db/JobDao.java 100.00% <ø> (ø)
api/src/main/java/marquez/db/JobVersionDao.java 91.04% <ø> (ø)
api/src/main/java/marquez/db/RunDao.java 92.50% <ø> (ø)
...ain/java/marquez/db/mappers/DatasetDataMapper.java 85.18% <ø> (ø)
...rc/main/java/marquez/db/mappers/JobDataMapper.java 87.50% <ø> (ø)
.../java/marquez/db/models/ColumnLineageNodeData.java 100.00% <ø> (ø)
.../src/main/java/marquez/service/LineageService.java 80.30% <ø> (ø)
... and 11 more

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@pawel-big-lebowski pawel-big-lebowski self-assigned this Dec 12, 2022
@pawel-big-lebowski pawel-big-lebowski force-pushed the feature/ol-facet-tables branch 3 times, most recently from 2e850f8 to f1b5ffc Compare December 13, 2022 09:41
@pawel-big-lebowski pawel-big-lebowski marked this pull request as draft December 13, 2022 09:48
@pawel-big-lebowski pawel-big-lebowski force-pushed the feature/ol-facet-tables branch 8 times, most recently from 70471fb to 660f093 Compare December 20, 2022 09:08
@boring-cyborg boring-cyborg bot added the docs label Dec 20, 2022
@pawel-big-lebowski pawel-big-lebowski force-pushed the feature/ol-facet-tables branch 3 times, most recently from 1b0ccd9 to 3830c32 Compare December 21, 2022 08:53
@mobuchowski
Copy link
Contributor

metadata.json - do we need this file? It shows one line changed, but also 1 addition, 0 deletions not shown because the diff is too large. Please use a local Git client to view these changes.

private static final String GET_CURRENT_LOCK_SQL =
"""
SELECT * FROM v55_facet_migration_lock ORDER BY created_at ASC, run_uuid ASC LIMIT 1
""";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think oneliners need not to use text block syntax, especially if not well-formatted 😉

+ " SELECT le.run_uuid, JSON_AGG(event->'run'->'facets') AS facets\n"
+ " FROM lineage_events le\n"
+ " GROUP BY le.run_uuid\n"
+ " SELECT rf.run_uuid, JSON_AGG(rf.facet ORDER BY rf.lineage_event_time ASC) AS facets\n"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's risky to use this data in the same version that we're migrating in, since we have manual migration process for some.

This release should IMO still use lineage_events table, and given reasonable time to migrate to newer version, we should use new tables to optimize queries.

Unless we have a well documented a way to run migration without impacting "running deployment", so that we don't have to rollback to previous Marquez version if something fails.

Copy link
Member Author

@wslulciuc wslulciuc Jan 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @mobuchowski for the pointing this out (which, I agree with).

@pawel-big-lebowski I know this PR / optimization has been dragging (mostly my fault, so happy to take the blame), but splitting the changes into separate PRs / releases would reduce any challenges with rollbacks, but also provide users the option to decide when to apply the migration / upgrade. So, that said, what I propose is:

  1. Open a PR to just introduce the dataset, job, and run facets tables (a write only mode), meaning the facet tables won't be referenced in anyway in our queries, but will be written to so that features can take advantage of the optimization; lineage events will continue to be written to the lineage_events table as well. This can be released in 0.30.0.
  2. As a follow up PR, we can update all references to lineage_events in our queries to use the individual facet tables. Within this PR, the migration script (and documentation) to backfill facets from the lineage_events table will also be introduced (and be required to run for users with larger lineage_events tables). This can be released in 0.31.0 or later.

PR #2152 (current) can be step 2 and marked ias blocked, and I can cherry pick any changes into a separate PR for step 1 (also open to other options). Anyways, thanks for the great work here @pawel-big-lebowski. Thoughts?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be difficult to split the change into separate PRs. The initial PR was missing tests. I added them within my commits and it detected some issues with previous commits like:

  • runFacetExists method in api/src/main/java/marquez/db/RunFacetsDao.java
  • fromName method in api/src/main/java/marquez/db/DatasetFacetsDao.java
    (see my latest commit)

So merging some of the commits shouldn't be a solution. Additionally, one cannot take benefit from facet tables unless they are filled with data. Otherwise, users will experience incomplete results.

This PR does not clean or remove data from lineage_events table so we always have the ability to prepare a fix that restores previous mechanics if we want to.

@wslulciuc Although splitting this PR into multiple PRs could improve readability, I don't see any advantage of splitting this into separate releases (0.30.0 and 0.31.0).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The combination of a manual migration process and the fact that the code stops reading from the lineage_events table introduces a real problem for users. When a user deploys, the tables will be created and all new records will be written into the new facet tables. However, all old data still lives only in the lineage_events table and won't be accessible until after the user executes the migration command. This means that the service will effectively be broken until users choose to execute this migration.

I think splitting the PR so that writes and reads are committed and can be deployed separately makes sense. This gives users the opportunity to push multiple deployments in a way that guarantees no downtime. It also makes this PR muuuch easier to review - readability is not just good for its own sake; it makes it easier to catch issues before they go to production. The migration script itself warrants its own review, I think.

@pawel-big-lebowski pawel-big-lebowski force-pushed the feature/ol-facet-tables branch 3 times, most recently from c2c16f5 to 1748433 Compare December 22, 2022 12:22
@pawel-big-lebowski pawel-big-lebowski force-pushed the feature/ol-facet-tables branch 3 times, most recently from 76e62b0 to 0322664 Compare January 2, 2023 08:20
@pawel-big-lebowski pawel-big-lebowski marked this pull request as ready for review January 2, 2023 09:04
Signed-off-by: Pawel Leszczynski <[email protected]>
@pawel-big-lebowski
Copy link
Collaborator

Database migration in a nutshell:

  • Users with up to 100K lineage_events should not bother about this migration.
  • Users with more than 100K lineage_events will need to run additional migration command.
  • DB upgrade is millisecond long. This is because migration procedure creates views with job, run and dataset facets. When target facet tables are loaded, the views are pointing to them. Until that moment views are defined over lineage_events table.
  • After the change facets are written into two locations: lineage_events table and job/run/dataset facets tables.
  • Perf tests were run to verify data migration performance. See results.

@wslulciuc
Copy link
Member Author

metadata.json - do we need this file? It shows one line changed, but also 1 addition, 0 deletions not shown because the diff is too large. Please use a local Git client to view these changes.

I updated metadata.json under docker/ to include missing facets, but looks like the other meta file was checked it by mistake and can be removed.

Comment on lines +150 to +162
Optional.ofNullable(datasetFacets.getDocumentation())
.ifPresent(
documentation ->
insertDatasetFacet(
UUID.randomUUID(),
now,
datasetUuid,
runUuid,
lineageEventTime,
lineageEventType,
Facet.DOCUMENTATION.getType(),
Facet.DOCUMENTATION.getName(),
toPgObject(Facet.DOCUMENTATION.asJson(documentation))));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can replace the copy/paste of this block with something like

writeDatasetFacet(Facet.DOCUMENTATION, datasetFacets.getDocumentation());

where writeDatasetFacet is defined as

void writeDatasetFacet(Facet facet, Object nullableData) {
    Optional.ofNullable(nullableData)
        .ifPresent(
            data ->
                insertDatasetFacet(
                    UUID.randomUUID(),
                    now,
                    datasetUuid,
                    runUuid,
                    lineageEventTime,
                    lineageEventType,
                    facet.getType(),
                    facet.getName(),
                    toPgObject(facet.asJson(data))));

}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created separate PR #2350 which is a part of this one responsible for creating new tables, writing new events to new tables, while not reading data from new tables.

I got rid of the repeated code block in #2350

Comment on lines +106 to +118
Optional.ofNullable(jobFacet.getDocumentation())
.ifPresent(
documentation ->
insertJobFacet(
UUID.randomUUID(),
now,
jobUuid,
runUuid,
lineageEventTime,
lineageEventType,
DatasetFacetsDao.Facet.DOCUMENTATION.getName(),
toPgObject(DatasetFacetsDao.Facet.DOCUMENTATION.asJson(documentation))));

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An interface for the various facet enums will help you avoid all this duplication

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, it's a bit ugly. Do you mean defining an interface for each enum to implement?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created separate PR #2350 which is a part of this one responsible for creating new tables, writing new events to new tables, while not reading data from new tables.

I addressed this feedback remark in #2350.

+ " SELECT le.run_uuid, JSON_AGG(event->'run'->'facets') AS facets\n"
+ " FROM lineage_events le\n"
+ " GROUP BY le.run_uuid\n"
+ " SELECT rf.run_uuid, JSON_AGG(rf.facet ORDER BY rf.lineage_event_time ASC) AS facets\n"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The combination of a manual migration process and the fact that the code stops reading from the lineage_events table introduces a real problem for users. When a user deploys, the tables will be created and all new records will be written into the new facet tables. However, all old data still lives only in the lineage_events table and won't be accessible until after the user executes the migration command. This means that the service will effectively be broken until users choose to execute this migration.

I think splitting the PR so that writes and reads are committed and can be deployed separately makes sense. This gives users the opportunity to push multiple deployments in a way that guarantees no downtime. It also makes this PR muuuch easier to review - readability is not just good for its own sake; it makes it easier to catch issues before they go to production. The migration script itself warrants its own review, I think.

@pawel-big-lebowski
Copy link
Collaborator

Replying to @collado-mike

The combination of a manual migration process and the fact that the code stops reading from the lineage_events table introduces a real problem for users. When a user deploys, the tables will be created and all new records will be written into the new facet tables. However, all old data still lives only in the lineage_events table and won't be accessible until after the user executes the migration command.

I think there may be some kind of misunderstanding here. For the users with more than 100K lineage_events:

  • run_facets, job_facets and dataset_facets tables are created during the flyway migration,
  • Views run_facets_view, job_facets_view and dataset_facets_view are created during flyway migration and their definition is created on the top of lineage_events tables. Application code makes use of those views.
  • Filling run_facets, job_facets and dataset_facets with data has to be done as manual asynchronous step but the application is still up and running before the command is called and executed.
  • Once manual migration is finished, views run_facets_view, job_facets_view and dataset_facets_view are recreated and their definition is based on run_facets, job_facets and dataset_facets tables.

This means that the service will effectively be broken until users choose to execute this migration.

No, it's not and that's why run_facets_view, job_facets_view and dataset_facets_view are created. They provide no-downtime experience at the time of manual migration procedure.

I think splitting the PR so that writes and reads are committed and can be deployed separately makes sense. This gives users the opportunity to push multiple deployments in a way that guarantees no downtime. It also makes this PR muuuch easier to review - readability is not just good for its own sake; it makes it easier to catch issues before they go to production. The migration script itself warrants its own review, I think.

I agree that splitting PR into multiple PRs makes sense:

  • PR I - create run_facets, job_facets and dataset_facets tables, write to them for new OL events. don't read data from new tables.
  • PR II - create run_facets_view, job_facets_view and dataset_facets_view on the top of lineage_events table, switch application logic to making use of the views.
  • PR III - migration command to fill run_facets, job_facets and dataset_facets tables. Replace the views from PR II to point to run_facets, job_facets and dataset_facets tables once migration is over.

Releasing PR I, PR II and PR III in one go does not imply any downtime.

* Split `lineage_events` table to `dataset_facets`, `run_facets`, and `job_facets` tables. [`2152`](https://github.com/MarquezProject/marquez/pull/2152)
[@wslulciuc](https://github.com/wslulciuc,), [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski)
* Performance improvement with migration procedure that requires manual steps if database has more than 100K lineage events.
* Please read [here](https://github.com/MarquezProject/marquez/blob/main/api/src/main/resources/marquez/db/migration/V55__readme.md) to get more database migration details.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: I would reword as follows:

Note: We highly encourage users to review our migration plan.

@@ -149,6 +150,12 @@ public void registerResources(
}
}

@Override
protected void addDefaultCommands(Bootstrap<MarquezConfig> bootstrap) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid overriding addDefaultCommands(), we can just register the command within MarquezApp.initialize():

@Override
public void initialize(@NonNull Bootstrap<MarquezConfig> bootstrap) {
  bootstrap.addCommand(new V55MigrationCommand());
  // continue initialization ...
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The existing commands are kind of no-arg commands. Migration commands needs to be able to access database connection. That's why I included it here so that it can get Application as a constructor param. Within initialize commands are added although application is not created yet.

* <p>Please refer to @link marquez/db/migration/V55__readme.md for more details.
*/
@Slf4j
public class V55MigrationCommand<MarquezConfig> extends EnvironmentCommand<marquez.MarquezConfig> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is V55MigrationCommand too specific (but also vague)? We'll certainly have more migrations in the future and one-off commands for each will be hard to manage and not the best user experience. I suggest we name the class DbMigrationsCommand and define v55_migrate as a subcommand. This tutorial is a pretty good reference on how to define subcommands.

Comment on lines +106 to +118
Optional.ofNullable(jobFacet.getDocumentation())
.ifPresent(
documentation ->
insertJobFacet(
UUID.randomUUID(),
now,
jobUuid,
runUuid,
lineageEventTime,
lineageEventType,
DatasetFacetsDao.Facet.DOCUMENTATION.getName(),
toPgObject(DatasetFacetsDao.Facet.DOCUMENTATION.asJson(documentation))));

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, it's a bit ugly. Do you mean defining an interface for each enum to implement?


@Setter private Integer chunkSize = null;

@Setter private boolean triggeredByCommand = false;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: I'd use the flag name manual.

execute(CREATE_DATASET_FACETS_VIEW);
execute(CREATE_JOB_FACETS_VIEW);

if (!triggeredByCommand && countLineageEvents() >= BASIC_MIGRATION_LIMIT) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's great that we inform the user on how to handle very large tables 💯

* Workaround to register uuid_generate_v4 function to generate uuids. gen_random_uuid() is
* available since Postgres 13
*/
jdbi.withHandle(h -> h.createCall("CREATE EXTENSION IF NOT EXISTS \"uuid-ossp\"").invoke());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be possible that the migration might not be applied by a superuser? Can we the uuid using java.util.UUID, then inject the value in the inserts?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a hard one. So, we don't want Java to touch the data. It's good for performance when everything is done in SQLs and no data transfers between backend DB and Java application are required.

Adding this extension is a workaround for PostgreSQL 12, that we still support. If we stopped supporting PostgreSQL 12, we could use built-in uuid function (https://www.postgresql.org/docs/current/functions-uuid.html) available since version 13.

Otherwise we can add extra condition here to include extra extension only for PostgreSQL 12. Anyway, I think PostgreSQL 12 users will need a superuser.

Copy link
Member Author

@wslulciuc wslulciuc Jan 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, that's reasonable. Mind making a note in the migration doc on the superuser requirement? Also, yeah, we should upgrade our version requirements for PostgreSQL. I wanted to first have in place schema validation in CI. PR #2326 gets us one step closer to upgrading.

| 10K events | 50K rows | 50K rows | 150K rows | 10sec |
| 100K events | 500K rows | 500K rows | 150K rows | 106sec |
| 500K events | 2.5M rows | 2.5M rows | 7.5M rows | 612sec |
| 1M events | 5M rows | 5M rows | 15M rows | 1473sec |
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this detailed and well throughout migration plan @pawel-big-lebowski 💯

@wslulciuc
Copy link
Member Author

wslulciuc commented Jan 5, 2023

The combination of a manual migration process and the fact that the code stops reading from the lineage_events table introduces a real problem for users.

In step 1 that I outlined above, Marquez would continue to write to the lineage_events table (and simultaneously to the new facet tables) therefore addressing this concern, which is a significant operational advantage. At Astronomer, we'd want this type of control to limit any downtime during the database migration. But, after doing another pass on the PR (and additional context from @pawel-big-lebowski comment above), views are used to avoid such downtime and effectively provide the safe guards for users when (manually, or automatically) applying the migration.

@pawel-big-lebowski has made some great additions and has helped push this PR to it's current state (and close to being merged!). Based on your feedback @collado-mike and @pawel-big-lebowski's input, there are enough safe guards to ensure the user has control of how the migration is applied (as outlined in V55__readme.md) while also ensuring Marquez can continue to handle reads / writes. To not completely stop progress, I feel addressing the comments in the PR and cleaning up a few sections of the code would avoid splitting this PR. Yeah, the PR is a bit on the larger size, but manageable and all additions are relevant to the feature. We can still split up the PR if that's more helpful for reviewing, but I don't think it's required. My comment earlier about separate releases of Marquez for this feature can be ignored.

@pawel-big-lebowski
Copy link
Collaborator

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api API layer changes docker docs
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants