From 95d6de58112e7c2e8c38ade18c89170b69813bde Mon Sep 17 00:00:00 2001 From: Willy Lulciuc Date: Mon, 22 Aug 2022 17:42:28 -0700 Subject: [PATCH] Add `--metadata` option to seed backend with ol events (#2082) * Add --metadata option to seed backend with ol events Signed-off-by: wslulciuc * Fix javadocs Signed-off-by: wslulciuc Signed-off-by: wslulciuc --- .../main/java/marquez/cli/SeedCommand.java | 1511 ++--------------- build.gradle | 2 +- docker-compose.seed.yml | 4 +- docker/metadata.json | 1213 +++++++++++++ docker/seed.sh | 2 +- 5 files changed, 1331 insertions(+), 1401 deletions(-) create mode 100644 docker/metadata.json diff --git a/api/src/main/java/marquez/cli/SeedCommand.java b/api/src/main/java/marquez/cli/SeedCommand.java index 48f4597f8c..4e1b1d131d 100644 --- a/api/src/main/java/marquez/cli/SeedCommand.java +++ b/api/src/main/java/marquez/cli/SeedCommand.java @@ -5,1438 +5,155 @@ package marquez.cli; -import static com.google.common.base.Preconditions.checkArgument; -import static marquez.client.models.JobType.BATCH; -import static marquez.common.base.MorePreconditions.checkNotBlank; +import static marquez.common.Utils.newObjectMapper; +import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; import io.dropwizard.cli.ConfiguredCommand; import io.dropwizard.setup.Bootstrap; -import java.net.URL; -import java.time.Duration; -import java.time.Instant; -import java.time.temporal.ChronoUnit; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Random; -import javax.annotation.Nullable; -import lombok.EqualsAndHashCode; -import lombok.Getter; +import io.openlineage.client.OpenLineage; +import io.openlineage.client.OpenLineageClient; +import io.openlineage.client.transports.HttpTransport; +import java.nio.file.Paths; import lombok.NonNull; -import lombok.ToString; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import marquez.MarquezConfig; -import marquez.client.MarquezClient; -import marquez.client.Utils; -import marquez.client.models.Dataset; -import marquez.client.models.DatasetId; -import marquez.client.models.DatasetMeta; -import marquez.client.models.DbTableMeta; -import marquez.client.models.Field; -import marquez.client.models.Job; -import marquez.client.models.JobMeta; -import marquez.client.models.Namespace; -import marquez.client.models.NamespaceMeta; -import marquez.client.models.Run; -import marquez.client.models.RunMeta; -import marquez.client.models.Source; -import marquez.client.models.SourceMeta; +import net.sourceforge.argparse4j.inf.Namespace; +import net.sourceforge.argparse4j.inf.Subparser; /** - * A command to seed the HTTP API with source, dataset, and job metadata. You can override the - * default {@code host} and {@code port} using the command-line arguments {@code --host} and {@code - * --port}. This command is meant to be used to explore the features of Marquez. For example, - * lineage graph, dataset schemas, job run history, etc. + * A command to seed the HTTP API with source, dataset, and job metadata using OpenLineage. The {@code seed} command is meant to be used to + * explore the features of Marquez. For example, lineage graph analysis, dataset lifecycle + * management, job run history, etc. + * + *

Note: You must specify {@code metadata} using the command-line argument {@code + * --metadata}. Metadata must be defined as a Json file containing an array of {@code OpenLineage} + * events. * *

Usage

* - * For example, to override the {@code port}: + * For example, to override the {@code url}: + * + *
{@code
+ * java -jar marquez-api.jar seed --url http://localhost:5000 --metadata metadata.json marquez.yml
+ * }
+ * + *

where, {@code metadata.json} contains metadata for run {@code + * d46e465b-d358-4d32-83d4-df660ff614dd}: * *

{@code
- * java -jar marquez-api.jar seed --port 5001 marquez.yml
+ * [
+ *   {
+ *     "eventType": "START",
+ *     "eventTime": "2020-02-22T22:42:42.000Z",
+ *     "run": {
+ *       "runId": "d46e465b-d358-4d32-83d4-df660ff614dd"
+ *     },
+ *     "job": {
+ *       "namespace": "my-namespace",
+ *       "name": "my-job"
+ *     },
+ *     "inputs": [{
+ *       "namespace": "my-namespace",
+ *       "name": "my-input"
+ *     }],
+ *     "producer": "https://github.com/OpenLineage/OpenLineage/tree/0.11.0/client/java"
+ *   },
+ *   {
+ *     "eventType": "COMPLETE",
+ *     "eventTime": "2020-02-22T22:48:12.000Z",
+ *     "run": {
+ *       "runId": "d46e465b-d358-4d32-83d4-df660ff614dd"
+ *     },
+ *     "job": {
+ *       "namespace": "my-namespace",
+ *       "name": "my-job"
+ *     },
+ *     "outputs": [{
+ *       "namespace": "my-namespace",
+ *       "name": "my-output",
+ *       "facets": {
+ *         "schema": {
+ *           "_producer": "https://github.com/OpenLineage/OpenLineage/tree/0.11.0/client/java",
+ *           "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SchemaDatasetFacet.json",
+ *           "fields": [
+ *             { "name": "a", "type": "VARCHAR"},
+ *             { "name": "b", "type": "VARCHAR"}
+ *           ]
+ *         }
+ *       }
+ *     }],
+ *     "producer": "https://github.com/OpenLineage/OpenLineage/tree/0.11.0/client/java"
+ *   }
+ * ]
  * }
* - * Note that all metadata is defined within this class and requires a running instance of Marquez. + *

Note: The {@code seed} command requires a running instance of Marquez. */ @Slf4j public final class SeedCommand extends ConfiguredCommand { - static final String DEFAULT_MARQUEZ_HOST = "localhost"; - static final int DEFAULT_MARQUEZ_PORT = 8080; - - public static final String NAMESPACE_NAME = "food_delivery"; - static final String SOURCE_NAME = "analytics_db"; + /* Default URL for HTTP backend. */ + private static final String DEFAULT_OL_URL = "http://localhost:8080"; - static final int LINEAGE_GRAPH_24_HOUR_WINDOW = 24; - static final int RUN_TIME_IN_SEC_MIN = 120; - static final int RUN_TIME_IN_SEC_MAX = 240; + /* Args for seed command. */ + private static final String CMD_ARG_OL_URL = "url"; + private static final String CMD_ARG_OL_METADATA = "metadata"; + /* Define seed command. */ public SeedCommand() { - super("seed", "seeds the HTTP API with metadata"); + super("seed", "seeds the HTTP API server with metadata"); } + /* Configure seed command. */ @Override - public void configure(@NonNull final net.sourceforge.argparse4j.inf.Subparser subparser) { + public void configure(@NonNull final Subparser subparser) { super.configure(subparser); subparser - .addArgument("--host") - .dest("host") + .addArgument("--url") + .dest("url") .type(String.class) .required(false) - .setDefault(DEFAULT_MARQUEZ_HOST) - .help("the HTTP API server host"); + .setDefault(DEFAULT_OL_URL) + .help("the HTTP API server url"); subparser - .addArgument("--port") - .dest("port") - .type(Integer.class) - .required(false) - .setDefault(DEFAULT_MARQUEZ_PORT) - .help("the HTTP API server port"); + .addArgument("--metadata") + .dest("metadata") + .type(String.class) + .required(true) + .help("the path to the metadata file (ex: path/to/metadata.json)"); } @Override protected void run( - @NonNull final Bootstrap bootstrap, - @NonNull final net.sourceforge.argparse4j.inf.Namespace namespace, - @NonNull final MarquezConfig config) { - final String host = namespace.getString("host"); - final int port = namespace.getInt("port"); - - final URL baseUrl = Utils.toUrl(String.format("http://%s:%d", host, port)); - final MarquezClient client = MarquezClient.builder().baseUrl(baseUrl).build(); - seedApiWithMeta(client, LINEAGE_GRAPH_24_HOUR_WINDOW); + @NonNull Bootstrap bootstrap, + @NonNull Namespace namespace, + @NonNull MarquezConfig config) { + final String olUrl = namespace.getString(CMD_ARG_OL_URL); + final String olMetadata = namespace.getString(CMD_ARG_OL_METADATA); + // Use HTTP transport. + final OpenLineageClient olClient = + OpenLineageClient.builder().transport(HttpTransport.builder().uri(olUrl).build()).build(); + log.info("Connected to '{}'... attempting to seed with metadata!", olUrl); + // Load, then emit events. + final ImmutableList olEvents = loadMetadata(olMetadata); + log.info("Emitting '{}' events to: '{}'", olEvents.size(), olUrl); + int olEventsEmitted = 0; // Keep count of events emitted. + for (final OpenLineage.RunEvent olEvent : olEvents) { + olClient.emit(olEvent); + olEventsEmitted++; + } + log.info("Successfully emitted '{}' events!", olEventsEmitted); } - public void seedApiWithMeta(@NonNull MarquezClient client, int additionalIterations) { - // (1) Create namespace - final NamespaceMeta namespaceMeta = - NamespaceMeta.builder() - .ownerName("owner@food.com") - .description("Food delivery example!") - .build(); - final Namespace newNamespace = client.createNamespace(NAMESPACE_NAME, namespaceMeta); - log.info("Created namespace: {}", newNamespace); - - // (2) Create source - final SourceMeta sourceMeta = - SourceMeta.builder() - .type("POSTGRESQL") - .connectionUrl("jdbc:postgres://localhost:3306/deliveries") - .description("Contains all food delivery orders.") - .build(); - final Source newSource = client.createSource(SOURCE_NAME, sourceMeta); - log.info("Created source: {}", newSource); - - // (3) Seed dataset meta - DATASET_META.forEach( - (datasetName, datasetMeta) -> { - final Dataset newDataset = client.createDataset(NAMESPACE_NAME, datasetName, datasetMeta); - log.info("Created dataset: {}", newDataset); - }); - - // (4) Seed job meta - JOB_META.forEach( - (jobName, jobMeta) -> { - final Job newJob = client.createJob(NAMESPACE_NAME, jobName, jobMeta); - log.info("Created job: {}", newJob); - }); - - // (5) Define run start times for each graph level - final Instant startTimesGraphLevel0 = Instant.now(); - final Instant startTimesGraphLevel1 = startTimesGraphLevel0.plusSeconds(RUN_TIME_IN_SEC_MAX); - final Instant startTimesGraphLevel2 = startTimesGraphLevel1.plusSeconds(RUN_TIME_IN_SEC_MAX); - final Instant startTimesGraphLevel3 = startTimesGraphLevel2.plusSeconds(RUN_TIME_IN_SEC_MAX); - final Instant startTimesGraphLevel4 = startTimesGraphLevel3.plusSeconds(RUN_TIME_IN_SEC_MAX); - - final Instant[] startTimesByGraphLevel = { - startTimesGraphLevel0, - startTimesGraphLevel1, - startTimesGraphLevel2, - startTimesGraphLevel3, - startTimesGraphLevel4 - }; - - // (6) Seed run meta for jobs using graph level start times - for (int hourOfDay = additionalIterations; hourOfDay >= 0; hourOfDay--) { - for (final Map.Entry entry : JOB_META.entrySet()) { - final String jobName = entry.getKey(); - final JobMeta jobMeta = entry.getValue(); - - if (hourOfDay >= ACTIVE_RUN_META.get(jobName).size()) { - continue; - } - - // On code change, create a new job version - final ActiveRunMeta activeRunMeta = ACTIVE_RUN_META.get(jobName).get(hourOfDay); - activeRunMeta - .getCodeChange() - .ifPresent( - codeChange -> { - client.createJob( - NAMESPACE_NAME, - jobName, - JobMeta.builder() - .type(jobMeta.getType()) - .inputs(jobMeta.getInputs()) - .outputs(jobMeta.getOutputs()) - .location(codeChange.getToUrl()) - .context(jobMeta.getContext()) - .description(jobMeta.getDescription().orElse(null)) - .build()); - }); - - // Set run start and end times - final Instant runStartedAt = - startTimesByGraphLevel[activeRunMeta.getLevelInGraph()].minus( - Duration.ofHours(hourOfDay)); - final Instant runEndedAt = runStartedAt.plusSeconds(secondsToAdd()); - - // Create run - final RunMeta runMeta = - RunMeta.builder() - .nominalStartTime(runStartedAt.truncatedTo(ChronoUnit.MINUTES)) - .nominalEndTime(runEndedAt.truncatedTo(ChronoUnit.MINUTES)) - .build(); - final Run run = client.createRun(NAMESPACE_NAME, jobName, runMeta); - log.info("Created run for job '{}': {}", jobName, run); - - // Start run - final Run running = client.markRunAsRunning(run.getId(), runStartedAt); - log.info("Marked run for job '{}' as 'RUNNING': {}", jobName, running); - - // Complete or fail run - if (activeRunMeta.isMarkFailed()) { - final Run failed = client.markRunAsFailed(run.getId(), runEndedAt); - log.info("Marked run for job '{}' as 'FAILED': {}", jobName, failed); - } else if (activeRunMeta.isMarkRunning()) { - for (final DatasetId output : jobMeta.getOutputs()) { - final DatasetMeta datasetMeta = DATASET_META.get(output.getName()); - final DbTableMeta.DbTableMetaBuilder dbTableMetaWithChange = - DbTableMeta.builder() - .physicalName(datasetMeta.getPhysicalName()) - .sourceName(datasetMeta.getSourceName()) - .description(datasetMeta.getDescription().orElse(null)) - .runId(run.getId()); - - if (activeRunMeta.hasSchemaChange()) { - activeRunMeta - .schemaChangeFor(output.getName()) - .ifPresent( - schemaChange -> - dbTableMetaWithChange.fields( - fieldsWithChange(datasetMeta.getFields(), schemaChange))); - } else { - dbTableMetaWithChange.fields(datasetMeta.getFields()); - } - final Dataset modifiedDataset = - client.createDataset( - NAMESPACE_NAME, output.getName(), dbTableMetaWithChange.build()); - log.info( - "Dataset '{}' modified by job '{}' on run '{}': {}", - output.getName(), - jobName, - run.getId(), - modifiedDataset); - } - - final Run completed = client.markRunAsCompleted(run.getId(), runEndedAt); - log.info("Marked run for job '{}' as 'COMPLETED': {}", jobName, completed); - } - } - } - } - - int secondsToAdd() { - return new Random().nextInt((RUN_TIME_IN_SEC_MAX - RUN_TIME_IN_SEC_MIN) + 1) - + RUN_TIME_IN_SEC_MIN; - } - - List fieldsWithChange( - @NonNull List fields, @NonNull ActiveRunMeta.SchemaChange change) { - final ImmutableList.Builder fieldsWithChange = ImmutableList.builder(); - for (final Field field : fields) { - fieldsWithChange.add( - (change.getFieldName().equals(field.getName())) - ? Field.builder() - .name(field.getName()) - .type(change.getToType()) - .tags(field.getTags()) - .description(field.getDescription().orElse(null)) - .build() - : field); - } - return fieldsWithChange.build(); - } - - static final ImmutableMap DATASET_META = - new ImmutableMap.Builder() - .put( - "public.orders", - DbTableMeta.builder() - .physicalName("public.orders") - .sourceName(SOURCE_NAME) - .fields( - Lists.newArrayList( - Field.builder() - .name("id") - .type("INTEGER") - .description("The unique ID of the order.") - .build(), - Field.builder() - .name("placed_on") - .type("TIMESTAMP") - .description( - "An ISO-8601 timestamp representing the date/time the order was placed.") - .build(), - Field.builder() - .name("menu_item_id") - .type("INTEGER") - .description("The ID of the menu item related to the order.") - .build(), - Field.builder() - .name("quantity") - .type("INTEGER") - .description("The number of the item in the order.") - .build(), - Field.builder() - .name("discount_id") - .type("INTEGER") - .description("The unique ID of the discount applied to the order.") - .build(), - Field.builder() - .name("comment") - .type("VARCHAR") - .description("The comment of the order.") - .build())) - .description("A table for orders.") - .build()) - .put( - "public.menus", - DbTableMeta.builder() - .physicalName("public.menus") - .sourceName(SOURCE_NAME) - .fields( - Lists.newArrayList( - Field.builder() - .name("id") - .type("INTEGER") - .description("The unique ID of the menu.") - .build(), - Field.builder() - .name("name") - .type("VARCHAR") - .description("The name of the menu.") - .build(), - Field.builder() - .name("restaurant_id") - .type("INTEGER") - .description("The ID of the restaurant related to the menu.") - .build(), - Field.builder() - .name("description") - .type("TEXT") - .description("The description of the menu.") - .build())) - .description("A table for menus.") - .build()) - .put( - "public.categories", - DbTableMeta.builder() - .physicalName("public.categories") - .sourceName(SOURCE_NAME) - .fields( - Lists.newArrayList( - Field.builder() - .name("id") - .type("INTEGER") - .description("The unique ID of the category.") - .build(), - Field.builder() - .name("name") - .type("VARCHAR") - .description("The name of the category.") - .build(), - Field.builder() - .name("menu_id") - .type("INTEGER") - .description("The ID of the menu related to the category.") - .build(), - Field.builder() - .name("description") - .type("TEXT") - .description("The description of the category.") - .build())) - .description("A table for categories.") - .build()) - .put( - "public.menu_items", - DbTableMeta.builder() - .physicalName("public.menu_items") - .sourceName(SOURCE_NAME) - .fields( - Lists.newArrayList( - Field.builder() - .name("id") - .type("INTEGER") - .description("The unique ID of the menu item.") - .build(), - Field.builder() - .name("name") - .type("VARCHAR") - .description("The name of the menu item.") - .build(), - Field.builder() - .name("price") - .type("INTEGER") - .description("The price of the menu item.") - .build(), - Field.builder() - .name("category_id") - .type("INTEGER") - .description("The ID of the category related to the item.") - .build(), - Field.builder() - .name("description") - .type("TEXT") - .description("The description of the menu item.") - .build())) - .description("A table for menu items.") - .build()) - .put( - "public.restaurants", - DbTableMeta.builder() - .physicalName("public.restaurants") - .sourceName(SOURCE_NAME) - .fields( - Lists.newArrayList( - Field.builder() - .name("id") - .type("INTEGER") - .description("The unique ID of the restaurant.") - .build(), - Field.builder() - .name("created_at") - .type("TIMESTAMP") - .description( - "An ISO-8601 timestamp representing the date/time the restaurant was created.") - .build(), - Field.builder() - .name("updated_at") - .type("TIMESTAMP") - .description( - "An ISO-8601 timestamp representing the date/time the restaurant was updated.") - .build(), - Field.builder() - .name("name") - .type("VARCHAR") - .description("The name of the restaurant.") - .build(), - Field.builder() - .name("email") - .type("VARCHAR") - .tags(Sets.newHashSet("PII")) - .description("The email address of the customer.") - .build(), - Field.builder() - .name("address") - .type("VARCHAR") - .description("The address of the restaurant.") - .build(), - Field.builder() - .name("phone") - .type("VARCHAR") - .description("The phone number of the restaurant.") - .build(), - Field.builder() - .name("city_id") - .type("INTEGER") - .description("The ID of the city related to the restaurant.") - .build(), - Field.builder() - .name("business_hours_id") - .type("INTEGER") - .description( - "The ID of the business hours related to the restaurant.") - .build(), - Field.builder() - .name("description") - .type("TEXT") - .description("The description of the restaurant.") - .build())) - .description("A table for customers.") - .build()) - .put( - "public.customers", - DbTableMeta.builder() - .physicalName("public.customers") - .sourceName(SOURCE_NAME) - .fields( - Lists.newArrayList( - Field.builder() - .name("id") - .type("INTEGER") - .description("The unique ID of the customer.") - .build(), - Field.builder() - .name("created_at") - .type("TIMESTAMP") - .description( - "An ISO-8601 timestamp representing the date/time the customer was created.") - .build(), - Field.builder() - .name("updated_at") - .type("TIMESTAMP") - .description( - "An ISO-8601 timestamp representing the date/time the customer was updated.") - .build(), - Field.builder() - .name("name") - .type("VARCHAR") - .description("The name of the customer.") - .build(), - Field.builder() - .name("email") - .type("VARCHAR") - .tags(Sets.newHashSet("PII")) - .description("The email address of the customer.") - .build(), - Field.builder() - .name("address") - .type("VARCHAR") - .description("The address of the customer.") - .build(), - Field.builder() - .name("phone") - .type("VARCHAR") - .description("The phone number of the customer.") - .build(), - Field.builder() - .name("city_id") - .type("INTEGER") - .description("The ID of the city related to the customer.") - .build())) - .description("A table for customers.") - .build()) - .put( - "public.order_status", - DbTableMeta.builder() - .physicalName("public.order_status") - .sourceName(SOURCE_NAME) - .fields( - Lists.newArrayList( - Field.builder() - .name("id") - .type("INTEGER") - .description("The unique ID of the order status.") - .build(), - Field.builder() - .name("transitioned_at") - .type("TIMESTAMP") - .description( - "An ISO-8601 timestamp representing the date/time the order status was transitioned.") - .build(), - Field.builder() - .name("status") - .type("VARCHAR") - .description("The status of the order status.") - .build(), - Field.builder() - .name("order_id") - .type("INTEGER") - .description("The ID of the order related to the order status.") - .build(), - Field.builder() - .name("customer_id") - .type("INTEGER") - .description("The ID of the customer related to the order status.") - .build(), - Field.builder() - .name("restaurant_id") - .type("INTEGER") - .description("The ID of the restaurant related to the order status.") - .build(), - Field.builder() - .name("driver_id") - .type("INTEGER") - .description("The ID of the driver related to the order status.") - .build())) - .description("A table for order status.") - .build()) - .put( - "public.orders_7_days", - DbTableMeta.builder() - .physicalName("public.orders_7_days") - .sourceName(SOURCE_NAME) - .fields( - Lists.newArrayList( - Field.builder() - .name("order_id") - .type("INTEGER") - .description("The ID of the order.") - .build(), - Field.builder() - .name("placed_on") - .type("TIMESTAMP") - .description( - "An ISO-8601 timestamp representing the date/time the order was placed.") - .build(), - Field.builder() - .name("menu_id") - .type("VARCHAR") - .description("The ID of the menu related to the order.") - .build(), - Field.builder() - .name("menu_item_id") - .type("INTEGER") - .description("The ID of the menu item related to the order.") - .build(), - Field.builder() - .name("category_id") - .type("INTEGER") - .description("The ID of category related to the order.") - .build(), - Field.builder() - .name("discount_id") - .type("INTEGER") - .description("The ID of the discount applied to the order.") - .build(), - Field.builder() - .name("city_id") - .type("INTEGER") - .description("The ID of the city related to the order.") - .build())) - .description("A table for weekly orders.") - .build()) - .put( - "public.drivers", - DbTableMeta.builder() - .physicalName("public.drivers") - .sourceName(SOURCE_NAME) - .fields( - Lists.newArrayList( - Field.builder() - .name("id") - .type("INTEGER") - .description("The unique ID of the driver.") - .build(), - Field.builder() - .name("created_at") - .type("TIMESTAMP") - .description( - "An ISO-8601 timestamp representing the date/time the driver was created.") - .build(), - Field.builder() - .name("updated_at") - .type("TIMESTAMP") - .description( - "An ISO-8601 timestamp representing the date/time the driver was updated.") - .build(), - Field.builder() - .name("name") - .type("VARCHAR") - .description("The name of the driver.") - .build(), - Field.builder() - .name("email") - .type("VARCHAR") - .description("The email of the driver.") - .build(), - Field.builder() - .name("phone") - .type("VARCHAR") - .description("The phone number of the driver.") - .build(), - Field.builder() - .name("car_make") - .type("VARCHAR") - .description("The make of the car.") - .build(), - Field.builder() - .name("car_model") - .type("VARCHAR") - .description("The model of the car.") - .build(), - Field.builder() - .name("car_year") - .type("VARCHAR") - .description("The year of the car.") - .build(), - Field.builder() - .name("car_color") - .type("VARCHAR") - .description("The color of the car.") - .build(), - Field.builder() - .name("car_license_plate") - .type("VARCHAR") - .description("The license plate number of the car.") - .build())) - .description("A table for drivers.") - .build()) - .put( - "public.delivery_7_days", - DbTableMeta.builder() - .physicalName("public.delivery_7_days") - .sourceName(SOURCE_NAME) - .fields( - Lists.newArrayList( - Field.builder() - .name("order_id") - .type("INTEGER") - .description("The ID of the order.") - .build(), - Field.builder() - .name("order_placed_on") - .type("TIMESTAMP") - .description( - "An ISO-8601 timestamp representing the date/time the order was placed.") - .build(), - Field.builder() - .name("order_dispatched_on") - .type("TIMESTAMP") - .description( - "An ISO-8601 timestamp representing the date/time the order was dispatched.") - .build(), - Field.builder() - .name("order_delivered_on") - .type("TIMESTAMP") - .description( - "An ISO-8601 timestamp representing the date/time the order was delivered.") - .build(), - Field.builder() - .name("customer_email") - .type("VARCHAR") - .description("The email of the customer.") - .build(), - Field.builder() - .name("menu_id") - .type("INTEGER") - .description("The ID of the menu related to the order.") - .build(), - Field.builder() - .name("menu_item_id") - .type("INTEGER") - .description("The ID of the menu item related to the order.") - .build(), - Field.builder() - .name("category_id") - .type("INTEGER") - .description("The ID of category related to the order.") - .build(), - Field.builder() - .name("discount_id") - .type("INTEGER") - .description("The ID of the discount applied to the order.") - .build(), - Field.builder() - .name("city_id") - .type("INTEGER") - .description("The ID of the city related to the order.") - .build(), - Field.builder() - .name("driver_id") - .type("INTEGER") - .description("The ID of the driver related to the order.") - .build())) - .description("A table for weekly deliveries.") - .build()) - .put( - "public.discounts", - DbTableMeta.builder() - .physicalName("public.discounts") - .sourceName(SOURCE_NAME) - .fields( - Lists.newArrayList( - Field.builder() - .name("id") - .type("INTEGER") - .description("The unique ID of the discount.") - .build(), - Field.builder() - .name("amount_off") - .type("INTEGER") - .description("The amount of the discount.") - .build(), - Field.builder() - .name("customer_email") - .type("VARCHAR") - .description("The email of the customer.") - .build(), - Field.builder() - .name("starts_on") - .type("TIMESTAMP") - .description( - "An ISO-8601 timestamp representing the date/time the discount starts.") - .build(), - Field.builder() - .name("ends_on") - .type("TIMESTAMP") - .description( - "An ISO-8601 timestamp representing the date/time the discount ends.") - .build())) - .description("A table for discounts.") - .build()) - .put( - "public.top_delivery_times", - DbTableMeta.builder() - .physicalName("public.top_delivery_times") - .sourceName(SOURCE_NAME) - .fields( - Lists.newArrayList( - Field.builder() - .name("order_id") - .type("INTEGER") - .description("The ID of the order.") - .build(), - Field.builder() - .name("order_placed_on") - .type("TIMESTAMP") - .description( - "An ISO-8601 timestamp representing the date/time the order was placed.") - .build(), - Field.builder() - .name("order_dispatched_on") - .type("TIMESTAMP") - .description( - "An ISO-8601 timestamp representing the date/time the order was dispatched.") - .build(), - Field.builder() - .name("order_delivered_on") - .type("TIMESTAMP") - .description( - "An ISO-8601 timestamp representing the date/time the order was delivered.") - .build(), - Field.builder() - .name("order_delivered_time") - .type("TIMESTAMP") - .description( - "An ISO-8601 timestamp representing the total time of delivery.") - .build(), - Field.builder() - .name("customer_email") - .type("VARCHAR") - .description("The email of the customer.") - .build(), - Field.builder() - .name("restaurant_id") - .type("INTEGER") - .description("The ID of the restaurant related to the order.") - .build(), - Field.builder() - .name("driver_id") - .type("INTEGER") - .description("The ID of the driver related to the order.") - .build())) - .description("A table for top deliveries.") - .build()) - .put( - "public.popular_orders_day_of_week", - DbTableMeta.builder() - .physicalName("public.popular_orders_day_of_week") - .sourceName(SOURCE_NAME) - .fields( - Lists.newArrayList( - Field.builder() - .name("order_day_of_week") - .type("VARCHAR") - .description("The day of week of the order.") - .build(), - Field.builder() - .name("order_placed_on") - .type("TIMESTAMP") - .description( - "An ISO-8601 timestamp representing the date/time the order was placed.") - .build(), - Field.builder() - .name("orders_placed") - .type("INTEGER") - .description("The number of orders placed on day of week.") - .build())) - .description("A table for popular orders by day of week.") - .build()) - .build(); - - static final ImmutableMap JOB_META = - new ImmutableMap.Builder() - .put( - "test.job_with_no_inputs_or_outputs", - JobMeta.builder() - .type(BATCH) - .description("An job with no inputs or outputs.") - .build()) - .put( - "test.job_with_no_runs", - JobMeta.builder().type(BATCH).description("An job with no runs.").build()) - .put( - "example.etl_orders", - JobMeta.builder() - .type(BATCH) - .outputs(NAMESPACE_NAME, "public.orders") - .location( - "https://github.com/example/jobs/blob/2294bc15eb49071f38425dc927e48655530a2f2e/etl_orders.py") - .description("Loads newly placed orders weekly.") - .context( - sql( - "INSERT INTO orders (id, placed_on, menu_item_id, quantity, discount_id, comment)\n " - + "SELECT id, placed_on, menu_item_id, quantity, discount_id, comment\n " - + "FROM tmp_orders;")) - .build()) - .put( - "example.etl_menus", - JobMeta.builder() - .type(BATCH) - .outputs(NAMESPACE_NAME, "public.menus") - .location( - "https://github.com/example/jobs/blob/2294bc15eb49071f38425dc927e48655530a2f2e/etl_menus.py") - .description("Loads newly added restaurant menus daily.") - .context( - sql( - "INSERT INTO menus (id, name, restaurant_id, description)\n " - + "SELECT id, name, restaurant_id, description\n " - + "FROM tmp_menus;")) - .build()) - .put( - "example.etl_categories", - JobMeta.builder() - .type(BATCH) - .outputs(NAMESPACE_NAME, "public.categories") - .location( - "https://github.com/example/jobs/blob/2294bc15eb49071f38425dc927e48655530a2f2e/etl_categories.py") - .description("Loads newly added menus categories daily.") - .context( - sql( - "INSERT INTO categories (id, name, menu_id, description)\n " - + "SELECT id, name, menu_id, description\n " - + "FROM tmp_categories;")) - .build()) - .put( - "example.etl_menu_items", - JobMeta.builder() - .type(BATCH) - .outputs(NAMESPACE_NAME, "public.menu_items") - .location( - "https://github.com/example/jobs/blob/2294bc15eb49071f38425dc927e48655530a2f2e/etl_menu_items.py") - .description("Loads newly added restaurant menu items daily.") - .context( - sql( - "INSERT INTO menu_items (id, name, price, category_id, description)\n " - + "SELECT id, name, price, category_id, description\n " - + "FROM tmp_menu_items;")) - .build()) - .put( - "example.etl_restaurants", - JobMeta.builder() - .type(BATCH) - .outputs(NAMESPACE_NAME, "public.restaurants") - .location( - "https://github.com/example/jobs/blob/2294bc15eb49071f38425dc927e48655530a2f2e/etl_restaurants.py") - .description("Loads newly registered restaurants daily.") - .context( - sql( - "INSERT INTO restaurants (id, created_at, updated_at, name, email, address, phone, city_id, business_hours_id, description)\n " - + "SELECT id, created_at, updated_at, name, email, address, phone, city_id, business_hours_id, description\n " - + "FROM tmp_restaurants;")) - .build()) - .put( - "example.etl_customers", - JobMeta.builder() - .type(BATCH) - .outputs(NAMESPACE_NAME, "public.customers") - .location( - "https://github.com/example/jobs/blob/2294bc15eb49071f38425dc927e48655530a2f2e/etl_customers.py") - .description("Loads newly registered customers daily.") - .context( - sql( - "INSERT INTO customers (id, created_at, updated_at, name, email, phone, city_id)\n " - + "SELECT id, created_at, updated_at, name, email, phone, city_id\n " - + "FROM tmp_customers;")) - .build()) - .put( - "example.etl_order_status", - JobMeta.builder() - .type(BATCH) - .outputs(NAMESPACE_NAME, "public.order_status") - .location( - "https://github.com/example/jobs/blob/2294bc15eb49071f38425dc927e48655530a2f2e/etl_order_status.py") - .description("Loads order statues updates daily.") - .context( - sql( - "INSERT INTO order_status (id, transitioned_at, status, order_id, customer_id, restaurant_id, driver_id)\n " - + "SELECT id, transitioned_at, status, order_id, customer_id, restaurant_id, driver_id\n " - + "FROM tmp_order_status;")) - .build()) - .put( - "example.etl_drivers", - JobMeta.builder() - .type(BATCH) - .outputs(NAMESPACE_NAME, "public.drivers") - .location( - "https://github.com/example/jobs/blob/2294bc15eb49071f38425dc927e48655530a2f2e/etl_drivers.py") - .description("Loads newly registered drivers daily.") - .context( - sql( - "INSERT INTO drivers (id, created_at, updated_at, name, email, phone, car_make, car_model, car_year, car_color, car_license_plate)\n " - + "SELECT id, created_at, updated_at, name, email, phone, car_make, car_model, car_year, car_color, car_license_plate\n " - + "FROM tmp_drivers;")) - .build()) - .put( - "example.etl_orders_7_days", - JobMeta.builder() - .type(BATCH) - .inputs( - NAMESPACE_NAME, - "public.menus", - "public.menu_items", - "public.orders", - "public.categories") - .outputs(NAMESPACE_NAME, "public.orders_7_days") - .location( - "https://github.com/example/jobs/blob/2294bc15eb49071f38425dc927e48655530a2f2e/etl_orders_7_days.py") - .description("Loads newly placed orders weekly.") - .context( - sql( - "INSERT INTO orders_7_days (order_id, placed_on, discount_id, menu_id, restaurant_id, menu_item_id, category_id)\n " - + "SELECT o.id AS order_id, o.placed_on, o.discount_id, m.id AS menu_id, m.restaurant_id, mi.id AS menu_item_id, c.id AS category_id\n" - + " FROM orders AS o\n" - + " INNER JOIN menu_items AS mi\n" - + " ON menu_items.id = o.menu_item_id\n" - + " INNER JOIN categories AS c\n" - + " ON c.id = mi.category_id\n" - + " INNER JOIN menu AS m\n" - + " ON m.id = c.menu_id\n" - + " WHERE o.placed_on >= NOW() - interval '7 days';")) - .build()) - .put( - "example.etl_delivery_7_days", - JobMeta.builder() - .type(BATCH) - .inputs( - NAMESPACE_NAME, - "public.orders_7_days", - "public.customers", - "public.order_status", - "public.drivers", - "public.restaurants") - .outputs(NAMESPACE_NAME, "public.delivery_7_days") - .location( - "https://github.com/example/jobs/blob/4d0b5d374261fdaf60a1fc588dd8f0d124b0e87f/etl_delivery_7_days.py") - .description("Loads new deliveries for the week.") - .context( - sql( - "INSERT INTO delivery (order_id, order_placed_on, order_dispatched_on, order_delivered_on, customer_email,\n" - + " customer_address, discount_id, menu_id, restaurant_id, restaurant_address, menu_item_id, category_id, driver_id)\n" - + " SELECT o.order_id, o.placed_on AS order_placed_on,\n" - + " (SELECT transitioned_at FROM order_status WHERE order_id == o.order_id AND status = 'DISPATCHED') AS order_dispatched_on,\n" - + " (SELECT transitioned_at FROM order_status WHERE order_id == o.order_id AND status = 'DELIVERED') AS order_delivered_on,\n" - + " c.email AS customer_email, c.address AS customer_address, o.discount_id, o.menu_id, o.restaurant_id,\n" - + " r.address, o.menu_item_id, o.category_id, d.id AS driver_id\n" - + " FROM orders_7_days AS o\n" - + " INNER JOIN order_status AS os\n" - + " ON os.order_id = o.order_id\n" - + " INNER JOIN customers AS c\n" - + " ON c.id = os.customer_id\n" - + " INNER JOIN restaurants AS r\n" - + " ON r.id = os.restaurant_id\n" - + " INNER JOIN drivers AS d\n" - + " ON d.id = os.driver_id\n" - + " WHERE os.transitioned_at >= NOW() - interval '7 days';")) - .build()) - .put( - "example.delivery_times_7_days", - JobMeta.builder() - .type(BATCH) - .inputs(NAMESPACE_NAME, "public.delivery_7_days") - .outputs(NAMESPACE_NAME, "public.top_delivery_times", "public.discounts") - .location( - "https://github.com/example/jobs/blob/2294bc15eb49071f38425dc927e48655530a2f2e/delivery_times_7_days.py") - .description("Determine weekly top delivery times by restaurant.") - .context( - sql( - "INSERT INTO top_delivery_times (order_id, order_placed_on, order_dispatched_on, order_delivered_on, order_delivery_time,\n" - + " customer_email, restaurant_id, driver_id)\n" - + " SELECT order_id, order_placed_on, order_delivered_on, DATEDIFF(minute, order_placed_on, order_delivered_on) AS order_delivery_time,\n" - + " customer_email, restaurant_id, driver_id\n" - + " FROM delivery_7_days\n" - + "GROUP BY restaurant_id\n" - + "ORDER BY order_delivery_time DESC\n" - + " LIMIT 1;")) - .build()) - .put( - "example.email_discounts", - JobMeta.builder() - .type(BATCH) - .inputs(NAMESPACE_NAME, "public.customers", "public.discounts") - .location( - "https://github.com/example/jobs/blob/2294bc15eb49071f38425dc927e48655530a2f2e/email_discounts.py") - .description("Email discounts to customers that have experienced order delays.") - .context(sql("SELECT * FROM discounts;")) - .build()) - .put( - "example.orders_popular_day_of_week", - JobMeta.builder() - .type(BATCH) - .inputs(NAMESPACE_NAME, "public.customers", "public.top_delivery_times") - .outputs(NAMESPACE_NAME, "public.popular_orders_day_of_week") - .location( - "https://github.com/example/jobs/blob/2294bc15eb49071f38425dc927e48655530a2f2e/orders_popular_day_of_week.py") - .description("Determines the popular day of week orders are placed.") - .context( - sql( - "INSERT INTO popular_orders_day_of_week (order_day_of_week, order_placed_on, orders_placed)\n" - + " SELECT order_day_of_week, order_placed_on, COUNT(*)\n" - + " FROM top_delivery_times;")) - .build()) - .build(); - - private static Map sql(final String sql) { - return ImmutableMap.of("sql", sql); - } - - @EqualsAndHashCode - @ToString - static final class ActiveRunMeta { - @Getter private final int levelInGraph; - @Getter private final boolean markFailed; - @Getter private final boolean markRunning; - @Nullable private final CodeChange codeChange; - @Getter private final ImmutableSet schemaChanges; - - public ActiveRunMeta( - final int levelInGraph, - final boolean markFailed, - final boolean markRunning, - @Nullable final CodeChange codeChange, - @NonNull final ImmutableSet schemaChanges) { - this.levelInGraph = levelInGraph; - this.markFailed = markFailed; - this.markRunning = markRunning; - this.codeChange = codeChange; - this.schemaChanges = schemaChanges; - } - - public Optional getCodeChange() { - return Optional.ofNullable(codeChange); - } - - public boolean hasChanges() { - return !hasCodeChange() && !hasSchemaChange(); - } - - public boolean hasCodeChange() { - return (codeChange != null); - } - - public boolean hasSchemaChange() { - return !schemaChanges.isEmpty(); - } - - public Optional schemaChangeFor(String datasetName) { - checkArgument(hasSchemaChange()); - for (SchemaChange schemaChange : schemaChanges) { - if (schemaChange.getDatasetName().equals(checkNotBlank(datasetName))) { - return Optional.of(schemaChange); - } - } - return Optional.empty(); - } - - interface Change {} - - @EqualsAndHashCode - @ToString - static class CodeChange implements Change { - @Getter String jobName; - @Nullable URL fromUrl; - @Getter URL toUrl; - - public CodeChange( - @NonNull final String jobName, @Nullable final URL fromUrl, @NonNull final URL toUrl) { - this.jobName = jobName; - this.fromUrl = fromUrl; - this.toUrl = toUrl; - } - - public Optional getFromUrl() { - return Optional.ofNullable(fromUrl); - } - - public static Builder builder() { - return new Builder(); - } - - static final class Builder { - private String jobName; - private URL fromUrl; - private URL toUrl; - - public Builder jobName(@NonNull String jobName) { - this.jobName = jobName; - return this; - } - - public Builder fromUrl(@NonNull String fromUrlString) { - return fromUrl(Utils.toUrl(fromUrlString)); - } - - public Builder fromUrl(@NonNull URL fromUrl) { - this.fromUrl = fromUrl; - return this; - } - - public Builder toUrl(@NonNull String toUrl) { - return toUrl(Utils.toUrl(toUrl)); - } - - public Builder toUrl(@NonNull URL toUrl) { - this.toUrl = toUrl; - return this; - } - - public CodeChange build() { - return new CodeChange(jobName, fromUrl, toUrl); - } - } - } - - @EqualsAndHashCode - @ToString - static class SchemaChange implements Change { - @Getter private String datasetName; - @Getter private String fieldName; - @Getter private String fromType; - @Getter private String toType; - - public SchemaChange( - @NonNull final String datasetName, - @NonNull final String fieldName, - @NonNull final String fromType, - @NonNull final String toType) { - this.datasetName = datasetName; - this.fieldName = fieldName; - this.fromType = fromType; - this.toType = toType; - } - - public static Builder builder() { - return new Builder(); - } - - static final class Builder { - private String datasetName; - private String fieldName; - private String fromType; - private String toType; - - public Builder datasetName(@NonNull String datasetName) { - this.datasetName = datasetName; - return this; - } - - public Builder fieldName(@NonNull String fieldName) { - this.fieldName = fieldName; - return this; - } - - public Builder fromType(@NonNull String fromType) { - this.fromType = fromType; - return this; - } - - public Builder toType(@NonNull String toType) { - this.toType = toType; - return this; - } - - public SchemaChange build() { - return new SchemaChange(datasetName, fieldName, fromType, toType); - } - } - } - - public static ImmutableList successes( - final int levelInLineageGraph, final int numOfSuccesses) { - final ImmutableList.Builder activeRuns = ImmutableList.builder(); - for (int i = 0; i < numOfSuccesses; i++) { - activeRuns.add(ActiveRunMeta.builder().levelInLineageGraph(levelInLineageGraph).build()); - } - return activeRuns.build(); - } - - public static ImmutableList failures( - final int levelInLineageGraph, final int numOfFailures) { - final ImmutableList.Builder activeRuns = ImmutableList.builder(); - for (int i = 0; i < numOfFailures; i++) { - activeRuns.add( - ActiveRunMeta.builder().levelInLineageGraph(levelInLineageGraph).markFailed().build()); - } - return activeRuns.build(); - } - - public static ImmutableList running( - final int levelInLineageGraph, final int numOfRunning) { - final ImmutableList.Builder activeRuns = ImmutableList.builder(); - for (int i = 0; i < numOfRunning; i++) { - activeRuns.add( - ActiveRunMeta.builder().levelInLineageGraph(levelInLineageGraph).markRunning().build()); - } - return activeRuns.build(); - } - - public static ImmutableList randomize( - final int levelInLineageGraph, final int numOfRandom) { - final ImmutableList.Builder activeRuns = ImmutableList.builder(); - for (int i = 0; i < numOfRandom; i++) { - if (new Random().nextBoolean()) { - activeRuns.add(successes(levelInLineageGraph, 1).get(0)); - } else { - activeRuns.add(failures(levelInLineageGraph, 1).get(0)); - } - } - return activeRuns.build(); - } - - public static ActiveRunMeta successesWith( - int levelInLineageGraph, @Nullable final SchemaChange... schemaChanges) { - return successesWith(levelInLineageGraph, null, schemaChanges); - } - - public static ActiveRunMeta successesWith( - final int levelInLineageGraph, - @Nullable final CodeChange codeChange, - @Nullable final SchemaChange... schemaChanges) { - return ActiveRunMeta.builder() - .levelInLineageGraph(levelInLineageGraph) - .codeChange(codeChange) - .schemaChanges(ImmutableSet.copyOf(schemaChanges)) - .build(); - } - - public static Builder builder() { - return new Builder(); - } - - static final class Builder { - private int levelInLineageGraph; - private boolean markFailed; - private boolean markRunning; - private CodeChange codeChange; - private ImmutableSet schemaChanges; - - private Builder() { - this.markFailed = false; - this.markRunning = true; - this.schemaChanges = ImmutableSet.of(); - } - - public Builder levelInLineageGraph(int levelInLineageGraph) { - this.levelInLineageGraph = levelInLineageGraph; - return this; - } - - public Builder markFailed() { - this.markFailed = true; - return this; - } - - public Builder markRunning() { - this.markRunning = true; - return this; - } - - public Builder codeChange(@NonNull CodeChange codeChange) { - this.codeChange = codeChange; - return this; - } - - public Builder schemaChanges(@NonNull ImmutableSet schemaChanges) { - this.schemaChanges = schemaChanges; - return this; - } - - public ActiveRunMeta build() { - return new ActiveRunMeta( - levelInLineageGraph, markFailed, markRunning, codeChange, schemaChanges); - } - } + /* Returns {@link OpenLineage.RunEvent}s contained within the provided metadata file. */ + @SneakyThrows + private ImmutableList loadMetadata(@NonNull String olMetadata) { + log.info("Loading metadata from: '{}'", olMetadata); + return newObjectMapper() + .readValue( + Paths.get(olMetadata).toFile(), + new TypeReference>() {}); } - - static final LinkedHashMap> ACTIVE_RUN_META = - Maps.newLinkedHashMap( - new ImmutableMap.Builder>() - .put("test.job_with_no_inputs_or_outputs", ActiveRunMeta.failures(0, 2)) - .put("test.job_with_no_runs", ActiveRunMeta.successes(0, 0)) - .put( - "example.etl_categories", - ActiveRunMeta.successes(0, LINEAGE_GRAPH_24_HOUR_WINDOW)) - .put( - "example.etl_menu_items", - ActiveRunMeta.successes(0, LINEAGE_GRAPH_24_HOUR_WINDOW)) - .put("example.etl_menus", ActiveRunMeta.successes(0, LINEAGE_GRAPH_24_HOUR_WINDOW)) - .put( - "example.etl_orders", - ImmutableList.copyOf( - Iterables.concat( - ActiveRunMeta.running(0, 1), - ActiveRunMeta.randomize(0, LINEAGE_GRAPH_24_HOUR_WINDOW - 1)))) - .put( - "example.etl_customers", ActiveRunMeta.successes(1, LINEAGE_GRAPH_24_HOUR_WINDOW)) - .put("example.etl_drivers", ActiveRunMeta.successes(1, LINEAGE_GRAPH_24_HOUR_WINDOW)) - .put( - "example.etl_order_status", - ActiveRunMeta.successes(1, LINEAGE_GRAPH_24_HOUR_WINDOW)) - .put( - "example.etl_orders_7_days", - ImmutableList.copyOf( - Iterables.concat( - ActiveRunMeta.running(1, 1), - ActiveRunMeta.successes(1, 1), - ActiveRunMeta.randomize(1, LINEAGE_GRAPH_24_HOUR_WINDOW - 2)))) - .put( - "example.etl_restaurants", - ActiveRunMeta.successes(1, LINEAGE_GRAPH_24_HOUR_WINDOW)) - .put( - "example.etl_delivery_7_days", - ImmutableList.copyOf( - Iterables.concat( - ImmutableList.of( - ActiveRunMeta.successesWith( - 2, - ActiveRunMeta.CodeChange.builder() - .jobName("example.etl_delivery_7_days") - .fromUrl( - JOB_META - .get("example.etl_delivery_7_days") - .getLocation() - .orElse(null)) - .toUrl( - "https://github.com/example/jobs/blob/c87f2a40553cfa4ae7178083a068bf1d0c6ca3a8/etl_delivery_7_days.py") - .build(), - ActiveRunMeta.SchemaChange.builder() - .datasetName("public.delivery_7_days") - .fieldName("discount_id") - .fromType("INTEGER") - .toType("VARCHAR") - .build())), - ActiveRunMeta.successes(2, LINEAGE_GRAPH_24_HOUR_WINDOW - 1)))) - .put( - "example.delivery_times_7_days", - ImmutableList.copyOf( - Iterables.concat( - ActiveRunMeta.failures(3, 1), - ActiveRunMeta.successes(3, LINEAGE_GRAPH_24_HOUR_WINDOW - 2), - ActiveRunMeta.failures(3, 1)))) - .put( - "example.email_discounts", - ActiveRunMeta.successes(4, LINEAGE_GRAPH_24_HOUR_WINDOW)) - .put( - "example.orders_popular_day_of_week", - ActiveRunMeta.successes(4, LINEAGE_GRAPH_24_HOUR_WINDOW)) - .build()); } diff --git a/build.gradle b/build.gradle index 275fbab702..996f91d8ba 100644 --- a/build.gradle +++ b/build.gradle @@ -59,7 +59,7 @@ subprojects { junit5Version = '5.8.2' lombokVersion = '1.18.24' mockitoVersion = '4.5.1' - openlineageVersion = '0.8.1' + openlineageVersion = '0.13.0' slf4jVersion = '1.7.36' postgresqlVersion = '42.3.5' isReleaseVersion = !version.endsWith('SNAPSHOT') diff --git a/docker-compose.seed.yml b/docker-compose.seed.yml index 725a3446f5..3aa9ea373a 100644 --- a/docker-compose.seed.yml +++ b/docker-compose.seed.yml @@ -4,11 +4,11 @@ services: image: "marquezproject/marquez:${TAG}" container_name: seed-marquez-with-metadata environment: - - MARQUEZ_HOST=api - - MARQUEZ_PORT=${API_PORT} + - MARQUEZ_URL=http://api:${API_PORT} volumes: - ./docker/wait-for-it.sh:/usr/src/app/wait-for-it.sh - ./docker/seed.sh:/usr/src/app/seed.sh + - ./docker/metadata.json:/usr/src/app/metadata.json links: - "db:postgres" depends_on: diff --git a/docker/metadata.json b/docker/metadata.json new file mode 100644 index 0000000000..538775030a --- /dev/null +++ b/docker/metadata.json @@ -0,0 +1,1213 @@ +[ + { + "eventType": "START", + "eventTime": "2020-02-22T22:42:42.000Z", + "run": { + "runId": "d46e465b-d358-4d32-83d4-df660ff614dd" + }, + "job": { + "namespace": "food_delivery", + "name": "etl_menus", + "facets": { + "sql": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SQLJobFacet.json", + "query": "INSERT INTO menus (id, name, restaurant_id, description)\n SELECT id, name, restaurant_id, description\n FROM tmp_menus;" + }, + "documentation": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/DocumentationJobFacet.json", + "description": "Loads newly added restaurant menus daily." + } + } + }, + "outputs": [ + { + "namespace": "food_delivery", + "name": "public.menus", + "facets": { + "schema": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SchemaDatasetFacet.json", + "fields": [ + { + "name": "id", + "type": "INTEGER", + "tags": [], + "description": "The unique ID of the menu." + }, + { + "name": "name", + "type": "VARCHAR", + "tags": [], + "description": "The name of the menu." + }, + { + "name": "restaurant_id", + "type": "INTEGER", + "tags": [], + "description": "The ID of the restaurant related to the menu." + }, + { + "name": "description", + "type": "TEXT", + "tags": [], + "description": "The description of the menu." + } + ] + } + } + } + ], + "producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json" + }, + { + "eventType": "COMPLETE", + "eventTime": "2020-02-22T22:45:52.000Z", + "run": { + "runId": "d46e465b-d358-4d32-83d4-df660ff614dd" + }, + "job": { + "namespace": "food_delivery", + "name": "etl_menus" + }, + "producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json" + }, + { + "eventType": "START", + "eventTime": "2020-02-22T22:42:42.000Z", + "run": { + "runId": "6f0c13a5-f29b-46a5-90c1-0ffbebbbd1aa" + }, + "job": { + "namespace": "food_delivery", + "name": "etl_categories", + "facets": { + "sql": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SQLJobFacet.json", + "query": "INSERT INTO categories (id, name, menu_id, description)\n SELECT id, name, menu_id, description\n FROM tmp_categories;" + }, + "documentation": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/DocumentationJobFacet.json", + "description": "Loads newly added menus categories daily." + } + } + }, + "outputs": [ + { + "namespace": "food_delivery", + "name": "public.categories", + "facets": { + "schema": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SchemaDatasetFacet.json", + "fields": [ + { + "name": "id", + "type": "INTEGER", + "description": "The unique ID of the category." + }, + { + "name": "name", + "type": "VARCHAR", + "description": "The name of the category." + }, + { + "name": "menu_id", + "type": "INTEGER", + "description": "The ID of the menu related to the category." + }, + { + "name": "description", + "type": "TEXT", + "description": "The description of the category." + } + ] + } + } + } + ], + "producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json" + }, + { + "eventType": "COMPLETE", + "eventTime": "2020-02-22T22:44:52.000Z", + "run": { + "runId": "6f0c13a5-f29b-46a5-90c1-0ffbebbbd1aa" + }, + "job": { + "namespace": "food_delivery", + "name": "etl_categories" + }, + "producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json" + }, + { + "eventType": "START", + "eventTime": "2020-02-22T22:42:42.000Z", + "run": { + "runId": "e05901b1-3a06-4b98-8d9c-aaf188c9a28c" + }, + "job": { + "namespace": "food_delivery", + "name": "etl_menu_items", + "facets": { + "sql": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SQLJobFacet.json", + "query": "INSERT INTO menu_items (id, name, price, category_id, description)\n SELECT id, name, price, category_id, description\n FROM tmp_menu_items;" + }, + "documentation": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/DocumentationJobFacet.json", + "description": "Loads newly added restaurant menu items daily." + } + } + }, + "outputs": [ + { + "namespace": "food_delivery", + "name": "public.menu_items", + "facets": { + "schema": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SchemaDatasetFacet.json", + "fields": [ + { + "name": "id", + "type": "INTEGER", + "description": "The ID of the menu item." + }, + { + "name": "name", + "type": "VARCHAR", + "description": "The name of the menu item." + }, + { + "name": "price", + "type": "VARCHAR", + "description": "The price of the menu item." + }, + { + "name": "category_id", + "type": "VARCHAR", + "description": "The ID of the category related to the item." + }, + { + "name": "description", + "type": "TEXT", + "description": "The description of the menu item." + } + ] + } + } + } + ], + "producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json" + }, + { + "eventType": "COMPLETE", + "eventTime": "2020-02-22T22:44:54.000Z", + "run": { + "runId": "e05901b1-3a06-4b98-8d9c-aaf188c9a28c" + }, + "job": { + "namespace": "food_delivery", + "name": "etl_menu_items" + }, + "producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json" + }, + { + "eventType": "START", + "eventTime": "2020-02-22T22:42:42.000Z", + "run": { + "runId": "a43a8523-349f-4296-807f-3354ac491990" + }, + "job": { + "namespace": "food_delivery", + "name": "etl_orders", + "facets": { + "sql": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SQLJobFacet.json", + "query": "INSERT INTO orders (id, placed_on, menu_item_id, quantity, discount_id, comment)\n SELECT id, placed_on, menu_item_id, quantity, discount_id, comment\n FROM tmp_orders;" + }, + "documentation": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/DocumentationJobFacet.json", + "description": "Loads newly placed orders daily." + } + } + }, + "outputs": [ + { + "namespace": "food_delivery", + "name": "public.orders", + "facets": { + "schema": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SchemaDatasetFacet.json", + "fields": [ + { + "name": "id", + "type": "INTEGER", + "description": "The unique ID of the order." + }, + { + "name": "placed_on", + "type": "TIMESTAMP", + "description": "An ISO-8601 timestamp representing the date/time the order was placed." + }, + { + "name": "menu_item_id", + "type": "INTEGER", + "description": "The ID of the menu item related to the order." + }, + { + "name": "quantity", + "type": "INTEGER", + "description": "The number of the item in the order." + }, + { + "name": "discount_id", + "type": "INTEGER", + "description": "The unique ID of the discount applied to the order." + }, + { + "name": "comment", + "type": "TEXT", + "description": "The comment of the order." + } + ] + } + } + } + ], + "producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json" + }, + { + "eventType": "COMPLETE", + "eventTime": "2020-02-22T22:44:52.000Z", + "run": { + "runId": "a43a8523-349f-4296-807f-3354ac491990" + }, + "job": { + "namespace": "food_delivery", + "name": "etl_orders" + }, + "producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json" + }, + { + "eventType": "START", + "eventTime": "2020-02-22T22:42:42.000Z", + "run": { + "runId": "ffba2c14-4170-48da-bec3-ab5fd4ec9a3f" + }, + "job": { + "namespace": "food_delivery", + "name": "etl_orders_7_days", + "facets": { + "sql": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SQLJobFacet.json", + "query": "INSERT INTO orders_7_days (order_id, placed_on, discount_id, restaurant_id, menu_id, menu_item_id, category_id)\n SELECT o.id AS order_id, o.placed_on, o.discount_id, m.restaurant_id, m.id AS menu_id, mi.id AS menu_item_id, c.id AS category_id\n FROM orders AS o\n INNER JOIN menu_items AS mi\n ON menu_items.id = o.menu_item_id\n INNER JOIN categories AS c\n ON c.id = mi.category_id\n INNER JOIN menu AS m\n ON m.id = c.menu_id\n WHERE o.placed_on >= NOW() - interval '7 days';" + }, + "documentation": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/DocumentationJobFacet.json", + "description": "Loads newly placed orders weekly." + } + } + }, + "inputs": [ + { + "namespace": "food_delivery", + "name": "public.menus" + }, + { + "namespace": "food_delivery", + "name": "public.menu_items" + }, + { + "namespace": "food_delivery", + "name": "public.orders" + }, + { + "namespace": "food_delivery", + "name": "public.categories" + } + ], + "outputs": [ + { + "namespace": "food_delivery", + "name": "public.orders_7_days", + "facets": { + "schema": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SchemaDatasetFacet.json", + "fields": [ + { + "name": "order_id", + "type": "INTEGER", + "description": "The ID of the order." + }, + { + "name": "placed_on", + "type": "TIMESTAMP", + "description": "An ISO-8601 timestamp representing the date/time the order was placed." + }, + { + "name": "discount_id", + "type": "INTEGER", + "description": "The ID of the discount applied to the order." + }, + { + "name": "restaurant_id", + "type": "INTEGER", + "description": "The ID of the restaurant related to the order." + }, + { + "name": "menu_id", + "type": "INTEGER", + "description": "The ID of the menu related to the order." + }, + { + "name": "menu_item_id", + "type": "INTEGER", + "description": "The ID of the menu item related to the order." + }, + { + "name": "category_id", + "type": "INTEGER", + "description": "The ID of category related to the order." + } + ] + } + } + } + ], + "producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json" + }, + { + "eventType": "COMPLETE", + "eventTime": "2020-02-22T22:44:02.000Z", + "run": { + "runId": "ffba2c14-4170-48da-bec3-ab5fd4ec9a3f" + }, + "job": { + "namespace": "food_delivery", + "name": "etl_orders_7_days" + }, + "producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json" + }, + { + "eventType": "START", + "eventTime": "2020-02-22T22:42:42.000Z", + "run": { + "runId": "182a9eaf-881a-4d49-860c-f7e260b8bf60" + }, + "job": { + "namespace": "food_delivery", + "name": "etl_customers", + "facets": { + "sql": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SQLJobFacet.json", + "query": "INSERT INTO customers (id, created_at, updated_at, name, email, phone, city_id)\n SELECT id, created_at, updated_at, name, email, phone, city_id\n FROM tmp_customers;" + }, + "documentation": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/DocumentationJobFacet.json", + "description": "Loads newly registered customers daily." + } + } + }, + "outputs": [ + { + "namespace": "food_delivery", + "name": "public.customers", + "facets": { + "schema": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SchemaDatasetFacet.json", + "fields": [ + { + "name": "id", + "type": "INTEGER", + "description": "The unique ID of the customer." + }, + { + "name": "created_at", + "type": "TIMESTAMP", + "description": "An ISO-8601 timestamp representing the date/time the customer was created." + }, + { + "name": "updated_at", + "type": "TIMESTAMP", + "description": "An ISO-8601 timestamp representing the date/time the customer was updated." + }, + { + "name": "name", + "type": "VARCHAR", + "description": "The name of the customer." + }, + { + "name": "email", + "type": "VARCHAR", + "description": "The email address of the customer." + }, + { + "name": "address", + "type": "VARCHAR", + "description": "The address of the customer." + }, + { + "name": "phone", + "type": "VARCHAR", + "description": "The phone number of the customer." + }, + { + "name": "city_id", + "type": "INTEGER", + "description": "The ID of the city related to the customer." + } + ] + } + } + } + ], + "producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json" + }, + { + "eventType": "COMPLETE", + "eventTime": "2020-02-22T22:44:55.000Z", + "run": { + "runId": "182a9eaf-881a-4d49-860c-f7e260b8bf60" + }, + "job": { + "namespace": "food_delivery", + "name": "etl_customers" + }, + "producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json" + }, + { + "eventType": "START", + "eventTime": "2020-02-22T22:42:42.000Z", + "run": { + "runId": "b7098939-87f0-4207-878f-dfd8e8804d8a" + }, + "job": { + "namespace": "food_delivery", + "name": "etl_order_status", + "facets": { + "sql": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SQLJobFacet.json", + "query": "INSERT INTO order_status (id, transitioned_at, status, order_id, customer_id, restaurant_id, driver_id)\n SELECT id, transitioned_at, status, order_id, customer_id, restaurant_id, driver_id\n FROM tmp_order_status;" + }, + "documentation": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/DocumentationJobFacet.json", + "description": "Loads order statues updates daily." + } + } + }, + "outputs": [ + { + "namespace": "food_delivery", + "name": "public.order_status", + "facets": { + "schema": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SchemaDatasetFacet.json", + "fields": [ + { + "name": "id", + "type": "INTEGER", + "description": "The unique ID of the order status." + }, + { + "name": "transitioned_at", + "type": "TIMESTAMP", + "description": "An ISO-8601 timestamp representing the date/time the order status was transitioned." + }, + { + "name": "status", + "type": "VARCHAR", + "description": "The status of the order." + }, + { + "name": "order_id", + "type": "INTEGER", + "description": "The ID of the order related to the order status." + }, + { + "name": "customer_id", + "type": "INTEGER", + "description": "The ID of the customer related to the order status." + }, + { + "name": "restaurant_id", + "type": "INTEGER", + "description": "The ID of the restaurant related to the order status." + }, + { + "name": "driver_id", + "type": "INTEGER", + "description": "The ID of the driver related to the order status." + } + ] + } + } + } + ], + "producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json" + }, + { + "eventType": "COMPLETE", + "eventTime": "2020-02-22T22:44:52.000Z", + "run": { + "runId": "b7098939-87f0-4207-878f-dfd8e8804d8a" + }, + "job": { + "namespace": "food_delivery", + "name": "etl_order_status" + }, + "producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json" + }, + { + "eventType": "START", + "eventTime": "2020-02-22T22:42:42.000Z", + "run": { + "runId": "9f3db1c5-5e9a-4280-8184-18aca4592c77" + }, + "job": { + "namespace": "food_delivery", + "name": "etl_drivers", + "facets": { + "sql": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SQLJobFacet.json", + "query": "INSERT INTO drivers (id, created_at, updated_at, name, email, phone, car_make, car_model, car_year, car_color, car_license_plate)\n SELECT id, created_at, updated_at, name, email, phone, car_make, car_model, car_year, car_color, car_license_plate\n FROM tmp_drivers;" + }, + "documentation": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/DocumentationJobFacet.json", + "description": "Loads newly registered drivers daily." + } + } + }, + "outputs": [ + { + "namespace": "food_delivery", + "name": "public.drivers", + "facets": { + "schema": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SchemaDatasetFacet.json", + "fields": [ + { + "name": "id", + "type": "INTEGER", + "tags": [], + "description": "The unique ID of the driver." + }, + { + "name": "created_at", + "type": "TIMESTAMP", + "tags": [], + "description": "An ISO-8601 timestamp representing the date/time the driver was created." + }, + { + "name": "updated_at", + "type": "TIMESTAMP", + "tags": [], + "description": "An ISO-8601 timestamp representing the date/time the driver was updated." + }, + { + "name": "name", + "type": "VARCHAR", + "tags": [], + "description": "The name of the driver." + }, + { + "name": "email", + "type": "VARCHAR", + "tags": [], + "description": "The email of the driver." + }, + { + "name": "phone", + "type": "VARCHAR", + "tags": [], + "description": "The phone number of the driver." + }, + { + "name": "car_make", + "type": "VARCHAR", + "tags": [], + "description": "The make of the car." + }, + { + "name": "car_model", + "type": "VARCHAR", + "tags": [], + "description": "The model of the car." + }, + { + "name": "car_year", + "type": "VARCHAR", + "tags": [], + "description": "The year of the car." + }, + { + "name": "car_color", + "type": "VARCHAR", + "tags": [], + "description": "The color of the car." + }, + { + "name": "car_license_plate", + "type": "VARCHAR", + "tags": [], + "description": "The license plate number of the car." + } + ] + } + } + } + ], + "producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json" + }, + { + "eventType": "COMPLETE", + "eventTime": "2020-02-22T22:44:52.000Z", + "run": { + "runId": "9f3db1c5-5e9a-4280-8184-18aca4592c77" + }, + "job": { + "namespace": "food_delivery", + "name": "etl_drivers" + }, + "producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json" + }, + { + "eventType": "START", + "eventTime": "2020-02-22T22:42:42.000Z", + "run": { + "runId": "8ddfb1d9-415f-4850-bcd6-01d02f011abe" + }, + "job": { + "namespace": "food_delivery", + "name": "etl_restaurants", + "facets": { + "sql": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SQLJobFacet.json", + "query": "INSERT INTO restaurants (id, created_at, updated_at, name, email, address, phone, city_id, business_hours_id, description)\n SELECT id, created_at, updated_at, name, email, address, phone, city_id, business_hours_id, description\n FROM tmp_restaurants;" + }, + "documentation": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/DocumentationJobFacet.json", + "description": "Loads newly registered restaurants daily." + } + } + }, + "outputs": [ + { + "namespace": "food_delivery", + "name": "public.restaurants", + "facets": { + "schema": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SchemaDatasetFacet.json", + "fields": [ + { + "name": "id", + "type": "INTEGER", + "tags": [], + "description": "The unique ID of the restaurant." + }, + { + "name": "created_at", + "type": "TIMESTAMP", + "tags": [], + "description": "An ISO-8601 timestamp representing the date/time the restaurant was created." + }, + { + "name": "updated_at", + "type": "TIMESTAMP", + "tags": [], + "description": "An ISO-8601 timestamp representing the date/time the restaurant was updated." + }, + { + "name": "name", + "type": "VARCHAR", + "tags": [], + "description": "The name of the restaurant." + }, + { + "name": "email", + "type": "VARCHAR", + "tags": [], + "description": "The email address of the restaurant." + }, + { + "name": "address", + "type": "VARCHAR", + "tags": [], + "description": "The address of the restaurant." + }, + { + "name": "phone", + "type": "VARCHAR", + "tags": [], + "description": "The phone number of the restaurant." + }, + { + "name": "city_id", + "type": "INTEGER", + "tags": [], + "description": "The ID of the city related to the restaurant." + }, + { + "name": "business_hours_id", + "type": "INTEGER", + "tags": [], + "description": "The ID of the business hours related to the restaurant." + }, + { + "name": "description", + "type": "TEXT", + "tags": [], + "description": "The description of the restaurant." + } + ] + } + } + } + ], + "producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json" + }, + { + "eventType": "COMPLETE", + "eventTime": "2020-02-22T22:44:56.000Z", + "run": { + "runId": "8ddfb1d9-415f-4850-bcd6-01d02f011abe" + }, + "job": { + "namespace": "food_delivery", + "name": "etl_restaurants" + }, + "producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json" + }, + { + "eventType": "START", + "eventTime": "2020-02-22T22:42:42.000Z", + "run": { + "runId": "d5a2a4c4-fc78-428d-ae85-08c942ed8371" + }, + "job": { + "namespace": "food_delivery", + "name": "etl_delivery_7_days", + "facets": { + "sql": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SQLJobFacet.json", + "query": "INSERT INTO delivery_7_days (order_id, order_placed_on, order_dispatched_on, order_delivered_on, customer_email,\n customer_address, discount_id, menu_id, restaurant_id, restaurant_address, menu_item_id, category_id, driver_id)\n SELECT o.order_id, o.placed_on AS order_placed_on,\n (SELECT transitioned_at FROM order_status WHERE order_id == o.order_id AND status = 'DISPATCHED') AS order_dispatched_on,\n (SELECT transitioned_at FROM order_status WHERE order_id == o.order_id AND status = 'DELIVERED') AS order_delivered_on,\n c.email AS customer_email, c.address AS customer_address, o.discount_id, o.menu_id, o.restaurant_id,\n r.address, o.menu_item_id, o.category_id, d.id AS driver_id\n FROM orders_7_days AS o\n INNER JOIN order_status AS os\n ON os.order_id = o.order_id\n INNER JOIN customers AS c\n ON c.id = os.customer_id\n INNER JOIN restaurants AS r\n ON r.id = os.restaurant_id\n INNER JOIN drivers AS d\n ON d.id = os.driver_id\n WHERE os.transitioned_at >= NOW() - interval '7 days';" + }, + "documentation": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/DocumentationJobFacet.json", + "description": "Loads new deliveries for the week." + } + } + }, + "inputs": [ + { + "namespace": "food_delivery", + "name": "public.orders_7_days" + }, + { + "namespace": "food_delivery", + "name": "public.customers" + }, + { + "namespace": "food_delivery", + "name": "public.order_status" + }, + { + "namespace": "food_delivery", + "name": "public.drivers" + }, + { + "namespace": "food_delivery", + "name": "public.restaurants" + } + ], + "outputs": [ + { + "namespace": "food_delivery", + "name": "public.delivery_7_days", + "facets": { + "schema": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SchemaDatasetFacet.json", + "fields": [ + { + "name": "order_id", + "type": "INTEGER", + "tags": [], + "description": "The ID of the order." + }, + { + "name": "order_placed_on", + "type": "TIMESTAMP", + "tags": [], + "description": "An ISO-8601 timestamp representing the date/time the order was placed." + }, + { + "name": "order_dispatched_on", + "type": "TIMESTAMP", + "tags": [], + "description": "An ISO-8601 timestamp representing the date/time the order was dispatched." + }, + { + "name": "order_delivered_on", + "type": "TIMESTAMP", + "tags": [], + "description": "An ISO-8601 timestamp representing the date/time the order was delivered." + }, + { + "name": "customer_email", + "type": "VARCHAR", + "tags": [], + "description": "The email address of the customer." + }, + { + "name": "menu_id", + "type": "INTEGER", + "tags": [], + "description": "The ID of the menu related to the order." + }, + { + "name": "menu_item_id", + "type": "INTEGER", + "tags": [], + "description": "The ID of the menu item related to the order." + }, + { + "name": "category_id", + "type": "INTEGER", + "tags": [], + "description": "The ID of category related to the order." + }, + { + "name": "discount_id", + "type": "INTEGER", + "tags": [], + "description": "The ID of the discount applied to the order" + }, + { + "name": "city_id", + "type": "INTEGER", + "tags": [], + "description": "The ID of the city related to the order." + }, + { + "name": "driver_id", + "type": "INTEGER", + "tags": [], + "description": "The ID of the driver related to the order." + } + ] + } + } + } + ], + "producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json" + }, + { + "eventType": "COMPLETE", + "eventTime": "2020-02-22T22:48:12.000Z", + "run": { + "runId": "d5a2a4c4-fc78-428d-ae85-08c942ed8371" + }, + "job": { + "namespace": "food_delivery", + "name": "etl_delivery_7_days" + }, + "producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json" + }, + { + "eventType": "START", + "eventTime": "2020-02-22T22:42:42.000Z", + "run": { + "runId": "bd41a42a-bf18-4b74-9bb7-cd62637823d8" + }, + "job": { + "namespace": "food_delivery", + "name": "delivery_times_7_days", + "facets": { + "sql": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SQLJobFacet.json", + "query": "INSERT INTO top_delivery_times (order_id, order_placed_on, order_dispatched_on, order_delivered_on, order_delivery_time,\n customer_email, restaurant_id, driver_id)\n SELECT order_id, order_placed_on, order_delivered_on, DATEDIFF(minute, order_placed_on, order_delivered_on) AS order_delivery_time,\n customer_email, restaurant_id, driver_id\n FROM delivery_7_days\n GROUP BY restaurant_id\n ORDER BY order_delivery_time DESC\n LIMIT 1;\nINSERT INTO discounts (amount_off, customer_email, starts_on, ends_on)\n SELECT customer_email, DATEDIFF(minute, order_placed_on, order_delivered_on) AS order_delivery_time,\n CASE WHEN order_delivery_time >= 60 THEN 15\n ELSE 5\n END AS amount_off,\n NOW() AS starts_on,\n NOW() + interval '7 days' AS ends_on\n FROM top_delivery_times\n WHERE order_delivery_time >= 45;" + }, + "documentation": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/DocumentationJobFacet.json", + "description": "Determine weekly top delivery times by restaurant." + } + } + }, + "inputs": [ + { + "namespace": "food_delivery", + "name": "public.delivery_7_days" + } + ], + "outputs": [ + { + "namespace": "food_delivery", + "name": "public.top_delivery_times", + "facets": { + "schema": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SchemaDatasetFacet.json", + "fields": [ + { + "name": "order_id", + "type": "INTEGER", + "tags": [], + "description": "The ID of the order." + }, + { + "name": "order_placed_on", + "type": "TIMESTAMP", + "tags": [], + "description": "An ISO-8601 timestamp representing the date/time the order was placed." + }, + { + "name": "order_dispatched_on", + "type": "TIMESTAMP", + "tags": [], + "description": "An ISO-8601 timestamp representing the date/time the order was dispatched." + }, + { + "name": "order_delivered_on", + "type": "TIMESTAMP", + "tags": [], + "description": "An ISO-8601 timestamp representing the date/time the order was delivered." + }, + { + "name": "order_delivered_time", + "type": "TIMESTAMP", + "tags": [], + "description": "An ISO-8601 timestamp representing the total time of delivery." + }, + { + "name": "customer_email", + "type": "VARCHAR", + "tags": [], + "description": "The email address of the customer." + }, + { + "name": "restaurant_id", + "type": "INTEGER", + "tags": [], + "description": "The ID of the restaurant related to the order." + }, + { + "name": "driver_id", + "type": "INTEGER", + "tags": [], + "description": "The ID of the driver related to the order." + } + ] + } + } + }, + { + "namespace": "food_delivery", + "name": "public.discounts", + "facets": { + "schema": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SchemaDatasetFacet.json", + "fields": [ + { + "name": "id", + "type": "INTEGER", + "tags": [], + "description": "The unique ID of the discount." + }, + { + "name": "amount_off", + "type": "INTEGER", + "tags": [], + "description": "The amount of the discount." + }, + { + "name": "customer_email", + "type": "VARCHAR", + "tags": [], + "description": "The email address of the customer." + }, + { + "name": "starts_on", + "type": "TIMESTAMP", + "tags": [], + "description": "An ISO-8601 timestamp representing the date/time the discount starts." + }, + { + "name": "ends_on", + "type": "TIMESTAMP", + "tags": [], + "description": "An ISO-8601 timestamp representing the date/time the discount ends." + } + ] + } + } + } + ], + "producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json" + }, + { + "eventType": "COMPLETE", + "eventTime": "2020-02-22T22:58:02.000Z", + "run": { + "runId": "bd41a42a-bf18-4b74-9bb7-cd62637823d8" + }, + "job": { + "namespace": "food_delivery", + "name": "delivery_times_7_days" + }, + "producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json" + }, + { + "eventType": "START", + "eventTime": "2020-02-22T22:42:42.000Z", + "run": { + "runId": "adc8507c-595e-4d76-9dac-be2bf0ffe1ee" + }, + "job": { + "namespace": "food_delivery", + "name": "orders_popular_day_of_week", + "facets": { + "sql": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SQLJobFacet.json", + "query": "INSERT INTO popular_orders_day_of_week (order_day_of_week, order_placed_on, orders_placed)\n SELECT EXTRACT(DOW FROM t.order_placed_on) AS order_day_of_week, t.order_placed_on, COUNT(*) AS orders_placed\n FROM top_delivery_times AS t\n INNER JOIN customers AS c\n ON d.customer_email = c.email\n GROUP BY t.order_day_of_week\n ORDER BY t.orders_placed DESC\n LIMIT 1;" + }, + "documentation": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/DocumentationJobFacet.json", + "description": "Determines the popular day of week orders are placed." + } + } + }, + "inputs": [ + { + "namespace": "food_delivery", + "name": "public.top_delivery_times" + }, + { + "namespace": "food_delivery", + "name": "public.customers" + } + ], + "outputs": [ + { + "namespace": "food_delivery", + "name": "public.popular_orders_day_of_week", + "facets": { + "schema": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SchemaDatasetFacet.json", + "fields": [ + { + "name": "order_day_of_week", + "type": "INTEGER", + "tags": [], + "description": "The day of week of the order." + }, + { + "name": "order_placed_on", + "type": "TIMESTAMP", + "tags": [], + "description": "An ISO-8601 timestamp representing the date/time the order was placed." + }, + { + "name": "orders_placed", + "type": "INTEGER", + "tags": [], + "description": "The number of orders placed on day of week." + } + ] + } + } + } + ], + "producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json" + }, + { + "eventType": "COMPLETE", + "eventTime": "2020-02-22T22:46:12.000Z", + "run": { + "runId": "adc8507c-595e-4d76-9dac-be2bf0ffe1ee" + }, + "job": { + "namespace": "food_delivery", + "name": "orders_popular_day_of_week" + }, + "producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json" + }, + { + "eventType": "START", + "eventTime": "2020-02-22T22:42:42.000Z", + "run": { + "runId": "3ab25429-cf9c-4d1d-9166-1e1946f9d3c3" + }, + "job": { + "namespace": "food_delivery", + "name": "email_discounts", + "facets": { + "sql": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SQLJobFacet.json", + "query": "SELECT d.id, d.amount_off, d.customer_email, d.starts_on, d.ends_on\n FROM discounts AS d\n INNER JOIN customers AS c\n ON d.customer_email = c.email;" + }, + "documentation": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/DocumentationJobFacet.json", + "description": "Email discounts to customers that have experienced order delays." + } + } + }, + "inputs": [ + { + "namespace": "food_delivery", + "name": "public.discounts" + }, + { + "namespace": "food_delivery", + "name": "public.customers" + } + ], + "producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json" + }, + { + "eventType": "COMPLETE", + "eventTime": "2020-02-22T22:56:01.000Z", + "run": { + "runId": "3ab25429-cf9c-4d1d-9166-1e1946f9d3c3" + }, + "job": { + "namespace": "food_delivery", + "name": "email_discounts" + }, + "producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json" + } +] diff --git a/docker/seed.sh b/docker/seed.sh index 69c1c5f5fa..25134e4d92 100755 --- a/docker/seed.sh +++ b/docker/seed.sh @@ -12,4 +12,4 @@ if [[ -z "${MARQUEZ_CONFIG}" ]]; then echo "WARNING 'MARQUEZ_CONFIG' not set, using development configuration." fi -java -jar marquez-api-*.jar seed --host "${MARQUEZ_HOST:-localhost}" --port "${MARQUEZ_PORT:-5000}" "${MARQUEZ_CONFIG}" +java -jar marquez-api-*.jar seed --url "${MARQUEZ_URL:-http://localhost:5000}" --metadata metadata.json "${MARQUEZ_CONFIG}"