Skip to content

Commit

Permalink
Merge branch 'master' into addTest
Browse files Browse the repository at this point in the history
  • Loading branch information
liumomo315 committed Nov 15, 2019
2 parents 3358b37 + 5de27d5 commit c6c8fbd
Show file tree
Hide file tree
Showing 290 changed files with 8,958 additions and 3,169 deletions.
6 changes: 5 additions & 1 deletion .test-infra/jenkins/CommonJobProperties.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ class CommonJobProperties {
String commitStatusContext,
String prTriggerPhrase = '',
boolean onlyTriggerPhraseToggle = true,
List<String> triggerPathPatterns = []) {
List<String> triggerPathPatterns = [],
List<String> excludePathPatterns = []) {
context.triggers {
githubPullRequest {
admins(['asfbot'])
Expand All @@ -123,6 +124,9 @@ class CommonJobProperties {
if (!triggerPathPatterns.isEmpty()) {
includedRegions(triggerPathPatterns.join('\n'))
}
if (!excludePathPatterns.isEmpty()) {
excludedRegions(excludePathPatterns)
}

extensions {
commitStatus {
Expand Down
6 changes: 5 additions & 1 deletion .test-infra/jenkins/PrecommitJobBuilder.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ class PrecommitJobBuilder {
/** If defined, set of path expressions used to trigger the job on commit. */
List<String> triggerPathPatterns = []

/** If defined, set of path expressions to not trigger the job on commit. */
List<String> excludePathPatterns = []

/** Whether to trigger on new PR commits. Useful to set to false when testing new jobs. */
boolean commitTriggering = true

Expand Down Expand Up @@ -86,7 +89,8 @@ class PrecommitJobBuilder {
githubUiHint(),
'',
false,
triggerPathPatterns)
triggerPathPatterns,
excludePathPatterns)
}
job.with additionalCustomization
}
Expand Down
2 changes: 1 addition & 1 deletion .test-infra/jenkins/job_Dependency_Check.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ job('beam_Dependency_Check') {
steps {
gradle {
rootBuildScriptDir(commonJobProperties.checkoutDir)
tasks(':runBeamDependencyCheck')
tasks('runBeamDependencyCheck')
commonJobProperties.setGradleSwitches(delegate)
switches('-Drevision=release')
}
Expand Down
3 changes: 3 additions & 0 deletions .test-infra/jenkins/job_PreCommit_Java.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ PrecommitJobBuilder builder = new PrecommitJobBuilder(
'^examples/java/.*$',
'^examples/kotlin/.*$',
'^release/.*$',
],
excludePathPatterns: [
'^sdks/java/extensions/sql/.*$'
]
)
builder.build {
Expand Down
1 change: 1 addition & 0 deletions .test-infra/jenkins/job_PreCommit_Python.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ PrecommitJobBuilder builderPytest = new PrecommitJobBuilder(
nameBase: 'Python_pytest',
gradleTask: ':pythonPreCommitPytest',
commitTriggering: false,
timeoutMins: 180,
)
builderPytest.build {
// Publish all test results to Jenkins.
Expand Down
52 changes: 52 additions & 0 deletions .test-infra/jenkins/job_PreCommit_SQL.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import PrecommitJobBuilder

PrecommitJobBuilder builder = new PrecommitJobBuilder(
scope: this,
nameBase: 'SQL',
gradleTask: ':sqlPreCommit',
gradleSwitches: ['-PdisableSpotlessCheck=true'], // spotless checked in job_PreCommit_Spotless
triggerPathPatterns: [
'^sdks/java/extensions/sql.*$',
]
)
builder.build {
publishers {
archiveJunit('**/build/test-results/**/*.xml')
recordIssues {
tools {
errorProne()
java()
checkStyle {
pattern('**/build/reports/checkstyle/*.xml')
}
configure { node ->
node / 'spotBugs' << 'io.jenkins.plugins.analysis.warnings.SpotBugs' {
pattern('**/build/reports/spotbugs/*.xml')
}
}
}
enabledForFailure(true)
}
jacocoCodeCoverage {
execPattern('**/build/jacoco/*.exec')
}
}
}
9 changes: 9 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ task javaPreCommit() {
dependsOn ":runners:direct-java:needsRunnerTests"
}

task sqlPreCommit() {
dependsOn ":sdks:java:extensions:sql:build"
dependsOn ":sdks:java:extensions:sql:buildDependents"
}

task javaPreCommitBeamZetaSQL() {
dependsOn ":sdks:java:extensions:sql:zetasql:test"
}
Expand Down Expand Up @@ -227,22 +232,26 @@ task python2PostCommit() {
dependsOn ":sdks:python:test-suites:direct:py2:directRunnerIT"
dependsOn ":sdks:python:test-suites:direct:py2:hdfsIntegrationTest"
dependsOn ":sdks:python:test-suites:direct:py2:mongodbioIT"
dependsOn ":sdks:python:test-suites:portable:py2:postCommitPy2"
}

task python35PostCommit() {
dependsOn ":sdks:python:test-suites:dataflow:py35:postCommitIT"
dependsOn ":sdks:python:test-suites:direct:py35:postCommitIT"
dependsOn ":sdks:python:test-suites:portable:py35:postCommitPy35"
}

task python36PostCommit() {
dependsOn ":sdks:python:test-suites:dataflow:py36:postCommitIT"
dependsOn ":sdks:python:test-suites:direct:py36:postCommitIT"
dependsOn ":sdks:python:test-suites:portable:py36:postCommitPy36"
}

task python37PostCommit() {
dependsOn ":sdks:python:test-suites:dataflow:py37:postCommitIT"
dependsOn ":sdks:python:test-suites:direct:py37:postCommitIT"
dependsOn ":sdks:python:test-suites:direct:py37:hdfsIntegrationTest"
dependsOn ":sdks:python:test-suites:portable:py37:postCommitPy37"
}

task portablePythonPreCommit() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1899,11 +1899,13 @@ class BeamModulePlugin implements Plugin<Project> {
}
}

def addPortableWordCountTask = { boolean isStreaming ->
project.task('portableWordCount' + (isStreaming ? 'Streaming' : 'Batch')) {
def addPortableWordCountTask = { boolean isStreaming, String runner ->
project.task('portableWordCount' + (runner.equals("PortableRunner") ? "" : runner) + (isStreaming ? 'Streaming' : 'Batch')) {
dependsOn = ['installGcpTest']
mustRunAfter = [
':runners:flink:1.9:job-server-container:docker',
':runners:flink:1.9:job-server:shadowJar',
':runners:spark:job-server:shadowJar',
':sdks:python:container:py2:docker',
':sdks:python:container:py35:docker',
':sdks:python:container:py36:docker',
Expand All @@ -1914,7 +1916,7 @@ class BeamModulePlugin implements Plugin<Project> {
def options = [
"--input=/etc/profile",
"--output=/tmp/py-wordcount-direct",
"--runner=PortableRunner",
"--runner=${runner}",
"--experiments=worker_threads=100",
"--parallelism=2",
"--shutdown_sources_on_final_watermark",
Expand Down Expand Up @@ -1953,8 +1955,11 @@ class BeamModulePlugin implements Plugin<Project> {
}
project.ext.addPortableWordCountTasks = {
->
addPortableWordCountTask(false)
addPortableWordCountTask(true)
addPortableWordCountTask(false, "PortableRunner")
addPortableWordCountTask(true, "PortableRunner")
addPortableWordCountTask(false, "FlinkRunner")
addPortableWordCountTask(true, "FlinkRunner")
addPortableWordCountTask(false, "SparkRunner")
}
}
}
Expand Down
43 changes: 43 additions & 0 deletions model/fn-execution/src/main/proto/beam_fn_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import "beam_runner_api.proto";
import "endpoints.proto";
import "google/protobuf/descriptor.proto";
import "google/protobuf/timestamp.proto";
import "google/protobuf/duration.proto";
import "google/protobuf/wrappers.proto";
import "metrics.proto";

Expand Down Expand Up @@ -203,13 +204,21 @@ message BundleApplication {
}

// An Application should be scheduled for execution after a delay.
// Either an absolute timestamp or a relative timestamp can represent a
// scheduled execution time.
message DelayedBundleApplication {
// Recommended time at which the application should be scheduled to execute
// by the runner. Times in the past may be scheduled to execute immediately.
// TODO(BEAM-8536): Migrate usage of absolute time to requested_time_delay.
google.protobuf.Timestamp requested_execution_time = 1;

// (Required) The application that should be scheduled.
BundleApplication application = 2;

// Recommended time delay at which the application should be scheduled to
// execute by the runner. Time delay that equals 0 may be scheduled to execute
// immediately. The unit of time delay should be microsecond.
google.protobuf.Duration requested_time_delay = 3;
}

// A request to process a given bundle.
Expand Down Expand Up @@ -628,6 +637,18 @@ message StateKey {
bytes key = 1;
}

// Represents a request for the values associated with a specified user key
// and window in a PCollection. See
// https://s.apache.org/beam-fn-state-api-and-bundle-processing for further
// details.
//
// Can only be used to perform StateGetRequests on side inputs of the URN
// beam:side_input:multimap:v1.
//
// For a PCollection<KV<K, V>>, the response data stream will be a
// concatenation of all V's associated with the specified key K. See
// https://s.apache.org/beam-fn-api-send-and-receive-data for further
// details.
message MultimapSideInput {
// (Required) The id of the PTransform containing a side input.
string transform_id = 1;
Expand All @@ -652,11 +673,33 @@ message StateKey {
bytes key = 4;
}

// Represents a request for the values associated with a specified window
// in a PCollection. See
// https://s.apache.org/beam-fn-state-api-and-bundle-processing for further
// details.
//
// Can only be used to perform StateGetRequests on side inputs of the URN
// beam:side_input:iterable:v1 and beam:side_input:multimap:v1.
//
// For a PCollection<V>, the response data stream will be a concatenation
// of all V's. See https://s.apache.org/beam-fn-api-send-and-receive-data
// for further details.
message IterableSideInput {
// (Required) The id of the PTransform containing a side input.
string transform_id = 1;
// (Required) The id of the side input.
string side_input_id = 2;
// (Required) The window (after mapping the currently executing elements
// window into the side input windows domain) encoded in a nested context.
bytes window = 3;
}

// (Required) One of the following state keys must be set.
oneof type {
Runner runner = 1;
MultimapSideInput multimap_side_input = 2;
BagUserState bag_user_state = 3;
IterableSideInput iterable_side_input = 4;
// TODO: represent a state key for user map state
}
}
Expand Down
23 changes: 23 additions & 0 deletions model/job-management/src/main/proto/beam_job_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -213,17 +213,40 @@ message JobMessagesResponse {
// without needing to pass through STARTING.
message JobState {
enum Enum {
// The job state reported by a runner cannot be interpreted by the SDK.
UNSPECIFIED = 0;

// The job has not yet started.
STOPPED = 1;

// The job is currently running.
RUNNING = 2;

// The job has successfully completed. (terminal)
DONE = 3;

// The job has failed. (terminal)
FAILED = 4;

// The job has been explicitly cancelled. (terminal)
CANCELLED = 5;

// The job has been updated. (terminal)
UPDATED = 6;

// The job is draining its data. (optional)
DRAINING = 7;

// The job has completed draining its data. (terminal)
DRAINED = 8;

// The job is starting up.
STARTING = 9;

// The job is cancelling. (optional)
CANCELLING = 10;

// The job is in the process of being updated. (optional)
UPDATING = 11;
}
}
Expand Down
9 changes: 9 additions & 0 deletions model/pipeline/src/main/proto/beam_runner_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,16 @@ message StandardPTransforms {

message StandardSideInputTypes {
enum Enum {
// Represents a view over a PCollection<V>.
//
// StateGetRequests performed on this side input must use
// StateKey.IterableSideInput.
ITERABLE = 0 [(beam_urn) = "beam:side_input:iterable:v1"];

// Represents a view over a PCollection<KV<K, V>>.
//
// StateGetRequests performed on this side input must use
// StateKey.IterableSideInput or StateKey.MultimapSideInput.
MULTIMAP = 1 [(beam_urn) = "beam:side_input:multimap:v1"];
}
}
Expand Down
4 changes: 2 additions & 2 deletions ownership/JAVA_DEPENDENCY_OWNERS.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ deps:
com.amazonaws:amazon-kinesis-client:
group: com.amazonaws
artifact: amazon-kinesis-client
owners:
owners: aromanenko-dev

com.amazonaws:amazon-kinesis-producer:
group: com.amazonaws
artifact: amazon-kinesis-producer
owners:
owners: aromanenko-dev

com.amazonaws:aws-java-sdk-cloudwatch:
group: com.amazonaws
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.beam.model.pipeline.v1.RunnerApi.ReadPayload;
import org.apache.beam.model.pipeline.v1.RunnerApi.StandardEnvironments;
import org.apache.beam.model.pipeline.v1.RunnerApi.WindowIntoPayload;
import org.apache.beam.sdk.util.ReleaseInfo;
import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.InvalidProtocolBufferException;
Expand Down Expand Up @@ -88,7 +89,8 @@ public class Environments {
* See https://beam.apache.org/contribute/docker-images/ for more information on how to build a
* container.
*/
private static final String JAVA_SDK_HARNESS_CONTAINER_URL = "apachebeam/java_sdk";
private static final String JAVA_SDK_HARNESS_CONTAINER_URL =
"apachebeam/java_sdk:" + ReleaseInfo.getReleaseInfo().getVersion();
public static final Environment JAVA_SDK_HARNESS_ENVIRONMENT =
createDockerEnvironment(JAVA_SDK_HARNESS_CONTAINER_URL);

Expand All @@ -114,6 +116,9 @@ public static Environment createOrGetDefaultEnvironment(String type, String conf
}

public static Environment createDockerEnvironment(String dockerImageUrl) {
if (Strings.isNullOrEmpty(dockerImageUrl)) {
return JAVA_SDK_HARNESS_ENVIRONMENT;
}
return Environment.newBuilder()
.setUrn(BeamUrns.getUrn(StandardEnvironments.Environments.DOCKER))
.setPayload(
Expand Down
Loading

0 comments on commit c6c8fbd

Please sign in to comment.