Skip to content

Commit

Permalink
refactor tis-plugin relevant dependencies from tis-console,tis-assemb…
Browse files Browse the repository at this point in the history
…le to tis-web-start
  • Loading branch information
baisui1981 committed May 27, 2024
1 parent fbdd71c commit 513fd0b
Show file tree
Hide file tree
Showing 16 changed files with 391 additions and 64 deletions.
23 changes: 0 additions & 23 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -275,29 +275,6 @@
<version>3.0</version>
</dependency>

<!-- <dependency>-->
<!-- <groupId>com.qlangtech.tis</groupId>-->
<!-- <artifactId>tis-solrcore-extend</artifactId>-->
<!-- <version>${project.version}</version>-->
<!-- <exclusions>-->
<!-- <exclusion>-->
<!-- <groupId>org.apache.hadoop</groupId>-->
<!-- <artifactId>hadoop-annotations</artifactId>-->
<!-- </exclusion>-->
<!-- <exclusion>-->
<!-- <groupId>org.springframework</groupId>-->
<!-- <artifactId>spring-core</artifactId>-->
<!-- </exclusion>-->
<!-- <exclusion>-->
<!-- <groupId>org.springframework</groupId>-->
<!-- <artifactId>spring-context</artifactId>-->
<!-- </exclusion>-->
<!-- <exclusion>-->
<!-- <groupId>org.springframework</groupId>-->
<!-- <artifactId>spring-context</artifactId>-->
<!-- </exclusion>-->
<!-- </exclusions>-->
<!-- </dependency>-->

<dependency>
<groupId>com.qlangtech.tis</groupId>
Expand Down
4 changes: 4 additions & 0 deletions tis-assemble/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
<groupId>com.qlangtech.tis</groupId>
<artifactId>tis-dag</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
Expand Down Expand Up @@ -117,6 +118,7 @@
<dependency>
<groupId>com.qlangtech.tis</groupId>
<artifactId>tis-hadoop-rpc</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
Expand All @@ -128,6 +130,7 @@
<groupId>com.qlangtech.tis</groupId>
<artifactId>tis-sql-parser</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
Expand Down Expand Up @@ -175,6 +178,7 @@
<dependency>
<groupId>com.qlangtech.tis</groupId>
<artifactId>tis-manage-pojo</artifactId>
<scope>provided</scope>
</dependency>

<!--for flume-ng-node log collector
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public static ActionInvocation createExecChain(DefaultChainContext chainContext)


public static ActionInvocation createInvocation(IExecChainContext chainContext, IExecuteInterceptor[] ints) {

final ComponentOrders componentOrders = new ComponentOrders();
AbstractActionInvocation preInvocation = new AbstractActionInvocation();
preInvocation.setContext(chainContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import com.google.common.collect.Maps;
import com.qlangtech.tis.assemble.FullbuildPhase;
import com.qlangtech.tis.cloud.ITISCoordinator;
import com.qlangtech.tis.config.k8s.ReplicasSpec;
import com.qlangtech.tis.coredefine.module.action.Specification;
import com.qlangtech.tis.datax.DataXJobSubmitParams;
import com.qlangtech.tis.datax.IDataXBatchPost;
import com.qlangtech.tis.datax.IDataxProcessor;
import com.qlangtech.tis.datax.IDataxWriter;
Expand Down Expand Up @@ -70,7 +73,8 @@ public class DefaultChainContext implements IExecChainContext {

@Override
public String getJavaMemSpec() {
return null;
DataXJobSubmitParams submitParams = DataXJobSubmitParams.getDftIfEmpty();
return submitParams.getJavaMemorySpec();
}
// @Override
// public TableDumpFactory getTableDumpFactory() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,11 @@ protected void service(HttpServletRequest req, HttpServletResponse res) throws S
throw new ServletException("clean plugin store cache faild ", e);
}

if (Boolean.parseBoolean(req.getParameter(TIS.KEY_ACTION_CLEAN_TIS))) {
TIS.clean();
logger.info(" clean TIS cache", extendPoint);
return;
}
// if (Boolean.parseBoolean(req.getParameter(TIS.KEY_ACTION_CLEAN_TIS))) {
// TIS.clean();
// logger.info(" clean TIS cache", extendPoint);
// return;
// }


int taskid = Integer.parseInt(req.getParameter(JobCommon.KEY_TASK_ID));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@
*/
public class Specification {

private static final Pattern p = Pattern.compile("(\\d+(\\.\\d)?)(([mGM]|Mi)?)");
public static final String MEMORY_UNIT_MEGABYTE = "Mi";

private static final Pattern p = Pattern.compile("(\\d+(\\.\\d)?)(([mGM]|"+MEMORY_UNIT_MEGABYTE+")?)");

public static Specification parse(String val) {
Matcher m = p.matcher(val);
Expand Down Expand Up @@ -72,7 +74,7 @@ public int normalizeMemory() {
*/
public int normalizeMemory(Optional<Integer> proportion) {
float result = 0;
if ("Mi".equals(this.getUnit()) || "M".equals(this.getUnit())) {
if (MEMORY_UNIT_MEGABYTE.equals(this.getUnit()) || "M".equals(this.getUnit())) {
result = this.getVal();
} else if ("G".equals(this.getUnit())) {
result = this.getVal() * 1024;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public void testNormalizeMemory() {
assertEquals(512, mem.normalizeMemory(Optional.of(50)));

mem = Specification.parse("1500Mi");
assertEquals("Mi", mem.getUnit());
assertEquals(Specification.MEMORY_UNIT_MEGABYTE, mem.getUnit());
assertEquals(1500, mem.normalizeMemory());
assertEquals(750, mem.normalizeMemory(Optional.of(50)));

Expand Down
7 changes: 6 additions & 1 deletion tis-console/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,20 @@
<dependency>
<groupId>com.qlangtech.tis</groupId>
<artifactId>tis-hadoop-rpc</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.qlangtech.tis</groupId>
<artifactId>tis-common-dao</artifactId>

<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.qlangtech.tis</groupId>
<artifactId>tis-dag</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
Expand Down Expand Up @@ -205,6 +207,7 @@
<groupId>com.qlangtech.tis</groupId>
<artifactId>tis-plugin</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
Expand All @@ -221,6 +224,7 @@
<dependency>
<groupId>com.qlangtech.tis</groupId>
<artifactId>tis-manage-pojo</artifactId>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
Expand All @@ -236,6 +240,7 @@
<dependency>
<groupId>com.qlangtech.tis</groupId>
<artifactId>tis-common</artifactId>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,25 +128,25 @@ public void afterSaved(PluginItems.PluginItemsSaveEvent event) {
}

private static void notifyPluginUpdate2AssembleNode(String applyParams, String targetResource) {
if (TisAppLaunch.isTestMock()) {
logger.info("skip apply clean " + targetResource + " cache by " + applyParams);
return;
}
long start = System.currentTimeMillis();
try {

URL url = new URL(Config.getAssembleHttpHost() + "/task_status?" + applyParams);
HttpUtils.get(url, new ConfigFileContext.StreamProcess<Void>() {
@Override
public Void p(int status, InputStream stream, Map<String, List<String>> headerFields) {
logger.info("has apply clean " + targetResource + " cache by " + applyParams);

return null;
}
});
} catch (Exception e) {
logger.warn("apply clean " + targetResource + ",consume:" + (System.currentTimeMillis() - start) + "ms, cache " + "faild " + e.getMessage());
}
// if (TisAppLaunch.isTestMock()) {
// logger.info("skip apply clean " + targetResource + " cache by " + applyParams);
// return;
// }
// long start = System.currentTimeMillis();
// try {
//
// URL url = new URL(Config.getAssembleHttpHost() + "/task_status?" + applyParams);
// HttpUtils.get(url, new ConfigFileContext.StreamProcess<Void>() {
// @Override
// public Void p(int status, InputStream stream, Map<String, List<String>> headerFields) {
// logger.info("has apply clean " + targetResource + " cache by " + applyParams);
//
// return null;
// }
// });
// } catch (Exception e) {
// logger.warn("apply clean " + targetResource + ",consume:" + (System.currentTimeMillis() - start) + "ms, cache " + "faild " + e.getMessage());
// }
}

private static class IconsDefs {
Expand Down Expand Up @@ -610,7 +610,7 @@ public void run() {
} catch (InterruptedException e) {
}
// 为了让Assemble等节点的uberClassLoader重新加载一次,需要主动向Assemble等节点发送一个指令
notifyPluginUpdate2AssembleNode(TIS.KEY_ACTION_CLEAN_TIS + "=true", "TIS");
// notifyPluginUpdate2AssembleNode(TIS.KEY_ACTION_CLEAN_TIS + "=true", "TIS");
InstallUtil.proceedToNextStateFrom(InstallState.INITIAL_PLUGINS_INSTALLING);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import com.google.common.collect.Maps;
import com.qlangtech.tis.assemble.FullbuildPhase;
import com.qlangtech.tis.compiler.streamcode.IDBTableNamesGetter;
import com.qlangtech.tis.datax.DataXJobSubmit;
import com.qlangtech.tis.datax.DataXJobSubmitParams;
import com.qlangtech.tis.datax.IDataxWriter;
import com.qlangtech.tis.exec.ExecuteResult;
import com.qlangtech.tis.exec.IExecChainContext;
Expand Down Expand Up @@ -120,10 +120,11 @@ public DataFlowAppSource(WorkFlow dataflow, IDataxWriter writer) {
}

public static ExecutorService createExecutorService(IExecChainContext execChainContext) {
int nThreads = 1;
final DataXJobSubmitParams submitParams = DataXJobSubmitParams.getDftIfEmpty();
int nThreads = submitParams.pipelineParallelism;
return new ThreadPoolExecutor(
nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(DataXJobSubmit.MAX_TABS_NUM_IN_PER_JOB),
new LinkedBlockingQueue<>(submitParams.maxJobs),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Expand Down
2 changes: 1 addition & 1 deletion tis-plugin/src/main/java/com/qlangtech/tis/TIS.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public class TIS {

public static final String KEY_ALT_SYSTEM_PROP_TIS_PLUGIN_ROOT = "plugin_dir_root";
public static final String KEY_TIS_PLUGIN_ROOT = "plugins";
public static final String KEY_ACTION_CLEAN_TIS = "cleanTis";
// public static final String KEY_ACTION_CLEAN_TIS = "cleanTis";

// public static final String KEY_TIS_INCR_COMPONENT_CONFIG_FILE = "incr_config.xml";
public static final String KEY_TIE_GLOBAL_COMPONENT_CONFIG_FILE = "global_config.xml";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.qlangtech.tis.datax;

/**
* @author: 百岁([email protected]
* @create: 2022-01-01 14:24
**/
public class DataXJobSingleProcessorException extends Exception {
public DataXJobSingleProcessorException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,6 @@
import com.qlangtech.tis.fullbuild.phasestatus.PhaseStatusCollection;
import com.qlangtech.tis.fullbuild.phasestatus.impl.DumpPhaseStatus;
import com.qlangtech.tis.order.center.IJoinTaskContext;
import com.qlangtech.tis.plugin.annotation.FormField;
import com.qlangtech.tis.plugin.annotation.FormFieldType;
import com.qlangtech.tis.plugin.annotation.Validator;
import com.qlangtech.tis.plugin.ds.CMeta;
import com.qlangtech.tis.plugin.ds.DBIdentity;
import com.qlangtech.tis.plugin.ds.ISelectedTab;
Expand Down Expand Up @@ -69,6 +66,7 @@ public abstract class DataXJobSubmit {
private static final Logger logger = LoggerFactory.getLogger(DataXJobSubmit.class);
public static final String KEY_DATAX_READERS = "dataX_readers";
public static final int MAX_TABS_NUM_IN_PER_JOB = 40;
public static final int DEFAULT_PARALLELISM_IN_VM = 1;// parallelism

public static Callable<DataXJobSubmit> mockGetter;

Expand All @@ -82,8 +80,8 @@ public static void main(String[] args) throws Exception {
// System.out.println( DataXJobSubmit.class("com/google/common/base/Preconditions.class"));
}

@FormField(ordinal = 0, type = FormFieldType.INT_NUMBER, validate = {Validator.require})
public Integer parallelism;
// @FormField(ordinal = 0, type = FormFieldType.INT_NUMBER, validate = {Validator.require})
// public Integer parallelism;

public static DataXJobSubmit.InstanceType getDataXTriggerType() {

Expand Down Expand Up @@ -165,8 +163,9 @@ public boolean validate(IControlMsgHandler controlMsgHandler, Context context,
@Override
public boolean validate(IControlMsgHandler controlMsgHandler, Context context,
List<DataXCfgGenerator.DataXCfgFile> cfgFileNames) {
if (cfgFileNames.size() > MAX_TABS_NUM_IN_PER_JOB) {
controlMsgHandler.addErrorMessage(context, "单机版,单次表导入不能超过" + MAX_TABS_NUM_IN_PER_JOB +
DataXJobSubmitParams submitParams = DataXJobSubmitParams.getDftIfEmpty();
if (cfgFileNames.size() > submitParams.maxJobs) {
controlMsgHandler.addErrorMessage(context, "单机版,单次表导入不能超过" + submitParams.maxJobs +
"张,如需要导入更多表,请使用分布式K8S DataX执行期");
return false;
}
Expand Down
Loading

0 comments on commit 513fd0b

Please sign in to comment.