Skip to content

Commit

Permalink
add java memory on dataX java launch params
Browse files Browse the repository at this point in the history
  • Loading branch information
baisui1981 committed Apr 13, 2024
1 parent b92a103 commit d57f781
Show file tree
Hide file tree
Showing 8 changed files with 121 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import com.qlangtech.tis.coredefine.module.action.Specification;
import org.apache.commons.lang.StringUtils;

import java.util.Optional;

/**
* 发布实例(ReplicationController,RepliaSet,Deployment)时的pod规格
*
Expand Down Expand Up @@ -110,9 +112,9 @@ public void setMemoryLimit(Specification memoryLimit) {
this.memoryLimit = memoryLimit;
}

public String toJavaMemorySpec() {
return "-Xms" + (int) (this.getMemoryRequest().normalizeMemory() * 0.8)
+ "m -Xmx" + (int) (this.getMemoryLimit().normalizeMemory() * 0.8) + "m";
public String toJavaMemorySpec(Optional<Integer> proportion) {
return "-Xms" + (int) (this.getMemoryRequest().normalizeMemory(proportion) * 0.8)
+ "m -Xmx" + (int) (this.getMemoryLimit().normalizeMemory(proportion) * 0.8) + "m";
}

public static void main(String[] args) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,26 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http:https://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.qlangtech.tis.coredefine.module.action;

import org.apache.commons.lang.StringUtils;

import java.util.Objects;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand All @@ -31,49 +32,60 @@
*/
public class Specification {

private static final Pattern p = Pattern.compile("(\\d+)(\\w*)");
private static final Pattern p = Pattern.compile("(\\d+(\\.\\d)?)([mGM]?)");

public static Specification parse(String val) {
Matcher m = p.matcher(val);
if (!m.matches()) {
throw new IllegalArgumentException("val:" + val + " is not match the pattern:" + p);
}
Specification s = new Specification();
s.setVal(Integer.parseInt(m.group(1)));
s.setUnit(m.group(2));
s.setVal(Float.parseFloat(m.group(1)));
s.setUnit(m.group(3));
return s;
}

private int val;
private float val;

private String unit;

public int getVal() {
public float getVal() {
return val;
}

public boolean isUnitEmpty() {
return StringUtils.isEmpty(this.unit);
}

public void setVal(int val) {
public void setVal(float val) {
this.val = val;
}

public int normalizeMemory() {
return this.normalizeMemory(Optional.empty());
}

/**
* 归一化内存规格,单位:兆
*
* @return
*/
public int normalizeMemory() {
int result = 0;
public int normalizeMemory(Optional<Integer> proportion) {
float result = 0;
if ("M".equals(this.getUnit())) {
result = this.getVal();
} else if ("G".equals(this.getUnit())) {
result = this.getVal() * 1024;
} else {
throw new IllegalStateException("invalid memory unit:" + this.getUnit());
}
return result;
final float r = result;
return proportion.map((p) -> {
if (p < 1 || p > 100) {
throw new IllegalArgumentException("proportion:" + p + " is invalid");
}
return (int) (r * p / 100);
}).orElse((int) result);
}

public String literalVal() {
Expand All @@ -83,24 +95,24 @@ public String literalVal() {
public int normalizeCPU() {
// d.setCpuRequest(Specification.parse("300m"));
// d.setCpuLimit(Specification.parse("2"));
int result = 0;
float result = 0;
if ("m".equals(this.getUnit())) {
result = this.getVal();
} else if (this.isUnitEmpty()) {
result = this.getVal() * 1024;
} else {
throw new IllegalStateException("invalid cpu unit:" + this.getUnit());
}
return result;
return (int)result;
}

public boolean memoryBigThan(Specification spec){
Objects.requireNonNull(spec,"param spec can not be null");
public boolean memoryBigThan(Specification spec) {
Objects.requireNonNull(spec, "param spec can not be null");
return this.normalizeMemory() > spec.normalizeMemory();
}

public boolean cpuBigThan(Specification spec){
Objects.requireNonNull(spec,"param spec can not be null");
public boolean cpuBigThan(Specification spec) {
Objects.requireNonNull(spec, "param spec can not be null");
return this.normalizeCPU() > spec.normalizeCPU();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@
public interface IJoinTaskContext extends IParamContext, IPipelineExecContext {




public String getJavaMemSpec();


public boolean isDryRun();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http:https://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.qlangtech.tis.coredefine.module.action;

import junit.framework.TestCase;

import java.util.Optional;

/**
* @author: 百岁([email protected]
* @create: 2024-04-13 12:56
**/
public class TestSpecification extends TestCase {

public void testNormalizeMemory() {
Specification mem = Specification.parse("1.5G");
assertEquals("G", mem.getUnit());
assertEquals(1536, mem.normalizeMemory());

assertEquals(691, mem.normalizeMemory(Optional.of(45)));

mem = Specification.parse("1G");
assertEquals("G", mem.getUnit());
assertEquals(1024, mem.normalizeMemory());
assertEquals(512, mem.normalizeMemory(Optional.of(50)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1168,14 +1168,17 @@ public void doUpdateDatax(Context context) throws Exception {
ApplicationCriteria appCriteria = new ApplicationCriteria();
appCriteria.createCriteria().andProjectNameEqualTo(dataxName);
this.getApplicationDAO().updateByExampleSelective(dataXApp, appCriteria);
IAppSource.cleanAppSourcePluginStoreCache(null, dataxName);
IAppSource.cleanAppSourcePluginStoreCache(this, dataxName);


DataXJobSubmit.getPowerJobSubmit().ifPresent((submit) -> {
submit.saveJob(this, context, old);
});

IAppSource.cleanAppSourcePluginStoreCache(null, dataxName);
IAppSource.cleanAppSourcePluginStoreCache(this, dataxName);




this.addActionMessage(context, "已经成功更新");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,12 @@ public static void validateTargetName(String targetName) {
// return getJobWorker(K8S_FLINK_CLUSTER_NAME, Optional.empty());
// }

public static <K8SDataXPowerJobWorker extends DataXJobWorker> K8SDataXPowerJobWorker getK8SDataXPowerJobWorker() {
return (K8SDataXPowerJobWorker)
DataXJobWorker.getJobWorker(TargetResName.K8S_DATAX_INSTANCE_NAME
, Optional.of(DataXJobWorker.K8SWorkerCptType.Worker));
}

public static DataXJobWorker getJobWorker(TargetResName resName) {

//ServerLaunchToken.createFlinkClusterToken()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,13 @@
import com.qlangtech.tis.trigger.util.JsonUtil;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;

import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Supplier;

Expand All @@ -68,6 +66,10 @@ public class DefaultExecContext implements IExecChainContext, IdentityName {
private ITISCoordinator coordinator;
private PhaseStatusCollection latestPhaseStatusCollection;
private StoreResourceType resType;
/**
* java 启动内存参数ms mx
*/
private String javaMemSpec;

public DefaultExecContext(String dataXName, Long triggerTimestamp) {
this.ps = Objects.requireNonNull(triggerTimestamp, "param triggerTimestamp can not be null");
Expand All @@ -89,12 +91,16 @@ static DefaultExecContext deserializeInstanceParams(JSONObject instanceParams, b
JobParams.KEY_TASK_ID + " can not be null," + JsonUtil.toString(instanceParams));
boolean dryRun = instanceParams.getBooleanValue(IFullBuildContext.DRY_RUN);
String appName = instanceParams.getString(JobParams.KEY_COLLECTION);
final String javaMemSpec = instanceParams.getString(JobParams.KEY_JAVA_MEMORY_SPEC);

Long triggerTimestamp = instanceParams.getLong(DataxUtils.EXEC_TIMESTAMP);
DefaultExecContext execChainContext = new DefaultExecContext(appName, triggerTimestamp);
execChainContext.setJavaMemSpec(javaMemSpec);
execChainContext.setCoordinator(ITISCoordinator.create());
execChainContext.setDryRun(dryRun);
execChainContext.setAttribute(JobCommon.KEY_TASK_ID, taskId);


if (resolveCfgsSnapshotConsumer) {

String pluginCfgsMetas = instanceParams.getString(PluginAndCfgsSnapshotUtils.KEY_PLUGIN_CFGS_METAS);
Expand Down Expand Up @@ -139,6 +145,14 @@ public void setDryRun(boolean dryRun) {
this.dryRun = dryRun;
}

@Override
public String getJavaMemSpec() {
return javaMemSpec;
}

public void setJavaMemSpec(String javaMemSpec) {
this.javaMemSpec = javaMemSpec;
}

public void setWorkflowId(Integer workflowId) {
this.workflowId = workflowId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ public interface IExecChainContext extends IJoinTaskContext {
= new MessageFormat(Config.getConfigRepositoryHost()
+ "/config/config.ajax?action={0}&event_submit_{1}=true");



static Integer createNewTask(IExecChainContext chainContext) {
return createNewTask(chainContext, TriggerType.MANUAL);
}
Expand Down

0 comments on commit d57f781

Please sign in to comment.