From d57f781c2ac043a563bd75e762bc22b7a4c9adf9 Mon Sep 17 00:00:00 2001 From: mozhenghua Date: Sat, 13 Apr 2024 16:31:09 +0800 Subject: [PATCH] add java memory on dataX java launch params --- .../tis/config/k8s/ReplicasSpec.java | 8 ++- .../module/action/Specification.java | 72 +++++++++++-------- .../tis/order/center/IJoinTaskContext.java | 3 +- .../module/action/TestSpecification.java | 43 +++++++++++ .../coredefine/module/action/DataxAction.java | 9 ++- .../tis/datax/job/DataXJobWorker.java | 6 ++ .../tis/exec/DefaultExecContext.java | 18 ++++- .../qlangtech/tis/exec/IExecChainContext.java | 2 + 8 files changed, 121 insertions(+), 40 deletions(-) create mode 100644 tis-builder-api/src/test/java/com/qlangtech/tis/coredefine/module/action/TestSpecification.java diff --git a/tis-builder-api/src/main/java/com/qlangtech/tis/config/k8s/ReplicasSpec.java b/tis-builder-api/src/main/java/com/qlangtech/tis/config/k8s/ReplicasSpec.java index c8aa54b36..d221eba40 100644 --- a/tis-builder-api/src/main/java/com/qlangtech/tis/config/k8s/ReplicasSpec.java +++ b/tis-builder-api/src/main/java/com/qlangtech/tis/config/k8s/ReplicasSpec.java @@ -20,6 +20,8 @@ import com.qlangtech.tis.coredefine.module.action.Specification; import org.apache.commons.lang.StringUtils; +import java.util.Optional; + /** * 发布实例(ReplicationController,RepliaSet,Deployment)时的pod规格 * @@ -110,9 +112,9 @@ public void setMemoryLimit(Specification memoryLimit) { this.memoryLimit = memoryLimit; } - public String toJavaMemorySpec() { - return "-Xms" + (int) (this.getMemoryRequest().normalizeMemory() * 0.8) - + "m -Xmx" + (int) (this.getMemoryLimit().normalizeMemory() * 0.8) + "m"; + public String toJavaMemorySpec(Optional proportion) { + return "-Xms" + (int) (this.getMemoryRequest().normalizeMemory(proportion) * 0.8) + + "m -Xmx" + (int) (this.getMemoryLimit().normalizeMemory(proportion) * 0.8) + "m"; } public static void main(String[] args) { diff --git a/tis-builder-api/src/main/java/com/qlangtech/tis/coredefine/module/action/Specification.java b/tis-builder-api/src/main/java/com/qlangtech/tis/coredefine/module/action/Specification.java index 5805173d3..d4d260135 100644 --- a/tis-builder-api/src/main/java/com/qlangtech/tis/coredefine/module/action/Specification.java +++ b/tis-builder-api/src/main/java/com/qlangtech/tis/coredefine/module/action/Specification.java @@ -1,25 +1,26 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package com.qlangtech.tis.coredefine.module.action; import org.apache.commons.lang.StringUtils; import java.util.Objects; +import java.util.Optional; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -31,7 +32,7 @@ */ public class Specification { - private static final Pattern p = Pattern.compile("(\\d+)(\\w*)"); + private static final Pattern p = Pattern.compile("(\\d+(\\.\\d)?)([mGM]?)"); public static Specification parse(String val) { Matcher m = p.matcher(val); @@ -39,16 +40,16 @@ public static Specification parse(String val) { throw new IllegalArgumentException("val:" + val + " is not match the pattern:" + p); } Specification s = new Specification(); - s.setVal(Integer.parseInt(m.group(1))); - s.setUnit(m.group(2)); + s.setVal(Float.parseFloat(m.group(1))); + s.setUnit(m.group(3)); return s; } - private int val; + private float val; private String unit; - public int getVal() { + public float getVal() { return val; } @@ -56,16 +57,21 @@ public boolean isUnitEmpty() { return StringUtils.isEmpty(this.unit); } - public void setVal(int val) { + public void setVal(float val) { this.val = val; } + public int normalizeMemory() { + return this.normalizeMemory(Optional.empty()); + } + /** * 归一化内存规格,单位:兆 + * * @return */ - public int normalizeMemory() { - int result = 0; + public int normalizeMemory(Optional proportion) { + float result = 0; if ("M".equals(this.getUnit())) { result = this.getVal(); } else if ("G".equals(this.getUnit())) { @@ -73,7 +79,13 @@ public int normalizeMemory() { } else { throw new IllegalStateException("invalid memory unit:" + this.getUnit()); } - return result; + final float r = result; + return proportion.map((p) -> { + if (p < 1 || p > 100) { + throw new IllegalArgumentException("proportion:" + p + " is invalid"); + } + return (int) (r * p / 100); + }).orElse((int) result); } public String literalVal() { @@ -83,7 +95,7 @@ public String literalVal() { public int normalizeCPU() { // d.setCpuRequest(Specification.parse("300m")); // d.setCpuLimit(Specification.parse("2")); - int result = 0; + float result = 0; if ("m".equals(this.getUnit())) { result = this.getVal(); } else if (this.isUnitEmpty()) { @@ -91,16 +103,16 @@ public int normalizeCPU() { } else { throw new IllegalStateException("invalid cpu unit:" + this.getUnit()); } - return result; + return (int)result; } - public boolean memoryBigThan(Specification spec){ - Objects.requireNonNull(spec,"param spec can not be null"); + public boolean memoryBigThan(Specification spec) { + Objects.requireNonNull(spec, "param spec can not be null"); return this.normalizeMemory() > spec.normalizeMemory(); } - public boolean cpuBigThan(Specification spec){ - Objects.requireNonNull(spec,"param spec can not be null"); + public boolean cpuBigThan(Specification spec) { + Objects.requireNonNull(spec, "param spec can not be null"); return this.normalizeCPU() > spec.normalizeCPU(); } diff --git a/tis-builder-api/src/main/java/com/qlangtech/tis/order/center/IJoinTaskContext.java b/tis-builder-api/src/main/java/com/qlangtech/tis/order/center/IJoinTaskContext.java index 03065807d..d965e3941 100644 --- a/tis-builder-api/src/main/java/com/qlangtech/tis/order/center/IJoinTaskContext.java +++ b/tis-builder-api/src/main/java/com/qlangtech/tis/order/center/IJoinTaskContext.java @@ -29,8 +29,7 @@ public interface IJoinTaskContext extends IParamContext, IPipelineExecContext { - - + public String getJavaMemSpec(); public boolean isDryRun(); diff --git a/tis-builder-api/src/test/java/com/qlangtech/tis/coredefine/module/action/TestSpecification.java b/tis-builder-api/src/test/java/com/qlangtech/tis/coredefine/module/action/TestSpecification.java new file mode 100644 index 000000000..1f0055173 --- /dev/null +++ b/tis-builder-api/src/test/java/com/qlangtech/tis/coredefine/module/action/TestSpecification.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.qlangtech.tis.coredefine.module.action; + +import junit.framework.TestCase; + +import java.util.Optional; + +/** + * @author: 百岁(baisui@qlangtech.com) + * @create: 2024-04-13 12:56 + **/ +public class TestSpecification extends TestCase { + + public void testNormalizeMemory() { + Specification mem = Specification.parse("1.5G"); + assertEquals("G", mem.getUnit()); + assertEquals(1536, mem.normalizeMemory()); + + assertEquals(691, mem.normalizeMemory(Optional.of(45))); + + mem = Specification.parse("1G"); + assertEquals("G", mem.getUnit()); + assertEquals(1024, mem.normalizeMemory()); + assertEquals(512, mem.normalizeMemory(Optional.of(50))); + } +} diff --git a/tis-console/src/main/java/com/qlangtech/tis/coredefine/module/action/DataxAction.java b/tis-console/src/main/java/com/qlangtech/tis/coredefine/module/action/DataxAction.java index b81f127f6..26824a5d8 100644 --- a/tis-console/src/main/java/com/qlangtech/tis/coredefine/module/action/DataxAction.java +++ b/tis-console/src/main/java/com/qlangtech/tis/coredefine/module/action/DataxAction.java @@ -1168,14 +1168,17 @@ public void doUpdateDatax(Context context) throws Exception { ApplicationCriteria appCriteria = new ApplicationCriteria(); appCriteria.createCriteria().andProjectNameEqualTo(dataxName); this.getApplicationDAO().updateByExampleSelective(dataXApp, appCriteria); - IAppSource.cleanAppSourcePluginStoreCache(null, dataxName); - IAppSource.cleanAppSourcePluginStoreCache(this, dataxName); - DataXJobSubmit.getPowerJobSubmit().ifPresent((submit) -> { submit.saveJob(this, context, old); }); + IAppSource.cleanAppSourcePluginStoreCache(null, dataxName); + IAppSource.cleanAppSourcePluginStoreCache(this, dataxName); + + + + this.addActionMessage(context, "已经成功更新"); } diff --git a/tis-plugin/src/main/java/com/qlangtech/tis/datax/job/DataXJobWorker.java b/tis-plugin/src/main/java/com/qlangtech/tis/datax/job/DataXJobWorker.java index f193c93ab..7e132c48f 100644 --- a/tis-plugin/src/main/java/com/qlangtech/tis/datax/job/DataXJobWorker.java +++ b/tis-plugin/src/main/java/com/qlangtech/tis/datax/job/DataXJobWorker.java @@ -152,6 +152,12 @@ public static void validateTargetName(String targetName) { // return getJobWorker(K8S_FLINK_CLUSTER_NAME, Optional.empty()); // } + public static K8SDataXPowerJobWorker getK8SDataXPowerJobWorker() { + return (K8SDataXPowerJobWorker) + DataXJobWorker.getJobWorker(TargetResName.K8S_DATAX_INSTANCE_NAME + , Optional.of(DataXJobWorker.K8SWorkerCptType.Worker)); + } + public static DataXJobWorker getJobWorker(TargetResName resName) { //ServerLaunchToken.createFlinkClusterToken() diff --git a/tis-plugin/src/main/java/com/qlangtech/tis/exec/DefaultExecContext.java b/tis-plugin/src/main/java/com/qlangtech/tis/exec/DefaultExecContext.java index 94a92770d..fd9cdcc05 100644 --- a/tis-plugin/src/main/java/com/qlangtech/tis/exec/DefaultExecContext.java +++ b/tis-plugin/src/main/java/com/qlangtech/tis/exec/DefaultExecContext.java @@ -41,7 +41,6 @@ import com.qlangtech.tis.trigger.util.JsonUtil; import org.apache.commons.codec.binary.Base64; import org.apache.commons.lang.StringUtils; -import org.apache.commons.lang.exception.ExceptionUtils; import java.io.ByteArrayInputStream; import java.io.InputStream; @@ -49,7 +48,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.function.Consumer; import java.util.function.Supplier; @@ -68,6 +66,10 @@ public class DefaultExecContext implements IExecChainContext, IdentityName { private ITISCoordinator coordinator; private PhaseStatusCollection latestPhaseStatusCollection; private StoreResourceType resType; + /** + * java 启动内存参数ms mx + */ + private String javaMemSpec; public DefaultExecContext(String dataXName, Long triggerTimestamp) { this.ps = Objects.requireNonNull(triggerTimestamp, "param triggerTimestamp can not be null"); @@ -89,12 +91,16 @@ static DefaultExecContext deserializeInstanceParams(JSONObject instanceParams, b JobParams.KEY_TASK_ID + " can not be null," + JsonUtil.toString(instanceParams)); boolean dryRun = instanceParams.getBooleanValue(IFullBuildContext.DRY_RUN); String appName = instanceParams.getString(JobParams.KEY_COLLECTION); + final String javaMemSpec = instanceParams.getString(JobParams.KEY_JAVA_MEMORY_SPEC); + Long triggerTimestamp = instanceParams.getLong(DataxUtils.EXEC_TIMESTAMP); DefaultExecContext execChainContext = new DefaultExecContext(appName, triggerTimestamp); + execChainContext.setJavaMemSpec(javaMemSpec); execChainContext.setCoordinator(ITISCoordinator.create()); execChainContext.setDryRun(dryRun); execChainContext.setAttribute(JobCommon.KEY_TASK_ID, taskId); + if (resolveCfgsSnapshotConsumer) { String pluginCfgsMetas = instanceParams.getString(PluginAndCfgsSnapshotUtils.KEY_PLUGIN_CFGS_METAS); @@ -139,6 +145,14 @@ public void setDryRun(boolean dryRun) { this.dryRun = dryRun; } + @Override + public String getJavaMemSpec() { + return javaMemSpec; + } + + public void setJavaMemSpec(String javaMemSpec) { + this.javaMemSpec = javaMemSpec; + } public void setWorkflowId(Integer workflowId) { this.workflowId = workflowId; diff --git a/tis-plugin/src/main/java/com/qlangtech/tis/exec/IExecChainContext.java b/tis-plugin/src/main/java/com/qlangtech/tis/exec/IExecChainContext.java index c361c2e2b..31b7c0258 100644 --- a/tis-plugin/src/main/java/com/qlangtech/tis/exec/IExecChainContext.java +++ b/tis-plugin/src/main/java/com/qlangtech/tis/exec/IExecChainContext.java @@ -64,6 +64,8 @@ public interface IExecChainContext extends IJoinTaskContext { = new MessageFormat(Config.getConfigRepositoryHost() + "/config/config.ajax?action={0}&event_submit_{1}=true"); + + static Integer createNewTask(IExecChainContext chainContext) { return createNewTask(chainContext, TriggerType.MANUAL); }