Skip to content

Commit

Permalink
Add graphql endpoint (MarquezProject#901)
Browse files Browse the repository at this point in the history
* Add graphql endpoint

Signed-off-by: henneberger <[email protected]>

* Fix json graphql type

Signed-off-by: henneberger <[email protected]>

* spotless

Signed-off-by: henneberger <[email protected]>

* Add text search

Signed-off-by: henneberger <[email protected]>

* Spotless

Signed-off-by: henneberger <[email protected]>

* Add rough sketch of lineage graphql endpoint

Signed-off-by: henneberger <[email protected]>

* Remove prototype hackaton code from pr

Signed-off-by: henneberger <[email protected]>

* Add simple graphql test

Signed-off-by: henneberger <[email protected]>

* Add graphql to config

Signed-off-by: henneberger <[email protected]>
  • Loading branch information
henneberger committed Feb 3, 2021
1 parent 9824cc0 commit 9da3469
Show file tree
Hide file tree
Showing 18 changed files with 1,714 additions and 0 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ The HTTP API listens on port `5000` for all calls and port `5001` for the admin
You can open http:https://localhost:3000 to begin exploring the web UI.

The graphql playground endpoint can be found at http:https://localhost:5000/graphql-playground with
the graphql endpoint located at http:https://localhost:5000/api/v1-beta/graphql

## Documentation

We invite everyone to help us improve and keep documentation up to date. Documentation is maintained in this repository and can be found under [`docs/`](https://github.com/MarquezProject/marquez/tree/main/docs).
Expand Down
3 changes: 3 additions & 0 deletions api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ dependencies {
compile "io.dropwizard:dropwizard-jdbi3:${dropwizardVersion}"
compile "io.dropwizard:dropwizard-json-logging:${dropwizardVersion}"
compile "io.dropwizard:dropwizard-http2:${dropwizardVersion}"
compile "io.dropwizard:dropwizard-assets:${dropwizardVersion}"
compile "io.prometheus:simpleclient:${prometheusVersion}"
compile "io.prometheus:simpleclient_dropwizard:${prometheusVersion}"
compile "io.prometheus:simpleclient_hotspot:${prometheusVersion}"
Expand All @@ -47,6 +48,8 @@ dependencies {
compile 'com.google.guava:guava:30.0-jre'
compile 'org.flywaydb:flyway-core:6.3.0'
compile 'org.postgresql:postgresql:42.2.5'
compile 'com.graphql-java:graphql-java:16.1'
compile 'com.graphql-java-kickstart:graphql-java-servlet:11.0.0'
compileOnly "org.projectlombok:lombok:${lombokVersion}"
annotationProcessor "org.projectlombok:lombok:${lombokVersion}"

Expand Down
15 changes: 15 additions & 0 deletions api/src/main/java/marquez/MarquezApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.codahale.metrics.jdbi3.InstrumentedSqlLogger;
import com.fasterxml.jackson.databind.SerializationFeature;
import io.dropwizard.Application;
import io.dropwizard.assets.AssetsBundle;
import io.dropwizard.configuration.EnvironmentVariableSubstitutor;
import io.dropwizard.configuration.SubstitutingSourceProvider;
import io.dropwizard.db.DataSourceFactory;
Expand Down Expand Up @@ -72,6 +73,13 @@ public void initialize(@NonNull Bootstrap<MarquezConfig> bootstrap) {
new EnvironmentVariableSubstitutor(ERROR_ON_UNDEFINED)));

bootstrap.getObjectMapper().disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);

bootstrap.addBundle(
new AssetsBundle(
"/assets",
"/graphql-playground",
"graphql-playground/index.htm",
"graphql-playground"));
}

@Override
Expand All @@ -91,6 +99,7 @@ public void run(@NonNull MarquezConfig config, @NonNull Environment env) throws
}
}
registerResources(config, env, source);

registerServlets(env);
}

Expand All @@ -107,6 +116,12 @@ public void registerResources(
final MarquezContext context =
MarquezContext.builder().jdbi(jdbi).tags(config.getTags()).build();

if (config.getGraphql().isEnabled()) {
env.servlets()
.addServlet("api/v1-beta/graphql", context.getGraphqlServlet())
.addMapping("/api/v1-beta/graphql", "/api/v1/schema.json");
}

log.debug("Registering resources...");
for (final Object resource : context.getResources()) {
env.jersey().register(resource);
Expand Down
5 changes: 5 additions & 0 deletions api/src/main/java/marquez/MarquezConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
public final class MarquezConfig extends Configuration {
private static final boolean DEFAULT_MIGRATE_ON_STARTUP = true;
private static final ImmutableSet<Tag> DEFAULT_TAGS = ImmutableSet.of();
@Getter private final GraphqlConfig graphql = new GraphqlConfig();

@Getter private final boolean migrateOnStartup = DEFAULT_MIGRATE_ON_STARTUP;
@Getter private final ImmutableSet<Tag> tags = DEFAULT_TAGS;
Expand All @@ -38,4 +39,8 @@ public final class MarquezConfig extends Configuration {
@Getter
@JsonProperty("flyway")
private final FlywayFactory flywayFactory = new FlywayFactory();

public static class GraphqlConfig {
@Getter private final boolean enabled = true;
}
}
7 changes: 7 additions & 0 deletions api/src/main/java/marquez/MarquezContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import graphql.kickstart.servlet.GraphQLHttpServlet;
import java.util.ArrayList;
import java.util.List;
import lombok.Getter;
Expand Down Expand Up @@ -30,6 +31,8 @@
import marquez.db.RunStateDao;
import marquez.db.SourceDao;
import marquez.db.TagDao;
import marquez.graphql.GraphqlSchemaBuilder;
import marquez.graphql.MarquezGraphqlServletBuilder;
import marquez.service.DatasetService;
import marquez.service.JobService;
import marquez.service.NamespaceService;
Expand Down Expand Up @@ -81,6 +84,7 @@ public final class MarquezContext {

@Getter private final ImmutableList<Object> resources;
@Getter private final JdbiExceptionExceptionMapper jdbiException;
@Getter private final GraphQLHttpServlet graphqlServlet;

private MarquezContext(
@NonNull final Jdbi jdbi,
Expand Down Expand Up @@ -150,6 +154,9 @@ private MarquezContext(
serviceExceptionMapper,
jdbiException,
openLineageResource);

final MarquezGraphqlServletBuilder servlet = new MarquezGraphqlServletBuilder();
this.graphqlServlet = servlet.getServlet(new GraphqlSchemaBuilder(jdbi));
}

public static Builder builder() {
Expand Down
52 changes: 52 additions & 0 deletions api/src/main/java/marquez/graphql/CustomGraphQLContextBuilder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package marquez.graphql;

import graphql.kickstart.execution.context.DefaultGraphQLContext;
import graphql.kickstart.execution.context.GraphQLContext;
import graphql.kickstart.servlet.context.DefaultGraphQLWebSocketContext;
import graphql.kickstart.servlet.context.GraphQLServletContextBuilder;
import java.util.concurrent.CompletableFuture;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.websocket.Session;
import javax.websocket.server.HandshakeRequest;
import org.dataloader.DataLoader;
import org.dataloader.DataLoaderRegistry;

public class CustomGraphQLContextBuilder implements GraphQLServletContextBuilder {

public CustomGraphQLContextBuilder() {}

@Override
public GraphQLContext build(HttpServletRequest req, HttpServletResponse response) {
return MarquezGraphqlContext.createServletContext(buildDataLoaderRegistry(), null)
.with(req)
.with(response)
.build();
}

@Override
public GraphQLContext build() {
return new DefaultGraphQLContext(buildDataLoaderRegistry(), null);
}

@Override
public GraphQLContext build(Session session, HandshakeRequest request) {
return DefaultGraphQLWebSocketContext.createWebSocketContext(buildDataLoaderRegistry(), null)
.with(session)
.with(request)
.build();
}

private DataLoaderRegistry buildDataLoaderRegistry() {
DataLoaderRegistry dataLoaderRegistry = new DataLoaderRegistry();
dataLoaderRegistry.register(
"datasetLineage",
new DataLoader<Integer, String>(
customerIds ->
CompletableFuture.supplyAsync(
() -> {
return null;
})));
return dataLoaderRegistry;
}
}
159 changes: 159 additions & 0 deletions api/src/main/java/marquez/graphql/GraphqlDaos.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package marquez.graphql;

import java.util.List;
import java.util.UUID;
import marquez.db.JobVersionDao.IoType;
import marquez.graphql.mapper.ObjectMapMapper;
import marquez.graphql.mapper.RowMap;
import org.jdbi.v3.sqlobject.SqlObject;
import org.jdbi.v3.sqlobject.config.RegisterRowMapper;
import org.jdbi.v3.sqlobject.statement.SqlQuery;

@RegisterRowMapper(ObjectMapMapper.class)
public interface GraphqlDaos extends SqlObject {
/*
* Note: Use must use a non-map type for returning single entries because a type of Map is already
* registered to jdbi.
*/
@SqlQuery("SELECT * FROM datasets WHERE uuid = :uuid ORDER BY updated_at")
RowMap<String, Object> getDataset(UUID uuid);

@SqlQuery("SELECT * FROM datasets")
List<RowMap<String, Object>> getDatasets();

@SqlQuery(
"SELECT datasets.* FROM datasets inner join namespaces on datasets.namespace_uuid = namespaces.uuid "
+ "where namespaces.name = :namespaceName and datasets.name = :name")
RowMap<String, Object> getDatasetByNamespaceAndName(String namespaceName, String name);

@SqlQuery(
"SELECT jobs.* FROM jobs inner join namespaces on jobs.namespace_uuid = namespaces.uuid "
+ "where namespaces.name = :namespaceName and jobs.name = :name")
RowMap<String, Object> getJobByNamespaceAndName(String namespaceName, String name);

@SqlQuery("SELECT * FROM jobs")
List<RowMap<String, Object>> getJobs();

@SqlQuery("SELECT * FROM sources where uuid = :uuid")
RowMap<String, Object> getSource(UUID uuid);

@SqlQuery("SELECT * FROM namespaces where uuid = :uuid")
RowMap<String, Object> getNamespace(UUID uuid);

@SqlQuery("SELECT * FROM dataset_fields where dataset_uuid = :datasetUuid")
List<RowMap<String, Object>> getDatasetField(UUID datasetUuid);

@SqlQuery(
"SELECT f.* FROM dataset_fields f inner join dataset_fields_tag_mapping m on m.dataset_field_uuid = f.uuid where m.tag_uuid = :tagUuid")
List<RowMap<String, Object>> getDatasetFieldsByTagUuid(UUID tagUuid);

@SqlQuery(
"SELECT d.* FROM datasets d inner join datasets_tag_mapping m on m.dataset_uuid = d.uuid where tag_uuid = :uuid")
List<RowMap<String, Object>> getDatasetsByTagUuid(UUID tagUuid);

@SqlQuery("SELECT d.* from datasets d where source_uuid = :sourceUuid")
List<RowMap<String, Object>> getDatasetsBySource(UUID sourceUuid);

@SqlQuery("SELECT * from runs where uuid = :uuid")
RowMap<String, Object> getRun(UUID uuid);

@SqlQuery("SELECT * from runs where run_args_uuid = :runArgsUuid")
List<RowMap<String, Object>> getRunsByRunArgs(UUID runArgsUuid);

@SqlQuery("SELECT * FROM dataset_versions where uuid = :uuid")
RowMap<String, Object> getCurrentDatasetVersion(UUID uuid);

@SqlQuery(
"SELECT dv.* from dataset_versions dv inner join runs_input_mapping m on m.dataset_version_uuid = dv.uuid where m.run_uuid = :runUuid")
List<RowMap<String, Object>> getDatasetVersionInputsByRun(UUID runUuid);

@SqlQuery(
"SELECT r.* from runs r inner join runs_input_mapping m on m.run_uuid = r.uuid where m.dataset_version_uuid = :datasetVersionUuid")
List<RowMap<String, Object>> getRunsByDatasetVersion(UUID datasetVersionUuid);

@SqlQuery(
"SELECT distinct jv.* from runs r inner join runs_input_mapping m on m.run_uuid = r.uuid inner join job_versions jv on jv.uuid = r.job_version_uuid where m.dataset_version_uuid = :datasetVersionUuid")
List<RowMap<String, Object>> getDistinctJobVersionsByDatasetVersion(UUID datasetVersionUuid);

@SqlQuery(
"SELECT distinct jv.* from dataset_versions dv inner join runs r on r.uuid = dv.run_uuid inner join job_versions jv on jv.uuid = r.job_version_uuid where dv.uuid = :datasetVersionUuid")
List<RowMap<String, Object>> getDistinctJobVersionsByDatasetVersionOutput(
UUID datasetVersionUuid);

@SqlQuery("SELECT dv.* from dataset_versions dv where dv.run_uuid = :runUuid")
List<RowMap<String, Object>> getDatasetVersionByRun(UUID runUuid);

@SqlQuery("SELECT * from run_args where uuid = :uuid")
RowMap<String, Object> getRunArgs(UUID uuid);

@SqlQuery(
"SELECT n.* from namespaces n inner join on namespace_ownerships no on no.namespace_uuid = n.uuid where owner_uuid = :ownerUuid")
List<RowMap<String, Object>> getNamespacesByOwner(UUID ownerUuid);

@SqlQuery(
"SELECT * from owners o inner join namespace_ownerships no on o.uuid = no.owner_uuid where namespace_uuid = :namespaceUuid")
List<RowMap<String, Object>> getOwnersByNamespace(UUID namespaceUuid);

@SqlQuery("SELECT * from owners where name = :ownerName")
RowMap<String, Object> getCurrentOwnerByNamespace(String ownerName);

@SqlQuery("SELECT * from job_contexts where uuid = :uuid")
RowMap<String, Object> getJobContext(UUID uuid);

@SqlQuery("SELECT * from datasets where namespace_uuid = :namespaceUuid")
List<RowMap<String, Object>> getDatasetsByNamespace(UUID namespaceUuid);

@SqlQuery(
"SELECT d.* from datasets d inner join job_versions_io_mapping m on m.dataset_uuid = d.uuid where m.job_version_uuid = :jobVersionUuid and io_type = :ioType")
List<RowMap<String, Object>> getIOMappingByJobVersion(UUID jobVersionUuid, IoType ioType);

@SqlQuery(
"SELECT jv.* "
+ " FROM job_versions_io_mapping m "
+ " inner join job_versions jv "
+ " on m.dataset_uuid = jv.uuid"
+ " where m.dataset_uuid = :datasetUuid AND m.io_type = :ioType")
List<RowMap<String, Object>> getJobVersionsByIoMapping(UUID datasetUuid, IoType ioType);

@SqlQuery("SELECT * from job_versions where job_uuid = :jobUuid")
List<RowMap<String, Object>> getJobVersionByJob(UUID jobUuid);

@SqlQuery("SELECT * from job_versions where uuid = :uuid")
RowMap<String, Object> getJobVersion(UUID uuid);

@SqlQuery("SELECT * from dataset_fields where dataset_uuid = :datasetVersionUuid")
List<RowMap<String, Object>> getFields(UUID datasetVersionUuid);

@SqlQuery(
"SELECT dv.* from dataset_versions dv inner join dataset_versions_field_mapping m on dv.uuid = m.dataset_version_uuid where dataset_field_uuid = :datasetFieldUuid")
List<RowMap<String, Object>> getVersionsByDatasetField(UUID datasetFieldUuid);

@SqlQuery("SELECT * FROM dataset_versions where dataset_uuid = :datasetUuid")
List<RowMap<String, Object>> getDatasetVersionsByDataset(UUID datasetUuid);

@SqlQuery("SELECT * FROM namespaces where name = :name")
RowMap<String, Object> getNamespaceByName(String name);

@SqlQuery("SELECT * from jobs where namespace_uuid = :namespaceUuid")
List<RowMap<String, Object>> getJobsByNamespace(UUID namespaceUuid);

@SqlQuery("SELECT * from jobs where uuid = :uuid")
RowMap<String, Object> getJob(UUID uuid);

@SqlQuery("SELECT * from run_states where run_uuid = :runUuid order by transitioned_at desc")
List<RowMap<String, Object>> getRunStateByRun(UUID runUuid);

@SqlQuery("SELECT * from run_states where uuid = :uuid")
RowMap<String, Object> getRunStateByUuid(UUID uuid);

@SqlQuery(
"SELECT t.* FROM datasets_tag_mapping m "
+ " inner join tags t "
+ " on m.tag_uuid = t.uuid"
+ " where dataset_uuid = :datasetUuid")
List<RowMap<String, Object>> getTagsByDatasetTag(UUID datasetUuid);

@SqlQuery(
"SELECT t.* from tags t inner join dataset_fields_tag_mapping m on t.uuid = m.tag_uuid where dataaset_field_uuid = :datasetFieldUuid")
List<RowMap<String, Object>> getTagsByDatasetField(UUID datasetFieldUuid);
}
Loading

0 comments on commit 9da3469

Please sign in to comment.