Skip to content

Commit

Permalink
fix bug for 管道创建流程中更改端类型会出错 #312
Browse files Browse the repository at this point in the history
  • Loading branch information
baisui1981 committed Apr 19, 2024
1 parent 8c598a4 commit f51d0cd
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 113 deletions.
2 changes: 1 addition & 1 deletion tis-console.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ spec:
mountPath: /opt/app/tis-uber/tis-assemble/conf/tis-web-config/
- name: tis-console-pvc
mountPath: "/opt/data"
image: registry.cn-hangzhou.aliyuncs.com/tis/tis-console:4.0.0.35
image: registry.cn-hangzhou.aliyuncs.com/tis/tis-console:4.0.0.36
# command: [ "/bin/sh", "-c", "sleep 1000000" ]
ports:
- name: tis-8080
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.alibaba.fastjson.JSONObject;
import com.qlangtech.tis.TIS;
import com.qlangtech.tis.datax.IDataxProcessor;
import com.qlangtech.tis.datax.IDataxReader;
import com.qlangtech.tis.datax.impl.DataXBasicProcessMeta;
import com.qlangtech.tis.datax.impl.DataxProcessor;
import com.qlangtech.tis.datax.impl.DataxReader;
Expand All @@ -44,131 +45,136 @@
**/
public enum ProcessModel {

CreateWorkFlow("createWorkFlow" //
, (pluginContext, dataxPipeName, reader, writer) -> {
DataxWriter.BaseDataxWriterDescriptor writerDesc
= (DataxWriter.BaseDataxWriterDescriptor) TIS.get().getDescriptor(writer.getString("impl"));
return getDataXBasicProcessMeta(Optional.empty(), writerDesc);
}
, () -> {
return DataxProcessor.getPluginDescMeta(DataxProcessor.DEFAULT_WORKFLOW_PROCESSOR_NAME);
}, StoreResourceType.DataFlow
) //
, CreateDatax("createDatax", (pluginContext, dataxPipeName, reader, writer) -> {
DataxReader.BaseDataxReaderDescriptor readerDesc
= (DataxReader.BaseDataxReaderDescriptor) TIS.get().getDescriptor(reader.getString("impl"));
DataxWriter.BaseDataxWriterDescriptor writerDesc
= (DataxWriter.BaseDataxWriterDescriptor) TIS.get().getDescriptor(writer.getString("impl"));
DataXBasicProcessMeta processMeta = getDataXBasicProcessMeta(Optional.of(readerDesc), writerDesc);
FileUtils.write(IDataxProcessor.getWriterDescFile(
pluginContext, dataxPipeName), writerDesc.getId(), TisUTF8.get(), false);
return processMeta;
} //
, () -> {
return DataxProcessor.getPluginDescMeta(DataxProcessor.DEFAULT_DATAX_PROCESSOR_NAME);
}, StoreResourceType.DataApp);

private final String val;
public final StoreResourceType resType;
// private final ValdateReaderAndWriter valdateReaderAndWriter;
private final IProcessMetaCreator processMetaCreator;
private final Supplier<Descriptor<IAppSource>> targetProcessDescsGetter;

public static DataXBasicProcessMeta getDataXBasicProcessMeta(
Optional<DataxReader.BaseDataxReaderDescriptor> readerDesc, DataxWriter.BaseDataxWriterDescriptor writerDesc) {
Objects.requireNonNull(readerDesc, "readerDesc can not be null");
Objects.requireNonNull(writerDesc, "writerDesc can not be null");
DataXBasicProcessMeta processMeta = DataXBasicProcessMeta.getDataXBasicProcessMetaByReader(readerDesc);
processMeta.setWriterRDBMS(writerDesc.isRdbms());
processMeta.setWriterSupportMultiTableInReader(writerDesc.isSupportMultiTable());

return processMeta;
}

public static ProcessModel parse(String val) {
if (StringUtils.isEmpty(val)) {
//throw new IllegalArgumentException(" pram val can not be null");
return CreateDatax;
CreateWorkFlow("createWorkFlow" //
, (pluginContext, dataxPipeName, reader, writer) -> {
DataxWriter.BaseDataxWriterDescriptor writerDesc
= (DataxWriter.BaseDataxWriterDescriptor) TIS.get().getDescriptor(writer.getString("impl"));
return getDataXBasicProcessMeta(Optional.empty(), writerDesc);
}
, () -> {
return DataxProcessor.getPluginDescMeta(DataxProcessor.DEFAULT_WORKFLOW_PROCESSOR_NAME);
}, StoreResourceType.DataFlow
) //
, CreateDatax("createDatax", (pluginContext, dataxPipeName, reader, writer) -> {
DataxReader.BaseDataxReaderDescriptor readerDesc
= (DataxReader.BaseDataxReaderDescriptor) TIS.get().getDescriptor(reader.getString("impl"));
DataxWriter.BaseDataxWriterDescriptor writerDesc
= (DataxWriter.BaseDataxWriterDescriptor) TIS.get().getDescriptor(writer.getString("impl"));
DataXBasicProcessMeta processMeta = getDataXBasicProcessMeta(Optional.of(readerDesc), writerDesc);
FileUtils.write(IDataxProcessor.getWriterDescFile(
pluginContext, dataxPipeName), writerDesc.getId(), TisUTF8.get(), false);

// IDataxProcessor processor
// = DataxProcessor.load(null, StoreResourceType.DataApp, dataxPipeName);
// IDataxReader reader1 = processor.getReader(null);

return processMeta;
} //
, () -> {
return DataxProcessor.getPluginDescMeta(DataxProcessor.DEFAULT_DATAX_PROCESSOR_NAME);
}, StoreResourceType.DataApp);

private final String val;
public final StoreResourceType resType;
// private final ValdateReaderAndWriter valdateReaderAndWriter;
private final IProcessMetaCreator processMetaCreator;
private final Supplier<Descriptor<IAppSource>> targetProcessDescsGetter;

public static DataXBasicProcessMeta getDataXBasicProcessMeta(
Optional<DataxReader.BaseDataxReaderDescriptor> readerDesc, DataxWriter.BaseDataxWriterDescriptor writerDesc) {
Objects.requireNonNull(readerDesc, "readerDesc can not be null");
Objects.requireNonNull(writerDesc, "writerDesc can not be null");
DataXBasicProcessMeta processMeta = DataXBasicProcessMeta.getDataXBasicProcessMetaByReader(readerDesc);
processMeta.setWriterRDBMS(writerDesc.isRdbms());
processMeta.setWriterSupportMultiTableInReader(writerDesc.isSupportMultiTable());

return processMeta;
}

public static ProcessModel parse(String val) {
if (StringUtils.isEmpty(val)) {
//throw new IllegalArgumentException(" pram val can not be null");
return CreateDatax;
}

for (ProcessModel m : ProcessModel.values()) {
if (m.val.equals(val)) {
return m;
}
}
return CreateDatax;
// throw new IllegalStateException("illegal val:" + val);
}

for (ProcessModel m : ProcessModel.values()) {
if (m.val.equals(val)) {
return m;
}
/**
* @param val
* @param processMetaCreator
* @param targetProcessDescsGetter
* @param resType
*/
private ProcessModel(String val
, IProcessMetaCreator processMetaCreator
, Supplier<Descriptor<IAppSource>> targetProcessDescsGetter, StoreResourceType resType) {
this.val = val;
// this.valdateReaderAndWriter = valdateReaderAndWriter;
this.processMetaCreator = processMetaCreator;
this.targetProcessDescsGetter = targetProcessDescsGetter;
this.resType = resType;
}
return CreateDatax;
// throw new IllegalStateException("illegal val:" + val);
}

/**
* @param val
* @param processMetaCreator
* @param targetProcessDescsGetter
* @param resType
*/
private ProcessModel(String val
, IProcessMetaCreator processMetaCreator
, Supplier<Descriptor<IAppSource>> targetProcessDescsGetter, StoreResourceType resType) {
this.val = val;
// this.valdateReaderAndWriter = valdateReaderAndWriter;
this.processMetaCreator = processMetaCreator;
this.targetProcessDescsGetter = targetProcessDescsGetter;
this.resType = resType;
}

// public boolean valdateReaderAndWriter(JSONObject reader, JSONObject writer, BasicModule module, Context context) {
// return this.valdateReaderAndWriter.valdateReaderAndWriter(reader, writer, module, context);
// }

public DataXBasicProcessMeta createProcessMeta(IPluginContext pluginContext
, String dataXName, JSONObject reader, JSONObject writer) throws Exception {
return this.processMetaCreator.createProcessMeta(pluginContext, dataXName, reader, writer);
}
public DataXBasicProcessMeta createProcessMeta(IPluginContext pluginContext
, String dataXName, JSONObject reader, JSONObject writer) throws Exception {
return this.processMetaCreator.createProcessMeta(pluginContext, dataXName, reader, writer);
}

public Descriptor<IAppSource> getPluginDescMeta() {
return this.targetProcessDescsGetter.get();
}
public Descriptor<IAppSource> getPluginDescMeta() {
return this.targetProcessDescsGetter.get();
}

public DataxWriter loadWriter(IPluginContext pluginContext, JSONObject writerDesc, String name) {
IDataxProcessor processor = (IDataxProcessor) loadDataXProcessor(pluginContext, name);
public DataxWriter loadWriter(IPluginContext pluginContext, JSONObject writerDesc, String name) {
IDataxProcessor processor = (IDataxProcessor) loadDataXProcessor(pluginContext, name);
// if (this == CreateDatax) {
// DataxReader.load(pluginContext, name);
// }
DataxWriter writer = (DataxWriter) processor.getWriter(pluginContext, false);
final String requestDescId = writerDesc.getString("impl");
if (this == CreateDatax && writer != null && StringUtils.equals(writer.getDescriptor().getId(), requestDescId)) {
DataxReader readerPlugin = DataxReader.load(pluginContext, name);
DataxWriter.BaseDataxWriterDescriptor writerDescriptor = (DataxWriter.BaseDataxWriterDescriptor) writer.getDescriptor();
if (!writerDescriptor.isSupportMultiTable() && readerPlugin.getSelectedTabs().size() > 1) {
// 这种情况是不允许的,例如:elastic这样的writer中对于column的设置比较复杂,需要在writer plugin页面中完成,所以就不能支持在reader中选择多个表了
throw new IllegalStateException("status is not allowed:!writerDescriptor.isSupportMultiTable() && readerPlugin.hasMulitTable()");
}
return writer;
// pluginInfo.put("item", (new DescribableJSON(writer)).getItemJson());
DataxWriter writer = (DataxWriter) processor.getWriter(pluginContext, false);
final String requestDescId = writerDesc.getString("impl");
if (this == CreateDatax && writer != null && StringUtils.equals(writer.getDescriptor().getId(), requestDescId)) {
DataxReader readerPlugin = DataxReader.load(pluginContext, name);
DataxWriter.BaseDataxWriterDescriptor writerDescriptor = (DataxWriter.BaseDataxWriterDescriptor) writer.getDescriptor();
if (!writerDescriptor.isSupportMultiTable() && readerPlugin.getSelectedTabs().size() > 1) {
// 这种情况是不允许的,例如:elastic这样的writer中对于column的设置比较复杂,需要在writer plugin页面中完成,所以就不能支持在reader中选择多个表了
throw new IllegalStateException("status is not allowed:!writerDescriptor.isSupportMultiTable() && readerPlugin.hasMulitTable()");
}
return writer;
// pluginInfo.put("item", (new DescribableJSON(writer)).getItemJson());
}

return writer;
}

return writer;
}

/**
* 在页面UI上编辑流程显示的DataXReader控件,WorkFlow流程中是通过 dataSource 对应了多个DataXReader,在编辑流程中不直接编辑,所以返回为空
*
* @param pluginContext
* @param name
* @return
*/
public Optional<DataxReader> getDataXReader(IPluginContext pluginContext, String name) {
IDataxProcessor processor = (IDataxProcessor) this.loadDataXProcessor(pluginContext, name);
if (this == CreateDatax) {
return Optional.of((DataxReader) processor.getReader(pluginContext));
} else if (this == CreateWorkFlow) {
return Optional.empty();
} else {
throw new IllegalStateException("illegal process model:" + this);
/**
* 在页面UI上编辑流程显示的DataXReader控件,WorkFlow流程中是通过 dataSource 对应了多个DataXReader,在编辑流程中不直接编辑,所以返回为空
*
* @param pluginContext
* @param name
* @return
*/
public Optional<DataxReader> getDataXReader(IPluginContext pluginContext, String name) {
IDataxProcessor processor = (IDataxProcessor) this.loadDataXProcessor(pluginContext, name);
if (this == CreateDatax) {
return Optional.of((DataxReader) processor.getReader(pluginContext));
} else if (this == CreateWorkFlow) {
return Optional.empty();
} else {
throw new IllegalStateException("illegal process model:" + this);
}
}
}

public IAppSource loadDataXProcessor(IPluginContext pluginContext, String name) {
public IAppSource loadDataXProcessor(IPluginContext pluginContext, String name) {
// KeyedPluginStore.StoreResourceType resType = null;
// if (this == CreateDatax) {
// resType = KeyedPluginStore.StoreResourceType.DataApp;
Expand All @@ -177,7 +183,7 @@ public IAppSource loadDataXProcessor(IPluginContext pluginContext, String name)
// } else {
// throw new IllegalStateException("illega type:" + this);
// }
// DataxProcessor.
return (IAppSource) DataxProcessor.load(pluginContext, this.resType, name);
}
// DataxProcessor.
return (IAppSource) DataxProcessor.load(pluginContext, this.resType, name);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,17 @@ public Collection<IdentityName> getSubFormPropVal(Object instance) {

try {
Object o = subFormField.get(instance);
return (o == null) ? Collections.emptyList() : (Collection<IdentityName>) o;
Collection<IdentityName> subItems = (o == null) ? Collections.emptyList() : (Collection<IdentityName>) o;
// 在pipeline创建阶段,当用户先选择 mysql-> doris 类型的同步,选择完表,然后又回退到端类型选择页面,重新选择了 mysql-> mysql,再进入下一步选择页面节点机会出错
boolean containNotEqualClassForItem = false;
for (IdentityName itme : subItems) {
if (itme.getClass() != this.instClazz) {
containNotEqualClassForItem = true;
}
}
return containNotEqualClassForItem
? subItems.stream().filter((subitem) -> subitem.getClass() == instClazz).collect(Collectors.toList())
: subItems;
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
Expand Down

0 comments on commit f51d0cd

Please sign in to comment.