From e8f297696b6ed81c0ec3f43b1cc1c5f55b50c307 Mon Sep 17 00:00:00 2001 From: mozhenghua Date: Wed, 17 Apr 2024 15:42:29 +0800 Subject: [PATCH] pluginStore can be null,shall be set by method setPluginStore ahead --- .../tis/manage/impl/DataFlowAppSource.java | 7 ++++++- .../src/main/java/com/qlangtech/tis/TIS.java | 7 ++++++- .../module/action/ProcessModel.java | 2 +- .../tis/datax/impl/DataxProcessor.java | 10 ++++++++++ .../qlangtech/tis/datax/impl/DataxReader.java | 2 +- .../com/qlangtech/tis/manage/IAppSource.java | 7 +++++++ .../tis/plugin/KeyedPluginStore.java | 11 +++++++++- .../tis/plugin/PluginAndCfgsSnapshot.java | 20 ++++++++++++++++--- .../com/qlangtech/tis/plugin/PluginStore.java | 10 +++++----- .../tis/util/RobustReflectionConverter2.java | 7 +++++++ 10 files changed, 70 insertions(+), 13 deletions(-) diff --git a/tis-dag/src/main/java/com/qlangtech/tis/manage/impl/DataFlowAppSource.java b/tis-dag/src/main/java/com/qlangtech/tis/manage/impl/DataFlowAppSource.java index 47d63a6e7..f1ce811c6 100644 --- a/tis-dag/src/main/java/com/qlangtech/tis/manage/impl/DataFlowAppSource.java +++ b/tis-dag/src/main/java/com/qlangtech/tis/manage/impl/DataFlowAppSource.java @@ -34,10 +34,12 @@ import com.qlangtech.tis.fullbuild.phasestatus.impl.DumpPhaseStatus; import com.qlangtech.tis.fullbuild.phasestatus.impl.JoinPhaseStatus; import com.qlangtech.tis.fullbuild.taskflow.*; +import com.qlangtech.tis.manage.IAppSource; import com.qlangtech.tis.manage.IDataFlowAppSource; import com.qlangtech.tis.manage.ISolrAppSource; import com.qlangtech.tis.manage.common.Config; import com.qlangtech.tis.manage.common.DagTaskUtils; +import com.qlangtech.tis.plugin.PluginStore; import com.qlangtech.tis.plugin.StoreResourceType; import com.qlangtech.tis.plugin.ds.ColumnMetaData; import com.qlangtech.tis.plugin.ds.IDataSourceFactoryGetter; @@ -80,8 +82,11 @@ public class DataFlowAppSource implements ISolrAppSource, IDataFlowAppSource { private final IFlatTableBuilder flatTableBuilder; private final IDataSourceFactoryGetter dsGetter; + @Override + public void setPluginStore(PluginStore pluginStore) { - // protected static final ExecutorService executorService = Executors.newCachedThreadPool(); + } +// protected static final ExecutorService executorService = Executors.newCachedThreadPool(); // protected static final ExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); diff --git a/tis-plugin/src/main/java/com/qlangtech/tis/TIS.java b/tis-plugin/src/main/java/com/qlangtech/tis/TIS.java index 030959df7..00a13d8be 100644 --- a/tis-plugin/src/main/java/com/qlangtech/tis/TIS.java +++ b/tis-plugin/src/main/java/com/qlangtech/tis/TIS.java @@ -142,7 +142,12 @@ public KeyedPluginStore compute(KeyedPluginStore.Key key) { = new Memoizer>() { @Override public KeyedPluginStore compute(KeyedPluginStore.AppKey key) { - return new KeyedPluginStore(key); + return new KeyedPluginStore(key, new PluginStore.IPluginProcessCallback() { + @Override + public void afterDeserialize(PluginStore pluginStore, IAppSource appSource) { + appSource.setPluginStore(pluginStore); + } + }); } }; diff --git a/tis-plugin/src/main/java/com/qlangtech/tis/coredefine/module/action/ProcessModel.java b/tis-plugin/src/main/java/com/qlangtech/tis/coredefine/module/action/ProcessModel.java index 74f793347..e9860ffcb 100644 --- a/tis-plugin/src/main/java/com/qlangtech/tis/coredefine/module/action/ProcessModel.java +++ b/tis-plugin/src/main/java/com/qlangtech/tis/coredefine/module/action/ProcessModel.java @@ -177,7 +177,7 @@ public IAppSource loadDataXProcessor(IPluginContext pluginContext, String name) // } else { // throw new IllegalStateException("illega type:" + this); // } - + // DataxProcessor. return (IAppSource) DataxProcessor.load(pluginContext, this.resType, name); } } diff --git a/tis-plugin/src/main/java/com/qlangtech/tis/datax/impl/DataxProcessor.java b/tis-plugin/src/main/java/com/qlangtech/tis/datax/impl/DataxProcessor.java index e6d3def6f..6dfa24c3e 100644 --- a/tis-plugin/src/main/java/com/qlangtech/tis/datax/impl/DataxProcessor.java +++ b/tis-plugin/src/main/java/com/qlangtech/tis/datax/impl/DataxProcessor.java @@ -37,6 +37,7 @@ import com.qlangtech.tis.manage.common.TisUTF8; import com.qlangtech.tis.plugin.IdentityName; import com.qlangtech.tis.plugin.KeyedPluginStore; +import com.qlangtech.tis.plugin.PluginStore; import com.qlangtech.tis.plugin.StoreResourceType; import com.qlangtech.tis.plugin.ds.ISelectedTab; import com.qlangtech.tis.sql.parser.tuple.creator.IStreamIncrGenerateStrategy; @@ -73,11 +74,17 @@ public abstract class DataxProcessor implements IBasicAppSource, IDataxProcessor public static final String DATAX_CREATE_DDL_DIR_NAME = "createDDL"; private List tableMaps; + private transient PluginStore pluginStore; public interface IDataxProcessorGetter { DataxProcessor get(String dataXName); } + @Override + public void setPluginStore(PluginStore pluginStore) { + this.pluginStore = pluginStore; + } + // for TEST public static IDataxProcessorGetter processorGetter; @@ -232,6 +239,9 @@ public void saveCreateTableDDL(IPluginContext pluginCtx , StringBuffer createDDL, String sqlFileName, boolean overWrite) throws IOException { File createDDLDir = this.getDataxCreateDDLDir(pluginCtx); saveCreateTableDDL(createDDL, createDDLDir, sqlFileName, overWrite); + // 主要更新一下最后更新时间,这样在执行powerjob任务可以顺利将更新后的ddl文件同步到powerjob的worker节点上去 + Objects.requireNonNull(this.pluginStore,"pluginStore can be null,shall be set by method setPluginStore ahead") + .writeLastModifyTimeStamp(); } public static void saveCreateTableDDL(StringBuffer createDDL, File createDDLDir, String sqlFileName, boolean overWrite) throws IOException { diff --git a/tis-plugin/src/main/java/com/qlangtech/tis/datax/impl/DataxReader.java b/tis-plugin/src/main/java/com/qlangtech/tis/datax/impl/DataxReader.java index 67e132769..7c2a38449 100644 --- a/tis-plugin/src/main/java/com/qlangtech/tis/datax/impl/DataxReader.java +++ b/tis-plugin/src/main/java/com/qlangtech/tis/datax/impl/DataxReader.java @@ -194,7 +194,7 @@ private static TIS.DataXReaderAppKey createDataXReaderKey(IPluginContext pluginC final TIS.DataXReaderAppKey key = new TIS.DataXReaderAppKey(pluginContext, db, appname, new PluginStore.IPluginProcessCallback() { @Override - public void afterDeserialize(final DataxReader reader) { + public void afterDeserialize(PluginStore ps,final DataxReader reader) { List subFieldFormPropertyTypes = reader.getDescriptor().getSubPluginFormPropertyTypes(); diff --git a/tis-plugin/src/main/java/com/qlangtech/tis/manage/IAppSource.java b/tis-plugin/src/main/java/com/qlangtech/tis/manage/IAppSource.java index e527694c7..ff04f259f 100644 --- a/tis-plugin/src/main/java/com/qlangtech/tis/manage/IAppSource.java +++ b/tis-plugin/src/main/java/com/qlangtech/tis/manage/IAppSource.java @@ -25,6 +25,7 @@ import com.qlangtech.tis.extension.Describable; import com.qlangtech.tis.extension.Descriptor; import com.qlangtech.tis.plugin.KeyedPluginStore; +import com.qlangtech.tis.plugin.PluginStore; import com.qlangtech.tis.plugin.StoreResourceType; import com.qlangtech.tis.plugin.StoreResourceTypeGetter; import com.qlangtech.tis.util.IPluginContext; @@ -113,5 +114,11 @@ static T load(IPluginContext pluginContext, String appNam default Descriptor getDescriptor() { return TIS.get().getDescriptor(this.getClass()); } + + /** + * DefaultDataXProcessor中需要调用PluginStore 的writeLastModifyTimeStamp(),在客户主动更新了create table DDL之后,所以需要事先将pluginStore实例注入 + * @param pluginStore + */ + void setPluginStore(PluginStore pluginStore); } diff --git a/tis-plugin/src/main/java/com/qlangtech/tis/plugin/KeyedPluginStore.java b/tis-plugin/src/main/java/com/qlangtech/tis/plugin/KeyedPluginStore.java index 98c3ca37b..0a755024b 100644 --- a/tis-plugin/src/main/java/com/qlangtech/tis/plugin/KeyedPluginStore.java +++ b/tis-plugin/src/main/java/com/qlangtech/tis/plugin/KeyedPluginStore.java @@ -163,11 +163,20 @@ public synchronized SetPluginsResult setPlugins(IPluginContext pluginContext, Op SetPluginsResult updateResult = super.setPlugins(pluginContext, context, dlist, update); if (updateResult.success && updateResult.cfgChanged) { // 本地写入时间戳文件,以备分布式文件同步之用 - updateResult.lastModifyTimeStamp = writeLastModifyTimeStamp(getLastModifyToken(this.key)); + updateResult.lastModifyTimeStamp = writeLastModifyTimeStamp(); } return updateResult; } + /** + * 可以让外部主动地更新一下lastmodify时间戳,例如,更新了ddl文件,这样可以第一时间同步到powerjob的运行环境中去 + * + * @return + */ + public long writeLastModifyTimeStamp() { + return writeLastModifyTimeStamp(getLastModifyToken(this.key)); + } + @Override public File getLastModifyTimeStampFile() { return new File(getSubPathDir(this.key), CenterResource.KEY_LAST_MODIFIED_EXTENDION); diff --git a/tis-plugin/src/main/java/com/qlangtech/tis/plugin/PluginAndCfgsSnapshot.java b/tis-plugin/src/main/java/com/qlangtech/tis/plugin/PluginAndCfgsSnapshot.java index e9732338c..d5ad50ed0 100644 --- a/tis-plugin/src/main/java/com/qlangtech/tis/plugin/PluginAndCfgsSnapshot.java +++ b/tis-plugin/src/main/java/com/qlangtech/tis/plugin/PluginAndCfgsSnapshot.java @@ -226,6 +226,14 @@ public static Manifest createDataBatchJobManifestCfgAttrs(TargetResName collecti return createDataBatchJobManifestCfgAttrs(processor); } + + public static Manifest createDataBatchJobManifestCfgAttrs(IDataxProcessor processor) throws Exception { + Map extraEnvProps = Collections.emptyMap(); + Optional> pluginMetasFilter = Optional.empty(); + return createDataBatchJobManifestCfgAttrs(processor, extraEnvProps, pluginMetasFilter); + + } + /** * 通过运行时遍历的方式取得 DataX 批量任务对应的Manifest * @@ -233,7 +241,8 @@ public static Manifest createDataBatchJobManifestCfgAttrs(TargetResName collecti * @return * @throws Exception */ - public static Manifest createDataBatchJobManifestCfgAttrs(IDataxProcessor processor) throws Exception { + public static Manifest createDataBatchJobManifestCfgAttrs(IDataxProcessor processor + , Map extraEnvProps, Optional> pluginMetasFilter) throws Exception { if (processor.getResType() != StoreResourceType.DataApp) { throw new IllegalArgumentException("resType must be " + StoreResourceType.DataApp + " but now is " + processor.getResType()); @@ -257,7 +266,7 @@ public static Manifest createDataBatchJobManifestCfgAttrs(IDataxProcessor proces return createManifestCfgAttrs(resName, System.currentTimeMillis() - , Collections.emptyMap(), () -> { + , extraEnvProps, () -> { KeyedPluginStore.PluginMetas metas = KeyedPluginStore.getAppAwarePluginMetas(processor.getResType(), resName.getName(), false); @@ -266,8 +275,13 @@ public static Manifest createDataBatchJobManifestCfgAttrs(IDataxProcessor proces Map globalPluginStoreLastModify = ComponentMeta.getGlobalPluginStoreLastModifyTimestamp(dataxComponentMeta); + PluginMetaSet collector = new PluginMetaSet(Optional.empty()); + for (PluginMeta meta : pluginMetas.getMetas()) { + collectAllPluginMeta(meta, collector); + } + return new PluginAndCfgsSnapshot(resName, globalPluginStoreLastModify - , pluginMetas // + , collector // , metas.lastModifyTimestamp, metas); }).getValue(); diff --git a/tis-plugin/src/main/java/com/qlangtech/tis/plugin/PluginStore.java b/tis-plugin/src/main/java/com/qlangtech/tis/plugin/PluginStore.java index 4ac3ad8b9..5f178769c 100644 --- a/tis-plugin/src/main/java/com/qlangtech/tis/plugin/PluginStore.java +++ b/tis-plugin/src/main/java/com/qlangtech/tis/plugin/PluginStore.java @@ -86,8 +86,8 @@ public PluginStore(Class pluginClass, XmlFile file, IPluginProcessCallback * * @param */ - public interface IPluginProcessCallback { - void afterDeserialize(T t); + public interface IPluginProcessCallback { + void afterDeserialize(PluginStore pluginStore, T t); } public void cleanPlugins() { @@ -277,7 +277,7 @@ public synchronized SetPluginsResult setPlugins(IPluginContext pluginContext, Op + this.pluginClass.getName() + ", but now is " + instance.getClass().getName()); } for (IPluginProcessCallback callback : pluginCreateCallback) { - callback.afterDeserialize(instance); + callback.afterDeserialize(this, instance); } } @@ -333,7 +333,7 @@ public synchronized SetPluginsResult setPlugins(IPluginContext pluginContext, Op } } - private long writeLastModifyTimeStamp() { + public long writeLastModifyTimeStamp() { File timestamp = getLastModifyTimeStampFile(this.file.getFile()); // String millisecTimeStamp = IParamContext.getCurrentMillisecTimeStamp(); // FileUtils.writeStringToFile(timestamp, millisecTimeStamp, TisUTF8.get()); @@ -418,7 +418,7 @@ private synchronized void load() { if (plugins != null) { plugins.forEach((p) -> { for (IPluginProcessCallback callback : this.pluginCreateCallback) { - callback.afterDeserialize(p); + callback.afterDeserialize(this, p); } }); } diff --git a/tis-plugin/src/main/java/com/qlangtech/tis/util/RobustReflectionConverter2.java b/tis-plugin/src/main/java/com/qlangtech/tis/util/RobustReflectionConverter2.java index 07624ecd7..ec9a1ae79 100644 --- a/tis-plugin/src/main/java/com/qlangtech/tis/util/RobustReflectionConverter2.java +++ b/tis-plugin/src/main/java/com/qlangtech/tis/util/RobustReflectionConverter2.java @@ -114,6 +114,13 @@ public void addAll(Collection metas, IRepositoryResource res) { this.repoRes.add(res); } + public void fillWithDependencies() { + List deps = Lists.newArrayList(); + for (PluginMeta meta : this.metas) { + meta.getMetaDependencies(); + } + } + private PluginMetas unCacheableFromPluginStore() { this.cacheable = false; return this;