From ea7711cd97165280595e72669e3232c57a0e14bf Mon Sep 17 00:00:00 2001 From: mozhenghua Date: Tue, 12 Mar 2024 11:55:26 +0800 Subject: [PATCH] fix powerjob server deploy in TIS --- .../coredefine/module/action/CoreAction.java | 4 +- .../tis/datax/job/ILaunchingOrchestrate.java | 21 ++-- .../qlangtech/tis/datax/job/JobResName.java | 23 ++--- .../tis/datax/job/OwnerJobResName.java | 12 ++- .../tis/datax/job/SubJobResName.java | 13 +-- .../log/WaittingProcessCollectorAppender.java | 95 +++++++++++-------- 6 files changed, 88 insertions(+), 80 deletions(-) diff --git a/tis-console/src/main/java/com/qlangtech/tis/coredefine/module/action/CoreAction.java b/tis-console/src/main/java/com/qlangtech/tis/coredefine/module/action/CoreAction.java index b38d76c2e..b997c989a 100644 --- a/tis-console/src/main/java/com/qlangtech/tis/coredefine/module/action/CoreAction.java +++ b/tis-console/src/main/java/com/qlangtech/tis/coredefine/module/action/CoreAction.java @@ -733,8 +733,8 @@ public void visit(FlinkJobDeploymentDetails details) { return new ILaunchingOrchestrate() { @Override - public List>> getExecuteSteps() { - List>> launchSteps = Lists.newArrayList(); + public List> getExecuteSteps() { + List> launchSteps = Lists.newArrayList(); for (SubJobResName rcRes : flinkDeployRes) { launchSteps.add(new ExecuteStep(rcRes, null)); } diff --git a/tis-plugin/src/main/java/com/qlangtech/tis/datax/job/ILaunchingOrchestrate.java b/tis-plugin/src/main/java/com/qlangtech/tis/datax/job/ILaunchingOrchestrate.java index 808298ed8..3b6365fc8 100644 --- a/tis-plugin/src/main/java/com/qlangtech/tis/datax/job/ILaunchingOrchestrate.java +++ b/tis-plugin/src/main/java/com/qlangtech/tis/datax/job/ILaunchingOrchestrate.java @@ -18,7 +18,6 @@ package com.qlangtech.tis.datax.job; -import com.qlangtech.tis.datax.job.JobResName.IJobExec; import org.apache.commons.collections.CollectionUtils; import java.util.List; @@ -33,7 +32,7 @@ **/ public interface ILaunchingOrchestrate { - public List> getExecuteSteps(); + public List> getExecuteSteps(); public default ExecuteSteps createExecuteSteps(Object owner) { return new ExecuteSteps(owner, this.getExecuteSteps().stream().collect(Collectors.toList())); @@ -56,30 +55,30 @@ public ExecuteSteps(Object owner, List executeSteps) { } } - public class ExecuteStep> extends SubJobMilestone { - private final JobResName subJob; + public class ExecuteStep extends SubJobMilestone { + private final JobResName subJob; - public ExecuteStep(JobResName resName, String describe) { + public ExecuteStep(JobResName resName, String describe) { this(resName, describe, false, false); } - public ExecuteStep(JobResName name, String describe, boolean complete, boolean success) { + public ExecuteStep(JobResName name, String describe, boolean complete, boolean success) { super(name.getName(), describe, complete, success); this.subJob = name; } - public JobResName getSubJob() { + public JobResName getSubJob() { return this.subJob; } - public ExecuteStep copy(SubJobMilestone subJobStone) { - ExecuteStep step = new ExecuteStep(subJob + public ExecuteStep copy(SubJobMilestone subJobStone) { + ExecuteStep step = new ExecuteStep(subJob , this.getDescribe(), subJobStone.isComplete(), subJobStone.isSuccess()); return step; } - public ExecuteStep copy() { - return new ExecuteStep(subJob, this.getDescribe(), this.isComplete(), this.isSuccess()); + public ExecuteStep copy() { + return new ExecuteStep(subJob, this.getDescribe(), this.isComplete(), this.isSuccess()); } } diff --git a/tis-plugin/src/main/java/com/qlangtech/tis/datax/job/JobResName.java b/tis-plugin/src/main/java/com/qlangtech/tis/datax/job/JobResName.java index a8f15f141..09ce432ef 100644 --- a/tis-plugin/src/main/java/com/qlangtech/tis/datax/job/JobResName.java +++ b/tis-plugin/src/main/java/com/qlangtech/tis/datax/job/JobResName.java @@ -20,23 +20,21 @@ import com.qlangtech.tis.coredefine.module.action.TargetResName; import com.qlangtech.tis.datax.TimeFormat; -import com.qlangtech.tis.datax.job.JobResName.IJobExec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.function.Consumer; - /** * @author: 百岁(baisui@qlangtech.com) * @create: 2023-12-29 10:01 **/ -public abstract class JobResName> extends TargetResName { - private final EXEC jobExec; +public abstract class JobResName extends TargetResName { + // private final EXEC jobExec; private static final Logger logger = LoggerFactory.getLogger(JobResName.class); - public JobResName(String name, EXEC jobExec) { + public JobResName(String name //, EXEC jobExec + ) { super(name); - this.jobExec = jobExec; + // this.jobExec = jobExec; } @@ -70,7 +68,7 @@ public final void execSubJob(T t) throws Exception { boolean success = false; try { sse.info(this.getName(), TimeFormat.getCurrentTimeStamp(), "〇〇 start to publish " + this.getResourceType() + "'" + this.getName() + "'"); - this.execute(sse, t, this.jobExec); + this.execute(sse, t); success = true; sse.info(this.getName(), TimeFormat.getCurrentTimeStamp(), "✔✔ successful to publish " + this.getResourceType() + "'" + this.getName() + "'"); } catch (Exception e) { @@ -84,7 +82,7 @@ public final void execSubJob(T t) throws Exception { } } - protected abstract void execute(SSERunnable sse, T t, EXEC jobExec) throws Exception; + protected abstract void execute(SSERunnable sse, T t) throws Exception; // if (jobExec instanceof SubJobExec) { // ((OwnerJobExec) jobExec).accept(t); // } else { @@ -94,15 +92,12 @@ public final void execSubJob(T t) throws Exception { protected abstract String getResourceType(); - public interface IJobExec { - - } - public interface OwnerJobExec extends IJobExec { + public interface OwnerJobExec { public RESULT accept(T t) throws Exception; } - public interface SubJobExec extends IJobExec { + public interface SubJobExec { public void accept(T t) throws Exception; } diff --git a/tis-plugin/src/main/java/com/qlangtech/tis/datax/job/OwnerJobResName.java b/tis-plugin/src/main/java/com/qlangtech/tis/datax/job/OwnerJobResName.java index f38b5ec97..1d623cd67 100644 --- a/tis-plugin/src/main/java/com/qlangtech/tis/datax/job/OwnerJobResName.java +++ b/tis-plugin/src/main/java/com/qlangtech/tis/datax/job/OwnerJobResName.java @@ -18,21 +18,23 @@ package com.qlangtech.tis.datax.job; -import com.qlangtech.tis.datax.job.JobResName.OwnerJobExec; - import java.util.Objects; /** * @author: 百岁(baisui@qlangtech.com) * @create: 2024-03-11 13:18 **/ -public abstract class OwnerJobResName extends JobResName> { +public abstract class OwnerJobResName extends JobResName { + + private final OwnerJobExec jobExec; + public OwnerJobResName(String name, OwnerJobExec jobExec) { - super(name, jobExec); + super(name); + this.jobExec = jobExec; } @Override - protected final void execute(SSERunnable sse, T t, OwnerJobExec jobExec) throws Exception { + protected final void execute(SSERunnable sse, T t) throws Exception { RESULT result = jobExec.accept(t); if (result != null) { sse.setContextAttr(SSEExecuteOwner.class, new SSEExecuteOwner(result)); diff --git a/tis-plugin/src/main/java/com/qlangtech/tis/datax/job/SubJobResName.java b/tis-plugin/src/main/java/com/qlangtech/tis/datax/job/SubJobResName.java index 343581985..0888f050a 100644 --- a/tis-plugin/src/main/java/com/qlangtech/tis/datax/job/SubJobResName.java +++ b/tis-plugin/src/main/java/com/qlangtech/tis/datax/job/SubJobResName.java @@ -18,20 +18,21 @@ package com.qlangtech.tis.datax.job; -import com.qlangtech.tis.datax.job.JobResName.SubJobExec; - /** * @author: 百岁(baisui@qlangtech.com) * @create: 2024-03-11 12:37 **/ -public abstract class SubJobResName extends JobResName> { +public abstract class SubJobResName extends JobResName { + + private final SubJobExec jobExec; - SubJobResName(String name, SubJobExec jobExec) { - super(name, jobExec); + protected SubJobResName(String name, SubJobExec jobExec) { + super(name); + this.jobExec = jobExec; } @Override - protected void execute(SSERunnable sse, T t, SubJobExec jobExec) throws Exception { + protected void execute(SSERunnable sse, T t) throws Exception { jobExec.accept(t); } } diff --git a/tis-web-start/src/main/java/com/qlangtech/tis/log/WaittingProcessCollectorAppender.java b/tis-web-start/src/main/java/com/qlangtech/tis/log/WaittingProcessCollectorAppender.java index 1ee80a800..44a523b06 100644 --- a/tis-web-start/src/main/java/com/qlangtech/tis/log/WaittingProcessCollectorAppender.java +++ b/tis-web-start/src/main/java/com/qlangtech/tis/log/WaittingProcessCollectorAppender.java @@ -1,19 +1,19 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package com.qlangtech.tis.log; @@ -22,47 +22,58 @@ import ch.qos.logback.classic.PatternLayout; import ch.qos.logback.core.UnsynchronizedAppenderBase; import com.qlangtech.tis.datax.job.SSERunnable; +import org.apache.commons.io.IOUtils; +import org.apache.commons.io.LineIterator; + +import java.io.StringReader; /** * @author: 百岁(baisui@qlangtech.com) * @create: 2024-02-09 10:00 **/ public class WaittingProcessCollectorAppender extends UnsynchronizedAppenderBase { - private PatternLayout layout; + private PatternLayout layout; + + @Override + public void start() { + this.layout = new PatternLayout(); + this.layout.setPattern("%msg%n"); + this.layout.setContext(this.getContext()); + super.start(); + this.layout.start(); + } - @Override - public void start() { - this.layout = new PatternLayout(); - this.layout.setPattern("%msg%n"); - this.layout.setContext(this.getContext()); - super.start(); - this.layout.start(); - } + @Override + protected void append(ch.qos.logback.classic.spi.LoggingEvent e) { + // System.out.println(e.getClass()); - @Override - protected void append(ch.qos.logback.classic.spi.LoggingEvent e) { - // System.out.println(e.getClass()); + // try (BufferedReader msgReader = new BufferedReader(new StringReader(e.getFormattedMessage()))) { - // try (BufferedReader msgReader = new BufferedReader(new StringReader(e.getFormattedMessage()))) { + Level level = null; + if (SSERunnable.sseAware()) { + SSERunnable sse = SSERunnable.getLocal(); + level = e.getLevel(); + if (level.isGreaterOrEqual(Level.ERROR)) { - Level level = null; - if (SSERunnable.sseAware()) { - SSERunnable sse = SSERunnable.getLocal(); - level = e.getLevel(); - if (level.isGreaterOrEqual(Level.ERROR)) { - sse.error(null, e.getTimeStamp(), this.layout.doLayout(e)); - return; - } + try (LineIterator lines = IOUtils.lineIterator(new StringReader(this.layout.doLayout(e)))) { + while (lines.hasNext()) { + sse.error(null, e.getTimeStamp(), lines.next()); + } + } catch (Exception ex) { + throw new RuntimeException(ex); + } + return; + } - if (level.isGreaterOrEqual(Level.DEBUG)) { - sse.info(null, e.getTimeStamp(), e.getFormattedMessage()); - return; - } + if (level.isGreaterOrEqual(Level.DEBUG)) { + sse.info(null, e.getTimeStamp(), e.getFormattedMessage()); + return; + } - throw new IllegalStateException("unhandler error level:" + level + " msg:" + e.getFormattedMessage()); + throw new IllegalStateException("unhandler error level:" + level + " msg:" + e.getFormattedMessage()); + } } - } }