Skip to content

Commit

Permalink
[Feature][Transform-V2 Filter] support exclude columns in the filter …
Browse files Browse the repository at this point in the history
…transform (apache#6960)
  • Loading branch information
litiliu authored Jun 13, 2024
1 parent c13a563 commit 4ba6637
Show file tree
Hide file tree
Showing 8 changed files with 355 additions and 35 deletions.
35 changes: 28 additions & 7 deletions docs/en/transform-v2/filter.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,20 @@ Filter the field.

## Options

| name | type | required | default value |
|--------|-------|----------|---------------|
| fields | array | yes | |
| name | type | required | default value |
|----------------|-------|----------|---------------|
| include_fields | array | no | |
| exclude_fields | array | no | |

### fields [array]
Notice, you must set one and only one of `include_fields` and `exclude_fields` properties

The list of fields that need to be kept. Fields not in the list will be deleted
### include_fields [array]

The list of fields that need to be kept. Fields not in the list will be deleted.

### exclude_fields [array]

The list of fields that need to be deleted. Fields not in the list will be kept.

### common options [string]

Expand All @@ -31,18 +38,32 @@ The data read from source is a table like this:
| Kin Dom | 20 | 123 |
| Joy Dom | 20 | 123 |

We want to delete field `age`, we can add `Filter` Transform like this
we want to keep the field named `name`, `card`, we can add a `Filter` Transform like below:

```
transform {
Filter {
source_table_name = "fake"
result_table_name = "fake1"
include_fields = [name, card]
}
}
```

Or we can delete the field named `age` by adding a `Filter` Transform with `exclude_fields` field set like below:

```
transform {
Filter {
source_table_name = "fake"
result_table_name = "fake1"
fields = [name, card]
exclude_fields = [age]
}
}
```

It is useful when you want to delete a small number of fields from a large table with tons of fields.

Then the data in result table `fake1` will like this

| name | card |
Expand Down
31 changes: 25 additions & 6 deletions docs/zh/transform-v2/filter.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,21 @@

## 属性

| 名称 | 类型 | 是否必须 | 默认值 |
|--------|-------|------|-----|
| fields | array | yes | |
| 名称 | 类型 | 是否必须 | 默认值 |
|----------------|-------|------|-----|
| include_fields | array | no | |
| exclude_fields | array | no | |

### fields [array]
### include_fields [array]

需要保留的字段列表。不在列表中的字段将被删除。

### exclude_fields [array]

需要删除的字段列表。不在列表中的字段将被保留。

注意,`include_fields``exclude_fields` 两个属性中,必须设置一个且只能设置一个

### common options [string]

转换插件的常见参数, 请参考 [Transform Plugin](common-options.md) 了解详情
Expand All @@ -31,14 +38,26 @@
| Kin Dom | 20 | 123 |
| Joy Dom | 20 | 123 |

我们想要删除字段 `age`,我们可以像这样添加 `Filter` 转换
我们想要保留字段 `name`, `card`,我们可以像这样添加 `Filter` 转换:

```
transform {
Filter {
source_table_name = "fake"
result_table_name = "fake1"
include_fields = [name, card]
}
}
```

我们也可以通过删除字段 `age` 来实现, 我们可以添加一个 `Filter` 转换,并设置exclude_fields:

```
transform {
Filter {
source_table_name = "fake"
result_table_name = "fake1"
fields = [name, card]
exclude_fields = [age]
}
}
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;

enum TransformCommonErrorCode implements SeaTunnelErrorCode {
public enum TransformCommonErrorCode implements SeaTunnelErrorCode {
INPUT_FIELD_NOT_FOUND(
"TRANSFORM_COMMON-01",
"The input field '<field>' of '<transform>' transform not found in upstream schema"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.seatunnel.transform.filter;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.ConfigValidator;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.ConstraintKey;
Expand All @@ -36,21 +38,40 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;

@Slf4j
public class FilterFieldTransform extends AbstractCatalogSupportTransform {
public static final String PLUGIN_NAME = "Filter";
private int[] inputValueIndex;
private final List<String> fields;

private int[] inputValueIndexList;

private final List<String> includeFields;
private final List<String> excludeFields;

public FilterFieldTransform(
@NonNull ReadonlyConfig config, @NonNull CatalogTable catalogTable) {
super(catalogTable);
SeaTunnelRowType seaTunnelRowType = catalogTable.getTableSchema().toPhysicalRowDataType();
fields = config.get(FilterFieldTransformConfig.KEY_FIELDS);
includeFields = config.get(FilterFieldTransformConfig.INCLUDE_FIELDS);
excludeFields = config.get(FilterFieldTransformConfig.EXCLUDE_FIELDS);
// exactly only one should be set
ConfigValidator.of(config)
.validate(
OptionRule.builder()
.exclusive(
FilterFieldTransformConfig.INCLUDE_FIELDS,
FilterFieldTransformConfig.EXCLUDE_FIELDS)
.build());
List<String> canNotFoundFields =
fields.stream()
Stream.concat(
Optional.ofNullable(includeFields).orElse(new ArrayList<>())
.stream(),
Optional.ofNullable(excludeFields).orElse(new ArrayList<>())
.stream())
.filter(field -> seaTunnelRowType.indexOf(field, false) == -1)
.collect(Collectors.toList());

Expand All @@ -67,10 +88,9 @@ public String getPluginName() {

@Override
protected SeaTunnelRow transformRow(SeaTunnelRow inputRow) {
// todo reuse array container if not remove fields
Object[] values = new Object[fields.size()];
for (int i = 0; i < fields.size(); i++) {
values[i] = inputRow.getField(inputValueIndex[i]);
Object[] values = new Object[inputValueIndexList.length];
for (int i = 0; i < inputValueIndexList.length; i++) {
values[i] = inputRow.getField(inputValueIndexList[i]);
}
SeaTunnelRow outputRow = new SeaTunnelRow(values);
outputRow.setRowKind(inputRow.getRowKind());
Expand All @@ -85,15 +105,34 @@ protected TableSchema transformTableSchema() {
SeaTunnelRowType seaTunnelRowType =
inputCatalogTable.getTableSchema().toPhysicalRowDataType();

inputValueIndex = new int[fields.size()];
ArrayList<String> outputFieldNames = new ArrayList<>();
List<Column> inputColumns = inputCatalogTable.getTableSchema().getColumns();
for (int i = 0; i < fields.size(); i++) {
String field = fields.get(i);
int inputFieldIndex = seaTunnelRowType.indexOf(field);
inputValueIndex[i] = inputFieldIndex;
outputColumns.add(inputColumns.get(inputFieldIndex).copy());
outputFieldNames.add(inputColumns.get(inputFieldIndex).getName());
// include
if (Objects.nonNull(includeFields)) {
inputValueIndexList = new int[includeFields.size()];
for (int i = 0; i < includeFields.size(); i++) {
String fieldName = includeFields.get(i);
int inputFieldIndex = seaTunnelRowType.indexOf(fieldName);
inputValueIndexList[i] = inputFieldIndex;
outputColumns.add(inputColumns.get(inputFieldIndex).copy());
outputFieldNames.add(inputColumns.get(inputFieldIndex).getName());
}
}

// exclude
if (Objects.nonNull(excludeFields)) {
inputValueIndexList = new int[inputColumns.size() - excludeFields.size()];
int index = 0;
for (int i = 0; i < inputColumns.size(); i++) {
// if the field is not in the fields, then add it to the outputColumns
if (!excludeFields.contains(inputColumns.get(i).getName())) {
String fieldName = inputColumns.get(i).getName();
int inputFieldIndex = seaTunnelRowType.indexOf(fieldName);
inputValueIndexList[index++] = inputFieldIndex;
outputColumns.add(inputColumns.get(i).copy());
outputFieldNames.add(inputColumns.get(i).getName());
}
}
}

List<ConstraintKey> outputConstraintKeys =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,16 @@
@Getter
@Setter
public class FilterFieldTransformConfig implements Serializable {
public static final Option<List<String>> KEY_FIELDS =
Options.key("fields")
public static final Option<List<String>> INCLUDE_FIELDS =
Options.key("include_fields")
.listType()
.noDefaultValue()
.withDescription(
"The list of fields that need to be kept. Fields not in the list will be deleted");
.withDescription("The list of fields that need to be kept.")
.withFallbackKeys("fields");

public static final Option<List<String>> EXCLUDE_FIELDS =
Options.key("exclude_fields")
.listType()
.noDefaultValue()
.withDescription("The list of fields that need to be deleted");
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@ public String factoryIdentifier() {

@Override
public OptionRule optionRule() {
return OptionRule.builder().required(FilterFieldTransformConfig.KEY_FIELDS).build();
return OptionRule.builder()
.exclusive(
FilterFieldTransformConfig.INCLUDE_FIELDS,
FilterFieldTransformConfig.EXCLUDE_FIELDS)
.build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ void testFilterTransformWithError() {
new HashMap<String, Object>() {
{
put(
FilterFieldTransformConfig.KEY_FIELDS.key(),
FilterFieldTransformConfig.INCLUDE_FIELDS.key(),
new ArrayList<String>() {
{
add("age");
Expand Down
Loading

0 comments on commit 4ba6637

Please sign in to comment.