diff --git a/tis-console.yaml b/tis-console.yaml index 1c6c96e23..e2c1b0c94 100644 --- a/tis-console.yaml +++ b/tis-console.yaml @@ -153,7 +153,7 @@ spec: mountPath: /opt/app/tis-uber/tis-assemble/conf/tis-web-config/ - name: tis-console-pvc mountPath: "/opt/data" - image: registry.cn-hangzhou.aliyuncs.com/tis/tis-console:4.0.0.29 + image: registry.cn-hangzhou.aliyuncs.com/tis/tis-console:4.0.0.32 # command: [ "/bin/sh", "-c", "sleep 1000000" ] ports: - name: tis-8080 diff --git a/tis-plugin/src/main/java/com/qlangtech/tis/coredefine/module/action/PowerjobTriggerBuildResult.java b/tis-plugin/src/main/java/com/qlangtech/tis/coredefine/module/action/PowerjobTriggerBuildResult.java index bda8b16fa..8c57c8e72 100644 --- a/tis-plugin/src/main/java/com/qlangtech/tis/coredefine/module/action/PowerjobTriggerBuildResult.java +++ b/tis-plugin/src/main/java/com/qlangtech/tis/coredefine/module/action/PowerjobTriggerBuildResult.java @@ -35,6 +35,10 @@ public class PowerjobTriggerBuildResult extends TriggerBuildResult { * 用于Crontab任务传递的参数 */ private String javaMemorySpec; + /** + * 前一次执行的taskId,初次执行时为空 + */ + private Integer previousTaskId; public PowerjobTriggerBuildResult() { } @@ -45,7 +49,15 @@ public PowerjobTriggerBuildResult(boolean success, JSONObject instanceParams) { pluginCfgsMetas = Objects.requireNonNull(instanceParams, "instanceParams can not be null") // .getString(PluginAndCfgsSnapshotUtils.KEY_PLUGIN_CFGS_METAS); this.javaMemorySpec = instanceParams.getString(JobParams.KEY_JAVA_MEMORY_SPEC); + this.previousTaskId = instanceParams.getInteger(JobParams.KEY_PREVIOUS_TASK_ID); + } + + public Integer getPreviousTaskId() { + return previousTaskId; + } + public void setPreviousTaskId(Integer previousTaskId) { + this.previousTaskId = previousTaskId; } public String getJavaMemorySpec() { diff --git a/tis-plugin/src/main/java/com/qlangtech/tis/exec/DefaultExecContext.java b/tis-plugin/src/main/java/com/qlangtech/tis/exec/DefaultExecContext.java index b59f42064..c1bd4d0f2 100644 --- a/tis-plugin/src/main/java/com/qlangtech/tis/exec/DefaultExecContext.java +++ b/tis-plugin/src/main/java/com/qlangtech/tis/exec/DefaultExecContext.java @@ -108,22 +108,40 @@ static DefaultExecContext deserializeInstanceParams(JSONObject instanceParams, b if (resolveCfgsSnapshotConsumer) { - String pluginCfgsMetas = instanceParams.getString(PluginAndCfgsSnapshotUtils.KEY_PLUGIN_CFGS_METAS); +// String pluginCfgsMetas = instanceParams.getString(PluginAndCfgsSnapshotUtils.KEY_PLUGIN_CFGS_METAS); +// +// if (StringUtils.isEmpty(pluginCfgsMetas)) { +// throw new IllegalStateException("property:" +// + PluginAndCfgsSnapshotUtils.KEY_PLUGIN_CFGS_METAS + " of instanceParams can not be null"); +// } +// +// final Base64 base64 = new Base64(); +// try (InputStream manifestJar = new ByteArrayInputStream(base64.decode(pluginCfgsMetas))) { +// cfgsSnapshotConsumer.accept(PluginAndCfgsSnapshot.getRepositoryCfgsSnapshot(appName, manifestJar)); +// } catch (Exception e) { +// throw new RuntimeException(e); +// } + + cfgsSnapshotConsumer.accept(resolveCfgsSnapshotConsumer(instanceParams)); + } - if (StringUtils.isEmpty(pluginCfgsMetas)) { - throw new IllegalStateException("property:" - + PluginAndCfgsSnapshotUtils.KEY_PLUGIN_CFGS_METAS + " of instanceParams can not be null"); - } + return execChainContext; + } - final Base64 base64 = new Base64(); - try (InputStream manifestJar = new ByteArrayInputStream(base64.decode(pluginCfgsMetas))) { - cfgsSnapshotConsumer.accept(PluginAndCfgsSnapshot.getRepositoryCfgsSnapshot(appName, manifestJar)); - } catch (Exception e) { - throw new RuntimeException(e); - } + static PluginAndCfgsSnapshot resolveCfgsSnapshotConsumer(JSONObject instanceParams) { + String pluginCfgsMetas = instanceParams.getString(PluginAndCfgsSnapshotUtils.KEY_PLUGIN_CFGS_METAS); + String appName = instanceParams.getString(JobParams.KEY_COLLECTION); + if (StringUtils.isEmpty(pluginCfgsMetas)) { + throw new IllegalStateException("property:" + + PluginAndCfgsSnapshotUtils.KEY_PLUGIN_CFGS_METAS + " of instanceParams can not be null"); } - return execChainContext; + final Base64 base64 = new Base64(); + try (InputStream manifestJar = new ByteArrayInputStream(base64.decode(pluginCfgsMetas))) { + return PluginAndCfgsSnapshot.getRepositoryCfgsSnapshot(appName, manifestJar); + } catch (Exception e) { + throw new RuntimeException(e); + } } public void putTablePt(IDumpTable table, ITabPartition pt) { diff --git a/tis-plugin/src/main/java/com/qlangtech/tis/exec/IExecChainContext.java b/tis-plugin/src/main/java/com/qlangtech/tis/exec/IExecChainContext.java index b0449c9cd..b066b7142 100644 --- a/tis-plugin/src/main/java/com/qlangtech/tis/exec/IExecChainContext.java +++ b/tis-plugin/src/main/java/com/qlangtech/tis/exec/IExecChainContext.java @@ -149,7 +149,7 @@ static JSONObject createInstanceParams( // try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) { // 将数据通道的依赖插件以及配置信息添加到instanceParams中 PluginAndCfgsSnapshotUtils.writeManifest2OutputStream(outputStream - , PluginAndCfgsSnapshot.createDataBatchJobManifestCfgAttrs((IDataxProcessor) processor, Optional.empty(), Collections.emptyMap())); + , PluginAndCfgsSnapshot.createDataBatchJobManifestCfgAttrs((IDataxProcessor) processor)); final Base64 base64 = new Base64(); return base64.encodeAsString(outputStream.toByteArray()); // instanceParams.put(PluginAndCfgsSnapshotUtils.KEY_PLUGIN_CFGS_METAS, base64.encodeAsString(outputStream.toByteArray())); @@ -157,23 +157,13 @@ static JSONObject createInstanceParams( // throw new RuntimeException(e); } })); - - -// try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) { -// // 将数据通道的依赖插件以及配置信息添加到instanceParams中 -// PluginAndCfgsSnapshotUtils.writeManifest2OutputStream(outputStream -// , PluginAndCfgsSnapshot.createDataBatchJobManifestCfgAttrs(processor, Optional.empty(), Collections.emptyMap())); -// final Base64 base64 = new Base64(); -// instanceParams.put(PluginAndCfgsSnapshotUtils.KEY_PLUGIN_CFGS_METAS, base64.encodeAsString(outputStream.toByteArray())); -// } catch (Exception e) { -// throw new RuntimeException(e); -// } - return instanceParams; } - public static DefaultExecContext deserializeInstanceParams(JSONObject instanceParams, Consumer execChainContextConsumer, Consumer cfgsSnapshotConsumer) { - return DefaultExecContext.deserializeInstanceParams(instanceParams, true, execChainContextConsumer, cfgsSnapshotConsumer); + public static DefaultExecContext deserializeInstanceParams(JSONObject instanceParams + , Consumer execChainContextConsumer, Consumer cfgsSnapshotConsumer) { + return DefaultExecContext.deserializeInstanceParams(instanceParams + , true, execChainContextConsumer, cfgsSnapshotConsumer); } public static DefaultExecContext deserializeInstanceParams(JSONObject instanceParams) { @@ -212,8 +202,6 @@ public AsynSubJob(String jobName) { } } -// T getAppSource(); - ITISCoordinator getZkClient(); @@ -228,10 +216,6 @@ public AsynSubJob(String jobName) { ITISFileSystem getIndexBuildFileSystem(); -// TableDumpFactory getTableDumpFactory(); -// -// IndexBuilderTriggerFactory getIndexBuilderFactory(); - void rebindLoggingMDCParams(); class TriggerNewTaskParam { diff --git a/tis-plugin/src/main/java/com/qlangtech/tis/plugin/KeyedPluginStore.java b/tis-plugin/src/main/java/com/qlangtech/tis/plugin/KeyedPluginStore.java index d2349f9f7..98c3ca37b 100644 --- a/tis-plugin/src/main/java/com/qlangtech/tis/plugin/KeyedPluginStore.java +++ b/tis-plugin/src/main/java/com/qlangtech/tis/plugin/KeyedPluginStore.java @@ -34,7 +34,12 @@ import java.io.File; import java.io.IOException; -import java.util.*; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -72,28 +77,33 @@ public static PluginMetas getAppAwarePluginMetas(boolean isDB, String name) { return getAppAwarePluginMetas(StoreResourceType.parse(isDB), name); } + public static PluginMetas getAppAwarePluginMetas(StoreResourceType resourceType, String name) { + return getAppAwarePluginMetas(resourceType, name, true); + } + /** * 取得某个应用下面相关的插件元数据信息用于分布式任务同步用 * * @return */ - public static PluginMetas getAppAwarePluginMetas(StoreResourceType resourceType, String name) { + public static PluginMetas getAppAwarePluginMetas(StoreResourceType resourceType, String name, boolean resolveMeta) { AppKey appKey = new AppKey(null, resourceType, name, (PluginClassCategory) null); File appDir = getSubPathDir(appKey); File lastModify = getLastModifyToken(appKey);// new File(appDir, CenterResource.KEY_LAST_MODIFIED_EXTENDION); long lastModfiyTimeStamp = -1; Set metas = Collections.emptySet(); - try { if (appDir.exists()) { if (lastModify.exists()) { lastModfiyTimeStamp = Long.parseLong(FileUtils.readFileToString(lastModify, TisUTF8.get())); } - Iterator files = FileUtils.iterateFiles(appDir, new String[]{DOMUtil.XML_RESERVED_PREFIX}, true); - metas = ComponentMeta.loadPluginMeta(() -> { - return Lists.newArrayList(files); - }); + if (resolveMeta) { + Iterator files = FileUtils.iterateFiles(appDir, new String[]{DOMUtil.XML_RESERVED_PREFIX}, true); + metas = ComponentMeta.loadPluginMeta(() -> { + return Lists.newArrayList(files); + }); + } } return new PluginMetas(appDir, metas, lastModfiyTimeStamp); } catch (IOException e) { diff --git a/tis-plugin/src/main/java/com/qlangtech/tis/plugin/PluginAndCfgsSnapshot.java b/tis-plugin/src/main/java/com/qlangtech/tis/plugin/PluginAndCfgsSnapshot.java index 5f3f2994b..e9732338c 100644 --- a/tis-plugin/src/main/java/com/qlangtech/tis/plugin/PluginAndCfgsSnapshot.java +++ b/tis-plugin/src/main/java/com/qlangtech/tis/plugin/PluginAndCfgsSnapshot.java @@ -64,10 +64,10 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.function.Predicate; +import java.util.function.Supplier; import java.util.jar.Attributes; import java.util.jar.JarInputStream; import java.util.jar.Manifest; @@ -195,7 +195,7 @@ public static PluginAndCfgsSnapshot setLocalPluginAndCfgsSnapshot(PluginAndCfgsS public final Map globalPluginStoreLastModify; public final Set pluginMetas; - public final Set repoRes; + private final Set repoRes; /** * 应用相关配置目录的最后更新时间 @@ -221,10 +221,9 @@ public static void createManifestCfgAttrs2File(File manifestJar, StoreResourceTy createManifestCfgAttrs2File(manifestJar, resourceType, collection, timestamp, pluginMetasFilter, Collections.emptyMap()); } - public static Manifest createDataBatchJobManifestCfgAttrs(TargetResName collection - , Optional> pluginMetasFilter, Map extraEnvPropss) throws Exception { + public static Manifest createDataBatchJobManifestCfgAttrs(TargetResName collection) throws Exception { IDataxProcessor processor = DataxProcessor.load(null, StoreResourceType.DataApp, collection.getName()); - return createDataBatchJobManifestCfgAttrs(processor, pluginMetasFilter, extraEnvPropss); + return createDataBatchJobManifestCfgAttrs(processor); } /** @@ -234,19 +233,44 @@ public static Manifest createDataBatchJobManifestCfgAttrs(TargetResName collecti * @return * @throws Exception */ - public static Manifest createDataBatchJobManifestCfgAttrs(IDataxProcessor processor, - Optional> pluginMetasFilter, Map extraEnvPropss) throws Exception { - Objects.requireNonNull(pluginMetasFilter, "pluginMetasFilter can not be null"); - Objects.requireNonNull(extraEnvPropss, "extraEnvPropss can not be null"); + public static Manifest createDataBatchJobManifestCfgAttrs(IDataxProcessor processor) throws Exception { + + if (processor.getResType() != StoreResourceType.DataApp) { + throw new IllegalArgumentException("resType must be " + StoreResourceType.DataApp + " but now is " + processor.getResType()); + } + RobustReflectionConverter2.PluginMetas pluginMetas = RobustReflectionConverter2.PluginMetas.collectMetas((metas) -> { // 先收集plugmeta,特别是通过dataXWriter的dataSource关联的元数据 - - processor.getReaders(null).forEach((reader) -> reader.startScanDependency()); - processor.getWriter(null).startScanDependency(); + IDataxProcessor dataxProcessor = DataxProcessor.load(null, processor.getResType(), processor.identityValue()); + dataxProcessor.getReaders(null).forEach((reader) -> { + // reader.getSelectedTabs().forEach((tab) -> tab.getCols()); + reader.startScanDependency(); + }); + dataxProcessor.getWriter(null).startScanDependency(); }); - return createManifestCfgAttrs(processor.getResType(), new TargetResName(processor.identityValue()), System.currentTimeMillis() - , extraEnvPropss, pluginMetasFilter, pluginMetas).getValue(); + TargetResName resName = new TargetResName(processor.identityValue()); +// return createManifestCfgAttrs(resName, System.currentTimeMillis(), Collections.emptyMap(), () -> { +// PluginAndCfgsSnapshot localSnapshot = getLocalPluginAndCfgsSnapshot(processor.getResType(), resName, Optional.empty(), pluginMetas); +// return localSnapshot; +// }).getRight(); + + + return createManifestCfgAttrs(resName, System.currentTimeMillis() + , Collections.emptyMap(), () -> { + + KeyedPluginStore.PluginMetas metas + = KeyedPluginStore.getAppAwarePluginMetas(processor.getResType(), resName.getName(), false); + + ComponentMeta dataxComponentMeta = new ComponentMeta(Lists.newArrayList(pluginMetas.getRepoResources())); + + Map globalPluginStoreLastModify = ComponentMeta.getGlobalPluginStoreLastModifyTimestamp(dataxComponentMeta); + + return new PluginAndCfgsSnapshot(resName, globalPluginStoreLastModify + , pluginMetas // + , metas.lastModifyTimestamp, metas); + + }).getValue(); } /** @@ -316,6 +340,56 @@ public static Manifest createManifestCfgAttrs(StoreResourceType resourceType, Ta Map extraEnvProps, Optional> pluginMetasFilter, IPluginMetasInfo appendPluginMeta) throws Exception { + return createManifestCfgAttrs(collection, timestamp, extraEnvProps, () -> { + PluginAndCfgsSnapshot localSnapshot = getLocalPluginAndCfgsSnapshot(resourceType, collection, pluginMetasFilter, appendPluginMeta); + return localSnapshot; + }); + +// //===================================================================== +// if (!CenterResource.notFetchFromCenterRepository()) { +// throw new IllegalStateException("must not fetchFromCenterRepository"); +// } +// +// Manifest manifest = new Manifest(); +// Map entries = manifest.getEntries(); +// Attributes attrs = new Attributes(); +// attrs.put(new Attributes.Name(collection.getName()), String.valueOf(timestamp)); +// // 传递App名称 +// entries.put(TIS_APP_NAME, attrs); +// +// final Attributes cfgAttrs = new Attributes(); +// // 传递Config变量 +// Config.getInstance().visitKeyValPair((e) -> { +// if (Config.KEY_TIS_HOST.equals(e.getKey())) { +// // tishost为127.0.0.1会出错 +// return; +// } +// addCfgAttrs(cfgAttrs, e); +// }); +// for (Map.Entry e : extraEnvProps.entrySet()) { +// addCfgAttrs(cfgAttrs, e); +// } +// cfgAttrs.put(new Attributes.Name(convertCfgPropertyKey(Config.KEY_TIS_HOST, true)), Config.getTisHost()); +// entries.put(Config.KEY_JAVA_RUNTIME_PROP_ENV_PROPS, cfgAttrs); +// +// +// //"globalPluginStore" "pluginMetas" "appLastModifyTimestamp" +// +// +// PluginAndCfgsSnapshot localSnapshot = getLocalPluginAndCfgsSnapshot(resourceType, collection, pluginMetasFilter, +// appendPluginMeta); +// +// localSnapshot.attachPluginCfgSnapshot2Manifest(manifest); +// return ImmutablePair.of(localSnapshot, manifest); + } + + + public static Pair // + createManifestCfgAttrs(TargetResName collection, + long timestamp, + Map extraEnvProps, + Supplier localPluginAndCfgsSnapshotCreator) throws Exception { + //===================================================================== if (!CenterResource.notFetchFromCenterRepository()) { throw new IllegalStateException("must not fetchFromCenterRepository"); @@ -343,17 +417,12 @@ public static Manifest createManifestCfgAttrs(StoreResourceType resourceType, Ta cfgAttrs.put(new Attributes.Name(convertCfgPropertyKey(Config.KEY_TIS_HOST, true)), Config.getTisHost()); entries.put(Config.KEY_JAVA_RUNTIME_PROP_ENV_PROPS, cfgAttrs); - - //"globalPluginStore" "pluginMetas" "appLastModifyTimestamp" - - - PluginAndCfgsSnapshot localSnapshot = getLocalPluginAndCfgsSnapshot(resourceType, collection, pluginMetasFilter, - appendPluginMeta); - + PluginAndCfgsSnapshot localSnapshot = localPluginAndCfgsSnapshotCreator.get(); localSnapshot.attachPluginCfgSnapshot2Manifest(manifest); return ImmutablePair.of(localSnapshot, manifest); } + private static void addCfgAttrs(Attributes cfgAttrs, Map.Entry e) { cfgAttrs.put(new Attributes.Name(convertCfgPropertyKey(e.getKey(), true)), e.getValue()); } @@ -737,13 +806,6 @@ public static File getPluginRootDir() { for (PluginMeta m : pluginMetas.metas) { collectAllPluginMeta(m, collector); } - // globalPluginMetas = null; - // UploadPluginMeta upm = UploadPluginMeta.parse("x:require"); - // List keyedPluginStores = hlist.stream() - // .filter((e) -> !e.isAppNameAware()) - // .flatMap((e) -> e.getPluginStore(null, upm).getAll().stream()) - // .collect(Collectors.toList()); - // ComponentMeta dataxComponentMeta = new ComponentMeta(keyedPluginStores); Set globalPluginMetas = dataxComponentMeta.loadPluginMeta(); for (PluginMeta m : globalPluginMetas) { collectAllPluginMeta(m, collector); @@ -843,15 +905,6 @@ public boolean addAll(Collection c) { public void attachPluginCfgSnapshot2Manifest(Manifest manifest) { Map entries = manifest.getEntries(); - // ExtensionList hlist = TIS.get().getExtensionList(HeteroEnum.class); - // List keyedPluginStores = hlist.stream() - // .filter((e) -> !e.isAppNameAware()) - // .map((e) -> e.getPluginStore(null, null)) - // .collect(Collectors.toList()); - // ComponentMeta dataxComponentMeta = new ComponentMeta(keyedPluginStores); - //Set globalPluginMetas = dataxComponentMeta.loadPluginMeta(); - //Map gPluginStoreLastModify = ComponentMeta.getGlobalPluginStoreLastModifyTimestamp - // (dataxComponentMeta); StringBuffer globalPluginStore = new StringBuffer(); for (Map.Entry e : globalPluginStoreLastModify.entrySet()) { diff --git a/tis-plugin/src/test/java/com/qlangtech/tis/exec/TestIExecChainContext.java b/tis-plugin/src/test/java/com/qlangtech/tis/exec/TestIExecChainContext.java new file mode 100644 index 000000000..cc8c9fd96 --- /dev/null +++ b/tis-plugin/src/test/java/com/qlangtech/tis/exec/TestIExecChainContext.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.qlangtech.tis.exec; + +import com.alibaba.fastjson.JSONObject; +import com.qlangtech.tis.common.utils.Assert; +import com.qlangtech.tis.datax.IDataxProcessor; +import com.qlangtech.tis.datax.impl.DataxProcessor; +import com.qlangtech.tis.manage.common.CenterResource; +import com.qlangtech.tis.plugin.PluginAndCfgsSnapshot; +import junit.framework.TestCase; + +import java.util.Optional; + +/** + * @author: 百岁(baisui@qlangtech.com) + * @create: 2024-04-16 15:36 + **/ +public class TestIExecChainContext extends TestCase { + @Override + public void setUp() throws Exception { + super.setUp(); + CenterResource.setNotFetchFromCenterRepository(); + } + + public void testCreateInstanceParams() { + int taskId = 999; + String pipeline = "mysql5"; + IDataxProcessor processor = DataxProcessor.load(null, pipeline); + JSONObject instanceParams = IExecChainContext.createInstanceParams(taskId, processor, false, Optional.empty()); + Assert.assertNotNull(instanceParams); + + PluginAndCfgsSnapshot snapshot = DefaultExecContext.resolveCfgsSnapshotConsumer(instanceParams); + Assert.assertNotNull(snapshot); + } +} diff --git a/tis-plugin/src/test/java/com/qlangtech/tis/plugin/TestPluginAndCfgsSnapshot.java b/tis-plugin/src/test/java/com/qlangtech/tis/plugin/TestPluginAndCfgsSnapshot.java index aeab5e4f7..304ddb284 100644 --- a/tis-plugin/src/test/java/com/qlangtech/tis/plugin/TestPluginAndCfgsSnapshot.java +++ b/tis-plugin/src/test/java/com/qlangtech/tis/plugin/TestPluginAndCfgsSnapshot.java @@ -52,7 +52,7 @@ public void testCreateDataBatchJobManifestCfgAttrs() throws Exception { String appName = "mysql_hudi"; // String appName = "mysql_doris3"; Manifest manifest = PluginAndCfgsSnapshot.createDataBatchJobManifestCfgAttrs( // - new TargetResName(appName), Optional.empty(), Collections.emptyMap()); + new TargetResName(appName)); Assert.assertNotNull(manifest); }