Skip to content

Commit

Permalink
support percent display show when dataX execution
Browse files Browse the repository at this point in the history
  • Loading branch information
baisui1981 committed Apr 16, 2024
1 parent d228180 commit e401f09
Show file tree
Hide file tree
Showing 11 changed files with 337 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
import com.qlangtech.tis.exec.ExecutePhaseRange;
import com.qlangtech.tis.exec.ExecuteResult;
import com.qlangtech.tis.exec.impl.DefaultChainContext;
import com.qlangtech.tis.extension.impl.XmlFile;
import com.qlangtech.tis.fullbuild.phasestatus.IFlush2Local;
import com.qlangtech.tis.fullbuild.phasestatus.IFlush2LocalFactory;
import com.qlangtech.tis.fullbuild.phasestatus.PhaseStatusCollection;
import com.qlangtech.tis.fullbuild.phasestatus.impl.BasicPhaseStatus;
import com.qlangtech.tis.fullbuild.phasestatus.impl.BuildPhaseStatus;
Expand All @@ -49,7 +50,6 @@
import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

Expand All @@ -69,25 +69,25 @@ public class IndexSwapTaskflowLauncher implements Daemon, ServletContextListener
//private ZkStateReader zkStateReader;

static {
initPhaseStatusStatusWriter();
// initPhaseStatusStatusWriter();
}

public static void initPhaseStatusStatusWriter() {
if (BasicPhaseStatus.statusWriter == null) {
BasicPhaseStatus.statusWriter = new BasicPhaseStatus.IFlush2Local() {
@Override
public void write(File localFile, BasicPhaseStatus status) throws Exception {
XmlFile xmlFile = new XmlFile(localFile);
xmlFile.write(status, Collections.emptySet());
}

@Override
public BasicPhaseStatus loadPhase(File localFile) throws Exception {
XmlFile xmlFile = new XmlFile(localFile);
return (BasicPhaseStatus) xmlFile.read();
}
};
}
// if (BasicPhaseStatus.statusWriter == null) {
// BasicPhaseStatus.statusWriter = new BasicPhaseStatus.IFlush2Local() {
// @Override
// public void write(File localFile, BasicPhaseStatus status) throws Exception {
// XmlFile xmlFile = new XmlFile(localFile);
// xmlFile.write(status, Collections.emptySet());
// }
//
// @Override
// public BasicPhaseStatus loadPhase(File localFile) throws Exception {
// XmlFile xmlFile = new XmlFile(localFile);
// return (BasicPhaseStatus) xmlFile.read();
// }
// };
// }
}


Expand Down Expand Up @@ -334,7 +334,9 @@ public static PhaseStatusCollection loadPhaseStatusFromLocal(int taskid) {
if (result == null) {
result = new PhaseStatusCollection(taskid, ExecutePhaseRange.fullRange());
}
phaseStatus = BasicPhaseStatus.statusWriter.loadPhase(localFile);
IFlush2Local flush2Local = IFlush2LocalFactory.createNew(IndexSwapTaskflowLauncher.class.getClassLoader(), localFile)
.orElseThrow(() -> new IllegalStateException("flush2Local must be present"));
phaseStatus = flush2Local.loadPhase(); // BasicPhaseStatus.statusWriter.loadPhase(localFile);
switch (phase) {
case FullDump:
result.setDumpPhase((DumpPhaseStatus) phaseStatus);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http:https://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.qlangtech.tis.order.center.impl;

import com.qlangtech.tis.extension.impl.XmlFile;
import com.qlangtech.tis.fullbuild.phasestatus.IFlush2Local;
import com.qlangtech.tis.fullbuild.phasestatus.impl.BasicPhaseStatus;

import java.io.File;
import java.util.Collections;

/**
* @author: 百岁([email protected]
* @create: 2024-04-15 18:38
**/
public class DefaultFlush2Local implements IFlush2Local {
private final XmlFile xmlFile;

public DefaultFlush2Local(File localFile) {
this.xmlFile = new XmlFile(localFile);
}

@Override
public void write(BasicPhaseStatus status) throws Exception {
xmlFile.write(status, Collections.emptySet());
}

@Override
public BasicPhaseStatus loadPhase() throws Exception {
return (BasicPhaseStatus) xmlFile.read();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http:https://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.qlangtech.tis.order.center.impl;

import com.qlangtech.tis.fullbuild.phasestatus.IFlush2Local;
import com.qlangtech.tis.fullbuild.phasestatus.IFlush2LocalFactory;

import java.io.File;

/**
* @author: 百岁([email protected]
* @create: 2024-04-15 18:52
**/
public class DefaultFlush2LocalFactory implements IFlush2LocalFactory {
@Override
public IFlush2Local create(File localFile) {
return new DefaultFlush2Local(localFile);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,11 @@ public void reportJoinStatus(JoinTaskStatus joinStat, StreamObserver<Empty> resp
JoinPhaseStatus joinPhase = phaseStatusSet.getJoinPhase();
JoinPhaseStatus.JoinTaskStatus joinTskStatus = LogCollectorClient.convert(joinStat);
joinPhase.taskStatus.put(joinStat.getJoinTaskName(), joinTskStatus);
if (phaseStatusSet.isComplete()) {
phaseStatusSet.flushStatus2Local();
}
// if (phaseStatusSet.isComplete()) {
// phaseStatusSet.flushStatus2Local();
// }
joinPhase.isComplete();
phaseStatusSet.flushStatus2Local();
returnEmpty(responseObserver);
}

Expand Down Expand Up @@ -248,7 +250,7 @@ public void reportDumpTableStatus(com.qlangtech.tis.rpc.grpc.log.common.TableDum
log.info("taskid:" + taskid + ",tablename:" + tableDumpStatus.getTableName() + ",read:"
+ tableDumpStatus.getReadRows() + ",all:" + tableDumpStatus.getAllRows());
DumpPhaseStatus dumpPhase = phaseStatusSet.getDumpPhase();
TableDumpStatus dumpStatus = phaseStatusSet.getDumpPhase().getTable(tableDumpStatus.getTableName());
TableDumpStatus dumpStatus = dumpPhase.getTable(tableDumpStatus.getTableName());
// }
if (tableDumpStatus.getComplete()) {
// 成功
Expand All @@ -264,6 +266,7 @@ public void reportDumpTableStatus(com.qlangtech.tis.rpc.grpc.log.common.TableDum
dumpStatus.setWaiting(tableDumpStatus.getWaiting());
}
dumpPhase.isComplete();
phaseStatusSet.flushStatus2Local();
returnEmpty(responseObserver);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
##
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http:https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

com.qlangtech.tis.order.center.impl.DefaultFlush2LocalFactory
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http:https://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.qlangtech.tis.fullbuild.phasestatus;

import com.qlangtech.tis.assemble.FullbuildPhase;
import com.qlangtech.tis.common.utils.Assert;
import com.qlangtech.tis.exec.ExecutePhaseRange;
import com.qlangtech.tis.fullbuild.phasestatus.impl.BasicPhaseStatus;
import com.qlangtech.tis.fullbuild.phasestatus.impl.DumpPhaseStatus;
import com.qlangtech.tis.fullbuild.phasestatus.impl.DumpPhaseStatus.TableDumpStatus;
import junit.framework.TestCase;

/**
* @author: 百岁([email protected]
* @create: 2024-04-16 09:58
**/
public class TestPhaseStatusCollection extends TestCase {
public void testISComplete() {
Integer taskId = 999;

PhaseStatusCollection statusCollection = new PhaseStatusCollection(taskId
, new ExecutePhaseRange(FullbuildPhase.FullDump, FullbuildPhase.FullDump));

MockFlush2Local flush2Local = new MockFlush2Local();

DumpPhaseStatus dumpPhase = new DumpPhaseStatus(taskId, flush2Local);
TableDumpStatus user = dumpPhase.getTable("user");
user.setAllRows(999);
user.setReadRows(998);
user.setWaiting(false);
user.setFaild(false);
user.setComplete(false);
statusCollection.setDumpPhase(dumpPhase);
Assert.assertFalse("shall be doing", statusCollection.isComplete());

for (int i = 0; i < 100; i++) {
statusCollection.flushStatus2Local();
}

user.setReadRows(999);
user.setComplete(true);
statusCollection.isComplete();
Assert.assertTrue("shall have done", statusCollection.isComplete());
Assert.assertEquals(2, flush2Local.writeCount);


}

private static class MockFlush2Local implements IFlush2Local {
private int writeCount;

@Override
public void write(BasicPhaseStatus status) throws Exception {
this.writeCount++;
}

@Override
public BasicPhaseStatus loadPhase() throws Exception {
throw new UnsupportedOperationException();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.qlangtech.tis.fullbuild.phasestatus;

import com.qlangtech.tis.fullbuild.phasestatus.impl.BasicPhaseStatus;

/**
* @author: 百岁([email protected]
* @create: 2024-04-15 18:49
**/
public interface IFlush2Local {


public void write(BasicPhaseStatus status) throws Exception;

public BasicPhaseStatus loadPhase() throws Exception;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http:https://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.qlangtech.tis.fullbuild.phasestatus;

import java.io.File;
import java.util.Optional;
import java.util.ServiceLoader;

/**
* @author: 百岁([email protected]
* @create: 2024-04-15 18:49
**/
public interface IFlush2LocalFactory {

public static Optional<IFlush2Local> createNew(ClassLoader cl, File localFile) {
ServiceLoader<IFlush2LocalFactory> svcLoader = ServiceLoader.load(IFlush2LocalFactory.class, cl);
for (IFlush2LocalFactory factory : svcLoader) {
return Optional.of(factory.create(localFile));
}
// throw new IllegalStateException("can not find any svc loader for :" + IFlush2LocalFactory.class);
return Optional.empty();
}


IFlush2Local create(File localFile);
}
Loading

0 comments on commit e401f09

Please sign in to comment.