forked from apache/flink
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[FLINK-24865][CEP] Support MATCH_RECOGNIZE in Batch mode
This closes apache#18408
- Loading branch information
1 parent
5610151
commit d244350
Showing
18 changed files
with
2,217 additions
and
890 deletions.
There are no files selected for viewing
57 changes: 57 additions & 0 deletions
57
...er/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecMatch.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http:https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.flink.table.planner.plan.nodes.exec.batch; | ||
|
||
import org.apache.flink.configuration.ReadableConfig; | ||
import org.apache.flink.table.data.RowData; | ||
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; | ||
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext; | ||
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty; | ||
import org.apache.flink.table.planner.plan.nodes.exec.MultipleTransformationTranslator; | ||
import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecMatch; | ||
import org.apache.flink.table.planner.plan.nodes.exec.spec.MatchSpec; | ||
import org.apache.flink.table.types.logical.RowType; | ||
|
||
import java.util.Collections; | ||
|
||
/** Batch {@link ExecNode} which matches along with MATCH_RECOGNIZE. */ | ||
public class BatchExecMatch extends CommonExecMatch | ||
implements BatchExecNode<RowData>, MultipleTransformationTranslator<RowData> { | ||
|
||
public BatchExecMatch( | ||
ReadableConfig tableConfig, | ||
MatchSpec matchSpec, | ||
InputProperty inputProperty, | ||
RowType outputType, | ||
String description) { | ||
super( | ||
ExecNodeContext.newNodeId(), | ||
ExecNodeContext.newContext(BatchExecMatch.class), | ||
ExecNodeContext.newPersistedConfig(BatchExecMatch.class, tableConfig), | ||
matchSpec, | ||
Collections.singletonList(inputProperty), | ||
outputType, | ||
description); | ||
} | ||
|
||
@Override | ||
public boolean isProcTime(RowType inputRowType) { | ||
return true; | ||
} | ||
} |
432 changes: 432 additions & 0 deletions
432
.../src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecMatch.java
Large diffs are not rendered by default.
Oops, something went wrong.
387 changes: 13 additions & 374 deletions
387
.../src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMatch.java
Large diffs are not rendered by default.
Oops, something went wrong.
65 changes: 65 additions & 0 deletions
65
...ain/java/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalMatch.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http:https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.flink.table.planner.plan.nodes.physical.batch; | ||
|
||
import org.apache.flink.table.planner.calcite.FlinkTypeFactory; | ||
import org.apache.flink.table.planner.plan.logical.MatchRecognize; | ||
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; | ||
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty; | ||
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecMatch; | ||
import org.apache.flink.table.planner.plan.nodes.physical.common.CommonPhysicalMatch; | ||
import org.apache.flink.table.planner.plan.utils.MatchUtil; | ||
|
||
import org.apache.calcite.plan.RelOptCluster; | ||
import org.apache.calcite.plan.RelTraitSet; | ||
import org.apache.calcite.rel.RelNode; | ||
import org.apache.calcite.rel.type.RelDataType; | ||
|
||
import java.util.List; | ||
|
||
import static org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig; | ||
|
||
/** Batch physical RelNode which matches along with MATCH_RECOGNIZE. */ | ||
public class BatchPhysicalMatch extends CommonPhysicalMatch implements BatchPhysicalRel { | ||
|
||
public BatchPhysicalMatch( | ||
RelOptCluster cluster, | ||
RelTraitSet traitSet, | ||
RelNode inputNode, | ||
MatchRecognize logicalMatch, | ||
RelDataType outputRowType) { | ||
super(cluster, traitSet, inputNode, logicalMatch, outputRowType); | ||
} | ||
|
||
@Override | ||
public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { | ||
return new BatchPhysicalMatch( | ||
getCluster(), traitSet, inputs.get(0), getLogicalMatch(), deriveRowType()); | ||
} | ||
|
||
@Override | ||
public ExecNode<?> translateToExecNode() { | ||
return new BatchExecMatch( | ||
unwrapTableConfig(this), | ||
MatchUtil.createMatchSpec(getLogicalMatch()), | ||
InputProperty.DEFAULT, | ||
FlinkTypeFactory.toLogicalRowType(getRowType()), | ||
getRelDetailedDescription()); | ||
} | ||
} |
101 changes: 101 additions & 0 deletions
101
...n/java/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalMatch.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,101 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http:https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.flink.table.planner.plan.nodes.physical.common; | ||
|
||
import org.apache.flink.table.api.TableException; | ||
import org.apache.flink.table.planner.plan.logical.MatchRecognize; | ||
import org.apache.flink.table.planner.plan.nodes.physical.FlinkPhysicalRel; | ||
import org.apache.flink.table.planner.plan.utils.PythonUtil; | ||
import org.apache.flink.table.planner.plan.utils.RelExplainUtil; | ||
|
||
import org.apache.calcite.plan.RelOptCluster; | ||
import org.apache.calcite.plan.RelTraitSet; | ||
import org.apache.calcite.rel.RelNode; | ||
import org.apache.calcite.rel.RelWriter; | ||
import org.apache.calcite.rel.SingleRel; | ||
import org.apache.calcite.rel.type.RelDataType; | ||
|
||
import scala.collection.JavaConverters; | ||
import scala.collection.Seq; | ||
|
||
/** Base physical RelNode which matches along with MATCH_RECOGNIZE. */ | ||
public abstract class CommonPhysicalMatch extends SingleRel implements FlinkPhysicalRel { | ||
|
||
private final MatchRecognize logicalMatch; | ||
private final RelDataType outputRowType; | ||
|
||
public CommonPhysicalMatch( | ||
RelOptCluster cluster, | ||
RelTraitSet traitSet, | ||
RelNode inputNode, | ||
MatchRecognize logicalMatch, | ||
RelDataType outputRowType) { | ||
super(cluster, traitSet, inputNode); | ||
if (logicalMatch.measures().values().stream() | ||
.anyMatch(m -> PythonUtil.containsPythonCall(m, null)) | ||
|| logicalMatch.patternDefinitions().values().stream() | ||
.anyMatch(p -> PythonUtil.containsPythonCall(p, null))) { | ||
throw new TableException("Python Function can not be used in MATCH_RECOGNIZE for now."); | ||
} | ||
this.logicalMatch = logicalMatch; | ||
this.outputRowType = outputRowType; | ||
} | ||
|
||
@Override | ||
protected RelDataType deriveRowType() { | ||
return outputRowType; | ||
} | ||
|
||
@Override | ||
public RelWriter explainTerms(RelWriter pw) { | ||
RelDataType inputRowType = getInput().getRowType(); | ||
Seq<String> fieldNames = | ||
JavaConverters.asScalaBufferConverter(inputRowType.getFieldNames()).asScala(); | ||
return super.explainTerms(pw) | ||
.itemIf( | ||
"partitionBy", | ||
RelExplainUtil.fieldToString( | ||
logicalMatch.partitionKeys().toArray(), inputRowType), | ||
!logicalMatch.partitionKeys().isEmpty()) | ||
.itemIf( | ||
"orderBy", | ||
RelExplainUtil.collationToString(logicalMatch.orderKeys(), inputRowType), | ||
!logicalMatch.orderKeys().getFieldCollations().isEmpty()) | ||
.itemIf( | ||
"measures", | ||
RelExplainUtil.measuresDefineToString( | ||
logicalMatch.measures(), | ||
fieldNames.toList(), | ||
this::getExpressionString, | ||
convertToExpressionDetail(pw.getDetailLevel())), | ||
!logicalMatch.measures().isEmpty()) | ||
.item("rowsPerMatch", RelExplainUtil.rowsPerMatchToString(logicalMatch.allRows())) | ||
.item("after", RelExplainUtil.afterMatchToString(logicalMatch.after(), fieldNames)) | ||
.item("pattern", logicalMatch.pattern().toString()) | ||
.itemIf( | ||
"subset", | ||
RelExplainUtil.subsetToString(logicalMatch.subsets()), | ||
!logicalMatch.subsets().isEmpty()) | ||
.item("define", logicalMatch.patternDefinitions()); | ||
} | ||
|
||
public MatchRecognize getLogicalMatch() { | ||
return logicalMatch; | ||
} | ||
} |
63 changes: 63 additions & 0 deletions
63
...java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalMatchRule.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http:https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.flink.table.planner.plan.rules.physical.batch; | ||
|
||
import org.apache.flink.table.planner.plan.logical.MatchRecognize; | ||
import org.apache.flink.table.planner.plan.nodes.FlinkConventions; | ||
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalMatch; | ||
import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalMatch; | ||
import org.apache.flink.table.planner.plan.rules.physical.common.CommonPhysicalMatchRule; | ||
|
||
import org.apache.calcite.plan.RelOptCluster; | ||
import org.apache.calcite.plan.RelOptRule; | ||
import org.apache.calcite.plan.RelTraitSet; | ||
import org.apache.calcite.rel.RelNode; | ||
import org.apache.calcite.rel.type.RelDataType; | ||
|
||
/** | ||
* The physical rule is responsible for convert {@link FlinkLogicalMatch} to {@link | ||
* BatchPhysicalMatch}. | ||
*/ | ||
public class BatchPhysicalMatchRule extends CommonPhysicalMatchRule { | ||
|
||
public static final RelOptRule INSTANCE = new BatchPhysicalMatchRule(); | ||
|
||
private BatchPhysicalMatchRule() { | ||
super( | ||
FlinkLogicalMatch.class, | ||
FlinkConventions.LOGICAL(), | ||
FlinkConventions.BATCH_PHYSICAL(), | ||
"BatchPhysicalMatchRule"); | ||
} | ||
|
||
@Override | ||
public RelNode convert(RelNode rel) { | ||
return super.convert(rel, FlinkConventions.BATCH_PHYSICAL()); | ||
} | ||
|
||
@Override | ||
protected RelNode convertToPhysicalMatch( | ||
RelOptCluster cluster, | ||
RelTraitSet traitSet, | ||
RelNode convertInput, | ||
MatchRecognize matchRecognize, | ||
RelDataType rowType) { | ||
return new BatchPhysicalMatch(cluster, traitSet, convertInput, matchRecognize, rowType); | ||
} | ||
} |
Oops, something went wrong.