Skip to content

Commit

Permalink
pluginStore can be null,shall be set by method setPluginStore ahead
Browse files Browse the repository at this point in the history
  • Loading branch information
baisui1981 committed Apr 17, 2024
1 parent 5b62837 commit e8f2976
Show file tree
Hide file tree
Showing 10 changed files with 70 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@
import com.qlangtech.tis.fullbuild.phasestatus.impl.DumpPhaseStatus;
import com.qlangtech.tis.fullbuild.phasestatus.impl.JoinPhaseStatus;
import com.qlangtech.tis.fullbuild.taskflow.*;
import com.qlangtech.tis.manage.IAppSource;
import com.qlangtech.tis.manage.IDataFlowAppSource;
import com.qlangtech.tis.manage.ISolrAppSource;
import com.qlangtech.tis.manage.common.Config;
import com.qlangtech.tis.manage.common.DagTaskUtils;
import com.qlangtech.tis.plugin.PluginStore;
import com.qlangtech.tis.plugin.StoreResourceType;
import com.qlangtech.tis.plugin.ds.ColumnMetaData;
import com.qlangtech.tis.plugin.ds.IDataSourceFactoryGetter;
Expand Down Expand Up @@ -80,8 +82,11 @@ public class DataFlowAppSource implements ISolrAppSource, IDataFlowAppSource {
private final IFlatTableBuilder flatTableBuilder;
private final IDataSourceFactoryGetter dsGetter;

@Override
public void setPluginStore(PluginStore<IAppSource> pluginStore) {

// protected static final ExecutorService executorService = Executors.newCachedThreadPool();
}
// protected static final ExecutorService executorService = Executors.newCachedThreadPool();

// protected static final ExecutorService executorService = Executors.newSingleThreadScheduledExecutor();

Expand Down
7 changes: 6 additions & 1 deletion tis-plugin/src/main/java/com/qlangtech/tis/TIS.java
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,12 @@ public KeyedPluginStore compute(KeyedPluginStore.Key key) {
= new Memoizer<KeyedPluginStore.AppKey, KeyedPluginStore<IAppSource>>() {
@Override
public KeyedPluginStore<IAppSource> compute(KeyedPluginStore.AppKey key) {
return new KeyedPluginStore(key);
return new KeyedPluginStore(key, new PluginStore.IPluginProcessCallback<IAppSource>() {
@Override
public void afterDeserialize(PluginStore<IAppSource> pluginStore, IAppSource appSource) {
appSource.setPluginStore(pluginStore);
}
});
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public IAppSource loadDataXProcessor(IPluginContext pluginContext, String name)
// } else {
// throw new IllegalStateException("illega type:" + this);
// }

// DataxProcessor.
return (IAppSource) DataxProcessor.load(pluginContext, this.resType, name);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.qlangtech.tis.manage.common.TisUTF8;
import com.qlangtech.tis.plugin.IdentityName;
import com.qlangtech.tis.plugin.KeyedPluginStore;
import com.qlangtech.tis.plugin.PluginStore;
import com.qlangtech.tis.plugin.StoreResourceType;
import com.qlangtech.tis.plugin.ds.ISelectedTab;
import com.qlangtech.tis.sql.parser.tuple.creator.IStreamIncrGenerateStrategy;
Expand Down Expand Up @@ -73,11 +74,17 @@ public abstract class DataxProcessor implements IBasicAppSource, IDataxProcessor
public static final String DATAX_CREATE_DDL_DIR_NAME = "createDDL";

private List<TableAlias> tableMaps;
private transient PluginStore<IAppSource> pluginStore;

public interface IDataxProcessorGetter {
DataxProcessor get(String dataXName);
}

@Override
public void setPluginStore(PluginStore<IAppSource> pluginStore) {
this.pluginStore = pluginStore;
}

// for TEST
public static IDataxProcessorGetter processorGetter;

Expand Down Expand Up @@ -232,6 +239,9 @@ public void saveCreateTableDDL(IPluginContext pluginCtx
, StringBuffer createDDL, String sqlFileName, boolean overWrite) throws IOException {
File createDDLDir = this.getDataxCreateDDLDir(pluginCtx);
saveCreateTableDDL(createDDL, createDDLDir, sqlFileName, overWrite);
// 主要更新一下最后更新时间,这样在执行powerjob任务可以顺利将更新后的ddl文件同步到powerjob的worker节点上去
Objects.requireNonNull(this.pluginStore,"pluginStore can be null,shall be set by method setPluginStore ahead")
.writeLastModifyTimeStamp();
}

public static void saveCreateTableDDL(StringBuffer createDDL, File createDDLDir, String sqlFileName, boolean overWrite) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ private static TIS.DataXReaderAppKey createDataXReaderKey(IPluginContext pluginC
final TIS.DataXReaderAppKey key = new TIS.DataXReaderAppKey(pluginContext, db, appname,
new PluginStore.IPluginProcessCallback<DataxReader>() {
@Override
public void afterDeserialize(final DataxReader reader) {
public void afterDeserialize(PluginStore<DataxReader> ps,final DataxReader reader) {

List<PluginFormProperties> subFieldFormPropertyTypes =
reader.getDescriptor().getSubPluginFormPropertyTypes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.qlangtech.tis.extension.Describable;
import com.qlangtech.tis.extension.Descriptor;
import com.qlangtech.tis.plugin.KeyedPluginStore;
import com.qlangtech.tis.plugin.PluginStore;
import com.qlangtech.tis.plugin.StoreResourceType;
import com.qlangtech.tis.plugin.StoreResourceTypeGetter;
import com.qlangtech.tis.util.IPluginContext;
Expand Down Expand Up @@ -113,5 +114,11 @@ static <T extends IAppSource> T load(IPluginContext pluginContext, String appNam
default Descriptor<IAppSource> getDescriptor() {
return TIS.get().getDescriptor(this.getClass());
}

/**
* DefaultDataXProcessor中需要调用PluginStore 的writeLastModifyTimeStamp(),在客户主动更新了create table DDL之后,所以需要事先将pluginStore实例注入
* @param pluginStore
*/
void setPluginStore(PluginStore<IAppSource> pluginStore);
}

Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,20 @@ public synchronized SetPluginsResult setPlugins(IPluginContext pluginContext, Op
SetPluginsResult updateResult = super.setPlugins(pluginContext, context, dlist, update);
if (updateResult.success && updateResult.cfgChanged) {
// 本地写入时间戳文件,以备分布式文件同步之用
updateResult.lastModifyTimeStamp = writeLastModifyTimeStamp(getLastModifyToken(this.key));
updateResult.lastModifyTimeStamp = writeLastModifyTimeStamp();
}
return updateResult;
}

/**
* 可以让外部主动地更新一下lastmodify时间戳,例如,更新了ddl文件,这样可以第一时间同步到powerjob的运行环境中去
*
* @return
*/
public long writeLastModifyTimeStamp() {
return writeLastModifyTimeStamp(getLastModifyToken(this.key));
}

@Override
public File getLastModifyTimeStampFile() {
return new File(getSubPathDir(this.key), CenterResource.KEY_LAST_MODIFIED_EXTENDION);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,14 +226,23 @@ public static Manifest createDataBatchJobManifestCfgAttrs(TargetResName collecti
return createDataBatchJobManifestCfgAttrs(processor);
}


public static Manifest createDataBatchJobManifestCfgAttrs(IDataxProcessor processor) throws Exception {
Map<String, String> extraEnvProps = Collections.emptyMap();
Optional<Predicate<PluginMeta>> pluginMetasFilter = Optional.empty();
return createDataBatchJobManifestCfgAttrs(processor, extraEnvProps, pluginMetasFilter);

}

/**
* 通过运行时遍历的方式取得 DataX 批量任务对应的Manifest
*
* @param processor
* @return
* @throws Exception
*/
public static Manifest createDataBatchJobManifestCfgAttrs(IDataxProcessor processor) throws Exception {
public static Manifest createDataBatchJobManifestCfgAttrs(IDataxProcessor processor
, Map<String, String> extraEnvProps, Optional<Predicate<PluginMeta>> pluginMetasFilter) throws Exception {

if (processor.getResType() != StoreResourceType.DataApp) {
throw new IllegalArgumentException("resType must be " + StoreResourceType.DataApp + " but now is " + processor.getResType());
Expand All @@ -257,7 +266,7 @@ public static Manifest createDataBatchJobManifestCfgAttrs(IDataxProcessor proces


return createManifestCfgAttrs(resName, System.currentTimeMillis()
, Collections.emptyMap(), () -> {
, extraEnvProps, () -> {

KeyedPluginStore.PluginMetas metas
= KeyedPluginStore.getAppAwarePluginMetas(processor.getResType(), resName.getName(), false);
Expand All @@ -266,8 +275,13 @@ public static Manifest createDataBatchJobManifestCfgAttrs(IDataxProcessor proces

Map<String, Long> globalPluginStoreLastModify = ComponentMeta.getGlobalPluginStoreLastModifyTimestamp(dataxComponentMeta);

PluginMetaSet collector = new PluginMetaSet(Optional.empty());
for (PluginMeta meta : pluginMetas.getMetas()) {
collectAllPluginMeta(meta, collector);
}

return new PluginAndCfgsSnapshot(resName, globalPluginStoreLastModify
, pluginMetas //
, collector //
, metas.lastModifyTimestamp, metas);

}).getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ public PluginStore(Class<T> pluginClass, XmlFile file, IPluginProcessCallback<T>
*
* @param <T>
*/
public interface IPluginProcessCallback<T> {
void afterDeserialize(T t);
public interface IPluginProcessCallback<T extends Describable> {
void afterDeserialize(PluginStore<T> pluginStore, T t);
}

public void cleanPlugins() {
Expand Down Expand Up @@ -277,7 +277,7 @@ public synchronized SetPluginsResult setPlugins(IPluginContext pluginContext, Op
+ this.pluginClass.getName() + ", but now is " + instance.getClass().getName());
}
for (IPluginProcessCallback<T> callback : pluginCreateCallback) {
callback.afterDeserialize(instance);
callback.afterDeserialize(this, instance);
}

}
Expand Down Expand Up @@ -333,7 +333,7 @@ public synchronized SetPluginsResult setPlugins(IPluginContext pluginContext, Op
}
}

private long writeLastModifyTimeStamp() {
public long writeLastModifyTimeStamp() {
File timestamp = getLastModifyTimeStampFile(this.file.getFile());
// String millisecTimeStamp = IParamContext.getCurrentMillisecTimeStamp();
// FileUtils.writeStringToFile(timestamp, millisecTimeStamp, TisUTF8.get());
Expand Down Expand Up @@ -418,7 +418,7 @@ private synchronized void load() {
if (plugins != null) {
plugins.forEach((p) -> {
for (IPluginProcessCallback<T> callback : this.pluginCreateCallback) {
callback.afterDeserialize(p);
callback.afterDeserialize(this, p);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,13 @@ public void addAll(Collection<PluginMeta> metas, IRepositoryResource res) {
this.repoRes.add(res);
}

public void fillWithDependencies() {
List<PluginMeta> deps = Lists.newArrayList();
for (PluginMeta meta : this.metas) {
meta.getMetaDependencies();
}
}

private PluginMetas unCacheableFromPluginStore() {
this.cacheable = false;
return this;
Expand Down

0 comments on commit e8f2976

Please sign in to comment.