Skip to content

Commit

Permalink
[FLINK-11959][table-runtime-blink] Introduce window operator for blin…
Browse files Browse the repository at this point in the history
…k streaming runtime

This closes apache#8058.
  • Loading branch information
KurtYoung committed Mar 30, 2019
1 parent 2d66d4a commit 2ef0280
Show file tree
Hide file tree
Showing 48 changed files with 7,372 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,9 @@ import org.apache.flink.table.expressions._
import org.apache.flink.table.functions.{AggregateFunction, DeclarativeAggregateFunction}
import org.apache.flink.table.generated.{AggsHandleFunction, GeneratedAggsHandleFunction, GeneratedNamespaceAggsHandleFunction, NamespaceAggsHandleFunction}
import org.apache.flink.table.plan.util.AggregateInfoList
import org.apache.flink.table.runtime.functions.ExecutionContext

import org.apache.calcite.rex.RexLiteral
import org.apache.calcite.tools.RelBuilder
import org.apache.flink.table.runtime.context.ExecutionContext

/**
* A code generator for generating [[AggsHandleFunction]].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
/**
* Utility for BaseRow.
*/
public class BaseRowUtil {
public class BaseRowTestUtil {

public static String baseRowToString(BaseRow value, BaseRowTypeInfo rowTypeInfo, TimeZone tz) {
return baseRowToString(value, rowTypeInfo, tz, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,14 @@ import org.apache.flink.table.dataview.DataViewSpec
import org.apache.flink.table.functions.AvgAggFunction.{DoubleAvgAggFunction, IntegralAvgAggFunction}
import org.apache.flink.table.generated.AggsHandleFunction
import org.apache.flink.table.plan.util.{AggregateInfo, AggregateInfoList}
import org.apache.flink.table.runtime.functions.ExecutionContext

import org.apache.calcite.rel.core.AggregateCall
import org.apache.calcite.tools.FrameworkConfig
import org.junit.{Assert, Test}
import org.powermock.api.mockito.PowerMockito.{mock, when}

import java.lang

import org.apache.flink.table.runtime.context.ExecutionContext

class AggsHandlerCodeGeneratorTest {

private val typeFactory: FlinkTypeFactory = new FlinkTypeFactory(new FlinkTypeSystem())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import org.apache.flink.table.api.java.{BatchTableEnvironment => JavaBatchTableE
import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTableEnv}
import org.apache.flink.table.api.{SqlParserException, Table, TableConfig, TableConfigOptions, TableEnvironment, TableImpl}
import org.apache.flink.table.plan.util.FlinkRelOptUtil
import org.apache.flink.table.runtime.utils.BatchTestBase.PARALLELISM
import org.apache.flink.table.util.DiffRepository
import org.apache.flink.test.util.AbstractTestBase
import org.apache.flink.types.Row
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink}
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.table.api.{TableConfig, Types}
import org.apache.flink.table.dataformat.BaseRow
import org.apache.flink.table.sinks.{AppendStreamTableSink, BatchTableSink, TableSink}
import org.apache.flink.table.sinks.{AppendStreamTableSink, BatchTableSink}
import org.apache.flink.table.typeutils.BaseRowTypeInfo
import org.apache.flink.table.util.BaseRowUtil
import org.apache.flink.table.util.BaseRowTestUtil
import org.apache.flink.types.Row

import _root_.java.util.TimeZone
Expand Down Expand Up @@ -136,7 +136,7 @@ final class TestingAppendBaseRowSink(
}

def invoke(value: BaseRow): Unit = localResults +=
BaseRowUtil.baseRowToString(value, rowTypeInfo, tz)
BaseRowTestUtil.baseRowToString(value, rowTypeInfo, tz)

def getAppendResults: List[String] = getResults

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.api.window;

import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.MathUtils;

import java.io.IOException;

/**
* A {@link Window} that represents a count window. For each count window, we will assign a unique
* id. Thus this CountWindow can act as namespace part in state. We can attach data to each
* different CountWindow.
*/
public class CountWindow extends Window {

private final long id;

public CountWindow(long id) {
this.id = id;
}

/**
* Gets the id (0-based) of the window.
*/
public long getId() {
return id;
}

@Override
public long maxTimestamp() {
return Long.MAX_VALUE;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

CountWindow window = (CountWindow) o;

return id == window.id;
}

@Override
public int hashCode() {
return MathUtils.longToIntWithBitMixing(id);
}

@Override
public String toString() {
return "CountWindow{" +
"id=" + id + '}';
}

@Override
public int compareTo(Window o) {
CountWindow that = (CountWindow) o;
return Long.compare(this.id, that.id);
}

// ------------------------------------------------------------------------
// Serializer
// ------------------------------------------------------------------------

/**
* The serializer used to write the CountWindow type.
*/
public static class Serializer extends TypeSerializerSingleton<CountWindow> {
private static final long serialVersionUID = 1L;

@Override
public boolean isImmutableType() {
return true;
}

@Override
public CountWindow createInstance() {
return null;
}

@Override
public CountWindow copy(CountWindow from) {
return from;
}

@Override
public CountWindow copy(CountWindow from, CountWindow reuse) {
return from;
}

@Override
public int getLength() {
return 0;
}

@Override
public void serialize(CountWindow record, DataOutputView target) throws IOException {
target.writeLong(record.id);
}

@Override
public CountWindow deserialize(DataInputView source) throws IOException {
return new CountWindow(source.readLong());
}

@Override
public CountWindow deserialize(CountWindow reuse, DataInputView source) throws IOException {
return deserialize(source);
}

@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
target.writeLong(source.readLong());
}


// ------------------------------------------------------------------------

@Override
public TypeSerializerSnapshot<CountWindow> snapshotConfiguration() {
return new CountWindow.Serializer.CountWindowSerializerSnapshot();
}

/**
* Serializer configuration snapshot for compatibility and format evolution.
*/
@SuppressWarnings("WeakerAccess")
public static final class CountWindowSerializerSnapshot extends SimpleTypeSerializerSnapshot<CountWindow> {

public CountWindowSerializerSnapshot() {
super(CountWindow.Serializer::new);
}
}
}
}
Loading

0 comments on commit 2ef0280

Please sign in to comment.