From ea3a1a2c5585954f2df0b21b9843f58045f5f10e Mon Sep 17 00:00:00 2001 From: wslulciuc Date: Thu, 25 Aug 2022 19:01:09 -0700 Subject: [PATCH 1/9] Add metadata.json to .gitignore Signed-off-by: wslulciuc --- .gitignore | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.gitignore b/.gitignore index 736fa4a401..c1caaf0fab 100644 --- a/.gitignore +++ b/.gitignore @@ -32,6 +32,9 @@ venv # Marquez configuration marquez.yml +# Metadata +metadata.json + # jenv .java-version From 5c6c128cec8a853452a4a40c4740785bde634ca3 Mon Sep 17 00:00:00 2001 From: wslulciuc Date: Thu, 25 Aug 2022 19:02:51 -0700 Subject: [PATCH 2/9] Add psql conf for pghero Signed-off-by: wslulciuc --- docker/postgresql.conf | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 docker/postgresql.conf diff --git a/docker/postgresql.conf b/docker/postgresql.conf new file mode 100644 index 0000000000..730835529e --- /dev/null +++ b/docker/postgresql.conf @@ -0,0 +1,6 @@ +shared_preload_libraries = 'pg_stat_statements' +pg_stat_statements.track = all +pg_stat_statements.max = 10000 +track_activity_query_size = 2048 + +listen_addresses = '*' From 192278afbe36f4fc40e8209839c5e71242bfa03c Mon Sep 17 00:00:00 2001 From: wslulciuc Date: Thu, 25 Aug 2022 19:07:04 -0700 Subject: [PATCH 3/9] Add pghero Signed-off-by: wslulciuc --- docker-compose.dev.yml | 9 +++++++++ docker-compose.yml | 2 ++ 2 files changed, 11 insertions(+) diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml index f34107fe26..517bfc06cd 100644 --- a/docker-compose.dev.yml +++ b/docker-compose.dev.yml @@ -8,3 +8,12 @@ services: build: context: ./web dockerfile: Dockerfile + + # To enable query stats with pghero, see https://github.com/ankane/pghero/blob/master/guides/Query-Stats.md#installation + pghero: + image: ankane/pghero + container_name: pghero + ports: + - "8080:8080" + environment: + DATABASE_URL: postgres://marquez:marquez@db:5432 diff --git a/docker-compose.yml b/docker-compose.yml index 08f7415146..307e6bc057 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -42,7 +42,9 @@ services: - MARQUEZ_USER=marquez - MARQUEZ_PASSWORD=marquez volumes: + - ./docker/postgresql.conf:/etc/postgresql/postgresql.conf - db-init:/docker-entrypoint-initdb.d + command: ["postgres", "-c", "config_file=/etc/postgresql/postgresql.conf"] # Enables SQL statement logging (see: https://www.postgresql.org/docs/12/runtime-config-logging.html#GUC-LOG-STATEMENT) # command: ["postgres", "-c", "log_statement=all"] From 7a0abfd828137b8f90dcc6468230102babcbf14c Mon Sep 17 00:00:00 2001 From: wslulciuc Date: Thu, 25 Aug 2022 23:42:52 -0700 Subject: [PATCH 4/9] Add metadata cmd Signed-off-by: wslulciuc --- api/src/main/java/marquez/MarquezApp.java | 2 + .../java/marquez/cli/MetadataCommand.java | 367 ++++++++++++++++++ 2 files changed, 369 insertions(+) create mode 100644 api/src/main/java/marquez/cli/MetadataCommand.java diff --git a/api/src/main/java/marquez/MarquezApp.java b/api/src/main/java/marquez/MarquezApp.java index 82f6c3e842..7f0894df47 100644 --- a/api/src/main/java/marquez/MarquezApp.java +++ b/api/src/main/java/marquez/MarquezApp.java @@ -27,6 +27,7 @@ import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import marquez.api.filter.JobRedirectFilter; +import marquez.cli.MetadataCommand; import marquez.cli.SeedCommand; import marquez.common.Utils; import marquez.db.DbMigration; @@ -75,6 +76,7 @@ public void initialize(@NonNull Bootstrap bootstrap) { new EnvironmentVariableSubstitutor(ERROR_ON_UNDEFINED))); // Add CLI commands + bootstrap.addCommand(new MetadataCommand()); bootstrap.addCommand(new SeedCommand()); bootstrap.getObjectMapper().disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); diff --git a/api/src/main/java/marquez/cli/MetadataCommand.java b/api/src/main/java/marquez/cli/MetadataCommand.java new file mode 100644 index 0000000000..486cdcd12b --- /dev/null +++ b/api/src/main/java/marquez/cli/MetadataCommand.java @@ -0,0 +1,367 @@ +/* + * Copyright 2018-2022 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.cli; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.openlineage.client.OpenLineage.RunEvent.EventType.COMPLETE; +import static io.openlineage.client.OpenLineage.RunEvent.EventType.START; + +import com.google.common.collect.ImmutableList; +import io.dropwizard.cli.Command; +import io.dropwizard.setup.Bootstrap; +import io.openlineage.client.OpenLineage; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; +import java.net.URI; +import java.time.Instant; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.util.List; +import java.util.Random; +import java.util.UUID; +import java.util.stream.Stream; +import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; +import marquez.common.Utils; +import marquez.common.models.DatasetName; +import marquez.common.models.FieldName; +import marquez.common.models.JobName; +import marquez.common.models.NamespaceName; +import marquez.common.models.RunId; +import net.sourceforge.argparse4j.inf.Namespace; +import net.sourceforge.argparse4j.inf.Subparser; + +/** + * A command to generate random source, dataset, and job metadata using OpenLineage. + * + *

Usage

+ * + * For example, the following command will generate {@code metadata.json} with {@code 10} events, + * where each START event will have a size of {@code ~16384} bytes; events will be written to the + * {@code current} directory. You may specify the location of {@code metadata.json} by using the + * command-line argument {@code --output} + * + *
{@code
+ * java -jar marquez-api.jar metadata --runs 10 --bytes-per-event 16384
+ * }
+ */ +@Slf4j +public final class MetadataCommand extends Command { + /* Used to calculate total bytes per event. */ + private static final int BYTES_PER_RUN = 578; + private static final int BYTES_PER_JOB = 58; + private static final int BYTES_PER_FIELD_IN_SCHEMA = 256; + + /* Default I/O and schema fields per event. */ + private static final int DEFAULT_NUM_OF_IO_PER_EVENT = 8; + private static final int DEFAULT_NUM_OF_FIELDS_IN_SCHEMA_PER_EVENT = 16; + + /* Default limit. */ + private static final int DEFAULT_RUNS = 25; + + /* Default bytes. */ + private static final int DEFAULT_BYTES_PER_EVENT = + BYTES_PER_RUN + + BYTES_PER_JOB + + ((BYTES_PER_FIELD_IN_SCHEMA * DEFAULT_NUM_OF_FIELDS_IN_SCHEMA_PER_EVENT) + * DEFAULT_NUM_OF_IO_PER_EVENT); + + /* Default output. */ + private static final String DEFAULT_OUTPUT = "metadata.json"; + + /* Args for metadata command. */ + private static final String CMD_ARG_METADATA_RUNS = "runs"; + private static final String CMD_ARG_METADATA_BYTES = "bytes-per-event"; + private static final String CMD_ARG_METADATA_OUTPUT = "output"; + + private static final Random RANDOM = new Random(); + private static final ZoneId AMERICA_LOS_ANGELES = ZoneId.of("America/Los_Angeles"); + private static final List FIELD_TYPES = ImmutableList.of("VARCHAR", "TEXT", "INTEGER"); + + private static final String OL_NAMESPACE = newNamespaceName().getValue(); + private static final OpenLineage OL = + new OpenLineage( + URI.create( + "https://github.com/MarquezProject/marquez/blob/main/api/src/main/java/marquez/cli/MetadataCommand.java")); + + /* Define metadata command. */ + public MetadataCommand() { + super("metadata", "generate random metadata using the OpenLineage standard"); + } + + /* Configure metadata command. */ + @Override + public void configure(@NonNull Subparser subparser) { + subparser + .addArgument("--runs") + .dest("runs") + .type(Integer.class) + .required(false) + .setDefault(DEFAULT_RUNS) + .help("limits runs up to N"); + subparser + .addArgument("--bytes-per-event") + .dest("bytes-per-event") + .type(Integer.class) + .required(false) + .setDefault(DEFAULT_BYTES_PER_EVENT) + .help("size (in bytes) per OL event"); + subparser + .addArgument("-o", "--output") + .dest("output") + .type(String.class) + .required(false) + .help("the output metadata file") + .setDefault(DEFAULT_OUTPUT); + } + + @Override + public void run(@NonNull Bootstrap bootstrap, @NonNull Namespace namespace) { + final int runs = namespace.getInt(CMD_ARG_METADATA_RUNS); + final int bytesPerEvent = namespace.getInt(CMD_ARG_METADATA_BYTES); + final String output = namespace.getString(CMD_ARG_METADATA_OUTPUT); + + // Generate, then write events to metadata file. + writeOlEvents(newOlEvents(runs, bytesPerEvent), output); + } + + /** Returns new {@link OpenLineage.RunEvent} objects with random values. */ + private static List newOlEvents( + final int numOfRuns, final int bytesPerEvent) { + System.out.format( + "Generating '%d' runs, each COMPLETE event will have a size of '~%d' (bytes)...\n", + numOfRuns, bytesPerEvent); + final List olRunEvents = + Stream.generate(() -> newOlRunEvents(bytesPerEvent)) + .limit(numOfRuns) + .collect(toImmutableList()); + + final ImmutableList.Builder olEvents = ImmutableList.builder(); + for (final RunEvents startAndComplete : olRunEvents) { + olEvents.add(startAndComplete.start()); // Add START event + olEvents.add(startAndComplete.complete()); // Add COMPLETE event + } + return olEvents.build(); + } + + /** + * Returns new {@link RunEvents} objects. A {@link RunEvents} object contains the {@code START} + * and {@code COMPLETE} event for a given run. + */ + private static RunEvents newOlRunEvents(final int bytesPerEvent) { + // (1) Generate run with an optional parent run, then the job. + final OpenLineage.Run olRun = newRun(hasParentRunOrNot()); + final OpenLineage.Job olJob = newJob(); + + // (2) Generate number of I/O for run. + int numOfInputs = RANDOM.nextInt(DEFAULT_NUM_OF_IO_PER_EVENT); + int numOfOutputs = DEFAULT_NUM_OF_IO_PER_EVENT - numOfInputs; + + // (3) Generate number of schema fields per I/O for run. + final int numOfFieldsInSchemaForInputs = + RANDOM.nextInt(DEFAULT_NUM_OF_FIELDS_IN_SCHEMA_PER_EVENT); + final int numOfFieldsInSchemaForOutputs = + DEFAULT_NUM_OF_FIELDS_IN_SCHEMA_PER_EVENT - numOfFieldsInSchemaForInputs; + + // (4) Generate an event of N bytes if provided; otherwise use default. + if (bytesPerEvent > DEFAULT_BYTES_PER_EVENT) { + // Bytes per event: + // +------------+-----------+-------------------+ + // | run meta | job meta | I/O meta | + // +------------+-----------+-------------------+ + // |-> 578B <-|-> 78B <-|->(256B x N) x P <-| + // where, N is number of fields per schema, and P is number of I/O per event. + // + // (5) Calculate the total I/O per event to equal the bytes per event. + final int numOfInputsAndOutputsForEvent = + (bytesPerEvent - BYTES_PER_RUN - BYTES_PER_JOB) + / (DEFAULT_NUM_OF_FIELDS_IN_SCHEMA_PER_EVENT * BYTES_PER_FIELD_IN_SCHEMA); + + // (6) Update the number of I/O to generate for run based on calculation. + numOfInputs = RANDOM.nextInt(numOfInputsAndOutputsForEvent); + numOfOutputs = numOfInputsAndOutputsForEvent - numOfInputs; + } + return new RunEvents( + OL.newRunEventBuilder() + .eventType(START) + .eventTime(newEventTime()) + .run(olRun) + .job(olJob) + .inputs(newInputs(numOfInputs, numOfFieldsInSchemaForInputs)) + .outputs(newOutputs(numOfOutputs, numOfFieldsInSchemaForOutputs)) + .build(), + OL.newRunEventBuilder() + .eventType(COMPLETE) + .eventTime(newEventTime()) + .run(olRun) + .job(olJob) + .build()); + } + + /** Write {@link OpenLineage.RunEvent}s to the specified {@code output}. */ + private static void writeOlEvents( + @NonNull final List olEvents, @NonNull String output) { + System.out.format("Writing '%d' runs to: '%s'\n", olEvents.size() / 2, output); + FileWriter fileWriter; + PrintWriter printWriter = null; + try { + fileWriter = new FileWriter(output); + printWriter = new PrintWriter(fileWriter); + printWriter.write(Utils.toJson(olEvents)); + } catch (IOException e) { + e.printStackTrace(); + } finally { + if (printWriter != null) { + printWriter.close(); + } + } + } + + /** + * Returns a new {@link OpenLineage.Run} object. A {@code parent} run will be associated with + * {@code child} run if {@code hasParentRun} is {@code true}; otherwise, the {@code child} run + * will not have a {@code parent} run. + */ + private static OpenLineage.Run newRun(final boolean hasParentRun) { + return OL.newRun( + newRunId().getValue(), + OL.newRunFacetsBuilder() + .parent( + hasParentRun + ? OL.newParentRunFacetBuilder().run(newParentRun()).job(newParentJob()).build() + : null) + .nominalTime( + OL.newNominalTimeRunFacetBuilder() + .nominalStartTime(newNominalTime()) + .nominalEndTime(newNominalTime().plusHours(1)) + .build()) + .build()); + } + + /** Returns a new {@link OpenLineage.ParentRunFacetRun} object. */ + private static OpenLineage.ParentRunFacetRun newParentRun() { + return OL.newParentRunFacetRunBuilder().runId(newRunId().getValue()).build(); + } + + /** Returns a new {@link OpenLineage.ParentRunFacetJob} object. */ + private static OpenLineage.ParentRunFacetJob newParentJob() { + return OL.newParentRunFacetJobBuilder() + .namespace(OL_NAMESPACE) + .name(newJobName().getValue()) + .build(); + } + + /** Returns a new {@link OpenLineage.Job} object. */ + static OpenLineage.Job newJob() { + return OL.newJobBuilder().namespace(OL_NAMESPACE).name(newJobName().getValue()).build(); + } + + /** Returns new {@link OpenLineage.InputDataset} objects. */ + private static List newInputs( + final int numOfInputs, final int numOfFields) { + return Stream.generate( + () -> + OL.newInputDatasetBuilder() + .namespace(OL_NAMESPACE) + .name(newDatasetName().getValue()) + .facets( + OL.newDatasetFacetsBuilder().schema(newDatasetSchema(numOfFields)).build()) + .build()) + .limit(numOfInputs) + .collect(toImmutableList()); + } + + /** Returns new {@link OpenLineage.OutputDataset} objects. */ + static List newOutputs(final int numOfOutputs, final int numOfFields) { + return Stream.generate( + () -> + OL.newOutputDatasetBuilder() + .namespace(OL_NAMESPACE) + .name(newDatasetName().getValue()) + .facets( + OL.newDatasetFacetsBuilder().schema(newDatasetSchema(numOfFields)).build()) + .build()) + .limit(numOfOutputs) + .collect(toImmutableList()); + } + + /** Returns a new {@link OpenLineage.SchemaDatasetFacet} object. */ + private static OpenLineage.SchemaDatasetFacet newDatasetSchema(final int numOfFields) { + return OL.newSchemaDatasetFacetBuilder().fields(newFields(numOfFields)).build(); + } + + /** Returns new {@link OpenLineage.SchemaDatasetFacetFields} objects. */ + private static List newFields(final int numOfFields) { + return Stream.generate( + () -> + OL.newSchemaDatasetFacetFieldsBuilder() + .name(newFieldName().getValue()) + .type(newFieldType()) + .description(newDescription()) + .build()) + .limit(numOfFields) + .collect(toImmutableList()); + } + + /** Returns a new {@link NamespaceName} object. */ + private static NamespaceName newNamespaceName() { + return NamespaceName.of("namespace" + newId()); + } + + /** Returns a new {@link RunId} object. */ + private static RunId newRunId() { + return RunId.of(UUID.randomUUID()); + } + + /** Returns a new {@link DatasetName} object. */ + private static DatasetName newDatasetName() { + return DatasetName.of("dataset" + newId()); + } + + /** Returns a new {@link FieldName} object. */ + private static FieldName newFieldName() { + return FieldName.of("field" + newId()); + } + + /** Returns a new field {@code type}. */ + private static String newFieldType() { + return FIELD_TYPES.get(RANDOM.nextInt(FIELD_TYPES.size())); + } + + /** Returns a new {@link JobName} object. */ + private static JobName newJobName() { + return JobName.of("job" + newId()); + } + + /** Returns a new {@code description}. */ + private static String newDescription() { + return "description" + newId(); + } + + /** Returns a new {@code nominal} time. */ + private static ZonedDateTime newNominalTime() { + return Instant.now().atZone(AMERICA_LOS_ANGELES); + } + + /** Returns a new {@code event} time. */ + private static ZonedDateTime newEventTime() { + return Instant.now().atZone(AMERICA_LOS_ANGELES); + } + + /** Returns {@code true} if parent run should be generated; {@code false} otherwise. */ + private static boolean hasParentRunOrNot() { + return RANDOM.nextBoolean(); + } + + private static int newId() { + return RANDOM.nextInt(Integer.MAX_VALUE - 1); + } + + /** ... */ + record RunEvents(@NonNull OpenLineage.RunEvent start, @NonNull OpenLineage.RunEvent complete) {} +} From 5dd275133bd8fd585871f2570401bdd8faa3d545 Mon Sep 17 00:00:00 2001 From: wslulciuc Date: Fri, 26 Aug 2022 00:34:55 -0700 Subject: [PATCH 5/9] Update javadocs Signed-off-by: wslulciuc --- .../java/marquez/cli/MetadataCommand.java | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/api/src/main/java/marquez/cli/MetadataCommand.java b/api/src/main/java/marquez/cli/MetadataCommand.java index 486cdcd12b..c2ad01d13c 100644 --- a/api/src/main/java/marquez/cli/MetadataCommand.java +++ b/api/src/main/java/marquez/cli/MetadataCommand.java @@ -41,10 +41,10 @@ * *

Usage

* - * For example, the following command will generate {@code metadata.json} with {@code 10} events, - * where each START event will have a size of {@code ~16384} bytes; events will be written to the - * {@code current} directory. You may specify the location of {@code metadata.json} by using the - * command-line argument {@code --output} + * For example, the following command will generate {@code metadata.json} with {@code 10} runs + * ({@code 20} events in total), where each START event will have a size of {@code ~16384} bytes; + * events will be written to {@code metadata.json} in the {@code current} directory. You may specify + * the location of {@code metadata.json} by using the command-line argument {@code --output}. * *
{@code
  * java -jar marquez-api.jar metadata --runs 10 --bytes-per-event 16384
@@ -52,7 +52,7 @@
  */
 @Slf4j
 public final class MetadataCommand extends Command {
-  /* Used to calculate total bytes per event. */
+  /* Used to calculate (approximate) total bytes per event. */
   private static final int BYTES_PER_RUN = 578;
   private static final int BYTES_PER_JOB = 58;
   private static final int BYTES_PER_FIELD_IN_SCHEMA = 256;
@@ -61,7 +61,7 @@ public final class MetadataCommand extends Command {
   private static final int DEFAULT_NUM_OF_IO_PER_EVENT = 8;
   private static final int DEFAULT_NUM_OF_FIELDS_IN_SCHEMA_PER_EVENT = 16;
 
-  /* Default limit. */
+  /* Default runs. */
   private static final int DEFAULT_RUNS = 25;
 
   /* Default bytes. */
@@ -79,6 +79,7 @@ public final class MetadataCommand extends Command {
   private static final String CMD_ARG_METADATA_BYTES = "bytes-per-event";
   private static final String CMD_ARG_METADATA_OUTPUT = "output";
 
+  /* Used for event randomization. */
   private static final Random RANDOM = new Random();
   private static final ZoneId AMERICA_LOS_ANGELES = ZoneId.of("America/Los_Angeles");
   private static final List FIELD_TYPES = ImmutableList.of("VARCHAR", "TEXT", "INTEGER");
@@ -103,7 +104,7 @@ public void configure(@NonNull Subparser subparser) {
         .type(Integer.class)
         .required(false)
         .setDefault(DEFAULT_RUNS)
-        .help("limits runs up to N");
+        .help("limits OL runs up to N");
     subparser
         .addArgument("--bytes-per-event")
         .dest("bytes-per-event")
@@ -205,8 +206,8 @@ private static RunEvents newOlRunEvents(final int bytesPerEvent) {
 
   /** Write {@link OpenLineage.RunEvent}s to the specified {@code output}. */
   private static void writeOlEvents(
-      @NonNull final List olEvents, @NonNull String output) {
-    System.out.format("Writing '%d' runs to: '%s'\n", olEvents.size() / 2, output);
+      @NonNull final List olEvents, @NonNull final String output) {
+    System.out.format("Writing '%d' events to: '%s'\n", olEvents.size(), output);
     FileWriter fileWriter;
     PrintWriter printWriter = null;
     try {
@@ -362,6 +363,6 @@ private static int newId() {
     return RANDOM.nextInt(Integer.MAX_VALUE - 1);
   }
 
-  /** ... */
+  /** A container class for run info. */
   record RunEvents(@NonNull OpenLineage.RunEvent start, @NonNull OpenLineage.RunEvent complete) {}
 }

From f715fe6eeed11d41b83baa8ac29293975948e8c8 Mon Sep 17 00:00:00 2001
From: wslulciuc 
Date: Fri, 26 Aug 2022 00:55:21 -0700
Subject: [PATCH 6/9] Add steps to enable query stats with pghero

Signed-off-by: wslulciuc 
---
 docker-compose.dev.yml | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml
index 517bfc06cd..ab437c6140 100644
--- a/docker-compose.dev.yml
+++ b/docker-compose.dev.yml
@@ -9,7 +9,9 @@ services:
       context: ./web
       dockerfile: Dockerfile
 
-  # To enable query stats with pghero, see https://github.com/ankane/pghero/blob/master/guides/Query-Stats.md#installation
+  # NOTE: To enable query stats with pghero:
+  #  (1) Log into marquez database as 'superuser'
+  #  (2) As 'superuser', run 'CREATE EXTENSION pg_stat_statements;'
   pghero:
     image: ankane/pghero
     container_name: pghero

From 1142b501bd25f7e18cf072e191ccfebeec632099 Mon Sep 17 00:00:00 2001
From: wslulciuc 
Date: Fri, 26 Aug 2022 00:59:07 -0700
Subject: [PATCH 7/9] Give pghero superuser access

Signed-off-by: wslulciuc 
---
 docker-compose.dev.yml | 5 +----
 1 file changed, 1 insertion(+), 4 deletions(-)

diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml
index ab437c6140..89162e3c3f 100644
--- a/docker-compose.dev.yml
+++ b/docker-compose.dev.yml
@@ -9,13 +9,10 @@ services:
       context: ./web
       dockerfile: Dockerfile
 
-  # NOTE: To enable query stats with pghero:
-  #  (1) Log into marquez database as 'superuser'
-  #  (2) As 'superuser', run 'CREATE EXTENSION pg_stat_statements;'
   pghero:
     image: ankane/pghero
     container_name: pghero
     ports:
       - "8080:8080"
     environment:
-      DATABASE_URL: postgres://marquez:marquez@db:5432
+      DATABASE_URL: postgres://postgres:password@db:5432

From f0f65cf4d1bd541722bec4971a6c2e6a84ac0f1b Mon Sep 17 00:00:00 2001
From: wslulciuc 
Date: Fri, 26 Aug 2022 01:19:35 -0700
Subject: [PATCH 8/9] Update cmd arg constant for --bytes-per-event

Signed-off-by: wslulciuc 
---
 api/src/main/java/marquez/cli/MetadataCommand.java | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/api/src/main/java/marquez/cli/MetadataCommand.java b/api/src/main/java/marquez/cli/MetadataCommand.java
index c2ad01d13c..1efa947bc7 100644
--- a/api/src/main/java/marquez/cli/MetadataCommand.java
+++ b/api/src/main/java/marquez/cli/MetadataCommand.java
@@ -76,7 +76,7 @@ public final class MetadataCommand extends Command {
 
   /* Args for metadata command. */
   private static final String CMD_ARG_METADATA_RUNS = "runs";
-  private static final String CMD_ARG_METADATA_BYTES = "bytes-per-event";
+  private static final String CMD_ARG_METADATA_BYTES_PER_EVENT = "bytes-per-event";
   private static final String CMD_ARG_METADATA_OUTPUT = "output";
 
   /* Used for event randomization. */
@@ -124,7 +124,7 @@ public void configure(@NonNull Subparser subparser) {
   @Override
   public void run(@NonNull Bootstrap bootstrap, @NonNull Namespace namespace) {
     final int runs = namespace.getInt(CMD_ARG_METADATA_RUNS);
-    final int bytesPerEvent = namespace.getInt(CMD_ARG_METADATA_BYTES);
+    final int bytesPerEvent = namespace.getInt(CMD_ARG_METADATA_BYTES_PER_EVENT);
     final String output = namespace.getString(CMD_ARG_METADATA_OUTPUT);
 
     // Generate, then write events to metadata file.

From f6d0536322a6515ea4da734c828b89df89a29565 Mon Sep 17 00:00:00 2001
From: wslulciuc 
Date: Fri, 26 Aug 2022 15:09:47 -0700
Subject: [PATCH 9/9] Simplify newOlEvents()

Signed-off-by: wslulciuc 
---
 .../main/java/marquez/cli/MetadataCommand.java    | 15 ++++-----------
 1 file changed, 4 insertions(+), 11 deletions(-)

diff --git a/api/src/main/java/marquez/cli/MetadataCommand.java b/api/src/main/java/marquez/cli/MetadataCommand.java
index 1efa947bc7..beb0650409 100644
--- a/api/src/main/java/marquez/cli/MetadataCommand.java
+++ b/api/src/main/java/marquez/cli/MetadataCommand.java
@@ -137,17 +137,10 @@ private static List newOlEvents(
     System.out.format(
         "Generating '%d' runs, each COMPLETE event will have a size of '~%d' (bytes)...\n",
         numOfRuns, bytesPerEvent);
-    final List olRunEvents =
-        Stream.generate(() -> newOlRunEvents(bytesPerEvent))
-            .limit(numOfRuns)
-            .collect(toImmutableList());
-
-    final ImmutableList.Builder olEvents = ImmutableList.builder();
-    for (final RunEvents startAndComplete : olRunEvents) {
-      olEvents.add(startAndComplete.start()); // Add START event
-      olEvents.add(startAndComplete.complete()); // Add COMPLETE event
-    }
-    return olEvents.build();
+    return Stream.generate(() -> newOlRunEvents(bytesPerEvent))
+        .limit(numOfRuns)
+        .flatMap(runEvents -> Stream.of(runEvents.start(), runEvents.complete()))
+        .collect(toImmutableList());
   }
 
   /**