Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(datahub-client): add java file emitter #5578

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 59 additions & 4 deletions metadata-integration/java/as-a-library.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Follow the specific instructions for your build system to declare a dependency o
### Gradle
Add the following to your build.gradle.
```gradle
implementation 'io.acryl:datahub-client:0.0.1'
implementation 'io.acryl:datahub-client:__version__'
```
### Maven
Add the following to your `pom.xml`.
Expand All @@ -23,8 +23,8 @@ Add the following to your `pom.xml`.
<dependency>
<groupId>io.acryl</groupId>
<artifactId>datahub-client</artifactId>
<!-- replace with the latest version number -->
<version>0.0.1</version>
<!-- replace __version__ with the latest version number -->
<version>__version__</version>
</dependency>
```

Expand Down Expand Up @@ -95,7 +95,7 @@ emitter.emit(mcpw, new Callback() {
});
```

### Emitter Code
### REST Emitter Code

If you're interested in looking at the REST emitter code, it is available [here](./datahub-client/src/main/java/datahub/client/rest/RestEmitter.java).

Expand Down Expand Up @@ -161,6 +161,61 @@ else {
System.out.println("Kafka service is down.");
}
```
### Kafka Emitter Code

If you're interested in looking at the Kafka emitter code, it is available [here](./datahub-client/src/main/java/datahub/client/kafka/KafkaEmitter.java).

## File Emitter

The File emitter writes metadata change proposal events (MCPs) into a JSON file that can be later handed off to the Python [File source](docs/generated/ingestion/sources/file.md) for ingestion. This works analogous to the [File sink](../../metadata-ingestion/sink_docs/file.md) in Python. This mechanism can be used when the system producing metadata events doesn't have direct connection to DataHub's REST server or Kafka brokers. The generated JSON file can be transferred later and then ingested into DataHub using the [File source](docs/generated/ingestion/sources/file.md).

### Usage

```java


import datahub.client.file.FileEmitter;
import datahub.client.file.FileEmitterConfig;
import datahub.event.MetadataChangeProposalWrapper;

// ... followed by


// Define output file co-ordinates
String outputFile = "/my/path/output.json";

//Create File Emitter
FileEmitter emitter = new FileEmitter(FileEmitterConfig.builder().fileName(outputFile).build());

// A couple of sample metadata events
MetadataChangeProposalWrapper mcpwOne = MetadataChangeProposalWrapper.builder()
.entityType("dataset")
.entityUrn("urn:li:dataset:(urn:li:dataPlatform:bigquery,my-project.my-dataset.user-table,PROD)")
.upsert()
.aspect(new DatasetProperties().setDescription("This is the canonical User profile dataset"))
.build();

MetadataChangeProposalWrapper mcpwTwo = MetadataChangeProposalWrapper.builder()
.entityType("dataset")
.entityUrn("urn:li:dataset:(urn:li:dataPlatform:bigquery,my-project.my-dataset.fact-orders-table,PROD)")
.upsert()
.aspect(new DatasetProperties().setDescription("This is the canonical Fact table for orders"))
.build();

MetadataChangeProposalWrapper[] mcpws = { mcpwOne, mcpwTwo };
for (MetadataChangeProposalWrapper mcpw : mcpws) {
emitter.emit(mcpw);
}
emitter.close(); // calling close() is important to ensure file gets closed cleanly

```
### File Emitter Code

If you're interested in looking at the File emitter code, it is available [here](./datahub-client/src/main/java/datahub/client/file/FileEmitter.java).

### Support for S3, GCS etc.

The File emitter only supports writing to the local filesystem currently. If you're interested in adding support for S3, GCS etc., contributions are welcome!

## Other Languages

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
package datahub.client.file;

import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.util.DefaultIndenter;
import com.fasterxml.jackson.core.util.DefaultPrettyPrinter;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.linkedin.data.template.JacksonDataTemplateCodec;
import com.linkedin.mxe.MetadataChangeProposal;

import datahub.client.Callback;
import datahub.client.Emitter;
import datahub.client.MetadataWriteResponse;
import datahub.event.EventFormatter;
import datahub.event.MetadataChangeProposalWrapper;
import datahub.event.UpsertAspectRequest;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class FileEmitter implements Emitter {

private final EventFormatter eventFormatter;
private final FileEmitterConfig config;
private final ObjectMapper objectMapper = new ObjectMapper()
.setSerializationInclusion(JsonInclude.Include.NON_NULL);
private final JacksonDataTemplateCodec dataTemplateCodec = new JacksonDataTemplateCodec(objectMapper.getFactory());

private final BufferedWriter writer;
private final Future<MetadataWriteResponse> cachedSuccessFuture;
private final AtomicBoolean closed;
private boolean wroteSomething;
private static final String INDENT_4 = " ";

/**
* The default constructor
*
* @param config
*/
public FileEmitter(FileEmitterConfig config) {

this.config = config;
this.eventFormatter = this.config.getEventFormatter();

DefaultPrettyPrinter pp = new DefaultPrettyPrinter()
.withObjectIndenter(new DefaultIndenter(FileEmitter.INDENT_4, DefaultIndenter.SYS_LF))
.withArrayIndenter(new DefaultIndenter(FileEmitter.INDENT_4, DefaultIndenter.SYS_LF));
this.dataTemplateCodec.setPrettyPrinter(pp);

try {
FileWriter fw = new FileWriter(config.getFileName(), false);
this.writer = new BufferedWriter(fw);
this.writer.append("[");
this.writer.newLine();
this.closed = new AtomicBoolean(false);
} catch (IOException e) {
throw new RuntimeException("Error while creating file", e);
}
this.wroteSomething = false;
log.debug("Emitter created successfully for " + this.config.getFileName());

this.cachedSuccessFuture = new Future<MetadataWriteResponse>() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}

@Override
public MetadataWriteResponse get() throws InterruptedException, ExecutionException {
return MetadataWriteResponse.builder().success(true).responseContent("MCP witten to File").build();
}

@Override
public MetadataWriteResponse get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return this.get();
}

@Override
public boolean isCancelled() {
return false;
}

@Override
public boolean isDone() {
return true;
}
};
}

@Override
public void close() throws IOException {
this.writer.newLine();
this.writer.append("]");
this.writer.close();
this.closed.set(true);
log.debug("Emitter closed for {}", this.config.getFileName());
}

@Override
public Future<MetadataWriteResponse> emit(@SuppressWarnings("rawtypes") MetadataChangeProposalWrapper mcpw,
Callback callback) throws IOException {
return emit(this.eventFormatter.convert(mcpw), callback);
}

@Override
public Future<MetadataWriteResponse> emit(MetadataChangeProposal mcp, Callback callback) throws IOException {
if (this.closed.get()) {
String errorMsg = "File Emitter is already closed.";
log.error(errorMsg);
Future<MetadataWriteResponse> response = createFailureFuture(errorMsg);
if (callback != null) {
callback.onFailure(new Exception(errorMsg));
}
return response;
}
try {
String serializedMCP = this.dataTemplateCodec.mapToString(mcp.data());
if (wroteSomething) {
this.writer.append(",");
this.writer.newLine();
}
this.writer.append(serializedMCP);
wroteSomething = true;
log.debug("MCP written successfully: {}", serializedMCP);
Future<MetadataWriteResponse> response = this.cachedSuccessFuture;
if (callback != null) {
try {
callback.onCompletion(response.get());
} catch (InterruptedException | ExecutionException e) {
log.warn("Callback could not be executed.", e);
}
}
return response;
} catch (Throwable t) {
Future<MetadataWriteResponse> response = createFailureFuture(t.getMessage());
if (callback != null) {
try {
callback.onFailure(t);
} catch (Exception e) {
log.warn("Callback could not be executed.", e);
}
}
return response;
}
}

@Override
public boolean testConnection() throws IOException, ExecutionException, InterruptedException {
throw new UnsupportedOperationException("testConnection not relevant for File Emitter");
}

@Override
public Future<MetadataWriteResponse> emit(List<UpsertAspectRequest> request, Callback callback) throws IOException {
throw new UnsupportedOperationException("UpsertAspectRequest not relevant for File Emitter");
}

private Future<MetadataWriteResponse> createFailureFuture(String message) {
return new Future<MetadataWriteResponse>() {

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}

@Override
public MetadataWriteResponse get() throws InterruptedException, ExecutionException {
return MetadataWriteResponse.builder().success(false).responseContent(message).build();
}

@Override
public MetadataWriteResponse get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
TimeoutException {
return this.get();
}

@Override
public boolean isCancelled() {
return false;
}

@Override
public boolean isDone() {
return true;
}

};
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package datahub.client.file;

import datahub.event.EventFormatter;
import lombok.Builder;
import lombok.Value;

@Value
@Builder
public class FileEmitterConfig {
@Builder.Default
@lombok.NonNull
private final String fileName = null;
@Builder.Default
private final EventFormatter eventFormatter = new EventFormatter(EventFormatter.Format.PEGASUS_JSON);

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class KafkaEmitter implements Emitter {
private static final int ADMIN_CLIENT_TIMEOUT_MS = 5000;

/**
* The default constructor, prefer using the `create` factory method.
* The default constructor
*
* @param config
* @throws IOException
Expand Down
Loading