Skip to content

Commit

Permalink
modify createDataBatchJobManifestCfgAttrs for fitler flink relevant p…
Browse files Browse the repository at this point in the history
…lugins and configs
  • Loading branch information
baisui1981 committed Apr 17, 2024
1 parent e401f09 commit 5b62837
Show file tree
Hide file tree
Showing 8 changed files with 208 additions and 79 deletions.
2 changes: 1 addition & 1 deletion tis-console.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ spec:
mountPath: /opt/app/tis-uber/tis-assemble/conf/tis-web-config/
- name: tis-console-pvc
mountPath: "/opt/data"
image: registry.cn-hangzhou.aliyuncs.com/tis/tis-console:4.0.0.29
image: registry.cn-hangzhou.aliyuncs.com/tis/tis-console:4.0.0.32
# command: [ "/bin/sh", "-c", "sleep 1000000" ]
ports:
- name: tis-8080
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ public class PowerjobTriggerBuildResult extends TriggerBuildResult {
* 用于Crontab任务传递的参数
*/
private String javaMemorySpec;
/**
* 前一次执行的taskId,初次执行时为空
*/
private Integer previousTaskId;

public PowerjobTriggerBuildResult() {
}
Expand All @@ -45,7 +49,15 @@ public PowerjobTriggerBuildResult(boolean success, JSONObject instanceParams) {
pluginCfgsMetas = Objects.requireNonNull(instanceParams, "instanceParams can not be null") //
.getString(PluginAndCfgsSnapshotUtils.KEY_PLUGIN_CFGS_METAS);
this.javaMemorySpec = instanceParams.getString(JobParams.KEY_JAVA_MEMORY_SPEC);
this.previousTaskId = instanceParams.getInteger(JobParams.KEY_PREVIOUS_TASK_ID);
}

public Integer getPreviousTaskId() {
return previousTaskId;
}

public void setPreviousTaskId(Integer previousTaskId) {
this.previousTaskId = previousTaskId;
}

public String getJavaMemorySpec() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,22 +108,40 @@ static DefaultExecContext deserializeInstanceParams(JSONObject instanceParams, b

if (resolveCfgsSnapshotConsumer) {

String pluginCfgsMetas = instanceParams.getString(PluginAndCfgsSnapshotUtils.KEY_PLUGIN_CFGS_METAS);
// String pluginCfgsMetas = instanceParams.getString(PluginAndCfgsSnapshotUtils.KEY_PLUGIN_CFGS_METAS);
//
// if (StringUtils.isEmpty(pluginCfgsMetas)) {
// throw new IllegalStateException("property:"
// + PluginAndCfgsSnapshotUtils.KEY_PLUGIN_CFGS_METAS + " of instanceParams can not be null");
// }
//
// final Base64 base64 = new Base64();
// try (InputStream manifestJar = new ByteArrayInputStream(base64.decode(pluginCfgsMetas))) {
// cfgsSnapshotConsumer.accept(PluginAndCfgsSnapshot.getRepositoryCfgsSnapshot(appName, manifestJar));
// } catch (Exception e) {
// throw new RuntimeException(e);
// }

cfgsSnapshotConsumer.accept(resolveCfgsSnapshotConsumer(instanceParams));
}

if (StringUtils.isEmpty(pluginCfgsMetas)) {
throw new IllegalStateException("property:"
+ PluginAndCfgsSnapshotUtils.KEY_PLUGIN_CFGS_METAS + " of instanceParams can not be null");
}
return execChainContext;
}

final Base64 base64 = new Base64();
try (InputStream manifestJar = new ByteArrayInputStream(base64.decode(pluginCfgsMetas))) {
cfgsSnapshotConsumer.accept(PluginAndCfgsSnapshot.getRepositoryCfgsSnapshot(appName, manifestJar));
} catch (Exception e) {
throw new RuntimeException(e);
}
static PluginAndCfgsSnapshot resolveCfgsSnapshotConsumer(JSONObject instanceParams) {
String pluginCfgsMetas = instanceParams.getString(PluginAndCfgsSnapshotUtils.KEY_PLUGIN_CFGS_METAS);
String appName = instanceParams.getString(JobParams.KEY_COLLECTION);
if (StringUtils.isEmpty(pluginCfgsMetas)) {
throw new IllegalStateException("property:"
+ PluginAndCfgsSnapshotUtils.KEY_PLUGIN_CFGS_METAS + " of instanceParams can not be null");
}

return execChainContext;
final Base64 base64 = new Base64();
try (InputStream manifestJar = new ByteArrayInputStream(base64.decode(pluginCfgsMetas))) {
return PluginAndCfgsSnapshot.getRepositoryCfgsSnapshot(appName, manifestJar);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

public void putTablePt(IDumpTable table, ITabPartition pt) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,31 +149,21 @@ static JSONObject createInstanceParams( //
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
// 将数据通道的依赖插件以及配置信息添加到instanceParams中
PluginAndCfgsSnapshotUtils.writeManifest2OutputStream(outputStream
, PluginAndCfgsSnapshot.createDataBatchJobManifestCfgAttrs((IDataxProcessor) processor, Optional.empty(), Collections.emptyMap()));
, PluginAndCfgsSnapshot.createDataBatchJobManifestCfgAttrs((IDataxProcessor) processor));
final Base64 base64 = new Base64();
return base64.encodeAsString(outputStream.toByteArray());
// instanceParams.put(PluginAndCfgsSnapshotUtils.KEY_PLUGIN_CFGS_METAS, base64.encodeAsString(outputStream.toByteArray()));
} catch (Exception e) {
throw new RuntimeException(e);
}
}));


// try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
// // 将数据通道的依赖插件以及配置信息添加到instanceParams中
// PluginAndCfgsSnapshotUtils.writeManifest2OutputStream(outputStream
// , PluginAndCfgsSnapshot.createDataBatchJobManifestCfgAttrs(processor, Optional.empty(), Collections.emptyMap()));
// final Base64 base64 = new Base64();
// instanceParams.put(PluginAndCfgsSnapshotUtils.KEY_PLUGIN_CFGS_METAS, base64.encodeAsString(outputStream.toByteArray()));
// } catch (Exception e) {
// throw new RuntimeException(e);
// }

return instanceParams;
}

public static DefaultExecContext deserializeInstanceParams(JSONObject instanceParams, Consumer<DefaultExecContext> execChainContextConsumer, Consumer<PluginAndCfgsSnapshot> cfgsSnapshotConsumer) {
return DefaultExecContext.deserializeInstanceParams(instanceParams, true, execChainContextConsumer, cfgsSnapshotConsumer);
public static DefaultExecContext deserializeInstanceParams(JSONObject instanceParams
, Consumer<DefaultExecContext> execChainContextConsumer, Consumer<PluginAndCfgsSnapshot> cfgsSnapshotConsumer) {
return DefaultExecContext.deserializeInstanceParams(instanceParams
, true, execChainContextConsumer, cfgsSnapshotConsumer);
}

public static DefaultExecContext deserializeInstanceParams(JSONObject instanceParams) {
Expand Down Expand Up @@ -212,8 +202,6 @@ public AsynSubJob(String jobName) {
}
}

// <T extends IBasicAppSource> T getAppSource();

ITISCoordinator getZkClient();


Expand All @@ -228,10 +216,6 @@ public AsynSubJob(String jobName) {

ITISFileSystem getIndexBuildFileSystem();

// TableDumpFactory getTableDumpFactory();
//
// IndexBuilderTriggerFactory getIndexBuilderFactory();

void rebindLoggingMDCParams();

class TriggerNewTaskParam {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,12 @@

import java.io.File;
import java.io.IOException;
import java.util.*;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -72,28 +77,33 @@ public static PluginMetas getAppAwarePluginMetas(boolean isDB, String name) {
return getAppAwarePluginMetas(StoreResourceType.parse(isDB), name);
}

public static PluginMetas getAppAwarePluginMetas(StoreResourceType resourceType, String name) {
return getAppAwarePluginMetas(resourceType, name, true);
}

/**
* 取得某个应用下面相关的插件元数据信息用于分布式任务同步用
*
* @return
*/
public static PluginMetas getAppAwarePluginMetas(StoreResourceType resourceType, String name) {
public static PluginMetas getAppAwarePluginMetas(StoreResourceType resourceType, String name, boolean resolveMeta) {
AppKey appKey = new AppKey(null, resourceType, name, (PluginClassCategory) null);
File appDir = getSubPathDir(appKey);
File lastModify = getLastModifyToken(appKey);// new File(appDir, CenterResource.KEY_LAST_MODIFIED_EXTENDION);
long lastModfiyTimeStamp = -1;
Set<PluginMeta> metas = Collections.emptySet();


try {
if (appDir.exists()) {
if (lastModify.exists()) {
lastModfiyTimeStamp = Long.parseLong(FileUtils.readFileToString(lastModify, TisUTF8.get()));
}
Iterator<File> files = FileUtils.iterateFiles(appDir, new String[]{DOMUtil.XML_RESERVED_PREFIX}, true);
metas = ComponentMeta.loadPluginMeta(() -> {
return Lists.newArrayList(files);
});
if (resolveMeta) {
Iterator<File> files = FileUtils.iterateFiles(appDir, new String[]{DOMUtil.XML_RESERVED_PREFIX}, true);
metas = ComponentMeta.loadPluginMeta(() -> {
return Lists.newArrayList(files);
});
}
}
return new PluginMetas(appDir, metas, lastModfiyTimeStamp);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.jar.Attributes;
import java.util.jar.JarInputStream;
import java.util.jar.Manifest;
Expand Down Expand Up @@ -195,7 +195,7 @@ public static PluginAndCfgsSnapshot setLocalPluginAndCfgsSnapshot(PluginAndCfgsS
public final Map<String, Long> globalPluginStoreLastModify;

public final Set<PluginMeta> pluginMetas;
public final Set<IRepositoryResource> repoRes;
private final Set<IRepositoryResource> repoRes;

/**
* 应用相关配置目录的最后更新时间
Expand All @@ -221,10 +221,9 @@ public static void createManifestCfgAttrs2File(File manifestJar, StoreResourceTy
createManifestCfgAttrs2File(manifestJar, resourceType, collection, timestamp, pluginMetasFilter, Collections.emptyMap());
}

public static Manifest createDataBatchJobManifestCfgAttrs(TargetResName collection
, Optional<Predicate<PluginMeta>> pluginMetasFilter, Map<String, String> extraEnvPropss) throws Exception {
public static Manifest createDataBatchJobManifestCfgAttrs(TargetResName collection) throws Exception {
IDataxProcessor processor = DataxProcessor.load(null, StoreResourceType.DataApp, collection.getName());
return createDataBatchJobManifestCfgAttrs(processor, pluginMetasFilter, extraEnvPropss);
return createDataBatchJobManifestCfgAttrs(processor);
}

/**
Expand All @@ -234,19 +233,44 @@ public static Manifest createDataBatchJobManifestCfgAttrs(TargetResName collecti
* @return
* @throws Exception
*/
public static Manifest createDataBatchJobManifestCfgAttrs(IDataxProcessor processor,
Optional<Predicate<PluginMeta>> pluginMetasFilter, Map<String, String> extraEnvPropss) throws Exception {
Objects.requireNonNull(pluginMetasFilter, "pluginMetasFilter can not be null");
Objects.requireNonNull(extraEnvPropss, "extraEnvPropss can not be null");
public static Manifest createDataBatchJobManifestCfgAttrs(IDataxProcessor processor) throws Exception {

if (processor.getResType() != StoreResourceType.DataApp) {
throw new IllegalArgumentException("resType must be " + StoreResourceType.DataApp + " but now is " + processor.getResType());
}

RobustReflectionConverter2.PluginMetas pluginMetas =
RobustReflectionConverter2.PluginMetas.collectMetas((metas) -> {
// 先收集plugmeta,特别是通过dataXWriter的dataSource关联的元数据

processor.getReaders(null).forEach((reader) -> reader.startScanDependency());
processor.getWriter(null).startScanDependency();
IDataxProcessor dataxProcessor = DataxProcessor.load(null, processor.getResType(), processor.identityValue());
dataxProcessor.getReaders(null).forEach((reader) -> {
// reader.getSelectedTabs().forEach((tab) -> tab.getCols());
reader.startScanDependency();
});
dataxProcessor.getWriter(null).startScanDependency();
});
return createManifestCfgAttrs(processor.getResType(), new TargetResName(processor.identityValue()), System.currentTimeMillis()
, extraEnvPropss, pluginMetasFilter, pluginMetas).getValue();
TargetResName resName = new TargetResName(processor.identityValue());
// return createManifestCfgAttrs(resName, System.currentTimeMillis(), Collections.emptyMap(), () -> {
// PluginAndCfgsSnapshot localSnapshot = getLocalPluginAndCfgsSnapshot(processor.getResType(), resName, Optional.empty(), pluginMetas);
// return localSnapshot;
// }).getRight();


return createManifestCfgAttrs(resName, System.currentTimeMillis()
, Collections.emptyMap(), () -> {

KeyedPluginStore.PluginMetas metas
= KeyedPluginStore.getAppAwarePluginMetas(processor.getResType(), resName.getName(), false);

ComponentMeta dataxComponentMeta = new ComponentMeta(Lists.newArrayList(pluginMetas.getRepoResources()));

Map<String, Long> globalPluginStoreLastModify = ComponentMeta.getGlobalPluginStoreLastModifyTimestamp(dataxComponentMeta);

return new PluginAndCfgsSnapshot(resName, globalPluginStoreLastModify
, pluginMetas //
, metas.lastModifyTimestamp, metas);

}).getValue();
}

/**
Expand Down Expand Up @@ -316,6 +340,56 @@ public static Manifest createManifestCfgAttrs(StoreResourceType resourceType, Ta
Map<String, String> extraEnvProps,
Optional<Predicate<PluginMeta>> pluginMetasFilter, IPluginMetasInfo appendPluginMeta) throws Exception {

return createManifestCfgAttrs(collection, timestamp, extraEnvProps, () -> {
PluginAndCfgsSnapshot localSnapshot = getLocalPluginAndCfgsSnapshot(resourceType, collection, pluginMetasFilter, appendPluginMeta);
return localSnapshot;
});

// //=====================================================================
// if (!CenterResource.notFetchFromCenterRepository()) {
// throw new IllegalStateException("must not fetchFromCenterRepository");
// }
//
// Manifest manifest = new Manifest();
// Map<String, Attributes> entries = manifest.getEntries();
// Attributes attrs = new Attributes();
// attrs.put(new Attributes.Name(collection.getName()), String.valueOf(timestamp));
// // 传递App名称
// entries.put(TIS_APP_NAME, attrs);
//
// final Attributes cfgAttrs = new Attributes();
// // 传递Config变量
// Config.getInstance().visitKeyValPair((e) -> {
// if (Config.KEY_TIS_HOST.equals(e.getKey())) {
// // tishost为127.0.0.1会出错
// return;
// }
// addCfgAttrs(cfgAttrs, e);
// });
// for (Map.Entry<String, String> e : extraEnvProps.entrySet()) {
// addCfgAttrs(cfgAttrs, e);
// }
// cfgAttrs.put(new Attributes.Name(convertCfgPropertyKey(Config.KEY_TIS_HOST, true)), Config.getTisHost());
// entries.put(Config.KEY_JAVA_RUNTIME_PROP_ENV_PROPS, cfgAttrs);
//
//
// //"globalPluginStore" "pluginMetas" "appLastModifyTimestamp"
//
//
// PluginAndCfgsSnapshot localSnapshot = getLocalPluginAndCfgsSnapshot(resourceType, collection, pluginMetasFilter,
// appendPluginMeta);
//
// localSnapshot.attachPluginCfgSnapshot2Manifest(manifest);
// return ImmutablePair.of(localSnapshot, manifest);
}


public static Pair<PluginAndCfgsSnapshot, Manifest> //
createManifestCfgAttrs(TargetResName collection,
long timestamp,
Map<String, String> extraEnvProps,
Supplier<PluginAndCfgsSnapshot> localPluginAndCfgsSnapshotCreator) throws Exception {

//=====================================================================
if (!CenterResource.notFetchFromCenterRepository()) {
throw new IllegalStateException("must not fetchFromCenterRepository");
Expand Down Expand Up @@ -343,17 +417,12 @@ public static Manifest createManifestCfgAttrs(StoreResourceType resourceType, Ta
cfgAttrs.put(new Attributes.Name(convertCfgPropertyKey(Config.KEY_TIS_HOST, true)), Config.getTisHost());
entries.put(Config.KEY_JAVA_RUNTIME_PROP_ENV_PROPS, cfgAttrs);


//"globalPluginStore" "pluginMetas" "appLastModifyTimestamp"


PluginAndCfgsSnapshot localSnapshot = getLocalPluginAndCfgsSnapshot(resourceType, collection, pluginMetasFilter,
appendPluginMeta);

PluginAndCfgsSnapshot localSnapshot = localPluginAndCfgsSnapshotCreator.get();
localSnapshot.attachPluginCfgSnapshot2Manifest(manifest);
return ImmutablePair.of(localSnapshot, manifest);
}


private static void addCfgAttrs(Attributes cfgAttrs, Map.Entry<String, String> e) {
cfgAttrs.put(new Attributes.Name(convertCfgPropertyKey(e.getKey(), true)), e.getValue());
}
Expand Down Expand Up @@ -737,13 +806,6 @@ public static File getPluginRootDir() {
for (PluginMeta m : pluginMetas.metas) {
collectAllPluginMeta(m, collector);
}
// globalPluginMetas = null;
// UploadPluginMeta upm = UploadPluginMeta.parse("x:require");
// List<IRepositoryResource> keyedPluginStores = hlist.stream()
// .filter((e) -> !e.isAppNameAware())
// .flatMap((e) -> e.getPluginStore(null, upm).getAll().stream())
// .collect(Collectors.toList());
// ComponentMeta dataxComponentMeta = new ComponentMeta(keyedPluginStores);
Set<PluginMeta> globalPluginMetas = dataxComponentMeta.loadPluginMeta();
for (PluginMeta m : globalPluginMetas) {
collectAllPluginMeta(m, collector);
Expand Down Expand Up @@ -843,15 +905,6 @@ public boolean addAll(Collection<PluginMeta> c) {

public void attachPluginCfgSnapshot2Manifest(Manifest manifest) {
Map<String, Attributes> entries = manifest.getEntries();
// ExtensionList<HeteroEnum> hlist = TIS.get().getExtensionList(HeteroEnum.class);
// List<IRepositoryResource> keyedPluginStores = hlist.stream()
// .filter((e) -> !e.isAppNameAware())
// .map((e) -> e.getPluginStore(null, null))
// .collect(Collectors.toList());
// ComponentMeta dataxComponentMeta = new ComponentMeta(keyedPluginStores);
//Set<XStream2.PluginMeta> globalPluginMetas = dataxComponentMeta.loadPluginMeta();
//Map<String, Long> gPluginStoreLastModify = ComponentMeta.getGlobalPluginStoreLastModifyTimestamp
// (dataxComponentMeta);

StringBuffer globalPluginStore = new StringBuffer();
for (Map.Entry<String, Long> e : globalPluginStoreLastModify.entrySet()) {
Expand Down

0 comments on commit 5b62837

Please sign in to comment.