Skip to content

Commit

Permalink
support percent display show when dataX execution
Browse files Browse the repository at this point in the history
  • Loading branch information
baisui1981 committed Apr 15, 2024
1 parent 59f1edb commit 37ed518
Show file tree
Hide file tree
Showing 9 changed files with 877 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.common.collect.Lists;
import com.qlangtech.tis.assemble.FullbuildPhase;
import com.qlangtech.tis.exec.ExecutePhaseRange;
import com.qlangtech.tis.exec.impl.DefaultChainContext;
import com.qlangtech.tis.exec.impl.TrackableExecuteInterceptor;
import com.qlangtech.tis.fullbuild.phasestatus.PhaseStatusCollection;
import com.qlangtech.tis.log.RealtimeLoggerCollectorAppender;
Expand All @@ -31,6 +32,7 @@
import com.qlangtech.tis.rpc.grpc.log.stream.PExecuteState;
import com.qlangtech.tis.rpc.grpc.log.stream.PMonotorTarget;
import com.qlangtech.tis.rpc.grpc.log.stream.PPhaseStatusCollection;
import com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget;
import com.qlangtech.tis.trigger.jst.MonotorTarget;
import com.qlangtech.tis.trigger.socket.LogType;
import com.qlangtech.tis.utils.Utils;
Expand All @@ -39,6 +41,7 @@
import io.grpc.Status;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import org.apache.commons.lang.NotImplementedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -93,6 +96,21 @@ public void run() {

public static RegisterMonitorEventHook registerMonitorEventHook = new RegisterMonitorEventHook();

@Override
public void loadPhaseStatusFromLatest(PSynResTarget request, StreamObserver<PPhaseStatusCollection> responseObserver) {
// super.loadPhaseStatusFromLatest(request, responseObserver);
boolean pipeline = request.getPipeline();
String targetResName = request.getName();
if (!pipeline) {
// 目前代码中还没有支持workflow资源的响应
throw new NotImplementedException("targetResName:" + targetResName + " must be pipeline");
}

PhaseStatusCollection statusCollection = DefaultChainContext.loadPhaseStatusFromLatest(targetResName);
responseObserver.onNext(statusCollection != null ? LogCollectorClient.convertPP(statusCollection) : null);
responseObserver.onCompleted();
}

/**
* 监听执行日志,详细信息
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,15 @@
import com.qlangtech.tis.fullbuild.phasestatus.impl.IndexBackFlowPhaseStatus;
import com.qlangtech.tis.fullbuild.phasestatus.impl.JoinPhaseStatus;
import com.qlangtech.tis.rpc.grpc.log.common.JoinTaskStatus;
import com.qlangtech.tis.rpc.grpc.log.stream.*;
import com.qlangtech.tis.rpc.grpc.log.stream.LogCollectorGrpc;
import com.qlangtech.tis.rpc.grpc.log.stream.PBuildPhaseStatus;
import com.qlangtech.tis.rpc.grpc.log.stream.PBuildPhaseStatusParam;
import com.qlangtech.tis.rpc.grpc.log.stream.PDumpPhaseStatus;
import com.qlangtech.tis.rpc.grpc.log.stream.PExecuteState;
import com.qlangtech.tis.rpc.grpc.log.stream.PIndexBackFlowPhaseStatus;
import com.qlangtech.tis.rpc.grpc.log.stream.PJoinPhaseStatus;
import com.qlangtech.tis.rpc.grpc.log.stream.PMonotorTarget;
import com.qlangtech.tis.rpc.grpc.log.stream.PPhaseStatusCollection;
import com.qlangtech.tis.trigger.jst.ILogListener;
import com.qlangtech.tis.trigger.jst.MonotorTarget;
import com.qlangtech.tis.trigger.socket.ExecuteState;
Expand All @@ -43,6 +51,7 @@
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.Objects;

/**
* @author 百岁([email protected]
Expand Down Expand Up @@ -330,8 +339,7 @@ public java.util.Iterator<com.qlangtech.tis.rpc.grpc.log.stream.PPhaseStatusColl

public static PhaseStatusCollection convert(PPhaseStatusCollection stat, ExecutePhaseRange executePhaseRange) {


PDumpPhaseStatus dumpPhase = stat.getDumpPhase();
PDumpPhaseStatus dumpPhase = Objects.requireNonNull(stat, "param stat can not be null").getDumpPhase();
PJoinPhaseStatus joinPhase = stat.getJoinPhase();
PBuildPhaseStatus buildPhase = stat.getBuildPhase();
PIndexBackFlowPhaseStatus backflow = stat.getIndexBackFlowPhaseStatus();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,37 @@ com.qlangtech.tis.rpc.grpc.log.common.Empty> getInitTaskMethod() {
return getInitTaskMethod;
}

private static volatile io.grpc.MethodDescriptor<com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget,
com.qlangtech.tis.rpc.grpc.log.stream.PPhaseStatusCollection> getLoadPhaseStatusFromLatestMethod;

@io.grpc.stub.annotations.RpcMethod(
fullMethodName = SERVICE_NAME + '/' + "LoadPhaseStatusFromLatest",
requestType = com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget.class,
responseType = com.qlangtech.tis.rpc.grpc.log.stream.PPhaseStatusCollection.class,
methodType = io.grpc.MethodDescriptor.MethodType.UNARY)
public static io.grpc.MethodDescriptor<com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget,
com.qlangtech.tis.rpc.grpc.log.stream.PPhaseStatusCollection> getLoadPhaseStatusFromLatestMethod() {
io.grpc.MethodDescriptor<com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget, com.qlangtech.tis.rpc.grpc.log.stream.PPhaseStatusCollection> getLoadPhaseStatusFromLatestMethod;
if ((getLoadPhaseStatusFromLatestMethod = LogCollectorGrpc.getLoadPhaseStatusFromLatestMethod) == null) {
synchronized (LogCollectorGrpc.class) {
if ((getLoadPhaseStatusFromLatestMethod = LogCollectorGrpc.getLoadPhaseStatusFromLatestMethod) == null) {
LogCollectorGrpc.getLoadPhaseStatusFromLatestMethod = getLoadPhaseStatusFromLatestMethod =
io.grpc.MethodDescriptor.<com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget, com.qlangtech.tis.rpc.grpc.log.stream.PPhaseStatusCollection>newBuilder()
.setType(io.grpc.MethodDescriptor.MethodType.UNARY)
.setFullMethodName(generateFullMethodName(SERVICE_NAME, "LoadPhaseStatusFromLatest"))
.setSampledToLocalTracing(true)
.setRequestMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget.getDefaultInstance()))
.setResponseMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
com.qlangtech.tis.rpc.grpc.log.stream.PPhaseStatusCollection.getDefaultInstance()))
.setSchemaDescriptor(new LogCollectorMethodDescriptorSupplier("LoadPhaseStatusFromLatest"))
.build();
}
}
}
return getLoadPhaseStatusFromLatestMethod;
}

/**
* Creates a new async stub that supports all call types for the service
*/
Expand Down Expand Up @@ -198,6 +229,17 @@ public void initTask(com.qlangtech.tis.rpc.grpc.log.stream.PPhaseStatusCollectio
asyncUnimplementedUnaryCall(getInitTaskMethod(), responseObserver);
}

/**
* <pre>
**
*取得最近一次成功的同步任务状态
* </pre>
*/
public void loadPhaseStatusFromLatest(com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget request,
io.grpc.stub.StreamObserver<com.qlangtech.tis.rpc.grpc.log.stream.PPhaseStatusCollection> responseObserver) {
asyncUnimplementedUnaryCall(getLoadPhaseStatusFromLatestMethod(), responseObserver);
}

@java.lang.Override public final io.grpc.ServerServiceDefinition bindService() {
return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor())
.addMethod(
Expand All @@ -221,6 +263,13 @@ public void initTask(com.qlangtech.tis.rpc.grpc.log.stream.PPhaseStatusCollectio
com.qlangtech.tis.rpc.grpc.log.stream.PPhaseStatusCollection,
com.qlangtech.tis.rpc.grpc.log.common.Empty>(
this, METHODID_INIT_TASK)))
.addMethod(
getLoadPhaseStatusFromLatestMethod(),
asyncUnaryCall(
new MethodHandlers<
com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget,
com.qlangtech.tis.rpc.grpc.log.stream.PPhaseStatusCollection>(
this, METHODID_LOAD_PHASE_STATUS_FROM_LATEST)))
.build();
}
}
Expand Down Expand Up @@ -271,6 +320,18 @@ public void initTask(com.qlangtech.tis.rpc.grpc.log.stream.PPhaseStatusCollectio
asyncUnaryCall(
getChannel().newCall(getInitTaskMethod(), getCallOptions()), request, responseObserver);
}

/**
* <pre>
**
*取得最近一次成功的同步任务状态
* </pre>
*/
public void loadPhaseStatusFromLatest(com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget request,
io.grpc.stub.StreamObserver<com.qlangtech.tis.rpc.grpc.log.stream.PPhaseStatusCollection> responseObserver) {
asyncUnaryCall(
getChannel().newCall(getLoadPhaseStatusFromLatestMethod(), getCallOptions()), request, responseObserver);
}
}

/**
Expand Down Expand Up @@ -307,6 +368,17 @@ public com.qlangtech.tis.rpc.grpc.log.common.Empty initTask(com.qlangtech.tis.rp
return blockingUnaryCall(
getChannel(), getInitTaskMethod(), getCallOptions(), request);
}

/**
* <pre>
**
*取得最近一次成功的同步任务状态
* </pre>
*/
public com.qlangtech.tis.rpc.grpc.log.stream.PPhaseStatusCollection loadPhaseStatusFromLatest(com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget request) {
return blockingUnaryCall(
getChannel(), getLoadPhaseStatusFromLatestMethod(), getCallOptions(), request);
}
}

/**
Expand All @@ -333,11 +405,24 @@ public com.google.common.util.concurrent.ListenableFuture<com.qlangtech.tis.rpc.
return futureUnaryCall(
getChannel().newCall(getInitTaskMethod(), getCallOptions()), request);
}

/**
* <pre>
**
*取得最近一次成功的同步任务状态
* </pre>
*/
public com.google.common.util.concurrent.ListenableFuture<com.qlangtech.tis.rpc.grpc.log.stream.PPhaseStatusCollection> loadPhaseStatusFromLatest(
com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget request) {
return futureUnaryCall(
getChannel().newCall(getLoadPhaseStatusFromLatestMethod(), getCallOptions()), request);
}
}

private static final int METHODID_BUILD_PHRASE_STATUS = 0;
private static final int METHODID_INIT_TASK = 1;
private static final int METHODID_REGISTER_MONITOR_EVENT = 2;
private static final int METHODID_LOAD_PHASE_STATUS_FROM_LATEST = 2;
private static final int METHODID_REGISTER_MONITOR_EVENT = 3;

private static final class MethodHandlers<Req, Resp> implements
io.grpc.stub.ServerCalls.UnaryMethod<Req, Resp>,
Expand All @@ -364,6 +449,10 @@ public void invoke(Req request, io.grpc.stub.StreamObserver<Resp> responseObserv
serviceImpl.initTask((com.qlangtech.tis.rpc.grpc.log.stream.PPhaseStatusCollection) request,
(io.grpc.stub.StreamObserver<com.qlangtech.tis.rpc.grpc.log.common.Empty>) responseObserver);
break;
case METHODID_LOAD_PHASE_STATUS_FROM_LATEST:
serviceImpl.loadPhaseStatusFromLatest((com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget) request,
(io.grpc.stub.StreamObserver<com.qlangtech.tis.rpc.grpc.log.stream.PPhaseStatusCollection>) responseObserver);
break;
default:
throw new AssertionError();
}
Expand Down Expand Up @@ -431,6 +520,7 @@ public static io.grpc.ServiceDescriptor getServiceDescriptor() {
.addMethod(getRegisterMonitorEventMethod())
.addMethod(getBuildPhraseStatusMethod())
.addMethod(getInitTaskMethod())
.addMethod(getLoadPhaseStatusFromLatestMethod())
.build();
}
}
Expand Down

0 comments on commit 37ed518

Please sign in to comment.