From 9da3469a2f28bdb2b376f0b04028385a992c5ff7 Mon Sep 17 00:00:00 2001 From: henneberger Date: Wed, 3 Feb 2021 11:37:49 -0800 Subject: [PATCH] Add graphql endpoint (#901) * Add graphql endpoint Signed-off-by: henneberger * Fix json graphql type Signed-off-by: henneberger * spotless Signed-off-by: henneberger * Add text search Signed-off-by: henneberger * Spotless Signed-off-by: henneberger * Add rough sketch of lineage graphql endpoint Signed-off-by: henneberger * Remove prototype hackaton code from pr Signed-off-by: henneberger * Add simple graphql test Signed-off-by: henneberger * Add graphql to config Signed-off-by: henneberger --- README.md | 3 + api/build.gradle | 3 + api/src/main/java/marquez/MarquezApp.java | 15 + api/src/main/java/marquez/MarquezConfig.java | 5 + api/src/main/java/marquez/MarquezContext.java | 7 + .../graphql/CustomGraphQLContextBuilder.java | 52 ++ .../java/marquez/graphql/GraphqlDaos.java | 159 ++++++ .../marquez/graphql/GraphqlDataFetchers.java | 385 +++++++++++++ .../marquez/graphql/GraphqlSchemaBuilder.java | 190 ++++++ .../graphql/MarquezGraphqlContext.java | 106 ++++ .../graphql/MarquezGraphqlServletBuilder.java | 22 + .../graphql/mapper/ObjectMapMapper.java | 35 ++ .../java/marquez/graphql/mapper/RowMap.java | 5 + .../assets/graphql-playground/index.htm | 540 ++++++++++++++++++ api/src/main/resources/schema.graphqls | 120 ++++ .../java/marquez/graphql/GraphqlTest.java | 60 ++ marquez.dev.yml | 3 + marquez.example.yml | 4 + 18 files changed, 1714 insertions(+) create mode 100644 api/src/main/java/marquez/graphql/CustomGraphQLContextBuilder.java create mode 100644 api/src/main/java/marquez/graphql/GraphqlDaos.java create mode 100644 api/src/main/java/marquez/graphql/GraphqlDataFetchers.java create mode 100644 api/src/main/java/marquez/graphql/GraphqlSchemaBuilder.java create mode 100644 api/src/main/java/marquez/graphql/MarquezGraphqlContext.java create mode 100644 api/src/main/java/marquez/graphql/MarquezGraphqlServletBuilder.java create mode 100644 api/src/main/java/marquez/graphql/mapper/ObjectMapMapper.java create mode 100644 api/src/main/java/marquez/graphql/mapper/RowMap.java create mode 100644 api/src/main/resources/assets/graphql-playground/index.htm create mode 100644 api/src/main/resources/schema.graphqls create mode 100644 api/src/test/java/marquez/graphql/GraphqlTest.java diff --git a/README.md b/README.md index 39c000e3db..f2483142ab 100644 --- a/README.md +++ b/README.md @@ -40,6 +40,9 @@ The HTTP API listens on port `5000` for all calls and port `5001` for the admin You can open http://localhost:3000 to begin exploring the web UI. +The graphql playground endpoint can be found at http://localhost:5000/graphql-playground with +the graphql endpoint located at http://localhost:5000/api/v1-beta/graphql + ## Documentation We invite everyone to help us improve and keep documentation up to date. Documentation is maintained in this repository and can be found under [`docs/`](https://github.com/MarquezProject/marquez/tree/main/docs). diff --git a/api/build.gradle b/api/build.gradle index 5b0956ce08..41e43a3a43 100644 --- a/api/build.gradle +++ b/api/build.gradle @@ -38,6 +38,7 @@ dependencies { compile "io.dropwizard:dropwizard-jdbi3:${dropwizardVersion}" compile "io.dropwizard:dropwizard-json-logging:${dropwizardVersion}" compile "io.dropwizard:dropwizard-http2:${dropwizardVersion}" + compile "io.dropwizard:dropwizard-assets:${dropwizardVersion}" compile "io.prometheus:simpleclient:${prometheusVersion}" compile "io.prometheus:simpleclient_dropwizard:${prometheusVersion}" compile "io.prometheus:simpleclient_hotspot:${prometheusVersion}" @@ -47,6 +48,8 @@ dependencies { compile 'com.google.guava:guava:30.0-jre' compile 'org.flywaydb:flyway-core:6.3.0' compile 'org.postgresql:postgresql:42.2.5' + compile 'com.graphql-java:graphql-java:16.1' + compile 'com.graphql-java-kickstart:graphql-java-servlet:11.0.0' compileOnly "org.projectlombok:lombok:${lombokVersion}" annotationProcessor "org.projectlombok:lombok:${lombokVersion}" diff --git a/api/src/main/java/marquez/MarquezApp.java b/api/src/main/java/marquez/MarquezApp.java index d40b966a11..88bbfa7fe0 100644 --- a/api/src/main/java/marquez/MarquezApp.java +++ b/api/src/main/java/marquez/MarquezApp.java @@ -17,6 +17,7 @@ import com.codahale.metrics.jdbi3.InstrumentedSqlLogger; import com.fasterxml.jackson.databind.SerializationFeature; import io.dropwizard.Application; +import io.dropwizard.assets.AssetsBundle; import io.dropwizard.configuration.EnvironmentVariableSubstitutor; import io.dropwizard.configuration.SubstitutingSourceProvider; import io.dropwizard.db.DataSourceFactory; @@ -72,6 +73,13 @@ public void initialize(@NonNull Bootstrap bootstrap) { new EnvironmentVariableSubstitutor(ERROR_ON_UNDEFINED))); bootstrap.getObjectMapper().disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + + bootstrap.addBundle( + new AssetsBundle( + "/assets", + "/graphql-playground", + "graphql-playground/index.htm", + "graphql-playground")); } @Override @@ -91,6 +99,7 @@ public void run(@NonNull MarquezConfig config, @NonNull Environment env) throws } } registerResources(config, env, source); + registerServlets(env); } @@ -107,6 +116,12 @@ public void registerResources( final MarquezContext context = MarquezContext.builder().jdbi(jdbi).tags(config.getTags()).build(); + if (config.getGraphql().isEnabled()) { + env.servlets() + .addServlet("api/v1-beta/graphql", context.getGraphqlServlet()) + .addMapping("/api/v1-beta/graphql", "/api/v1/schema.json"); + } + log.debug("Registering resources..."); for (final Object resource : context.getResources()) { env.jersey().register(resource); diff --git a/api/src/main/java/marquez/MarquezConfig.java b/api/src/main/java/marquez/MarquezConfig.java index d01fe5fbc3..2a2cd66d42 100644 --- a/api/src/main/java/marquez/MarquezConfig.java +++ b/api/src/main/java/marquez/MarquezConfig.java @@ -27,6 +27,7 @@ public final class MarquezConfig extends Configuration { private static final boolean DEFAULT_MIGRATE_ON_STARTUP = true; private static final ImmutableSet DEFAULT_TAGS = ImmutableSet.of(); + @Getter private final GraphqlConfig graphql = new GraphqlConfig(); @Getter private final boolean migrateOnStartup = DEFAULT_MIGRATE_ON_STARTUP; @Getter private final ImmutableSet tags = DEFAULT_TAGS; @@ -38,4 +39,8 @@ public final class MarquezConfig extends Configuration { @Getter @JsonProperty("flyway") private final FlywayFactory flywayFactory = new FlywayFactory(); + + public static class GraphqlConfig { + @Getter private final boolean enabled = true; + } } diff --git a/api/src/main/java/marquez/MarquezContext.java b/api/src/main/java/marquez/MarquezContext.java index 1dd6f082f2..dbceb0a9dc 100644 --- a/api/src/main/java/marquez/MarquezContext.java +++ b/api/src/main/java/marquez/MarquezContext.java @@ -3,6 +3,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; +import graphql.kickstart.servlet.GraphQLHttpServlet; import java.util.ArrayList; import java.util.List; import lombok.Getter; @@ -30,6 +31,8 @@ import marquez.db.RunStateDao; import marquez.db.SourceDao; import marquez.db.TagDao; +import marquez.graphql.GraphqlSchemaBuilder; +import marquez.graphql.MarquezGraphqlServletBuilder; import marquez.service.DatasetService; import marquez.service.JobService; import marquez.service.NamespaceService; @@ -81,6 +84,7 @@ public final class MarquezContext { @Getter private final ImmutableList resources; @Getter private final JdbiExceptionExceptionMapper jdbiException; + @Getter private final GraphQLHttpServlet graphqlServlet; private MarquezContext( @NonNull final Jdbi jdbi, @@ -150,6 +154,9 @@ private MarquezContext( serviceExceptionMapper, jdbiException, openLineageResource); + + final MarquezGraphqlServletBuilder servlet = new MarquezGraphqlServletBuilder(); + this.graphqlServlet = servlet.getServlet(new GraphqlSchemaBuilder(jdbi)); } public static Builder builder() { diff --git a/api/src/main/java/marquez/graphql/CustomGraphQLContextBuilder.java b/api/src/main/java/marquez/graphql/CustomGraphQLContextBuilder.java new file mode 100644 index 0000000000..cc6c77593c --- /dev/null +++ b/api/src/main/java/marquez/graphql/CustomGraphQLContextBuilder.java @@ -0,0 +1,52 @@ +package marquez.graphql; + +import graphql.kickstart.execution.context.DefaultGraphQLContext; +import graphql.kickstart.execution.context.GraphQLContext; +import graphql.kickstart.servlet.context.DefaultGraphQLWebSocketContext; +import graphql.kickstart.servlet.context.GraphQLServletContextBuilder; +import java.util.concurrent.CompletableFuture; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import javax.websocket.Session; +import javax.websocket.server.HandshakeRequest; +import org.dataloader.DataLoader; +import org.dataloader.DataLoaderRegistry; + +public class CustomGraphQLContextBuilder implements GraphQLServletContextBuilder { + + public CustomGraphQLContextBuilder() {} + + @Override + public GraphQLContext build(HttpServletRequest req, HttpServletResponse response) { + return MarquezGraphqlContext.createServletContext(buildDataLoaderRegistry(), null) + .with(req) + .with(response) + .build(); + } + + @Override + public GraphQLContext build() { + return new DefaultGraphQLContext(buildDataLoaderRegistry(), null); + } + + @Override + public GraphQLContext build(Session session, HandshakeRequest request) { + return DefaultGraphQLWebSocketContext.createWebSocketContext(buildDataLoaderRegistry(), null) + .with(session) + .with(request) + .build(); + } + + private DataLoaderRegistry buildDataLoaderRegistry() { + DataLoaderRegistry dataLoaderRegistry = new DataLoaderRegistry(); + dataLoaderRegistry.register( + "datasetLineage", + new DataLoader( + customerIds -> + CompletableFuture.supplyAsync( + () -> { + return null; + }))); + return dataLoaderRegistry; + } +} diff --git a/api/src/main/java/marquez/graphql/GraphqlDaos.java b/api/src/main/java/marquez/graphql/GraphqlDaos.java new file mode 100644 index 0000000000..d8a53869fc --- /dev/null +++ b/api/src/main/java/marquez/graphql/GraphqlDaos.java @@ -0,0 +1,159 @@ +package marquez.graphql; + +import java.util.List; +import java.util.UUID; +import marquez.db.JobVersionDao.IoType; +import marquez.graphql.mapper.ObjectMapMapper; +import marquez.graphql.mapper.RowMap; +import org.jdbi.v3.sqlobject.SqlObject; +import org.jdbi.v3.sqlobject.config.RegisterRowMapper; +import org.jdbi.v3.sqlobject.statement.SqlQuery; + +@RegisterRowMapper(ObjectMapMapper.class) +public interface GraphqlDaos extends SqlObject { + /* + * Note: Use must use a non-map type for returning single entries because a type of Map is already + * registered to jdbi. + */ + @SqlQuery("SELECT * FROM datasets WHERE uuid = :uuid ORDER BY updated_at") + RowMap getDataset(UUID uuid); + + @SqlQuery("SELECT * FROM datasets") + List> getDatasets(); + + @SqlQuery( + "SELECT datasets.* FROM datasets inner join namespaces on datasets.namespace_uuid = namespaces.uuid " + + "where namespaces.name = :namespaceName and datasets.name = :name") + RowMap getDatasetByNamespaceAndName(String namespaceName, String name); + + @SqlQuery( + "SELECT jobs.* FROM jobs inner join namespaces on jobs.namespace_uuid = namespaces.uuid " + + "where namespaces.name = :namespaceName and jobs.name = :name") + RowMap getJobByNamespaceAndName(String namespaceName, String name); + + @SqlQuery("SELECT * FROM jobs") + List> getJobs(); + + @SqlQuery("SELECT * FROM sources where uuid = :uuid") + RowMap getSource(UUID uuid); + + @SqlQuery("SELECT * FROM namespaces where uuid = :uuid") + RowMap getNamespace(UUID uuid); + + @SqlQuery("SELECT * FROM dataset_fields where dataset_uuid = :datasetUuid") + List> getDatasetField(UUID datasetUuid); + + @SqlQuery( + "SELECT f.* FROM dataset_fields f inner join dataset_fields_tag_mapping m on m.dataset_field_uuid = f.uuid where m.tag_uuid = :tagUuid") + List> getDatasetFieldsByTagUuid(UUID tagUuid); + + @SqlQuery( + "SELECT d.* FROM datasets d inner join datasets_tag_mapping m on m.dataset_uuid = d.uuid where tag_uuid = :uuid") + List> getDatasetsByTagUuid(UUID tagUuid); + + @SqlQuery("SELECT d.* from datasets d where source_uuid = :sourceUuid") + List> getDatasetsBySource(UUID sourceUuid); + + @SqlQuery("SELECT * from runs where uuid = :uuid") + RowMap getRun(UUID uuid); + + @SqlQuery("SELECT * from runs where run_args_uuid = :runArgsUuid") + List> getRunsByRunArgs(UUID runArgsUuid); + + @SqlQuery("SELECT * FROM dataset_versions where uuid = :uuid") + RowMap getCurrentDatasetVersion(UUID uuid); + + @SqlQuery( + "SELECT dv.* from dataset_versions dv inner join runs_input_mapping m on m.dataset_version_uuid = dv.uuid where m.run_uuid = :runUuid") + List> getDatasetVersionInputsByRun(UUID runUuid); + + @SqlQuery( + "SELECT r.* from runs r inner join runs_input_mapping m on m.run_uuid = r.uuid where m.dataset_version_uuid = :datasetVersionUuid") + List> getRunsByDatasetVersion(UUID datasetVersionUuid); + + @SqlQuery( + "SELECT distinct jv.* from runs r inner join runs_input_mapping m on m.run_uuid = r.uuid inner join job_versions jv on jv.uuid = r.job_version_uuid where m.dataset_version_uuid = :datasetVersionUuid") + List> getDistinctJobVersionsByDatasetVersion(UUID datasetVersionUuid); + + @SqlQuery( + "SELECT distinct jv.* from dataset_versions dv inner join runs r on r.uuid = dv.run_uuid inner join job_versions jv on jv.uuid = r.job_version_uuid where dv.uuid = :datasetVersionUuid") + List> getDistinctJobVersionsByDatasetVersionOutput( + UUID datasetVersionUuid); + + @SqlQuery("SELECT dv.* from dataset_versions dv where dv.run_uuid = :runUuid") + List> getDatasetVersionByRun(UUID runUuid); + + @SqlQuery("SELECT * from run_args where uuid = :uuid") + RowMap getRunArgs(UUID uuid); + + @SqlQuery( + "SELECT n.* from namespaces n inner join on namespace_ownerships no on no.namespace_uuid = n.uuid where owner_uuid = :ownerUuid") + List> getNamespacesByOwner(UUID ownerUuid); + + @SqlQuery( + "SELECT * from owners o inner join namespace_ownerships no on o.uuid = no.owner_uuid where namespace_uuid = :namespaceUuid") + List> getOwnersByNamespace(UUID namespaceUuid); + + @SqlQuery("SELECT * from owners where name = :ownerName") + RowMap getCurrentOwnerByNamespace(String ownerName); + + @SqlQuery("SELECT * from job_contexts where uuid = :uuid") + RowMap getJobContext(UUID uuid); + + @SqlQuery("SELECT * from datasets where namespace_uuid = :namespaceUuid") + List> getDatasetsByNamespace(UUID namespaceUuid); + + @SqlQuery( + "SELECT d.* from datasets d inner join job_versions_io_mapping m on m.dataset_uuid = d.uuid where m.job_version_uuid = :jobVersionUuid and io_type = :ioType") + List> getIOMappingByJobVersion(UUID jobVersionUuid, IoType ioType); + + @SqlQuery( + "SELECT jv.* " + + " FROM job_versions_io_mapping m " + + " inner join job_versions jv " + + " on m.dataset_uuid = jv.uuid" + + " where m.dataset_uuid = :datasetUuid AND m.io_type = :ioType") + List> getJobVersionsByIoMapping(UUID datasetUuid, IoType ioType); + + @SqlQuery("SELECT * from job_versions where job_uuid = :jobUuid") + List> getJobVersionByJob(UUID jobUuid); + + @SqlQuery("SELECT * from job_versions where uuid = :uuid") + RowMap getJobVersion(UUID uuid); + + @SqlQuery("SELECT * from dataset_fields where dataset_uuid = :datasetVersionUuid") + List> getFields(UUID datasetVersionUuid); + + @SqlQuery( + "SELECT dv.* from dataset_versions dv inner join dataset_versions_field_mapping m on dv.uuid = m.dataset_version_uuid where dataset_field_uuid = :datasetFieldUuid") + List> getVersionsByDatasetField(UUID datasetFieldUuid); + + @SqlQuery("SELECT * FROM dataset_versions where dataset_uuid = :datasetUuid") + List> getDatasetVersionsByDataset(UUID datasetUuid); + + @SqlQuery("SELECT * FROM namespaces where name = :name") + RowMap getNamespaceByName(String name); + + @SqlQuery("SELECT * from jobs where namespace_uuid = :namespaceUuid") + List> getJobsByNamespace(UUID namespaceUuid); + + @SqlQuery("SELECT * from jobs where uuid = :uuid") + RowMap getJob(UUID uuid); + + @SqlQuery("SELECT * from run_states where run_uuid = :runUuid order by transitioned_at desc") + List> getRunStateByRun(UUID runUuid); + + @SqlQuery("SELECT * from run_states where uuid = :uuid") + RowMap getRunStateByUuid(UUID uuid); + + @SqlQuery( + "SELECT t.* FROM datasets_tag_mapping m " + + " inner join tags t " + + " on m.tag_uuid = t.uuid" + + " where dataset_uuid = :datasetUuid") + List> getTagsByDatasetTag(UUID datasetUuid); + + @SqlQuery( + "SELECT t.* from tags t inner join dataset_fields_tag_mapping m on t.uuid = m.tag_uuid where dataaset_field_uuid = :datasetFieldUuid") + List> getTagsByDatasetField(UUID datasetFieldUuid); +} diff --git a/api/src/main/java/marquez/graphql/GraphqlDataFetchers.java b/api/src/main/java/marquez/graphql/GraphqlDataFetchers.java new file mode 100644 index 0000000000..52bb8b3857 --- /dev/null +++ b/api/src/main/java/marquez/graphql/GraphqlDataFetchers.java @@ -0,0 +1,385 @@ +package marquez.graphql; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.collect.ImmutableMap; +import graphql.schema.DataFetcher; +import java.util.Map; +import java.util.UUID; +import marquez.common.Utils; +import marquez.db.JobVersionDao.IoType; +import org.jdbi.v3.core.Jdbi; + +public class GraphqlDataFetchers { + private GraphqlDaos dao; + + public GraphqlDataFetchers(Jdbi jdbi) { + this.dao = jdbi.onDemand(GraphqlDaos.class); + } + + public DataFetcher getDatasets() { + return dataFetchingEnvironment -> { + return dao.getDatasets(); + }; + } + + public DataFetcher getDatasetByNamespaceAndName() { + return dataFetchingEnvironment -> { + String name = dataFetchingEnvironment.getArgument("name"); + String namespace = dataFetchingEnvironment.getArgument("namespace"); + + return dao.getDatasetByNamespaceAndName(namespace, name); + }; + } + + public DataFetcher getSourcesByDataset() { + return dataFetchingEnvironment -> { + Map map = dataFetchingEnvironment.getSource(); + + return dao.getSource((UUID) map.get("sourceUuid")); + }; + } + + public DataFetcher getNamespaceByDataset() { + return dataFetchingEnvironment -> { + Map map = dataFetchingEnvironment.getSource(); + + return dao.getNamespace((UUID) map.get("namespaceUuid")); + }; + } + + public DataFetcher getCurrentVersionByDataset() { + return dataFetchingEnvironment -> { + Map map = dataFetchingEnvironment.getSource(); + + return dao.getCurrentDatasetVersion((UUID) map.get("currentVersionUuid")); + }; + } + + public DataFetcher getFieldsByDataset() { + return dataFetchingEnvironment -> { + Map map = dataFetchingEnvironment.getSource(); + + return dao.getDatasetField((UUID) map.get("uuid")); + }; + } + + public DataFetcher getJobVersionAsInputByDataset() { + return dataFetchingEnvironment -> { + Map map = dataFetchingEnvironment.getSource(); + + return dao.getJobVersionsByIoMapping((UUID) map.get("uuid"), IoType.INPUT); + }; + } + + public DataFetcher getVersionAsOutputByDataset() { + return dataFetchingEnvironment -> { + Map map = dataFetchingEnvironment.getSource(); + + return dao.getJobVersionsByIoMapping((UUID) map.get("uuid"), IoType.OUTPUT); + }; + } + + public DataFetcher getTagsByDataset() { + return dataFetchingEnvironment -> { + Map map = dataFetchingEnvironment.getSource(); + + return dao.getTagsByDatasetTag((UUID) map.get("uuid")); + }; + } + + public DataFetcher getVersionsByDataset() { + return dataFetchingEnvironment -> { + Map map = dataFetchingEnvironment.getSource(); + + return dao.getDatasetVersionsByDataset((UUID) map.get("uuid")); + }; + } + + public DataFetcher getDatasetFieldsByTag() { + return dataFetchingEnvironment -> { + Map map = dataFetchingEnvironment.getSource(); + + return dao.getDatasetFieldsByTagUuid((UUID) map.get("uuid")); + }; + } + + public DataFetcher getDatasetsByTag() { + return dataFetchingEnvironment -> { + Map map = dataFetchingEnvironment.getSource(); + + return dao.getDatasetsByTagUuid((UUID) map.get("uuid")); + }; + } + + public DataFetcher getDatasetsBySource() { + return dataFetchingEnvironment -> { + Map map = dataFetchingEnvironment.getSource(); + + return dao.getDatasetsBySource((UUID) map.get("uuid")); + }; + } + + public DataFetcher getRunByRunStateRecord() { + return dataFetchingEnvironment -> { + Map map = dataFetchingEnvironment.getSource(); + + return dao.getRun((UUID) map.get("runUuid")); + }; + } + + public DataFetcher getRunsByRunArgs() { + return dataFetchingEnvironment -> { + Map map = dataFetchingEnvironment.getSource(); + + return dao.getRunsByRunArgs((UUID) map.get("uuid")); + }; + } + + public DataFetcher getJobVersionByRun() { + return dataFetchingEnvironment -> { + Map map = dataFetchingEnvironment.getSource(); + + return dao.getJobVersion((UUID) map.get("jobVersionUuid")); + }; + } + + public DataFetcher getRunStatesByRun() { + return dataFetchingEnvironment -> { + Map map = dataFetchingEnvironment.getSource(); + + return dao.getRunStateByRun((UUID) map.get("uuid")); + }; + } + + public DataFetcher getStartStateByRun() { + return dataFetchingEnvironment -> { + Map map = dataFetchingEnvironment.getSource(); + + return dao.getRunStateByUuid((UUID) map.get("startStateUuid")); + }; + } + + public DataFetcher getEndStateByRun() { + return dataFetchingEnvironment -> { + Map map = dataFetchingEnvironment.getSource(); + + return dao.getRunStateByUuid((UUID) map.get("endStateUuid")); + }; + } + + public DataFetcher getInputsByRun() { + return dataFetchingEnvironment -> { + Map map = dataFetchingEnvironment.getSource(); + + return dao.getDatasetVersionInputsByRun((UUID) map.get("uuid")); + }; + } + + public DataFetcher getOutputsByRun() { + return dataFetchingEnvironment -> { + Map map = dataFetchingEnvironment.getSource(); + + return dao.getDatasetVersionByRun((UUID) map.get("uuid")); + }; + } + + public DataFetcher getRunArgsByRun() { + return dataFetchingEnvironment -> { + Map map = dataFetchingEnvironment.getSource(); + Map runArgs = dao.getRunArgs((UUID) map.get("runArgsUuid")); + if (runArgs == null) { + return null; + } + + return Utils.fromJson( + (String) map.get("args"), new TypeReference>() {}); + }; + } + + public DataFetcher getNamespacesByOwner() { + return dataFetchingEnvironment -> { + Map map = dataFetchingEnvironment.getSource(); + + return dao.getNamespacesByOwner((UUID) map.get("uuid")); + }; + } + + public DataFetcher getOwnersByNamespace() { + return dataFetchingEnvironment -> { + Map map = dataFetchingEnvironment.getSource(); + + return dao.getOwnersByNamespace((UUID) map.get("uuid")); + }; + } + + public DataFetcher getCurrentOwnerByNamespace() { + return dataFetchingEnvironment -> { + Map map = dataFetchingEnvironment.getSource(); + + return dao.getCurrentOwnerByNamespace((String) map.get("currentOwnerName")); + }; + } + + public DataFetcher getJobContextByJobVersion() { + return dataFetchingEnvironment -> { + Map map = dataFetchingEnvironment.getSource(); + + Map jobContext = dao.getJobContext((UUID) map.get("jobContextUuid")); + if (jobContext == null) { + return null; + } + return Utils.fromJson( + (String) jobContext.get("context"), new TypeReference>() {}); + }; + } + + public DataFetcher getLatestRunByJobVersion() { + return dataFetchingEnvironment -> { + Map map = dataFetchingEnvironment.getSource(); + UUID latestRunUuid = (UUID) map.get("latestRunUuid"); + if (latestRunUuid == null) { + return null; + } + return dao.getRun(latestRunUuid); + }; + } + + public DataFetcher getJobByJobVersion() { + return dataFetchingEnvironment -> { + Map map = dataFetchingEnvironment.getSource(); + + return dao.getJob((UUID) map.get("jobUuid")); + }; + } + + public DataFetcher getInputsByJobVersion() { + return dataFetchingEnvironment -> { + Map map = dataFetchingEnvironment.getSource(); + + return dao.getIOMappingByJobVersion((UUID) map.get("uuid"), IoType.INPUT); + }; + } + + public DataFetcher getOutputsByJobVersion() { + return dataFetchingEnvironment -> { + Map map = dataFetchingEnvironment.getSource(); + + return dao.getIOMappingByJobVersion((UUID) map.get("uuid"), IoType.OUTPUT); + }; + } + + public DataFetcher getVersionsByJob() { + return dataFetchingEnvironment -> { + Map map = dataFetchingEnvironment.getSource(); + + return dao.getJobVersionByJob((UUID) map.get("uuid")); + }; + } + + public DataFetcher getNamespaceByJob() { + return dataFetchingEnvironment -> { + Map map = dataFetchingEnvironment.getSource(); + + return dao.getNamespace((UUID) map.get("namespaceUuid")); + }; + } + + public DataFetcher getCurrentVersionByJob() { + return dataFetchingEnvironment -> { + Map map = dataFetchingEnvironment.getSource(); + + return dao.getJobVersion((UUID) map.get("currentVersionUuid")); + }; + } + + public DataFetcher getFieldsByDatasetVersion() { + return dataFetchingEnvironment -> { + Map map = dataFetchingEnvironment.getSource(); + + return dao.getFields((UUID) map.get("uuid")); + }; + } + + public DataFetcher getRunByDatasetVersion() { + return dataFetchingEnvironment -> { + Map map = dataFetchingEnvironment.getSource(); + + return dao.getRun((UUID) map.get("runUuid")); + }; + } + + public DataFetcher getDatasetByDatasetField() { + return dataFetchingEnvironment -> { + Map map = dataFetchingEnvironment.getSource(); + + return dao.getDataset((UUID) map.get("uuid")); + }; + } + + public DataFetcher getVersionsByDatasetField() { + return dataFetchingEnvironment -> { + Map map = dataFetchingEnvironment.getSource(); + + return dao.getVersionsByDatasetField((UUID) map.get("uuid")); + }; + } + + public DataFetcher getTagsByDatasetField() { + return dataFetchingEnvironment -> { + Map map = dataFetchingEnvironment.getSource(); + + return dao.getTagsByDatasetField((UUID) map.get("uuid")); + }; + } + + public DataFetcher getDatasetByDatasetVersion() { + return dataFetchingEnvironment -> { + Map map = dataFetchingEnvironment.getSource(); + + return dao.getDataset((UUID) map.get("datasetUuid")); + }; + } + + public DataFetcher getNamespaceByName() { + return dataFetchingEnvironment -> { + String name = dataFetchingEnvironment.getArgument("name"); + + return dao.getNamespaceByName(name); + }; + } + + public DataFetcher getJobsByNamespace() { + return dataFetchingEnvironment -> { + Map map = dataFetchingEnvironment.getSource(); + if (map.isEmpty()) return null; + + return dao.getJobsByNamespace((UUID) map.get("uuid")); + }; + } + + public DataFetcher getDatasetsByNamespace() { + return dataFetchingEnvironment -> { + Map map = dataFetchingEnvironment.getSource(); + if (map.isEmpty()) { + return null; + } + + return dao.getDatasetsByNamespace((UUID) map.get("uuid")); + }; + } + + public DataFetcher getJobs() { + return dataFetchingEnvironment -> { + return dao.getJobs(); + }; + } + + public DataFetcher getJobsByNamespaceAndName() { + return dataFetchingEnvironment -> { + String name = dataFetchingEnvironment.getArgument("name"); + String namespace = dataFetchingEnvironment.getArgument("namespace"); + + return dao.getJobByNamespaceAndName(namespace, name); + }; + } +} diff --git a/api/src/main/java/marquez/graphql/GraphqlSchemaBuilder.java b/api/src/main/java/marquez/graphql/GraphqlSchemaBuilder.java new file mode 100644 index 0000000000..f647f694d0 --- /dev/null +++ b/api/src/main/java/marquez/graphql/GraphqlSchemaBuilder.java @@ -0,0 +1,190 @@ +package marquez.graphql; + +import static graphql.schema.idl.TypeRuntimeWiring.newTypeWiring; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Charsets; +import com.google.common.io.Resources; +import graphql.schema.Coercing; +import graphql.schema.CoercingParseLiteralException; +import graphql.schema.CoercingParseValueException; +import graphql.schema.CoercingSerializeException; +import graphql.schema.GraphQLScalarType; +import graphql.schema.GraphQLSchema; +import graphql.schema.idl.RuntimeWiring; +import graphql.schema.idl.RuntimeWiring.Builder; +import graphql.schema.idl.SchemaGenerator; +import graphql.schema.idl.SchemaParser; +import graphql.schema.idl.TypeDefinitionRegistry; +import java.net.URL; +import java.time.ZonedDateTime; +import java.util.Map; +import java.util.UUID; +import lombok.SneakyThrows; +import org.jdbi.v3.core.Jdbi; + +public class GraphqlSchemaBuilder { + private final Jdbi jdbi; + + public GraphqlSchemaBuilder(Jdbi jdbi) { + this.jdbi = jdbi; + } + + @SneakyThrows + public GraphQLSchema buildSchema() { + URL url = Resources.getResource("schema.graphqls"); + String sdl = Resources.toString(url, Charsets.UTF_8); + Builder wiring = RuntimeWiring.newRuntimeWiring(); + buildRuntimeWiring(jdbi, wiring); + + TypeDefinitionRegistry typeDefinitionRegistry = new SchemaParser().parse(sdl); + + SchemaGenerator schemaGenerator = new SchemaGenerator(); + return schemaGenerator.makeExecutableSchema(typeDefinitionRegistry, wiring.build()); + } + + public void buildRuntimeWiring(Jdbi jdbi, Builder wiring) { + GraphqlDataFetchers dataFetchers = new GraphqlDataFetchers(jdbi); + + wiring + .type( + newTypeWiring("Query") + .dataFetcher("datasets", dataFetchers.getDatasets()) + .dataFetcher("dataset", dataFetchers.getDatasetByNamespaceAndName()) + .dataFetcher("namespace", dataFetchers.getNamespaceByName()) + .dataFetcher("jobs", dataFetchers.getJobs()) + .dataFetcher("job", dataFetchers.getJobsByNamespaceAndName())) + .type( + newTypeWiring("Dataset") + .dataFetcher("source", dataFetchers.getSourcesByDataset()) + .dataFetcher("namespace", dataFetchers.getNamespaceByDataset()) + .dataFetcher("currentVersion", dataFetchers.getCurrentVersionByDataset()) + .dataFetcher("fields", dataFetchers.getFieldsByDataset()) + .dataFetcher("jobVersionAsInput", dataFetchers.getJobVersionAsInputByDataset()) + .dataFetcher("jobVersionAsOutput", dataFetchers.getVersionAsOutputByDataset()) + .dataFetcher("tags", dataFetchers.getTagsByDataset()) + .dataFetcher("versions", dataFetchers.getVersionsByDataset())) + .type( + newTypeWiring("Tag") + .dataFetcher("fields", dataFetchers.getDatasetFieldsByTag()) + .dataFetcher("datasets", dataFetchers.getDatasetsByTag())) + .type(newTypeWiring("Source").dataFetcher("datasets", dataFetchers.getDatasetsBySource())) + .type( + newTypeWiring("RunStateRecord") + .dataFetcher("run", dataFetchers.getRunByRunStateRecord())) + .type( + newTypeWiring("Run") + .dataFetcher("jobVersion", dataFetchers.getJobVersionByRun()) + .dataFetcher("runArgs", dataFetchers.getRunArgsByRun()) + .dataFetcher("states", dataFetchers.getRunStatesByRun()) + .dataFetcher("startState", dataFetchers.getStartStateByRun()) + .dataFetcher("endState", dataFetchers.getEndStateByRun()) + .dataFetcher("inputs", dataFetchers.getInputsByRun()) + .dataFetcher("outputs", dataFetchers.getOutputsByRun())) + .type(newTypeWiring("Owner").dataFetcher("namespaces", dataFetchers.getNamespacesByOwner())) + .type( + newTypeWiring("Namespace") + .dataFetcher("owners", dataFetchers.getOwnersByNamespace()) + .dataFetcher("currentOwner", dataFetchers.getCurrentOwnerByNamespace()) + .dataFetcher("jobs", dataFetchers.getJobsByNamespace()) + .dataFetcher("datasets", dataFetchers.getDatasetsByNamespace())) + .type( + newTypeWiring("JobVersion") + .dataFetcher("jobContext", dataFetchers.getJobContextByJobVersion()) + .dataFetcher("latestRun", dataFetchers.getLatestRunByJobVersion()) + .dataFetcher("job", dataFetchers.getJobByJobVersion()) + .dataFetcher("inputs", dataFetchers.getInputsByJobVersion()) + .dataFetcher("outputs", dataFetchers.getOutputsByJobVersion())) + .type( + newTypeWiring("Job") + .dataFetcher("versions", dataFetchers.getVersionsByJob()) + .dataFetcher("namespace", dataFetchers.getNamespaceByJob()) + .dataFetcher("currentVersion", dataFetchers.getCurrentVersionByJob())) + .type( + newTypeWiring("DatasetVersion") + .dataFetcher("fields", dataFetchers.getFieldsByDatasetVersion()) + .dataFetcher("run", dataFetchers.getRunByDatasetVersion()) + .dataFetcher("dataset", dataFetchers.getDatasetByDatasetVersion())) + .type( + newTypeWiring("DatasetField") + .dataFetcher("dataset", dataFetchers.getDatasetByDatasetField()) + .dataFetcher("versions", dataFetchers.getVersionsByDatasetField()) + .dataFetcher("tags", dataFetchers.getTagsByDatasetField())) + .scalar( + GraphQLScalarType.newScalar() + .name("UUID") + .coercing( + new Coercing() { + + @Override + public String serialize(Object dataFetcherResult) + throws CoercingSerializeException { + return dataFetcherResult.toString(); + } + + @Override + public UUID parseValue(Object input) throws CoercingParseValueException { + return UUID.fromString(input.toString()); + } + + @Override + public UUID parseLiteral(Object input) throws CoercingParseLiteralException { + return UUID.fromString(input.toString()); + } + }) + .build()) + .scalar( + GraphQLScalarType.newScalar() + .name("Json") + .coercing( + new Coercing() { + ObjectMapper mapper = new ObjectMapper(); + + @Override + @SneakyThrows + public Map serialize(Object dataFetcherResult) + throws CoercingSerializeException { + return (Map) dataFetcherResult; + } + + @Override + @SneakyThrows + public String parseValue(Object input) throws CoercingParseValueException { + return mapper.writeValueAsString(input); + } + + @Override + @SneakyThrows + public String parseLiteral(Object input) + throws CoercingParseLiteralException { + return mapper.writeValueAsString(input); + } + }) + .build()) + .scalar( + GraphQLScalarType.newScalar() + .name("DateTime") + .coercing( + new Coercing() { + + @Override + public String serialize(Object dataFetcherResult) + throws CoercingSerializeException { + return dataFetcherResult.toString(); + } + + @Override + public ZonedDateTime parseValue(Object input) + throws CoercingParseValueException { + return ZonedDateTime.parse(input.toString()); + } + + @Override + public ZonedDateTime parseLiteral(Object input) + throws CoercingParseLiteralException { + return ZonedDateTime.parse(input.toString()); + } + }) + .build()); + } +} diff --git a/api/src/main/java/marquez/graphql/MarquezGraphqlContext.java b/api/src/main/java/marquez/graphql/MarquezGraphqlContext.java new file mode 100644 index 0000000000..43c910286b --- /dev/null +++ b/api/src/main/java/marquez/graphql/MarquezGraphqlContext.java @@ -0,0 +1,106 @@ +package marquez.graphql; + +import graphql.kickstart.execution.context.DefaultGraphQLContext; +import graphql.kickstart.servlet.context.GraphQLServletContext; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import javax.security.auth.Subject; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import javax.servlet.http.Part; +import lombok.SneakyThrows; +import org.dataloader.DataLoaderRegistry; + +public class MarquezGraphqlContext extends DefaultGraphQLContext implements GraphQLServletContext { + + private final HttpServletRequest httpServletRequest; + private final HttpServletResponse httpServletResponse; + private final Map lineageMap; + + protected MarquezGraphqlContext( + DataLoaderRegistry dataLoaderRegistry, + Subject subject, + HttpServletRequest httpServletRequest, + HttpServletResponse httpServletResponse) { + super(dataLoaderRegistry, subject); + this.httpServletRequest = httpServletRequest; + this.httpServletResponse = httpServletResponse; + this.lineageMap = new ConcurrentHashMap<>(); + } + + public static Builder createServletContext(DataLoaderRegistry registry, Subject subject) { + return new Builder(registry, subject); + } + + public static Builder createServletContext() { + return new Builder(new DataLoaderRegistry(), null); + } + + public Map getLineageMap() { + return lineageMap; + } + + @Override + public HttpServletRequest getHttpServletRequest() { + return httpServletRequest; + } + + @Override + public HttpServletResponse getHttpServletResponse() { + return httpServletResponse; + } + + @Override + @SneakyThrows + public List getFileParts() { + return httpServletRequest.getParts().stream() + .filter(part -> part.getContentType() != null) + .collect(Collectors.toList()); + } + + @Override + @SneakyThrows + public Map> getParts() { + return httpServletRequest.getParts().stream().collect(Collectors.groupingBy(Part::getName)); + } + + public static class Builder { + + private HttpServletRequest httpServletRequest; + private HttpServletResponse httpServletResponse; + private DataLoaderRegistry dataLoaderRegistry; + private Subject subject; + + private Builder(DataLoaderRegistry dataLoaderRegistry, Subject subject) { + this.dataLoaderRegistry = dataLoaderRegistry; + this.subject = subject; + } + + public MarquezGraphqlContext build() { + return new MarquezGraphqlContext( + dataLoaderRegistry, subject, httpServletRequest, httpServletResponse); + } + + public Builder with(HttpServletRequest httpServletRequest) { + this.httpServletRequest = httpServletRequest; + return this; + } + + public Builder with(DataLoaderRegistry dataLoaderRegistry) { + this.dataLoaderRegistry = dataLoaderRegistry; + return this; + } + + public Builder with(Subject subject) { + this.subject = subject; + return this; + } + + public Builder with(HttpServletResponse httpServletResponse) { + this.httpServletResponse = httpServletResponse; + return this; + } + } +} diff --git a/api/src/main/java/marquez/graphql/MarquezGraphqlServletBuilder.java b/api/src/main/java/marquez/graphql/MarquezGraphqlServletBuilder.java new file mode 100644 index 0000000000..d9516ec383 --- /dev/null +++ b/api/src/main/java/marquez/graphql/MarquezGraphqlServletBuilder.java @@ -0,0 +1,22 @@ +package marquez.graphql; + +import graphql.kickstart.execution.GraphQLQueryInvoker; +import graphql.kickstart.servlet.GraphQLConfiguration; +import graphql.kickstart.servlet.GraphQLHttpServlet; +import graphql.schema.GraphQLSchema; + +public class MarquezGraphqlServletBuilder { + public GraphQLHttpServlet getServlet(final GraphqlSchemaBuilder schemaBuilder) { + final GraphQLSchema schema = schemaBuilder.buildSchema(); + + final GraphQLQueryInvoker queryInvoker = GraphQLQueryInvoker.newBuilder().build(); + + final GraphQLConfiguration config = + GraphQLConfiguration.with(schema) + .with(queryInvoker) + .with(new CustomGraphQLContextBuilder()) + .build(); + + return GraphQLHttpServlet.with(config); + } +} diff --git a/api/src/main/java/marquez/graphql/mapper/ObjectMapMapper.java b/api/src/main/java/marquez/graphql/mapper/ObjectMapMapper.java new file mode 100644 index 0000000000..0bdebbcfff --- /dev/null +++ b/api/src/main/java/marquez/graphql/mapper/ObjectMapMapper.java @@ -0,0 +1,35 @@ +package marquez.graphql.mapper; + +import com.google.common.base.CaseFormat; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; +import org.jdbi.v3.core.mapper.RowMapper; +import org.jdbi.v3.core.statement.StatementContext; + +@Slf4j +public class ObjectMapMapper implements RowMapper> { + @Override + public RowMap map(@NonNull ResultSet results, @NonNull StatementContext context) + throws SQLException { + return getData(results); + } + + protected RowMap getData(ResultSet results) { + try { + ResultSetMetaData metaData = results.getMetaData(); + RowMap columns = new RowMap<>(); + for (int i = 1; i <= metaData.getColumnCount(); i++) { + columns.put( + CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, metaData.getColumnName(i)), + results.getObject(i)); + } + return columns; + } catch (SQLException e) { + log.error("Unable to get column names", e); + } + return new RowMap<>(); + } +} diff --git a/api/src/main/java/marquez/graphql/mapper/RowMap.java b/api/src/main/java/marquez/graphql/mapper/RowMap.java new file mode 100644 index 0000000000..7ee014c647 --- /dev/null +++ b/api/src/main/java/marquez/graphql/mapper/RowMap.java @@ -0,0 +1,5 @@ +package marquez.graphql.mapper; + +import java.util.HashMap; + +public class RowMap extends HashMap {} diff --git a/api/src/main/resources/assets/graphql-playground/index.htm b/api/src/main/resources/assets/graphql-playground/index.htm new file mode 100644 index 0000000000..e6717261b5 --- /dev/null +++ b/api/src/main/resources/assets/graphql-playground/index.htm @@ -0,0 +1,540 @@ + + + + + + + + GraphQL Playground + + + + + + + + + + +
+ +
Loading + GraphQL Playground +
+
+ +
+ + + \ No newline at end of file diff --git a/api/src/main/resources/schema.graphqls b/api/src/main/resources/schema.graphqls new file mode 100644 index 0000000000..b581e8694d --- /dev/null +++ b/api/src/main/resources/schema.graphqls @@ -0,0 +1,120 @@ +scalar UUID +scalar DateTime +scalar Json + +type Query { + datasets: [Dataset] + dataset(namespace: String!, name: String!): Dataset + namespace(name: String!): Namespace + jobs: [Job] + job(namespace: String!, name: String): Job +} + +type Tag { + name: String + createdAt: DateTime + updatedAt: DateTime + description: String + + fields: [DatasetField] + datasets: [Dataset] +} +type Source { + type: String + name: String + createdAt: DateTime + updatedAt: DateTime + connectionUrl: String + description: String + datasets: [Dataset] +} +type RunStateRecord { + transitionedAt: DateTime + state: String + run: Run +} +type Run { + createdAt: DateTime + updatedAt: DateTime + nominalStartTime: DateTime + nominalEndTime: DateTime + + jobVersion: JobVersion + runArgs: Json + currentState: String + startState: RunStateRecord + endState: RunStateRecord + states: [RunStateRecord] + inputs: [DatasetVersion] + outputs: [DatasetVersion] +} +type Owner { + createdAt: DateTime + name: String + namespaces: [Namespace] +} +type Namespace { + name: String + createdAt: DateTime + updatedAt: DateTime + description: String + owners: [Owner] + currentOwner: Owner + jobs: [Job] + datasets: [Dataset] +} +type JobVersion { + createdAt: DateTime + updatedAt: DateTime + location: String + version: UUID + jobContext: Json + latestRun: Run + job: Job + inputs: [Dataset] + outputs: [Dataset] +} +type Job { + type: String + name: String + createdAt: DateTime + updatedAt: DateTime + description: String + versions: [JobVersion] + namespace: Namespace + currentVersion: JobVersion +} +type DatasetVersion { + createdAt: DateTime + dataset: Dataset + version: UUID + fields: [DatasetField] + run: Run +} +type DatasetField { + type: String + createdAt: DateTime + updatedAt: DateTime + name: String + description: String + dataset: Dataset + versions: [DatasetVersion] + tags: [Tag] +} +type Dataset { + type: String + name: String + physicalName: String + createdAt: DateTime + updatedAt: DateTime + lastModifiedAt: DateTime + description: String + source: Source + fields: [DatasetField] + jobVersionAsInput: [JobVersion] + jobVersionAsOutput: [JobVersion] + namespace: Namespace + tags: [Tag] + currentVersion: DatasetVersion + versions: [DatasetVersion] +} \ No newline at end of file diff --git a/api/src/test/java/marquez/graphql/GraphqlTest.java b/api/src/test/java/marquez/graphql/GraphqlTest.java new file mode 100644 index 0000000000..3c10588161 --- /dev/null +++ b/api/src/test/java/marquez/graphql/GraphqlTest.java @@ -0,0 +1,60 @@ +package marquez.graphql; + +import static org.junit.Assert.*; + +import graphql.ExecutionResult; +import graphql.GraphQL; +import io.dropwizard.util.Resources; +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import marquez.DataAccessTests; +import marquez.IntegrationTests; +import marquez.JdbiRuleInit; +import marquez.common.Utils; +import marquez.db.OpenLineageDao; +import marquez.service.OpenLineageService; +import marquez.service.models.LineageEvent; +import org.jdbi.v3.core.Jdbi; +import org.jdbi.v3.testing.JdbiRule; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({DataAccessTests.class, IntegrationTests.class}) +public class GraphqlTest { + @ClassRule public static final JdbiRule dbRule = JdbiRuleInit.init(); + private static GraphQL graphQL; + + @BeforeClass + public static void setup() throws IOException, ExecutionException, InterruptedException { + Jdbi jdbi = dbRule.getJdbi(); + GraphqlSchemaBuilder schemaBuilder = new GraphqlSchemaBuilder(jdbi); + graphQL = GraphQL.newGraphQL(schemaBuilder.buildSchema()).build(); + OpenLineageDao openLineageDao = jdbi.onDemand(OpenLineageDao.class); + LineageEvent lineageEvent = + Utils.newObjectMapper() + .readValue(Resources.getResource("open_lineage/event_simple.json"), LineageEvent.class); + + OpenLineageService service = new OpenLineageService(openLineageDao); + service.createLineageEvent(lineageEvent).get(); + } + + @Test + public void testGraphql() { + ExecutionResult result = + graphQL.execute( + "{" + + " job(namespace: \"my-scheduler-namespace\", name: \"myjob.mytask\") {" + + " name" + + " }" + + "}"); + + assertTrue(result.getErrors().isEmpty()); + Map map = result.getData(); + Map job = (Map) map.get("job"); + + assertEquals("myjob.mytask", job.get("name")); + } +} diff --git a/marquez.dev.yml b/marquez.dev.yml index e2c94454a4..8c786d1190 100644 --- a/marquez.dev.yml +++ b/marquez.dev.yml @@ -14,6 +14,9 @@ db: migrateOnStartup: true +graphql: + enabled: ${GRAPHQL_ENABLED:-true} + logging: level: INFO appenders: diff --git a/marquez.example.yml b/marquez.example.yml index 71567ae1f3..5138536253 100644 --- a/marquez.example.yml +++ b/marquez.example.yml @@ -44,6 +44,10 @@ db: # Enables database migration on startup (default: true) migrateOnStartup: ${MIGRATE_ON_STARTUP:-true} +# Enabled the Graphql endpoint +graphql: + enabled: ${GRAPHQL_ENABLED:-true} + ### LOGGING CONFIG ### # Enables logging configuration overrides (see: https://www.dropwizard.io/en/stable/manual/configuration.html#logging)