Skip to content

Commit

Permalink
[BEAM-9147] Make VideoIntelligence use PTransform on user-facing API (a…
Browse files Browse the repository at this point in the history
…pache#11464)

[BEAM-9147] Make VideoIntelligence use PTransform on user-facing API
  • Loading branch information
mwalenia committed Apr 29, 2020
1 parent 199570e commit 068d961
Show file tree
Hide file tree
Showing 9 changed files with 294 additions and 91 deletions.
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ conversion to beam schema options. *Remark: Schema aware is still experimental.*
values as strings) into Python native types that are written to Avro
(Python's date, datetime types, decimal, etc). For more information
see https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro#avro_conversions.
* Added integration of Java SDK with Google Cloud AI VideoIntelligence service
([BEAM-9147](https://issues.apache.org/jira/browse/BEAM-9147))


## Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.ml;

import com.google.cloud.videointelligence.v1.Feature;
import com.google.cloud.videointelligence.v1.VideoAnnotationResults;
import com.google.cloud.videointelligence.v1.VideoContext;
import com.google.protobuf.ByteString;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.values.KV;

/**
* Implementation of AnnotateVideoFn accepting KVs as contents of input PCollection. Keys are the
* ByteString encoded video contents, values - VideoContext objects.
*/
@Experimental
class AnnotateVideoBytesWithContextFn extends AnnotateVideoFn<KV<ByteString, VideoContext>> {

public AnnotateVideoBytesWithContextFn(List<Feature> featureList) {
super(featureList);
}

/** ProcessElement implementation. */
@Override
public void processElement(ProcessContext context)
throws ExecutionException, InterruptedException {
ByteString element = context.element().getKey();
VideoContext videoContext = context.element().getValue();
List<VideoAnnotationResults> videoAnnotationResults =
getVideoAnnotationResults(null, element, videoContext);
context.output(videoAnnotationResults);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,31 +35,31 @@
import org.apache.beam.sdk.values.PCollectionView;

/**
* Base class for Video Intelligence transform.
* Base class for DoFns used in VideoIntelligence transforms.
*
* @param <T> Class of input data being passed in - either ByteString - video data encoded into.
* String or String - a GCS URI of the video to be annotated.
*/
@Experimental
public abstract class AnnotateVideo<T> extends DoFn<T, List<VideoAnnotationResults>> {
abstract class AnnotateVideoFn<T> extends DoFn<T, List<VideoAnnotationResults>> {

protected final PCollectionView<Map<T, VideoContext>> contextSideInput;
protected final List<Feature> featureList;
VideoIntelligenceServiceClient videoIntelligenceServiceClient;

public AnnotateVideo(
public AnnotateVideoFn(
PCollectionView<Map<T, VideoContext>> contextSideInput, List<Feature> featureList) {
this.contextSideInput = contextSideInput;
this.featureList = featureList;
}

public AnnotateVideo(List<Feature> featureList) {
public AnnotateVideoFn(List<Feature> featureList) {
contextSideInput = null;
this.featureList = featureList;
}

@StartBundle
public void startBundle() throws IOException {
@Setup
public void setup() throws IOException {
videoIntelligenceServiceClient = VideoIntelligenceServiceClient.create();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.ml;

import com.google.cloud.videointelligence.v1.Feature;
import com.google.cloud.videointelligence.v1.VideoAnnotationResults;
import com.google.cloud.videointelligence.v1.VideoContext;
import com.google.protobuf.ByteString;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.values.PCollectionView;

/**
* Implementation of AnnotateVideoFn accepting ByteStrings as contents of input PCollection. Videos
* decoded from the ByteStrings are annotated.
*/
@Experimental
class AnnotateVideoFromBytesFn extends AnnotateVideoFn<ByteString> {

public AnnotateVideoFromBytesFn(
PCollectionView<Map<ByteString, VideoContext>> contextSideInput, List<Feature> featureList) {
super(contextSideInput, featureList);
}

/** Implementation of ProcessElement. */
@Override
public void processElement(ProcessContext context)
throws ExecutionException, InterruptedException {
ByteString element = context.element();
VideoContext videoContext = null;
if (contextSideInput != null) {
videoContext = context.sideInput(contextSideInput).get(element);
}
List<VideoAnnotationResults> videoAnnotationResults =
getVideoAnnotationResults(null, element, videoContext);
context.output(videoAnnotationResults);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.ml;

import com.google.cloud.videointelligence.v1.Feature;
import com.google.cloud.videointelligence.v1.VideoAnnotationResults;
import com.google.cloud.videointelligence.v1.VideoContext;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.values.PCollectionView;

/**
* Implementation of AnnotateVideoFn accepting Strings as contents of input PCollection. Annotates
* videos found on GCS based on URIs from input PCollection.
*/
@Experimental
class AnnotateVideoFromURIFn extends AnnotateVideoFn<String> {

public AnnotateVideoFromURIFn(
PCollectionView<Map<String, VideoContext>> contextSideInput, List<Feature> featureList) {
super(contextSideInput, featureList);
}

/** ProcessElement implementation. */
@Override
public void processElement(ProcessContext context)
throws ExecutionException, InterruptedException {
String elementURI = context.element();
VideoContext videoContext = null;
if (contextSideInput != null) {
videoContext = context.sideInput(contextSideInput).get(elementURI);
}
List<VideoAnnotationResults> annotationResultsList =
getVideoAnnotationResults(elementURI, null, videoContext);
context.output(annotationResultsList);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.ml;

import com.google.cloud.videointelligence.v1.Feature;
import com.google.cloud.videointelligence.v1.VideoAnnotationResults;
import com.google.cloud.videointelligence.v1.VideoContext;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.values.KV;

/**
* Implementation of AnnotateVideoFn accepting KVs as contents of input PCollection. Keys are the
* GCS URIs, values - VideoContext objects.
*/
@Experimental
class AnnotateVideoURIWithContextFn extends AnnotateVideoFn<KV<String, VideoContext>> {

public AnnotateVideoURIWithContextFn(List<Feature> featureList) {
super(featureList);
}

/** ProcessElement implementation. */
@Override
public void processElement(ProcessContext context)
throws ExecutionException, InterruptedException {
String elementURI = context.element().getKey();
VideoContext videoContext = context.element().getValue();
List<VideoAnnotationResults> videoAnnotationResults =
getVideoAnnotationResults(elementURI, null, videoContext);
context.output(videoAnnotationResults);
}
}
Loading

0 comments on commit 068d961

Please sign in to comment.