diff --git a/tis-builder-api/src/main/java/com/qlangtech/tis/coredefine/module/action/IRCController.java b/tis-builder-api/src/main/java/com/qlangtech/tis/coredefine/module/action/IRCController.java index 0a4c9c250..0af4fa47d 100644 --- a/tis-builder-api/src/main/java/com/qlangtech/tis/coredefine/module/action/IRCController.java +++ b/tis-builder-api/src/main/java/com/qlangtech/tis/coredefine/module/action/IRCController.java @@ -18,6 +18,7 @@ package com.qlangtech.tis.coredefine.module.action; import com.qlangtech.tis.config.k8s.ReplicasSpec; +import com.qlangtech.tis.datax.job.JobOrchestrateException; import com.qlangtech.tis.plugin.incr.WatchPodLog; import com.qlangtech.tis.trigger.jst.ILogListener; @@ -50,7 +51,7 @@ public interface IRCController { * @param timestamp * @throws Exception */ - void deploy(TargetResName collection, ReplicasSpec incrSpec, long timestamp) throws Exception; + void deploy(TargetResName collection, ReplicasSpec incrSpec, long timestamp) throws JobOrchestrateException; /** * 删除 增量实例 diff --git a/tis-builder-api/src/main/java/com/qlangtech/tis/coredefine/module/action/impl/AdapterRCController.java b/tis-builder-api/src/main/java/com/qlangtech/tis/coredefine/module/action/impl/AdapterRCController.java index 03ec02fbd..7e7128e18 100644 --- a/tis-builder-api/src/main/java/com/qlangtech/tis/coredefine/module/action/impl/AdapterRCController.java +++ b/tis-builder-api/src/main/java/com/qlangtech/tis/coredefine/module/action/impl/AdapterRCController.java @@ -21,6 +21,7 @@ import com.qlangtech.tis.config.k8s.ReplicasSpec; import com.qlangtech.tis.coredefine.module.action.IRCController; import com.qlangtech.tis.coredefine.module.action.TargetResName; +import com.qlangtech.tis.datax.job.JobOrchestrateException; import com.qlangtech.tis.plugin.incr.WatchPodLog; import com.qlangtech.tis.trigger.jst.ILogListener; @@ -30,7 +31,7 @@ **/ public class AdapterRCController implements IRCController { @Override - public void deploy(TargetResName collection, ReplicasSpec incrSpec, long timestamp) throws Exception { + public void deploy(TargetResName collection, ReplicasSpec incrSpec, long timestamp) throws JobOrchestrateException { throw new UnsupportedOperationException(); } diff --git a/tis-builder-api/src/main/java/com/qlangtech/tis/datax/job/JobOrchestrateException.java b/tis-builder-api/src/main/java/com/qlangtech/tis/datax/job/JobOrchestrateException.java new file mode 100644 index 000000000..d2c46caf3 --- /dev/null +++ b/tis-builder-api/src/main/java/com/qlangtech/tis/datax/job/JobOrchestrateException.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.qlangtech.tis.datax.job; + +/** + * 编排任务在任务在执行过程中出错,需要中断编排任务 + * + * @author: 百岁(baisui@qlangtech.com) + * @create: 2023-12-29 10:01 + **/ +public class JobOrchestrateException extends Exception { + public JobOrchestrateException(String message) { + super(message); + } + + public JobOrchestrateException(Throwable cause) { + super(cause); + } +} diff --git a/tis-builder-api/src/main/java/com/qlangtech/tis/manage/common/Config.java b/tis-builder-api/src/main/java/com/qlangtech/tis/manage/common/Config.java index cb95b6111..02bd35ad3 100644 --- a/tis-builder-api/src/main/java/com/qlangtech/tis/manage/common/Config.java +++ b/tis-builder-api/src/main/java/com/qlangtech/tis/manage/common/Config.java @@ -530,6 +530,7 @@ protected InputStream getOriginSource() { throw new RuntimeException(e); } } + @Override protected String getPropValue(String key) { return props.getProperty(key); @@ -576,15 +577,21 @@ public final String getString(String key, boolean notEmpty) { private static abstract class LocalResBasedPropertyGetter extends P { @Override protected final String getProp(String key) { - if (KEY_ASSEMBLE_HOST.equals(key) || KEY_TIS_HOST.equals(key)) { + + if (TisAppLaunch.isTestMock()) { + return this.getPropValue(key); + } + + if ((KEY_ASSEMBLE_HOST.equals(key) || KEY_TIS_HOST.equals(key))) { if (!BasicConfig.inDockerContainer()) { - return NetUtils.getHost(); + return NetUtils.getHost(); } } return this.getPropValue(key); } - protected abstract String getPropValue(String key); + + protected abstract String getPropValue(String key); } public static String getGenerateParentPackage() { diff --git a/tis-console/src/main/java/com/qlangtech/tis/coredefine/module/action/CoreAction.java b/tis-console/src/main/java/com/qlangtech/tis/coredefine/module/action/CoreAction.java index 3bbc78f1f..62e24df01 100644 --- a/tis-console/src/main/java/com/qlangtech/tis/coredefine/module/action/CoreAction.java +++ b/tis-console/src/main/java/com/qlangtech/tis/coredefine/module/action/CoreAction.java @@ -44,7 +44,9 @@ import com.qlangtech.tis.datax.job.ILaunchingOrchestrate; import com.qlangtech.tis.datax.job.ILaunchingOrchestrate.ExecuteStep; import com.qlangtech.tis.datax.job.ILaunchingOrchestrate.ExecuteSteps; +import com.qlangtech.tis.datax.job.JobOrchestrateException; import com.qlangtech.tis.datax.job.JobResName; +import com.qlangtech.tis.datax.job.SSERunnable; import com.qlangtech.tis.datax.job.ServerLaunchToken; import com.qlangtech.tis.datax.job.ServerLaunchToken.FlinkClusterTokenManager; import com.qlangtech.tis.datax.job.SubJobResName; @@ -69,6 +71,7 @@ import com.qlangtech.tis.manage.common.HttpUtils.PostParam; import com.qlangtech.tis.manage.common.ManageUtils; import com.qlangtech.tis.manage.common.RunContext; +import com.qlangtech.tis.manage.common.valve.AjaxValve.ActionExecResult; import com.qlangtech.tis.manage.servlet.DownloadResource; import com.qlangtech.tis.manage.servlet.DownloadServlet; import com.qlangtech.tis.manage.spring.aop.Func; @@ -556,7 +559,7 @@ private static HashMap createDescVals(Descriptor desc) { * @throws Exception */ @Func(value = PermissionConstant.PERMISSION_INCR_PROCESS_CONFIG_EDIT) - public void doCompileAndPackage(Context context) throws Exception { + public void doCompileAndPackage(Context context) { // IBasicAppSource appSource = IAppSource.load(null, this.getCollectionName()); @@ -662,9 +665,10 @@ private ILaunchingOrchestrate getFlinkJobWorkingOrchestrate(T // } // }; - + final String subJobName = "Incr " + getCollectionName() + " Compile And Package"; final SubJobResName compileAndPackage = - JobResName.createSubJob("Incr " + getCollectionName() + " Compile And Package", (dto) -> { + JobResName.createSubJob(subJobName, (dto) -> { + final SSERunnable sse = SSERunnable.getLocal(); long start = System.currentTimeMillis(); /** * ========================================================== @@ -673,7 +677,14 @@ private ILaunchingOrchestrate getFlinkJobWorkingOrchestrate(T */ this.doCompileAndPackage(dto.context); if (dto.hasErrors()) { - return; + +// ActionExecResult r = new ActionExecResult(this.getContext()).invoke(); +// r.getErrorMsgs(); +// for(){ +// +// } + throw new JobOrchestrateException(subJobName +" faild"); + //return; } dto.appendLog("\n compile and package consume:" + (System.currentTimeMillis() - start) + "ms "); }); @@ -726,7 +737,11 @@ public void visit(FlinkJobDeploymentDetails details) { }); if (loop.get()) { log.info("check " + getCollectionName() + " deploy status,tryCount:" + tryCount.get()); - Thread.sleep(3000); + try { + Thread.sleep(3000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } } @@ -781,7 +796,7 @@ private static Map getDependencyDbsMap(BasicModule module, IndexS .collect(Collectors.toMap(DatasourceDb::getId, (r) -> ManageUtils.formatNowYyyyMMddHHmmss(r.getOpTime()))); } - private static IndexStreamCodeGenerator getIndexStreamCodeGenerator(BasicModule module) throws Exception { + private static IndexStreamCodeGenerator getIndexStreamCodeGenerator(BasicModule module) { IBasicAppSource appSource = IAppSource.load(null, module.getCollectionName()); diff --git a/tis-console/src/main/java/com/qlangtech/tis/coredefine/module/action/TISK8sDelegate.java b/tis-console/src/main/java/com/qlangtech/tis/coredefine/module/action/TISK8sDelegate.java index a20644b7e..779d23123 100644 --- a/tis-console/src/main/java/com/qlangtech/tis/coredefine/module/action/TISK8sDelegate.java +++ b/tis-console/src/main/java/com/qlangtech/tis/coredefine/module/action/TISK8sDelegate.java @@ -24,6 +24,7 @@ import com.qlangtech.tis.coredefine.module.action.impl.FlinkJobDeploymentDetails; import com.qlangtech.tis.coredefine.module.action.impl.RcDeployment; import com.qlangtech.tis.datax.job.DataXJobWorker; +import com.qlangtech.tis.datax.job.JobOrchestrateException; import com.qlangtech.tis.plugin.incr.IncrStreamFactory; import com.qlangtech.tis.plugin.incr.WatchPodLog; import com.qlangtech.tis.runtime.module.misc.IMessageHandler; @@ -137,7 +138,7 @@ public static void main(String[] args) throws Exception { incrK8s.isRCDeployment(true); } - public void deploy(ReplicasSpec incrSpec, final long timestamp) throws Exception { + public void deploy(ReplicasSpec incrSpec, final long timestamp) throws JobOrchestrateException { this.incrSync.deploy(this.indexName, incrSpec, timestamp); } diff --git a/tis-plugin/src/main/java/com/qlangtech/tis/datax/job/JobResName.java b/tis-plugin/src/main/java/com/qlangtech/tis/datax/job/JobResName.java index 09ce432ef..8d5689c08 100644 --- a/tis-plugin/src/main/java/com/qlangtech/tis/datax/job/JobResName.java +++ b/tis-plugin/src/main/java/com/qlangtech/tis/datax/job/JobResName.java @@ -50,7 +50,7 @@ public static SubJobResName createSubJob(String jobName, ThrowableConsume final SubJobResName created = new SubJobResName(jobName, new SubJobExec() { @Override - public void accept(T dto) throws Exception { + public void accept(T dto) throws JobOrchestrateException { exec.accept(dto); } }) { @@ -63,7 +63,7 @@ protected String getResourceType() { } - public final void execSubJob(T t) throws Exception { + public final void execSubJob(T t) throws JobOrchestrateException { SSERunnable sse = SSERunnable.getLocal(); boolean success = false; try { @@ -71,7 +71,7 @@ public final void execSubJob(T t) throws Exception { this.execute(sse, t); success = true; sse.info(this.getName(), TimeFormat.getCurrentTimeStamp(), "✔✔ successful to publish " + this.getResourceType() + "'" + this.getName() + "'"); - } catch (Exception e) { + } catch (JobOrchestrateException e) { logger.error(e.getMessage(), e); throw e; } finally { @@ -82,7 +82,7 @@ public final void execSubJob(T t) throws Exception { } } - protected abstract void execute(SSERunnable sse, T t) throws Exception; + protected abstract void execute(SSERunnable sse, T t) throws JobOrchestrateException; // if (jobExec instanceof SubJobExec) { // ((OwnerJobExec) jobExec).accept(t); // } else { @@ -94,16 +94,16 @@ public final void execSubJob(T t) throws Exception { public interface OwnerJobExec { - public RESULT accept(T t) throws Exception; + public RESULT accept(T t) throws JobOrchestrateException; } public interface SubJobExec { - public void accept(T t) throws Exception; + public void accept(T t) throws JobOrchestrateException; } @FunctionalInterface public interface ThrowableConsumer { - void accept(T t) throws Exception; + void accept(T t) throws JobOrchestrateException; } } diff --git a/tis-plugin/src/main/java/com/qlangtech/tis/datax/job/OwnerJobResName.java b/tis-plugin/src/main/java/com/qlangtech/tis/datax/job/OwnerJobResName.java index 1d623cd67..f6fa7988d 100644 --- a/tis-plugin/src/main/java/com/qlangtech/tis/datax/job/OwnerJobResName.java +++ b/tis-plugin/src/main/java/com/qlangtech/tis/datax/job/OwnerJobResName.java @@ -34,7 +34,7 @@ public OwnerJobResName(String name, OwnerJobExec jobExec) { } @Override - protected final void execute(SSERunnable sse, T t) throws Exception { + protected final void execute(SSERunnable sse, T t) throws JobOrchestrateException { RESULT result = jobExec.accept(t); if (result != null) { sse.setContextAttr(SSEExecuteOwner.class, new SSEExecuteOwner(result)); diff --git a/tis-plugin/src/main/java/com/qlangtech/tis/datax/job/PowerjobOrchestrateException.java b/tis-plugin/src/main/java/com/qlangtech/tis/datax/job/PowerjobOrchestrateException.java index 9b1d5c5c0..7d5ad7fab 100644 --- a/tis-plugin/src/main/java/com/qlangtech/tis/datax/job/PowerjobOrchestrateException.java +++ b/tis-plugin/src/main/java/com/qlangtech/tis/datax/job/PowerjobOrchestrateException.java @@ -28,7 +28,7 @@ * @author 百岁 (baisui@qlangtech.com) * @date 2023/12/18 */ -public class PowerjobOrchestrateException extends Exception { +public class PowerjobOrchestrateException extends JobOrchestrateException { public PowerjobOrchestrateException(String message) { super(message); } diff --git a/tis-plugin/src/main/java/com/qlangtech/tis/datax/job/SubJobResName.java b/tis-plugin/src/main/java/com/qlangtech/tis/datax/job/SubJobResName.java index 0888f050a..3144c156f 100644 --- a/tis-plugin/src/main/java/com/qlangtech/tis/datax/job/SubJobResName.java +++ b/tis-plugin/src/main/java/com/qlangtech/tis/datax/job/SubJobResName.java @@ -32,7 +32,7 @@ protected SubJobResName(String name, SubJobExec jobExec) { } @Override - protected void execute(SSERunnable sse, T t) throws Exception { + protected void execute(SSERunnable sse, T t) throws JobOrchestrateException { jobExec.accept(t); } } diff --git a/tis-plugin/src/main/java/com/qlangtech/tis/plugin/ds/DataSourceFactoryPluginStore.java b/tis-plugin/src/main/java/com/qlangtech/tis/plugin/ds/DataSourceFactoryPluginStore.java index 54eb780d7..acff6c955 100644 --- a/tis-plugin/src/main/java/com/qlangtech/tis/plugin/ds/DataSourceFactoryPluginStore.java +++ b/tis-plugin/src/main/java/com/qlangtech/tis/plugin/ds/DataSourceFactoryPluginStore.java @@ -52,7 +52,7 @@ public DSKey getDSKey() { return (DSKey) this.key; } - public FacadeDataSource createFacadeDataSource() throws Exception { + public FacadeDataSource createFacadeDataSource() { DataSourceFactory plugin = this.getPlugin(); if (plugin == null) { throw new IllegalStateException("dbName:" + key.keyVal + " relevant facade datasource has not been defined,file:" + this.getSerializeFileName()); diff --git a/tis-scala-compiler/src/main/java/com/qlangtech/tis/compiler/streamcode/GenerateDAOAndIncrScript.java b/tis-scala-compiler/src/main/java/com/qlangtech/tis/compiler/streamcode/GenerateDAOAndIncrScript.java index 29ceb5a5e..6ce5cc276 100644 --- a/tis-scala-compiler/src/main/java/com/qlangtech/tis/compiler/streamcode/GenerateDAOAndIncrScript.java +++ b/tis-scala-compiler/src/main/java/com/qlangtech/tis/compiler/streamcode/GenerateDAOAndIncrScript.java @@ -26,6 +26,7 @@ import com.qlangtech.tis.compiler.java.JavaCompilerProcess; import com.qlangtech.tis.coredefine.module.action.IbatorProperties; import com.qlangtech.tis.coredefine.module.action.IndexIncrStatus; +import com.qlangtech.tis.datax.job.JobOrchestrateException; import com.qlangtech.tis.manage.common.Config; import com.qlangtech.tis.manage.common.incr.StreamContextConstant; import com.qlangtech.tis.offline.DbScope; @@ -69,7 +70,7 @@ public GenerateDAOAndIncrScript(IControlMsgHandler msgHandler, IndexStreamCodeGe * @param dependencyDbs Map * @throws Exception */ - public void generate(Context context, IndexIncrStatus incrStatus, boolean compilerAndPackage, Map dependencyDbs) throws Exception { + public void generate(Context context, IndexIncrStatus incrStatus, boolean compilerAndPackage, Map dependencyDbs) { generateDAOScript(context, dependencyDbs); generateIncrScript(context, incrStatus, compilerAndPackage, Collections.unmodifiableMap(indexStreamCodeGenerator.getDbTables()), false); } @@ -139,7 +140,7 @@ public void generateIncrScript(Context context, IndexIncrStatus incrStatus, bool } } - private void generateDAOScript(Context context, Map dependencyDbs) throws Exception { + private void generateDAOScript(Context context, Map dependencyDbs) { final Map> dbNameMap = Collections.unmodifiableMap(indexStreamCodeGenerator.getDbTables()); if (dbNameMap.size() < 1) { throw new IllegalStateException("dbNameMap size can not small than 1"); @@ -208,7 +209,7 @@ protected KoubeiProgressCallback getProgressCallback() { } } catch (Exception e) { // 将文件夹清空 - FileUtils.forceDelete(properties.getDaoDir()); + FileUtils.deleteQuietly(properties.getDaoDir()); throw new RuntimeException("dao path:" + properties.getDaoDir(), e); } } diff --git a/tis-scala-compiler/src/main/java/com/qlangtech/tis/compiler/streamcode/IndexStreamCodeGenerator.java b/tis-scala-compiler/src/main/java/com/qlangtech/tis/compiler/streamcode/IndexStreamCodeGenerator.java index cd89d777b..efd8f10b7 100644 --- a/tis-scala-compiler/src/main/java/com/qlangtech/tis/compiler/streamcode/IndexStreamCodeGenerator.java +++ b/tis-scala-compiler/src/main/java/com/qlangtech/tis/compiler/streamcode/IndexStreamCodeGenerator.java @@ -61,7 +61,7 @@ public class IndexStreamCodeGenerator { public IndexStreamCodeGenerator(String collection, IBasicAppSource streamIncrGenerateStrategy, long incrScriptTimestamp - , IDBTableNamesGetter dbTableNamesGetter) throws Exception { + , IDBTableNamesGetter dbTableNamesGetter) { if (StringUtils.isEmpty(collection)) { throw new IllegalArgumentException("argument collection can not be null"); } @@ -85,7 +85,7 @@ public void deleteScript() { FileUtils.deleteQuietly(streamScriptRootDir); } - private void initialize() throws Exception { + private void initialize() { // FullbuildWorkflowAction.getDataflowTopology(CoreAction.this, this.workFlow); // this.dfTopology = SqlTaskNodeMeta.getSqlDataFlowTopology(this.workflowName); // this.dbTables = getDependencyTables(dfTopology); diff --git a/tis-sql-parser/src/main/resources/com/qlangtech/tis/classtpl/flink_source_handle_rowdata_scala.vm b/tis-sql-parser/src/main/resources/com/qlangtech/tis/classtpl/flink_source_handle_rowdata_scala.vm index f694ba8a7..286c02417 100644 --- a/tis-sql-parser/src/main/resources/com/qlangtech/tis/classtpl/flink_source_handle_rowdata_scala.vm +++ b/tis-sql-parser/src/main/resources/com/qlangtech/tis/classtpl/flink_source_handle_rowdata_scala.vm @@ -15,8 +15,8 @@ class ${config.javaName}SourceHandle extends RowDataFlinkSourceHandle { #foreach($i in $config.dumpTables ) ## ${i.tabName}Stream.addSink(sinkFunction) - val ${i.from}Stream = streamMap.get("${i.to}") - sinkFunction.add2Sink("${i.to}" , ${i.from}Stream) + val ${i.to}Stream = streamMap.get("${i.to}") + sinkFunction.add2Sink("${i.to}" , ${i.to}Stream) #end } diff --git a/tis-web-config/config.properties b/tis-web-config/config.properties index 603901112..c4748bb41 100644 --- a/tis-web-config/config.properties +++ b/tis-web-config/config.properties @@ -30,8 +30,8 @@ tis.datasource.type=derby tis.datasource.dbname=tis_console_db -assemble.host=192.168.28.132 -tis.host=192.168.28.132 +assemble.host=192.168.28.171 +tis.host=192.168.28.171