diff --git a/tis-assemble/src/main/java/com/qlangtech/tis/rpc/server/FullBuildStatCollectorServer.java b/tis-assemble/src/main/java/com/qlangtech/tis/rpc/server/FullBuildStatCollectorServer.java index 5c7ba8d0d..e635c849e 100644 --- a/tis-assemble/src/main/java/com/qlangtech/tis/rpc/server/FullBuildStatCollectorServer.java +++ b/tis-assemble/src/main/java/com/qlangtech/tis/rpc/server/FullBuildStatCollectorServer.java @@ -21,6 +21,7 @@ import com.google.common.collect.Lists; import com.qlangtech.tis.assemble.FullbuildPhase; import com.qlangtech.tis.exec.ExecutePhaseRange; +import com.qlangtech.tis.exec.impl.DefaultChainContext; import com.qlangtech.tis.exec.impl.TrackableExecuteInterceptor; import com.qlangtech.tis.fullbuild.phasestatus.PhaseStatusCollection; import com.qlangtech.tis.log.RealtimeLoggerCollectorAppender; @@ -31,6 +32,7 @@ import com.qlangtech.tis.rpc.grpc.log.stream.PExecuteState; import com.qlangtech.tis.rpc.grpc.log.stream.PMonotorTarget; import com.qlangtech.tis.rpc.grpc.log.stream.PPhaseStatusCollection; +import com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget; import com.qlangtech.tis.trigger.jst.MonotorTarget; import com.qlangtech.tis.trigger.socket.LogType; import com.qlangtech.tis.utils.Utils; @@ -39,6 +41,7 @@ import io.grpc.Status; import io.grpc.stub.ServerCallStreamObserver; import io.grpc.stub.StreamObserver; +import org.apache.commons.lang.NotImplementedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -93,6 +96,21 @@ public void run() { public static RegisterMonitorEventHook registerMonitorEventHook = new RegisterMonitorEventHook(); + @Override + public void loadPhaseStatusFromLatest(PSynResTarget request, StreamObserver responseObserver) { + // super.loadPhaseStatusFromLatest(request, responseObserver); + boolean pipeline = request.getPipeline(); + String targetResName = request.getName(); + if (!pipeline) { + // 目前代码中还没有支持workflow资源的响应 + throw new NotImplementedException("targetResName:" + targetResName + " must be pipeline"); + } + + PhaseStatusCollection statusCollection = DefaultChainContext.loadPhaseStatusFromLatest(targetResName); + responseObserver.onNext(statusCollection != null ? LogCollectorClient.convertPP(statusCollection) : null); + responseObserver.onCompleted(); + } + /** * 监听执行日志,详细信息 * diff --git a/tis-hadoop-rpc/src/main/java/com/qlangtech/tis/rpc/grpc/log/LogCollectorClient.java b/tis-hadoop-rpc/src/main/java/com/qlangtech/tis/rpc/grpc/log/LogCollectorClient.java index 587507f20..006740793 100644 --- a/tis-hadoop-rpc/src/main/java/com/qlangtech/tis/rpc/grpc/log/LogCollectorClient.java +++ b/tis-hadoop-rpc/src/main/java/com/qlangtech/tis/rpc/grpc/log/LogCollectorClient.java @@ -28,7 +28,15 @@ import com.qlangtech.tis.fullbuild.phasestatus.impl.IndexBackFlowPhaseStatus; import com.qlangtech.tis.fullbuild.phasestatus.impl.JoinPhaseStatus; import com.qlangtech.tis.rpc.grpc.log.common.JoinTaskStatus; -import com.qlangtech.tis.rpc.grpc.log.stream.*; +import com.qlangtech.tis.rpc.grpc.log.stream.LogCollectorGrpc; +import com.qlangtech.tis.rpc.grpc.log.stream.PBuildPhaseStatus; +import com.qlangtech.tis.rpc.grpc.log.stream.PBuildPhaseStatusParam; +import com.qlangtech.tis.rpc.grpc.log.stream.PDumpPhaseStatus; +import com.qlangtech.tis.rpc.grpc.log.stream.PExecuteState; +import com.qlangtech.tis.rpc.grpc.log.stream.PIndexBackFlowPhaseStatus; +import com.qlangtech.tis.rpc.grpc.log.stream.PJoinPhaseStatus; +import com.qlangtech.tis.rpc.grpc.log.stream.PMonotorTarget; +import com.qlangtech.tis.rpc.grpc.log.stream.PPhaseStatusCollection; import com.qlangtech.tis.trigger.jst.ILogListener; import com.qlangtech.tis.trigger.jst.MonotorTarget; import com.qlangtech.tis.trigger.socket.ExecuteState; @@ -43,6 +51,7 @@ import org.slf4j.LoggerFactory; import java.util.Map; +import java.util.Objects; /** * @author 百岁(baisui@qlangtech.com) @@ -330,8 +339,7 @@ public java.util.Iterator getInitTaskMethod() { return getInitTaskMethod; } + private static volatile io.grpc.MethodDescriptor getLoadPhaseStatusFromLatestMethod; + + @io.grpc.stub.annotations.RpcMethod( + fullMethodName = SERVICE_NAME + '/' + "LoadPhaseStatusFromLatest", + requestType = com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget.class, + responseType = com.qlangtech.tis.rpc.grpc.log.stream.PPhaseStatusCollection.class, + methodType = io.grpc.MethodDescriptor.MethodType.UNARY) + public static io.grpc.MethodDescriptor getLoadPhaseStatusFromLatestMethod() { + io.grpc.MethodDescriptor getLoadPhaseStatusFromLatestMethod; + if ((getLoadPhaseStatusFromLatestMethod = LogCollectorGrpc.getLoadPhaseStatusFromLatestMethod) == null) { + synchronized (LogCollectorGrpc.class) { + if ((getLoadPhaseStatusFromLatestMethod = LogCollectorGrpc.getLoadPhaseStatusFromLatestMethod) == null) { + LogCollectorGrpc.getLoadPhaseStatusFromLatestMethod = getLoadPhaseStatusFromLatestMethod = + io.grpc.MethodDescriptor.newBuilder() + .setType(io.grpc.MethodDescriptor.MethodType.UNARY) + .setFullMethodName(generateFullMethodName(SERVICE_NAME, "LoadPhaseStatusFromLatest")) + .setSampledToLocalTracing(true) + .setRequestMarshaller(io.grpc.protobuf.ProtoUtils.marshaller( + com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget.getDefaultInstance())) + .setResponseMarshaller(io.grpc.protobuf.ProtoUtils.marshaller( + com.qlangtech.tis.rpc.grpc.log.stream.PPhaseStatusCollection.getDefaultInstance())) + .setSchemaDescriptor(new LogCollectorMethodDescriptorSupplier("LoadPhaseStatusFromLatest")) + .build(); + } + } + } + return getLoadPhaseStatusFromLatestMethod; + } + /** * Creates a new async stub that supports all call types for the service */ @@ -198,6 +229,17 @@ public void initTask(com.qlangtech.tis.rpc.grpc.log.stream.PPhaseStatusCollectio asyncUnimplementedUnaryCall(getInitTaskMethod(), responseObserver); } + /** + *
+     **
+     *取得最近一次成功的同步任务状态
+     * 
+ */ + public void loadPhaseStatusFromLatest(com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget request, + io.grpc.stub.StreamObserver responseObserver) { + asyncUnimplementedUnaryCall(getLoadPhaseStatusFromLatestMethod(), responseObserver); + } + @java.lang.Override public final io.grpc.ServerServiceDefinition bindService() { return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor()) .addMethod( @@ -221,6 +263,13 @@ public void initTask(com.qlangtech.tis.rpc.grpc.log.stream.PPhaseStatusCollectio com.qlangtech.tis.rpc.grpc.log.stream.PPhaseStatusCollection, com.qlangtech.tis.rpc.grpc.log.common.Empty>( this, METHODID_INIT_TASK))) + .addMethod( + getLoadPhaseStatusFromLatestMethod(), + asyncUnaryCall( + new MethodHandlers< + com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget, + com.qlangtech.tis.rpc.grpc.log.stream.PPhaseStatusCollection>( + this, METHODID_LOAD_PHASE_STATUS_FROM_LATEST))) .build(); } } @@ -271,6 +320,18 @@ public void initTask(com.qlangtech.tis.rpc.grpc.log.stream.PPhaseStatusCollectio asyncUnaryCall( getChannel().newCall(getInitTaskMethod(), getCallOptions()), request, responseObserver); } + + /** + *
+     **
+     *取得最近一次成功的同步任务状态
+     * 
+ */ + public void loadPhaseStatusFromLatest(com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget request, + io.grpc.stub.StreamObserver responseObserver) { + asyncUnaryCall( + getChannel().newCall(getLoadPhaseStatusFromLatestMethod(), getCallOptions()), request, responseObserver); + } } /** @@ -307,6 +368,17 @@ public com.qlangtech.tis.rpc.grpc.log.common.Empty initTask(com.qlangtech.tis.rp return blockingUnaryCall( getChannel(), getInitTaskMethod(), getCallOptions(), request); } + + /** + *
+     **
+     *取得最近一次成功的同步任务状态
+     * 
+ */ + public com.qlangtech.tis.rpc.grpc.log.stream.PPhaseStatusCollection loadPhaseStatusFromLatest(com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget request) { + return blockingUnaryCall( + getChannel(), getLoadPhaseStatusFromLatestMethod(), getCallOptions(), request); + } } /** @@ -333,11 +405,24 @@ public com.google.common.util.concurrent.ListenableFuture + ** + *取得最近一次成功的同步任务状态 + * + */ + public com.google.common.util.concurrent.ListenableFuture loadPhaseStatusFromLatest( + com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget request) { + return futureUnaryCall( + getChannel().newCall(getLoadPhaseStatusFromLatestMethod(), getCallOptions()), request); + } } private static final int METHODID_BUILD_PHRASE_STATUS = 0; private static final int METHODID_INIT_TASK = 1; - private static final int METHODID_REGISTER_MONITOR_EVENT = 2; + private static final int METHODID_LOAD_PHASE_STATUS_FROM_LATEST = 2; + private static final int METHODID_REGISTER_MONITOR_EVENT = 3; private static final class MethodHandlers implements io.grpc.stub.ServerCalls.UnaryMethod, @@ -364,6 +449,10 @@ public void invoke(Req request, io.grpc.stub.StreamObserver responseObserv serviceImpl.initTask((com.qlangtech.tis.rpc.grpc.log.stream.PPhaseStatusCollection) request, (io.grpc.stub.StreamObserver) responseObserver); break; + case METHODID_LOAD_PHASE_STATUS_FROM_LATEST: + serviceImpl.loadPhaseStatusFromLatest((com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget) request, + (io.grpc.stub.StreamObserver) responseObserver); + break; default: throw new AssertionError(); } @@ -431,6 +520,7 @@ public static io.grpc.ServiceDescriptor getServiceDescriptor() { .addMethod(getRegisterMonitorEventMethod()) .addMethod(getBuildPhraseStatusMethod()) .addMethod(getInitTaskMethod()) + .addMethod(getLoadPhaseStatusFromLatestMethod()) .build(); } } diff --git a/tis-hadoop-rpc/src/main/java/com/qlangtech/tis/rpc/grpc/log/stream/LogCollectorProto.java b/tis-hadoop-rpc/src/main/java/com/qlangtech/tis/rpc/grpc/log/stream/LogCollectorProto.java index 8600f502b..3bb6af10d 100644 --- a/tis-hadoop-rpc/src/main/java/com/qlangtech/tis/rpc/grpc/log/stream/LogCollectorProto.java +++ b/tis-hadoop-rpc/src/main/java/com/qlangtech/tis/rpc/grpc/log/stream/LogCollectorProto.java @@ -19,6 +19,11 @@ public static void registerAllExtensions( static final com.google.protobuf.GeneratedMessageV3.FieldAccessorTable internal_static_stream_PBuildPhaseStatusParam_fieldAccessorTable; + static final com.google.protobuf.Descriptors.Descriptor + internal_static_stream_PSynResTarget_descriptor; + static final + com.google.protobuf.GeneratedMessageV3.FieldAccessorTable + internal_static_stream_PSynResTarget_fieldAccessorTable; static final com.google.protobuf.Descriptors.Descriptor internal_static_stream_PPhaseStatusCollection_descriptor; static final @@ -85,50 +90,53 @@ public static void registerAllExtensions( java.lang.String[] descriptorData = { "\n\023log-collector.proto\022\006stream\032\020common-ms" + "g.proto\"(\n\026PBuildPhaseStatusParam\022\016\n\006tas" + - "kid\030\001 \001(\004\"\366\001\n\026PPhaseStatusCollection\022+\n\t" + - "dumpPhase\030\001 \001(\0132\030.stream.PDumpPhaseStatu" + - "s\022+\n\tjoinPhase\030\002 \001(\0132\030.stream.PJoinPhase" + - "Status\022-\n\nbuildPhase\030\003 \001(\0132\031.stream.PBui" + - "ldPhaseStatus\022C\n\030indexBackFlowPhaseStatu" + - "s\030\004 \001(\0132!.stream.PIndexBackFlowPhaseStat" + - "us\022\016\n\006taskId\030\005 \001(\r\"\225\001\n\020PDumpPhaseStatus\022" + - "<\n\ntablesDump\030\001 \003(\0132(.stream.PDumpPhaseS" + - "tatus.TablesDumpEntry\032C\n\017TablesDumpEntry" + - "\022\013\n\003key\030\001 \001(\t\022\037\n\005value\030\002 \001(\0132\020.TableDump" + - "Status:\0028\001\"\224\001\n\020PJoinPhaseStatus\022<\n\ntaskS" + - "tatus\030\001 \003(\0132(.stream.PJoinPhaseStatus.Ta" + - "skStatusEntry\032B\n\017TaskStatusEntry\022\013\n\003key\030" + - "\001 \001(\t\022\036\n\005value\030\002 \001(\0132\017.JoinTaskStatus:\0028" + - "\001\"\255\001\n\021PBuildPhaseStatus\022G\n\017nodeBuildStat" + - "us\030\001 \003(\0132..stream.PBuildPhaseStatus.Node" + - "BuildStatusEntry\032O\n\024NodeBuildStatusEntry" + - "\022\013\n\003key\030\001 \001(\t\022&\n\005value\030\002 \001(\0132\027.BuildShar" + - "edPhaseStatus:\0028\001\"\255\001\n\031PIndexBackFlowPhas" + - "eStatus\022G\n\013nodesStatus\030\001 \003(\01322.stream.PI" + - "ndexBackFlowPhaseStatus.NodesStatusEntry" + - "\032G\n\020NodesStatusEntry\022\013\n\003key\030\001 \001(\t\022\"\n\005val" + - "ue\030\002 \001(\0132\023.NodeBackflowStatus:\0028\001\"d\n\016PMo" + - "notorTarget\022\022\n\ncollection\030\001 \001(\t\022\016\n\006taski" + - "d\030\002 \001(\r\022.\n\007logtype\030\003 \001(\0162\035.stream.PExecu" + - "teState.LogType\"\213\003\n\rPExecuteState\0220\n\010inf" + - "oType\030\001 \001(\0162\036.stream.PExecuteState.InfoT" + - "ype\022.\n\007logType\030\002 \001(\0162\035.stream.PExecuteSt" + - "ate.LogType\022\013\n\003msg\030\003 \001(\t\022\014\n\004from\030\004 \001(\t\022\r" + - "\n\005jobId\030\005 \001(\004\022\016\n\006taskId\030\006 \001(\004\022\023\n\013service" + - "Name\030\007 \001(\t\022\021\n\texecState\030\010 \001(\t\022\014\n\004time\030\t " + - "\001(\004\022\021\n\tcomponent\030\n \001(\t\"_\n\007LogType\022\035\n\031INC" + - "R_DEPLOY_STATUS_CHANGE\020\000\022\022\n\016MQ_TAGS_STAT" + - "US\020\001\022\010\n\004FULL\020\002\022\010\n\004INCR\020\003\022\r\n\tINCR_SEND\020\004\"" + - "4\n\010InfoType\022\010\n\004INFO\020\000\022\010\n\004WARN\020\001\022\t\n\005ERROR" + - "\020\002\022\t\n\005FATAL\020\0032\352\001\n\014LogCollector\022K\n\024Regist" + - "erMonitorEvent\022\026.stream.PMonotorTarget\032\025" + - ".stream.PExecuteState\"\000(\0010\001\022W\n\021BuildPhra" + - "seStatus\022\036.stream.PBuildPhaseStatusParam" + - "\032\036.stream.PPhaseStatusCollection\"\0000\001\0224\n\010" + - "InitTask\022\036.stream.PPhaseStatusCollection" + - "\032\006.Empty\"\000BC\n%com.qlangtech.tis.rpc.grpc" + - ".log.streamB\021LogCollectorProtoP\001\242\002\004HLWSb" + - "\006proto3" + "kid\030\001 \001(\004\"/\n\rPSynResTarget\022\014\n\004name\030\001 \001(\t" + + "\022\020\n\010pipeline\030\002 \001(\010\"\366\001\n\026PPhaseStatusColle" + + "ction\022+\n\tdumpPhase\030\001 \001(\0132\030.stream.PDumpP" + + "haseStatus\022+\n\tjoinPhase\030\002 \001(\0132\030.stream.P" + + "JoinPhaseStatus\022-\n\nbuildPhase\030\003 \001(\0132\031.st" + + "ream.PBuildPhaseStatus\022C\n\030indexBackFlowP" + + "haseStatus\030\004 \001(\0132!.stream.PIndexBackFlow" + + "PhaseStatus\022\016\n\006taskId\030\005 \001(\r\"\225\001\n\020PDumpPha" + + "seStatus\022<\n\ntablesDump\030\001 \003(\0132(.stream.PD" + + "umpPhaseStatus.TablesDumpEntry\032C\n\017Tables" + + "DumpEntry\022\013\n\003key\030\001 \001(\t\022\037\n\005value\030\002 \001(\0132\020." + + "TableDumpStatus:\0028\001\"\224\001\n\020PJoinPhaseStatus" + + "\022<\n\ntaskStatus\030\001 \003(\0132(.stream.PJoinPhase" + + "Status.TaskStatusEntry\032B\n\017TaskStatusEntr" + + "y\022\013\n\003key\030\001 \001(\t\022\036\n\005value\030\002 \001(\0132\017.JoinTask" + + "Status:\0028\001\"\255\001\n\021PBuildPhaseStatus\022G\n\017node" + + "BuildStatus\030\001 \003(\0132..stream.PBuildPhaseSt" + + "atus.NodeBuildStatusEntry\032O\n\024NodeBuildSt" + + "atusEntry\022\013\n\003key\030\001 \001(\t\022&\n\005value\030\002 \001(\0132\027." + + "BuildSharedPhaseStatus:\0028\001\"\255\001\n\031PIndexBac" + + "kFlowPhaseStatus\022G\n\013nodesStatus\030\001 \003(\01322." + + "stream.PIndexBackFlowPhaseStatus.NodesSt" + + "atusEntry\032G\n\020NodesStatusEntry\022\013\n\003key\030\001 \001" + + "(\t\022\"\n\005value\030\002 \001(\0132\023.NodeBackflowStatus:\002" + + "8\001\"d\n\016PMonotorTarget\022\022\n\ncollection\030\001 \001(\t" + + "\022\016\n\006taskid\030\002 \001(\r\022.\n\007logtype\030\003 \001(\0162\035.stre" + + "am.PExecuteState.LogType\"\213\003\n\rPExecuteSta" + + "te\0220\n\010infoType\030\001 \001(\0162\036.stream.PExecuteSt" + + "ate.InfoType\022.\n\007logType\030\002 \001(\0162\035.stream.P" + + "ExecuteState.LogType\022\013\n\003msg\030\003 \001(\t\022\014\n\004fro" + + "m\030\004 \001(\t\022\r\n\005jobId\030\005 \001(\004\022\016\n\006taskId\030\006 \001(\004\022\023" + + "\n\013serviceName\030\007 \001(\t\022\021\n\texecState\030\010 \001(\t\022\014" + + "\n\004time\030\t \001(\004\022\021\n\tcomponent\030\n \001(\t\"_\n\007LogTy" + + "pe\022\035\n\031INCR_DEPLOY_STATUS_CHANGE\020\000\022\022\n\016MQ_" + + "TAGS_STATUS\020\001\022\010\n\004FULL\020\002\022\010\n\004INCR\020\003\022\r\n\tINC" + + "R_SEND\020\004\"4\n\010InfoType\022\010\n\004INFO\020\000\022\010\n\004WARN\020\001" + + "\022\t\n\005ERROR\020\002\022\t\n\005FATAL\020\0032\300\002\n\014LogCollector\022" + + "K\n\024RegisterMonitorEvent\022\026.stream.PMonoto" + + "rTarget\032\025.stream.PExecuteState\"\000(\0010\001\022W\n\021" + + "BuildPhraseStatus\022\036.stream.PBuildPhaseSt" + + "atusParam\032\036.stream.PPhaseStatusCollectio" + + "n\"\0000\001\0224\n\010InitTask\022\036.stream.PPhaseStatusC" + + "ollection\032\006.Empty\"\000\022T\n\031LoadPhaseStatusFr" + + "omLatest\022\025.stream.PSynResTarget\032\036.stream" + + ".PPhaseStatusCollection\"\000BC\n%com.qlangte" + + "ch.tis.rpc.grpc.log.streamB\021LogCollector" + + "ProtoP\001\242\002\004HLWSb\006proto3" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor. InternalDescriptorAssigner() { @@ -149,14 +157,20 @@ public com.google.protobuf.ExtensionRegistry assignDescriptors( com.google.protobuf.GeneratedMessageV3.FieldAccessorTable( internal_static_stream_PBuildPhaseStatusParam_descriptor, new java.lang.String[] { "Taskid", }); - internal_static_stream_PPhaseStatusCollection_descriptor = + internal_static_stream_PSynResTarget_descriptor = getDescriptor().getMessageTypes().get(1); + internal_static_stream_PSynResTarget_fieldAccessorTable = new + com.google.protobuf.GeneratedMessageV3.FieldAccessorTable( + internal_static_stream_PSynResTarget_descriptor, + new java.lang.String[] { "Name", "Pipeline", }); + internal_static_stream_PPhaseStatusCollection_descriptor = + getDescriptor().getMessageTypes().get(2); internal_static_stream_PPhaseStatusCollection_fieldAccessorTable = new com.google.protobuf.GeneratedMessageV3.FieldAccessorTable( internal_static_stream_PPhaseStatusCollection_descriptor, new java.lang.String[] { "DumpPhase", "JoinPhase", "BuildPhase", "IndexBackFlowPhaseStatus", "TaskId", }); internal_static_stream_PDumpPhaseStatus_descriptor = - getDescriptor().getMessageTypes().get(2); + getDescriptor().getMessageTypes().get(3); internal_static_stream_PDumpPhaseStatus_fieldAccessorTable = new com.google.protobuf.GeneratedMessageV3.FieldAccessorTable( internal_static_stream_PDumpPhaseStatus_descriptor, @@ -168,7 +182,7 @@ public com.google.protobuf.ExtensionRegistry assignDescriptors( internal_static_stream_PDumpPhaseStatus_TablesDumpEntry_descriptor, new java.lang.String[] { "Key", "Value", }); internal_static_stream_PJoinPhaseStatus_descriptor = - getDescriptor().getMessageTypes().get(3); + getDescriptor().getMessageTypes().get(4); internal_static_stream_PJoinPhaseStatus_fieldAccessorTable = new com.google.protobuf.GeneratedMessageV3.FieldAccessorTable( internal_static_stream_PJoinPhaseStatus_descriptor, @@ -180,7 +194,7 @@ public com.google.protobuf.ExtensionRegistry assignDescriptors( internal_static_stream_PJoinPhaseStatus_TaskStatusEntry_descriptor, new java.lang.String[] { "Key", "Value", }); internal_static_stream_PBuildPhaseStatus_descriptor = - getDescriptor().getMessageTypes().get(4); + getDescriptor().getMessageTypes().get(5); internal_static_stream_PBuildPhaseStatus_fieldAccessorTable = new com.google.protobuf.GeneratedMessageV3.FieldAccessorTable( internal_static_stream_PBuildPhaseStatus_descriptor, @@ -192,7 +206,7 @@ public com.google.protobuf.ExtensionRegistry assignDescriptors( internal_static_stream_PBuildPhaseStatus_NodeBuildStatusEntry_descriptor, new java.lang.String[] { "Key", "Value", }); internal_static_stream_PIndexBackFlowPhaseStatus_descriptor = - getDescriptor().getMessageTypes().get(5); + getDescriptor().getMessageTypes().get(6); internal_static_stream_PIndexBackFlowPhaseStatus_fieldAccessorTable = new com.google.protobuf.GeneratedMessageV3.FieldAccessorTable( internal_static_stream_PIndexBackFlowPhaseStatus_descriptor, @@ -204,13 +218,13 @@ public com.google.protobuf.ExtensionRegistry assignDescriptors( internal_static_stream_PIndexBackFlowPhaseStatus_NodesStatusEntry_descriptor, new java.lang.String[] { "Key", "Value", }); internal_static_stream_PMonotorTarget_descriptor = - getDescriptor().getMessageTypes().get(6); + getDescriptor().getMessageTypes().get(7); internal_static_stream_PMonotorTarget_fieldAccessorTable = new com.google.protobuf.GeneratedMessageV3.FieldAccessorTable( internal_static_stream_PMonotorTarget_descriptor, new java.lang.String[] { "Collection", "Taskid", "Logtype", }); internal_static_stream_PExecuteState_descriptor = - getDescriptor().getMessageTypes().get(7); + getDescriptor().getMessageTypes().get(8); internal_static_stream_PExecuteState_fieldAccessorTable = new com.google.protobuf.GeneratedMessageV3.FieldAccessorTable( internal_static_stream_PExecuteState_descriptor, diff --git a/tis-hadoop-rpc/src/main/java/com/qlangtech/tis/rpc/grpc/log/stream/PSynResTarget.java b/tis-hadoop-rpc/src/main/java/com/qlangtech/tis/rpc/grpc/log/stream/PSynResTarget.java new file mode 100644 index 000000000..bb41204a1 --- /dev/null +++ b/tis-hadoop-rpc/src/main/java/com/qlangtech/tis/rpc/grpc/log/stream/PSynResTarget.java @@ -0,0 +1,618 @@ +// Generated by the protocol buffer compiler. DO NOT EDIT! +// source: log-collector.proto + +package com.qlangtech.tis.rpc.grpc.log.stream; + +/** + * Protobuf type {@code stream.PSynResTarget} + */ +public final class PSynResTarget extends + com.google.protobuf.GeneratedMessageV3 implements + // @@protoc_insertion_point(message_implements:stream.PSynResTarget) + PSynResTargetOrBuilder { +private static final long serialVersionUID = 0L; + // Use PSynResTarget.newBuilder() to construct. + private PSynResTarget(com.google.protobuf.GeneratedMessageV3.Builder builder) { + super(builder); + } + private PSynResTarget() { + name_ = ""; + } + + @java.lang.Override + public final com.google.protobuf.UnknownFieldSet + getUnknownFields() { + return this.unknownFields; + } + private PSynResTarget( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + this(); + if (extensionRegistry == null) { + throw new java.lang.NullPointerException(); + } + int mutable_bitField0_ = 0; + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder(); + try { + boolean done = false; + while (!done) { + int tag = input.readTag(); + switch (tag) { + case 0: + done = true; + break; + case 10: { + java.lang.String s = input.readStringRequireUtf8(); + + name_ = s; + break; + } + case 16: { + + pipeline_ = input.readBool(); + break; + } + default: { + if (!parseUnknownField( + input, unknownFields, extensionRegistry, tag)) { + done = true; + } + break; + } + } + } + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + throw e.setUnfinishedMessage(this); + } catch (java.io.IOException e) { + throw new com.google.protobuf.InvalidProtocolBufferException( + e).setUnfinishedMessage(this); + } finally { + this.unknownFields = unknownFields.build(); + makeExtensionsImmutable(); + } + } + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return com.qlangtech.tis.rpc.grpc.log.stream.LogCollectorProto.internal_static_stream_PSynResTarget_descriptor; + } + + @java.lang.Override + protected com.google.protobuf.GeneratedMessageV3.FieldAccessorTable + internalGetFieldAccessorTable() { + return com.qlangtech.tis.rpc.grpc.log.stream.LogCollectorProto.internal_static_stream_PSynResTarget_fieldAccessorTable + .ensureFieldAccessorsInitialized( + com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget.class, com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget.Builder.class); + } + + public static final int NAME_FIELD_NUMBER = 1; + private volatile java.lang.Object name_; + /** + * string name = 1; + */ + public java.lang.String getName() { + java.lang.Object ref = name_; + if (ref instanceof java.lang.String) { + return (java.lang.String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + name_ = s; + return s; + } + } + /** + * string name = 1; + */ + public com.google.protobuf.ByteString + getNameBytes() { + java.lang.Object ref = name_; + if (ref instanceof java.lang.String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + name_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + + public static final int PIPELINE_FIELD_NUMBER = 2; + private boolean pipeline_; + /** + *
+   **
+   * dataX pipeline OR transform workflow
+   * 
+ * + * bool pipeline = 2; + */ + public boolean getPipeline() { + return pipeline_; + } + + private byte memoizedIsInitialized = -1; + @java.lang.Override + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized == 1) return true; + if (isInitialized == 0) return false; + + memoizedIsInitialized = 1; + return true; + } + + @java.lang.Override + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + if (!getNameBytes().isEmpty()) { + com.google.protobuf.GeneratedMessageV3.writeString(output, 1, name_); + } + if (pipeline_ != false) { + output.writeBool(2, pipeline_); + } + unknownFields.writeTo(output); + } + + @java.lang.Override + public int getSerializedSize() { + int size = memoizedSize; + if (size != -1) return size; + + size = 0; + if (!getNameBytes().isEmpty()) { + size += com.google.protobuf.GeneratedMessageV3.computeStringSize(1, name_); + } + if (pipeline_ != false) { + size += com.google.protobuf.CodedOutputStream + .computeBoolSize(2, pipeline_); + } + size += unknownFields.getSerializedSize(); + memoizedSize = size; + return size; + } + + @java.lang.Override + public boolean equals(final java.lang.Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget)) { + return super.equals(obj); + } + com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget other = (com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget) obj; + + if (!getName() + .equals(other.getName())) return false; + if (getPipeline() + != other.getPipeline()) return false; + if (!unknownFields.equals(other.unknownFields)) return false; + return true; + } + + @java.lang.Override + public int hashCode() { + if (memoizedHashCode != 0) { + return memoizedHashCode; + } + int hash = 41; + hash = (19 * hash) + getDescriptor().hashCode(); + hash = (37 * hash) + NAME_FIELD_NUMBER; + hash = (53 * hash) + getName().hashCode(); + hash = (37 * hash) + PIPELINE_FIELD_NUMBER; + hash = (53 * hash) + com.google.protobuf.Internal.hashBoolean( + getPipeline()); + hash = (29 * hash) + unknownFields.hashCode(); + memoizedHashCode = hash; + return hash; + } + + public static com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget parseFrom( + java.nio.ByteBuffer data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget parseFrom( + java.nio.ByteBuffer data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget parseFrom(java.io.InputStream input) + throws java.io.IOException { + return com.google.protobuf.GeneratedMessageV3 + .parseWithIOException(PARSER, input); + } + public static com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return com.google.protobuf.GeneratedMessageV3 + .parseWithIOException(PARSER, input, extensionRegistry); + } + public static com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + return com.google.protobuf.GeneratedMessageV3 + .parseDelimitedWithIOException(PARSER, input); + } + public static com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return com.google.protobuf.GeneratedMessageV3 + .parseDelimitedWithIOException(PARSER, input, extensionRegistry); + } + public static com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return com.google.protobuf.GeneratedMessageV3 + .parseWithIOException(PARSER, input); + } + public static com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return com.google.protobuf.GeneratedMessageV3 + .parseWithIOException(PARSER, input, extensionRegistry); + } + + @java.lang.Override + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder() { + return DEFAULT_INSTANCE.toBuilder(); + } + public static Builder newBuilder(com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget prototype) { + return DEFAULT_INSTANCE.toBuilder().mergeFrom(prototype); + } + @java.lang.Override + public Builder toBuilder() { + return this == DEFAULT_INSTANCE + ? new Builder() : new Builder().mergeFrom(this); + } + + @java.lang.Override + protected Builder newBuilderForType( + com.google.protobuf.GeneratedMessageV3.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + /** + * Protobuf type {@code stream.PSynResTarget} + */ + public static final class Builder extends + com.google.protobuf.GeneratedMessageV3.Builder implements + // @@protoc_insertion_point(builder_implements:stream.PSynResTarget) + com.qlangtech.tis.rpc.grpc.log.stream.PSynResTargetOrBuilder { + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return com.qlangtech.tis.rpc.grpc.log.stream.LogCollectorProto.internal_static_stream_PSynResTarget_descriptor; + } + + @java.lang.Override + protected com.google.protobuf.GeneratedMessageV3.FieldAccessorTable + internalGetFieldAccessorTable() { + return com.qlangtech.tis.rpc.grpc.log.stream.LogCollectorProto.internal_static_stream_PSynResTarget_fieldAccessorTable + .ensureFieldAccessorsInitialized( + com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget.class, com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget.Builder.class); + } + + // Construct using com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder( + com.google.protobuf.GeneratedMessageV3.BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (com.google.protobuf.GeneratedMessageV3 + .alwaysUseFieldBuilders) { + } + } + @java.lang.Override + public Builder clear() { + super.clear(); + name_ = ""; + + pipeline_ = false; + + return this; + } + + @java.lang.Override + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return com.qlangtech.tis.rpc.grpc.log.stream.LogCollectorProto.internal_static_stream_PSynResTarget_descriptor; + } + + @java.lang.Override + public com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget getDefaultInstanceForType() { + return com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget.getDefaultInstance(); + } + + @java.lang.Override + public com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget build() { + com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + @java.lang.Override + public com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget buildPartial() { + com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget result = new com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget(this); + result.name_ = name_; + result.pipeline_ = pipeline_; + onBuilt(); + return result; + } + + @java.lang.Override + public Builder clone() { + return super.clone(); + } + @java.lang.Override + public Builder setField( + com.google.protobuf.Descriptors.FieldDescriptor field, + java.lang.Object value) { + return super.setField(field, value); + } + @java.lang.Override + public Builder clearField( + com.google.protobuf.Descriptors.FieldDescriptor field) { + return super.clearField(field); + } + @java.lang.Override + public Builder clearOneof( + com.google.protobuf.Descriptors.OneofDescriptor oneof) { + return super.clearOneof(oneof); + } + @java.lang.Override + public Builder setRepeatedField( + com.google.protobuf.Descriptors.FieldDescriptor field, + int index, java.lang.Object value) { + return super.setRepeatedField(field, index, value); + } + @java.lang.Override + public Builder addRepeatedField( + com.google.protobuf.Descriptors.FieldDescriptor field, + java.lang.Object value) { + return super.addRepeatedField(field, value); + } + @java.lang.Override + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget) { + return mergeFrom((com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget other) { + if (other == com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget.getDefaultInstance()) return this; + if (!other.getName().isEmpty()) { + name_ = other.name_; + onChanged(); + } + if (other.getPipeline() != false) { + setPipeline(other.getPipeline()); + } + this.mergeUnknownFields(other.unknownFields); + onChanged(); + return this; + } + + @java.lang.Override + public final boolean isInitialized() { + return true; + } + + @java.lang.Override + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget parsedMessage = null; + try { + parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry); + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + parsedMessage = (com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget) e.getUnfinishedMessage(); + throw e.unwrapIOException(); + } finally { + if (parsedMessage != null) { + mergeFrom(parsedMessage); + } + } + return this; + } + + private java.lang.Object name_ = ""; + /** + * string name = 1; + */ + public java.lang.String getName() { + java.lang.Object ref = name_; + if (!(ref instanceof java.lang.String)) { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + name_ = s; + return s; + } else { + return (java.lang.String) ref; + } + } + /** + * string name = 1; + */ + public com.google.protobuf.ByteString + getNameBytes() { + java.lang.Object ref = name_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + name_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + /** + * string name = 1; + */ + public Builder setName( + java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + + name_ = value; + onChanged(); + return this; + } + /** + * string name = 1; + */ + public Builder clearName() { + + name_ = getDefaultInstance().getName(); + onChanged(); + return this; + } + /** + * string name = 1; + */ + public Builder setNameBytes( + com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + checkByteStringIsUtf8(value); + + name_ = value; + onChanged(); + return this; + } + + private boolean pipeline_ ; + /** + *
+     **
+     * dataX pipeline OR transform workflow
+     * 
+ * + * bool pipeline = 2; + */ + public boolean getPipeline() { + return pipeline_; + } + /** + *
+     **
+     * dataX pipeline OR transform workflow
+     * 
+ * + * bool pipeline = 2; + */ + public Builder setPipeline(boolean value) { + + pipeline_ = value; + onChanged(); + return this; + } + /** + *
+     **
+     * dataX pipeline OR transform workflow
+     * 
+ * + * bool pipeline = 2; + */ + public Builder clearPipeline() { + + pipeline_ = false; + onChanged(); + return this; + } + @java.lang.Override + public final Builder setUnknownFields( + final com.google.protobuf.UnknownFieldSet unknownFields) { + return super.setUnknownFields(unknownFields); + } + + @java.lang.Override + public final Builder mergeUnknownFields( + final com.google.protobuf.UnknownFieldSet unknownFields) { + return super.mergeUnknownFields(unknownFields); + } + + + // @@protoc_insertion_point(builder_scope:stream.PSynResTarget) + } + + // @@protoc_insertion_point(class_scope:stream.PSynResTarget) + private static final com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget DEFAULT_INSTANCE; + static { + DEFAULT_INSTANCE = new com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget(); + } + + public static com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget getDefaultInstance() { + return DEFAULT_INSTANCE; + } + + private static final com.google.protobuf.Parser + PARSER = new com.google.protobuf.AbstractParser() { + @java.lang.Override + public PSynResTarget parsePartialFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return new PSynResTarget(input, extensionRegistry); + } + }; + + public static com.google.protobuf.Parser parser() { + return PARSER; + } + + @java.lang.Override + public com.google.protobuf.Parser getParserForType() { + return PARSER; + } + + @java.lang.Override + public com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget getDefaultInstanceForType() { + return DEFAULT_INSTANCE; + } + +} + diff --git a/tis-hadoop-rpc/src/main/java/com/qlangtech/tis/rpc/grpc/log/stream/PSynResTargetOrBuilder.java b/tis-hadoop-rpc/src/main/java/com/qlangtech/tis/rpc/grpc/log/stream/PSynResTargetOrBuilder.java new file mode 100644 index 000000000..8ef7f0292 --- /dev/null +++ b/tis-hadoop-rpc/src/main/java/com/qlangtech/tis/rpc/grpc/log/stream/PSynResTargetOrBuilder.java @@ -0,0 +1,29 @@ +// Generated by the protocol buffer compiler. DO NOT EDIT! +// source: log-collector.proto + +package com.qlangtech.tis.rpc.grpc.log.stream; + +public interface PSynResTargetOrBuilder extends + // @@protoc_insertion_point(interface_extends:stream.PSynResTarget) + com.google.protobuf.MessageOrBuilder { + + /** + * string name = 1; + */ + java.lang.String getName(); + /** + * string name = 1; + */ + com.google.protobuf.ByteString + getNameBytes(); + + /** + *
+   **
+   * dataX pipeline OR transform workflow
+   * 
+ * + * bool pipeline = 2; + */ + boolean getPipeline(); +} diff --git a/tis-hadoop-rpc/src/main/java/com/qlangtech/tis/rpc/server/IncrStatusClient.java b/tis-hadoop-rpc/src/main/java/com/qlangtech/tis/rpc/server/IncrStatusClient.java index 2ea70fa49..ea18a5fb0 100644 --- a/tis-hadoop-rpc/src/main/java/com/qlangtech/tis/rpc/server/IncrStatusClient.java +++ b/tis-hadoop-rpc/src/main/java/com/qlangtech/tis/rpc/server/IncrStatusClient.java @@ -17,6 +17,7 @@ */ package com.qlangtech.tis.rpc.server; +import com.qlangtech.tis.exec.ExecutePhaseRange; import com.qlangtech.tis.fullbuild.phasestatus.JobLog; import com.qlangtech.tis.fullbuild.phasestatus.PhaseStatusCollection; import com.qlangtech.tis.fullbuild.phasestatus.impl.BuildSharedPhaseStatus; @@ -38,6 +39,9 @@ import com.qlangtech.tis.rpc.grpc.log.common.JoinTaskStatus; import com.qlangtech.tis.rpc.grpc.log.common.TableDumpStatus; import com.qlangtech.tis.rpc.grpc.log.stream.LogCollectorGrpc; +import com.qlangtech.tis.rpc.grpc.log.stream.PPhaseStatusCollection; +import com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget; +import com.qlangtech.tis.rpc.grpc.log.stream.PSynResTarget.Builder; import io.grpc.Channel; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; @@ -166,7 +170,15 @@ public void initSynJob(PhaseStatusCollection buildStatus) { @Override public PhaseStatusCollection loadPhaseStatusFromLatest(SynResTarget resTarget) { - return null; + + Builder builder = PSynResTarget.newBuilder(); + builder.setPipeline(resTarget.isPipeline()); + builder.setName(resTarget.getName()); + PPhaseStatusCollection statusCollection = logCollectorBlockingStub.loadPhaseStatusFromLatest(builder.build()); + if (statusCollection == null) { + return null; + } + return LogCollectorClient.convert(statusCollection, ExecutePhaseRange.fullRange()); } @Override diff --git a/tis-plugin/src/main/java/com/qlangtech/tis/plugin/PluginAndCfgSnapshotLocalCache.java b/tis-plugin/src/main/java/com/qlangtech/tis/plugin/PluginAndCfgSnapshotLocalCache.java index 13ac06f2d..fad162dc6 100644 --- a/tis-plugin/src/main/java/com/qlangtech/tis/plugin/PluginAndCfgSnapshotLocalCache.java +++ b/tis-plugin/src/main/java/com/qlangtech/tis/plugin/PluginAndCfgSnapshotLocalCache.java @@ -19,11 +19,14 @@ package com.qlangtech.tis.plugin; import com.qlangtech.tis.coredefine.module.action.TargetResName; +import com.qlangtech.tis.fullbuild.phasestatus.PhaseStatusCollection; import java.util.Map; import java.util.Optional; +import java.util.WeakHashMap; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; +import java.util.function.Supplier; /** * @author: 百岁(baisui@qlangtech.com) @@ -31,11 +34,28 @@ **/ public class PluginAndCfgSnapshotLocalCache { private final Map localCache; + /** + * 当前task对应的前一次执行的状态缓存,为这次全量构建,例如allrows 提供百分比支持 + */ + private final Map taskPreviousStatusCache = new WeakHashMap<>(); public PluginAndCfgSnapshotLocalCache() { this.localCache = new ConcurrentHashMap<>(); } + public PhaseStatusCollection getPreviousStatus(Integer currentTaskId, Supplier statSupplier) { + synchronized (taskPreviousStatusCache) { + PhaseStatusCollection status = taskPreviousStatusCache.get(currentTaskId); + if (status == null) { + status = statSupplier.get(); + if (status != null) { + taskPreviousStatusCache.put(currentTaskId, status); + } + } + return status; + } + } + /** * 取得和app相关的资源pluginCfgs快照 * diff --git a/tis-sql-parser/src/main/java/com/qlangtech/tis/powerjob/SelectedTabTriggers.java b/tis-sql-parser/src/main/java/com/qlangtech/tis/powerjob/SelectedTabTriggers.java index 03771108d..151e6e900 100644 --- a/tis-sql-parser/src/main/java/com/qlangtech/tis/powerjob/SelectedTabTriggers.java +++ b/tis-sql-parser/src/main/java/com/qlangtech/tis/powerjob/SelectedTabTriggers.java @@ -30,6 +30,7 @@ import com.qlangtech.tis.offline.DataxUtils; import com.qlangtech.tis.plugin.StoreResourceType; import com.qlangtech.tis.plugin.ds.ISelectedTab; +import com.qlangtech.tis.realtime.yarn.rpc.SynResTarget; import com.qlangtech.tis.trigger.util.JsonUtil; import org.apache.commons.lang.StringUtils; @@ -178,6 +179,17 @@ public SelectedTabTriggersConfig(StoreResourceType resType, String dataXName, St this.dataXName = dataXName; } + public SynResTarget getTargetRes() { + switch (this.getResType()) { + case DataFlow: + return SynResTarget.transform(this.getDataXName()); + case DataApp: + return SynResTarget.pipeline(this.getDataXName()); + default: + throw new IllegalStateException("unsupport resType:" + this.getResType()); + } + } + private String preTrigger; private String postTrigger; private List splitTabsCfg = Lists.newArrayList();