Skip to content

Commit

Permalink
fix powerjob server deploy in TIS
Browse files Browse the repository at this point in the history
  • Loading branch information
baisui1981 committed Mar 12, 2024
1 parent 7c29748 commit ea7711c
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -733,8 +733,8 @@ public void visit(FlinkJobDeploymentDetails details) {

return new ILaunchingOrchestrate() {
@Override
public List<ExecuteStep<FlinkJobDeployDTO, SubJobExec<FlinkJobDeployDTO>>> getExecuteSteps() {
List<ExecuteStep<FlinkJobDeployDTO, SubJobExec<FlinkJobDeployDTO>>> launchSteps = Lists.newArrayList();
public List<ExecuteStep<FlinkJobDeployDTO>> getExecuteSteps() {
List<ExecuteStep<FlinkJobDeployDTO>> launchSteps = Lists.newArrayList();
for (SubJobResName rcRes : flinkDeployRes) {
launchSteps.add(new ExecuteStep(rcRes, null));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package com.qlangtech.tis.datax.job;

import com.qlangtech.tis.datax.job.JobResName.IJobExec;
import org.apache.commons.collections.CollectionUtils;

import java.util.List;
Expand All @@ -33,7 +32,7 @@
**/
public interface ILaunchingOrchestrate<T> {

public List<ExecuteStep<T, ?>> getExecuteSteps();
public List<ExecuteStep<T>> getExecuteSteps();

public default ExecuteSteps createExecuteSteps(Object owner) {
return new ExecuteSteps(owner, this.getExecuteSteps().stream().collect(Collectors.toList()));
Expand All @@ -56,30 +55,30 @@ public ExecuteSteps(Object owner, List<ExecuteStep> executeSteps) {
}
}

public class ExecuteStep<T, EXEC extends IJobExec<T>> extends SubJobMilestone {
private final JobResName<T, EXEC> subJob;
public class ExecuteStep<T> extends SubJobMilestone {
private final JobResName<T> subJob;

public ExecuteStep(JobResName<T, EXEC> resName, String describe) {
public ExecuteStep(JobResName<T> resName, String describe) {
this(resName, describe, false, false);
}

public ExecuteStep(JobResName<T, EXEC> name, String describe, boolean complete, boolean success) {
public ExecuteStep(JobResName<T> name, String describe, boolean complete, boolean success) {
super(name.getName(), describe, complete, success);
this.subJob = name;
}

public JobResName<T, EXEC> getSubJob() {
public JobResName<T> getSubJob() {
return this.subJob;
}

public ExecuteStep<T, EXEC> copy(SubJobMilestone subJobStone) {
ExecuteStep<T, EXEC> step = new ExecuteStep<T, EXEC>(subJob
public ExecuteStep<T> copy(SubJobMilestone subJobStone) {
ExecuteStep<T> step = new ExecuteStep<T>(subJob
, this.getDescribe(), subJobStone.isComplete(), subJobStone.isSuccess());
return step;
}

public ExecuteStep<T, EXEC> copy() {
return new ExecuteStep<T, EXEC>(subJob, this.getDescribe(), this.isComplete(), this.isSuccess());
public ExecuteStep<T> copy() {
return new ExecuteStep<T>(subJob, this.getDescribe(), this.isComplete(), this.isSuccess());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,21 @@

import com.qlangtech.tis.coredefine.module.action.TargetResName;
import com.qlangtech.tis.datax.TimeFormat;
import com.qlangtech.tis.datax.job.JobResName.IJobExec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.function.Consumer;

/**
* @author: 百岁([email protected]
* @create: 2023-12-29 10:01
**/
public abstract class JobResName<T, EXEC extends IJobExec<T>> extends TargetResName {
private final EXEC jobExec;
public abstract class JobResName<T> extends TargetResName {
// private final EXEC jobExec;
private static final Logger logger = LoggerFactory.getLogger(JobResName.class);

public JobResName(String name, EXEC jobExec) {
public JobResName(String name //, EXEC jobExec
) {
super(name);
this.jobExec = jobExec;
// this.jobExec = jobExec;
}


Expand Down Expand Up @@ -70,7 +68,7 @@ public final void execSubJob(T t) throws Exception {
boolean success = false;
try {
sse.info(this.getName(), TimeFormat.getCurrentTimeStamp(), "〇〇 start to publish " + this.getResourceType() + "'" + this.getName() + "'");
this.execute(sse, t, this.jobExec);
this.execute(sse, t);
success = true;
sse.info(this.getName(), TimeFormat.getCurrentTimeStamp(), "✔✔ successful to publish " + this.getResourceType() + "'" + this.getName() + "'");
} catch (Exception e) {
Expand All @@ -84,7 +82,7 @@ public final void execSubJob(T t) throws Exception {
}
}

protected abstract void execute(SSERunnable sse, T t, EXEC jobExec) throws Exception;
protected abstract void execute(SSERunnable sse, T t) throws Exception;
// if (jobExec instanceof SubJobExec) {
// ((OwnerJobExec) jobExec).accept(t);
// } else {
Expand All @@ -94,15 +92,12 @@ public final void execSubJob(T t) throws Exception {

protected abstract String getResourceType();

public interface IJobExec<T> {

}

public interface OwnerJobExec<T, RESULT> extends IJobExec<T> {
public interface OwnerJobExec<T, RESULT> {
public RESULT accept(T t) throws Exception;
}

public interface SubJobExec<T> extends IJobExec<T> {
public interface SubJobExec<T> {
public void accept(T t) throws Exception;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,23 @@

package com.qlangtech.tis.datax.job;

import com.qlangtech.tis.datax.job.JobResName.OwnerJobExec;

import java.util.Objects;

/**
* @author: 百岁([email protected]
* @create: 2024-03-11 13:18
**/
public abstract class OwnerJobResName<T, RESULT> extends JobResName<T, OwnerJobExec<T, RESULT>> {
public abstract class OwnerJobResName<T, RESULT> extends JobResName<T> {

private final OwnerJobExec<T, RESULT> jobExec;

public OwnerJobResName(String name, OwnerJobExec<T, RESULT> jobExec) {
super(name, jobExec);
super(name);
this.jobExec = jobExec;
}

@Override
protected final void execute(SSERunnable sse, T t, OwnerJobExec<T, RESULT> jobExec) throws Exception {
protected final void execute(SSERunnable sse, T t) throws Exception {
RESULT result = jobExec.accept(t);
if (result != null) {
sse.setContextAttr(SSEExecuteOwner.class, new SSEExecuteOwner(result));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,21 @@

package com.qlangtech.tis.datax.job;

import com.qlangtech.tis.datax.job.JobResName.SubJobExec;

/**
* @author: 百岁([email protected]
* @create: 2024-03-11 12:37
**/
public abstract class SubJobResName<T> extends JobResName<T, SubJobExec<T>> {
public abstract class SubJobResName<T> extends JobResName<T> {

private final SubJobExec<T> jobExec;

SubJobResName(String name, SubJobExec<T> jobExec) {
super(name, jobExec);
protected SubJobResName(String name, SubJobExec<T> jobExec) {
super(name);
this.jobExec = jobExec;
}

@Override
protected void execute(SSERunnable sse, T t, SubJobExec<T> jobExec) throws Exception {
protected void execute(SSERunnable sse, T t) throws Exception {
jobExec.accept(t);
}
}
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http:https://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.qlangtech.tis.log;
Expand All @@ -22,47 +22,58 @@
import ch.qos.logback.classic.PatternLayout;
import ch.qos.logback.core.UnsynchronizedAppenderBase;
import com.qlangtech.tis.datax.job.SSERunnable;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.LineIterator;

import java.io.StringReader;

/**
* @author: 百岁([email protected]
* @create: 2024-02-09 10:00
**/
public class WaittingProcessCollectorAppender extends UnsynchronizedAppenderBase<ch.qos.logback.classic.spi.LoggingEvent> {
private PatternLayout layout;
private PatternLayout layout;

@Override
public void start() {
this.layout = new PatternLayout();
this.layout.setPattern("%msg%n");
this.layout.setContext(this.getContext());
super.start();
this.layout.start();
}

@Override
public void start() {
this.layout = new PatternLayout();
this.layout.setPattern("%msg%n");
this.layout.setContext(this.getContext());
super.start();
this.layout.start();
}
@Override
protected void append(ch.qos.logback.classic.spi.LoggingEvent e) {
// System.out.println(e.getClass());

@Override
protected void append(ch.qos.logback.classic.spi.LoggingEvent e) {
// System.out.println(e.getClass());
// try (BufferedReader msgReader = new BufferedReader(new StringReader(e.getFormattedMessage()))) {

// try (BufferedReader msgReader = new BufferedReader(new StringReader(e.getFormattedMessage()))) {
Level level = null;
if (SSERunnable.sseAware()) {
SSERunnable sse = SSERunnable.getLocal();
level = e.getLevel();
if (level.isGreaterOrEqual(Level.ERROR)) {

Level level = null;
if (SSERunnable.sseAware()) {
SSERunnable sse = SSERunnable.getLocal();
level = e.getLevel();
if (level.isGreaterOrEqual(Level.ERROR)) {
sse.error(null, e.getTimeStamp(), this.layout.doLayout(e));
return;
}
try (LineIterator lines = IOUtils.lineIterator(new StringReader(this.layout.doLayout(e)))) {
while (lines.hasNext()) {
sse.error(null, e.getTimeStamp(), lines.next());
}
} catch (Exception ex) {
throw new RuntimeException(ex);
}
return;
}

if (level.isGreaterOrEqual(Level.DEBUG)) {
sse.info(null, e.getTimeStamp(), e.getFormattedMessage());
return;
}
if (level.isGreaterOrEqual(Level.DEBUG)) {
sse.info(null, e.getTimeStamp(), e.getFormattedMessage());
return;
}


throw new IllegalStateException("unhandler error level:" + level + " msg:" + e.getFormattedMessage());
throw new IllegalStateException("unhandler error level:" + level + " msg:" + e.getFormattedMessage());
}
}
}


}

0 comments on commit ea7711c

Please sign in to comment.