Skip to content

Commit

Permalink
support percent display show when dataX execution
Browse files Browse the repository at this point in the history
  • Loading branch information
baisui1981 committed Apr 15, 2024
1 parent 3ad6010 commit d228180
Show file tree
Hide file tree
Showing 15 changed files with 196 additions and 782 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.google.common.collect.Lists;
import com.qlangtech.tis.assemble.FullbuildPhase;
import com.qlangtech.tis.exec.ExecutePhaseRange;
import com.qlangtech.tis.exec.impl.DefaultChainContext;
import com.qlangtech.tis.exec.impl.TrackableExecuteInterceptor;
import com.qlangtech.tis.fullbuild.phasestatus.PhaseStatusCollection;
import com.qlangtech.tis.log.RealtimeLoggerCollectorAppender;
Expand All @@ -32,7 +31,6 @@
import com.qlangtech.tis.rpc.grpc.log.stream.PExecuteState;
import com.qlangtech.tis.rpc.grpc.log.stream.PMonotorTarget;
import com.qlangtech.tis.rpc.grpc.log.stream.PPhaseStatusCollection;
import com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget;
import com.qlangtech.tis.trigger.jst.MonotorTarget;
import com.qlangtech.tis.trigger.socket.LogType;
import com.qlangtech.tis.utils.Utils;
Expand All @@ -41,7 +39,6 @@
import io.grpc.Status;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import org.apache.commons.lang.NotImplementedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -97,19 +94,14 @@ public void run() {
public static RegisterMonitorEventHook registerMonitorEventHook = new RegisterMonitorEventHook();

@Override
public void loadPhaseStatusFromLatest(PSynResTarget request, StreamObserver<PPhaseStatusCollection> responseObserver) {
boolean pipeline = request.getPipeline();
String targetResName = request.getName();
if (!pipeline) {
// 目前代码中还没有支持workflow资源的响应
throw new NotImplementedException("targetResName:" + targetResName + " must be pipeline");
}

PhaseStatusCollection statusCollection = DefaultChainContext.loadPhaseStatusFromLatest(targetResName);
public void loadPhaseStatus(PBuildPhaseStatusParam request, StreamObserver<PPhaseStatusCollection> responseObserver) {
PhaseStatusCollection statusCollection = IndexSwapTaskflowLauncher.loadPhaseStatusFromLocal((int) request.getTaskid());
logger.info("taskId:{} load relevant status from local persist is null:{}", request.getTaskid(), statusCollection != null);
responseObserver.onNext((statusCollection != null) ? LogCollectorClient.convertPP(statusCollection) : null);
responseObserver.onCompleted();
}


/**
* 监听执行日志,详细信息
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,23 @@
import com.qlangtech.tis.offline.DataxUtils;
import com.qlangtech.tis.offline.module.action.OfflineDatasourceAction;
import com.qlangtech.tis.order.center.IParamContext;
import com.qlangtech.tis.realtime.yarn.rpc.SynResTarget;
import com.qlangtech.tis.runtime.module.action.BasicModule;
import com.qlangtech.tis.workflow.pojo.WorkFlowBuildHistory;
import com.qlangtech.tis.workflow.pojo.WorkFlowBuildHistoryCriteria;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;

/**
* @author 百岁([email protected]
* @date 2017年9月30日
*/
public class FullbuildWorkflowAction extends BasicModule {

private static final Logger logger = LoggerFactory.getLogger(FullbuildWorkflowAction.class);
/**
*
*/
Expand Down Expand Up @@ -110,25 +112,36 @@ public void doCreateNewTask(Context context) {
*/
@Func(value = PermissionConstant.DATAFLOW_MANAGE, sideEffect = false)
public void doGetLatestSuccessWorkflow(Context context) {
String appName = this.getString(IFullBuildContext.KEY_APP_NAME);
if (StringUtils.isEmpty(appName)) {
throw new IllegalArgumentException("param appName can not be null");
}

WorkFlowBuildHistoryCriteria historyCriteria = new WorkFlowBuildHistoryCriteria();
historyCriteria.setOrderByClause("id desc");
historyCriteria.createCriteria()
.andAppNameEqualTo(appName).andStateEqualTo((byte) ExecResult.SUCCESS.getValue());
try {
String appName = this.getString(IFullBuildContext.KEY_APP_NAME);
if (StringUtils.isEmpty(appName)) {
throw new IllegalArgumentException("param appName can not be null");
}

List<WorkFlowBuildHistory> histories
= this.getWorkflowDAOFacade().getWorkFlowBuildHistoryDAO().selectByExample(historyCriteria, 1, 1);

for (WorkFlowBuildHistory buildHistory : histories) {
this.setBizResult(context, buildHistory);
return;
WorkFlowBuildHistory latestSuccessWorkflowHistory = this.getLatestSuccessWorkflowHistory(SynResTarget.pipeline(appName));
if (latestSuccessWorkflowHistory != null) {
this.setBizResult(context, latestSuccessWorkflowHistory);
return;
}
this.addErrorMessage(context, "can not find build history by appname:" + appName);
// WorkFlowBuildHistoryCriteria historyCriteria = new WorkFlowBuildHistoryCriteria();
// historyCriteria.setOrderByClause("id desc");
// historyCriteria.createCriteria()
// .andAppNameEqualTo(appName).andStateEqualTo((byte) ExecResult.SUCCESS.getValue());
//
// List<WorkFlowBuildHistory> histories
// = this.getWorkflowDAOFacade().getWorkFlowBuildHistoryDAO().selectByExample(historyCriteria, 1, 1);
//
// for (WorkFlowBuildHistory buildHistory : histories) {
// this.setBizResult(context, buildHistory);
// return;
// }


} finally {
// logger.info("doGetLatestSuccessWorkflow return ");
}

this.addErrorMessage(context, "can not find build history by appname:" + appName);
}

@Func(value = PermissionConstant.DATAFLOW_MANAGE, sideEffect = false)
Expand Down
9 changes: 1 addition & 8 deletions tis-hadoop-rpc/log-collector.proto
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,14 @@ service LogCollector {
/**
*取得最近一次成功的同步任务状态
*/
rpc LoadPhaseStatusFromLatest (PSynResTarget) returns (PPhaseStatusCollection) {
rpc LoadPhaseStatus (PBuildPhaseStatusParam) returns (PPhaseStatusCollection) {
}
}

message PBuildPhaseStatusParam {
uint64 taskid = 1;
}

message PSynResTarget {
string name = 1;
/**
* dataX pipeline OR transform workflow
*/
bool pipeline = 2;
}

message PPhaseStatusCollection {
PDumpPhaseStatus dumpPhase = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,35 +120,35 @@ com.qlangtech.tis.rpc.grpc.log.common.Empty> getInitTaskMethod() {
return getInitTaskMethod;
}

private static volatile io.grpc.MethodDescriptor<com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget,
com.qlangtech.tis.rpc.grpc.log.stream.PPhaseStatusCollection> getLoadPhaseStatusFromLatestMethod;
private static volatile io.grpc.MethodDescriptor<com.qlangtech.tis.rpc.grpc.log.stream.PBuildPhaseStatusParam,
com.qlangtech.tis.rpc.grpc.log.stream.PPhaseStatusCollection> getLoadPhaseStatusMethod;

@io.grpc.stub.annotations.RpcMethod(
fullMethodName = SERVICE_NAME + '/' + "LoadPhaseStatusFromLatest",
requestType = com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget.class,
fullMethodName = SERVICE_NAME + '/' + "LoadPhaseStatus",
requestType = com.qlangtech.tis.rpc.grpc.log.stream.PBuildPhaseStatusParam.class,
responseType = com.qlangtech.tis.rpc.grpc.log.stream.PPhaseStatusCollection.class,
methodType = io.grpc.MethodDescriptor.MethodType.UNARY)
public static io.grpc.MethodDescriptor<com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget,
com.qlangtech.tis.rpc.grpc.log.stream.PPhaseStatusCollection> getLoadPhaseStatusFromLatestMethod() {
io.grpc.MethodDescriptor<com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget, com.qlangtech.tis.rpc.grpc.log.stream.PPhaseStatusCollection> getLoadPhaseStatusFromLatestMethod;
if ((getLoadPhaseStatusFromLatestMethod = LogCollectorGrpc.getLoadPhaseStatusFromLatestMethod) == null) {
public static io.grpc.MethodDescriptor<com.qlangtech.tis.rpc.grpc.log.stream.PBuildPhaseStatusParam,
com.qlangtech.tis.rpc.grpc.log.stream.PPhaseStatusCollection> getLoadPhaseStatusMethod() {
io.grpc.MethodDescriptor<com.qlangtech.tis.rpc.grpc.log.stream.PBuildPhaseStatusParam, com.qlangtech.tis.rpc.grpc.log.stream.PPhaseStatusCollection> getLoadPhaseStatusMethod;
if ((getLoadPhaseStatusMethod = LogCollectorGrpc.getLoadPhaseStatusMethod) == null) {
synchronized (LogCollectorGrpc.class) {
if ((getLoadPhaseStatusFromLatestMethod = LogCollectorGrpc.getLoadPhaseStatusFromLatestMethod) == null) {
LogCollectorGrpc.getLoadPhaseStatusFromLatestMethod = getLoadPhaseStatusFromLatestMethod =
io.grpc.MethodDescriptor.<com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget, com.qlangtech.tis.rpc.grpc.log.stream.PPhaseStatusCollection>newBuilder()
if ((getLoadPhaseStatusMethod = LogCollectorGrpc.getLoadPhaseStatusMethod) == null) {
LogCollectorGrpc.getLoadPhaseStatusMethod = getLoadPhaseStatusMethod =
io.grpc.MethodDescriptor.<com.qlangtech.tis.rpc.grpc.log.stream.PBuildPhaseStatusParam, com.qlangtech.tis.rpc.grpc.log.stream.PPhaseStatusCollection>newBuilder()
.setType(io.grpc.MethodDescriptor.MethodType.UNARY)
.setFullMethodName(generateFullMethodName(SERVICE_NAME, "LoadPhaseStatusFromLatest"))
.setFullMethodName(generateFullMethodName(SERVICE_NAME, "LoadPhaseStatus"))
.setSampledToLocalTracing(true)
.setRequestMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget.getDefaultInstance()))
com.qlangtech.tis.rpc.grpc.log.stream.PBuildPhaseStatusParam.getDefaultInstance()))
.setResponseMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
com.qlangtech.tis.rpc.grpc.log.stream.PPhaseStatusCollection.getDefaultInstance()))
.setSchemaDescriptor(new LogCollectorMethodDescriptorSupplier("LoadPhaseStatusFromLatest"))
.setSchemaDescriptor(new LogCollectorMethodDescriptorSupplier("LoadPhaseStatus"))
.build();
}
}
}
return getLoadPhaseStatusFromLatestMethod;
return getLoadPhaseStatusMethod;
}

/**
Expand Down Expand Up @@ -235,9 +235,9 @@ public void initTask(com.qlangtech.tis.rpc.grpc.log.stream.PPhaseStatusCollectio
*取得最近一次成功的同步任务状态
* </pre>
*/
public void loadPhaseStatusFromLatest(com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget request,
public void loadPhaseStatus(com.qlangtech.tis.rpc.grpc.log.stream.PBuildPhaseStatusParam request,
io.grpc.stub.StreamObserver<com.qlangtech.tis.rpc.grpc.log.stream.PPhaseStatusCollection> responseObserver) {
asyncUnimplementedUnaryCall(getLoadPhaseStatusFromLatestMethod(), responseObserver);
asyncUnimplementedUnaryCall(getLoadPhaseStatusMethod(), responseObserver);
}

@java.lang.Override public final io.grpc.ServerServiceDefinition bindService() {
Expand All @@ -264,12 +264,12 @@ public void loadPhaseStatusFromLatest(com.qlangtech.tis.rpc.grpc.log.stream.PSyn
com.qlangtech.tis.rpc.grpc.log.common.Empty>(
this, METHODID_INIT_TASK)))
.addMethod(
getLoadPhaseStatusFromLatestMethod(),
getLoadPhaseStatusMethod(),
asyncUnaryCall(
new MethodHandlers<
com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget,
com.qlangtech.tis.rpc.grpc.log.stream.PBuildPhaseStatusParam,
com.qlangtech.tis.rpc.grpc.log.stream.PPhaseStatusCollection>(
this, METHODID_LOAD_PHASE_STATUS_FROM_LATEST)))
this, METHODID_LOAD_PHASE_STATUS)))
.build();
}
}
Expand Down Expand Up @@ -327,10 +327,10 @@ public void initTask(com.qlangtech.tis.rpc.grpc.log.stream.PPhaseStatusCollectio
*取得最近一次成功的同步任务状态
* </pre>
*/
public void loadPhaseStatusFromLatest(com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget request,
public void loadPhaseStatus(com.qlangtech.tis.rpc.grpc.log.stream.PBuildPhaseStatusParam request,
io.grpc.stub.StreamObserver<com.qlangtech.tis.rpc.grpc.log.stream.PPhaseStatusCollection> responseObserver) {
asyncUnaryCall(
getChannel().newCall(getLoadPhaseStatusFromLatestMethod(), getCallOptions()), request, responseObserver);
getChannel().newCall(getLoadPhaseStatusMethod(), getCallOptions()), request, responseObserver);
}
}

Expand Down Expand Up @@ -375,9 +375,9 @@ public com.qlangtech.tis.rpc.grpc.log.common.Empty initTask(com.qlangtech.tis.rp
*取得最近一次成功的同步任务状态
* </pre>
*/
public com.qlangtech.tis.rpc.grpc.log.stream.PPhaseStatusCollection loadPhaseStatusFromLatest(com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget request) {
public com.qlangtech.tis.rpc.grpc.log.stream.PPhaseStatusCollection loadPhaseStatus(com.qlangtech.tis.rpc.grpc.log.stream.PBuildPhaseStatusParam request) {
return blockingUnaryCall(
getChannel(), getLoadPhaseStatusFromLatestMethod(), getCallOptions(), request);
getChannel(), getLoadPhaseStatusMethod(), getCallOptions(), request);
}
}

Expand Down Expand Up @@ -412,16 +412,16 @@ public com.google.common.util.concurrent.ListenableFuture<com.qlangtech.tis.rpc.
*取得最近一次成功的同步任务状态
* </pre>
*/
public com.google.common.util.concurrent.ListenableFuture<com.qlangtech.tis.rpc.grpc.log.stream.PPhaseStatusCollection> loadPhaseStatusFromLatest(
com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget request) {
public com.google.common.util.concurrent.ListenableFuture<com.qlangtech.tis.rpc.grpc.log.stream.PPhaseStatusCollection> loadPhaseStatus(
com.qlangtech.tis.rpc.grpc.log.stream.PBuildPhaseStatusParam request) {
return futureUnaryCall(
getChannel().newCall(getLoadPhaseStatusFromLatestMethod(), getCallOptions()), request);
getChannel().newCall(getLoadPhaseStatusMethod(), getCallOptions()), request);
}
}

private static final int METHODID_BUILD_PHRASE_STATUS = 0;
private static final int METHODID_INIT_TASK = 1;
private static final int METHODID_LOAD_PHASE_STATUS_FROM_LATEST = 2;
private static final int METHODID_LOAD_PHASE_STATUS = 2;
private static final int METHODID_REGISTER_MONITOR_EVENT = 3;

private static final class MethodHandlers<Req, Resp> implements
Expand Down Expand Up @@ -449,8 +449,8 @@ public void invoke(Req request, io.grpc.stub.StreamObserver<Resp> responseObserv
serviceImpl.initTask((com.qlangtech.tis.rpc.grpc.log.stream.PPhaseStatusCollection) request,
(io.grpc.stub.StreamObserver<com.qlangtech.tis.rpc.grpc.log.common.Empty>) responseObserver);
break;
case METHODID_LOAD_PHASE_STATUS_FROM_LATEST:
serviceImpl.loadPhaseStatusFromLatest((com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget) request,
case METHODID_LOAD_PHASE_STATUS:
serviceImpl.loadPhaseStatus((com.qlangtech.tis.rpc.grpc.log.stream.PBuildPhaseStatusParam) request,
(io.grpc.stub.StreamObserver<com.qlangtech.tis.rpc.grpc.log.stream.PPhaseStatusCollection>) responseObserver);
break;
default:
Expand Down Expand Up @@ -520,7 +520,7 @@ public static io.grpc.ServiceDescriptor getServiceDescriptor() {
.addMethod(getRegisterMonitorEventMethod())
.addMethod(getBuildPhraseStatusMethod())
.addMethod(getInitTaskMethod())
.addMethod(getLoadPhaseStatusFromLatestMethod())
.addMethod(getLoadPhaseStatusMethod())
.build();
}
}
Expand Down

0 comments on commit d228180

Please sign in to comment.