Skip to content

Commit

Permalink
modify powerjob deleteion process
Browse files Browse the repository at this point in the history
  • Loading branch information
baisui1981 committed Mar 24, 2024
1 parent 97c7719 commit 3db705c
Show file tree
Hide file tree
Showing 21 changed files with 237 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
public class TargetResName extends ResName {


public static final TargetResName K8S_DATAX_INSTANCE_NAME = new TargetResName("datax-worker");

public String getStreamSourceHandlerClass() {
return "com.qlangtech.tis.realtime.transfer." + this.getName() + "." + UnderlineUtils.getJavaName(this.getName()) + "SourceHandle";
}
Expand Down
2 changes: 1 addition & 1 deletion tis-console.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ spec:
volumeMounts:
- name: tis-console-pvc
mountPath: "/opt/data"
image: registry.cn-hangzhou.aliyuncs.com/tis/tis-console:4.0.1
image: registry.cn-hangzhou.aliyuncs.com/tis/tis-console:4.0.0.2
ports:
- name: tis-8080
containerPort: 8080
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import com.qlangtech.tis.extension.IPropertyType;
import com.qlangtech.tis.extension.util.MultiItemsViewType;
import com.qlangtech.tis.extension.util.PluginExtraProps;
import com.qlangtech.tis.fullbuild.IFullBuildContext;
import com.qlangtech.tis.lang.TisException;
import com.qlangtech.tis.manage.IAppSource;
import com.qlangtech.tis.manage.PermissionConstant;
Expand Down Expand Up @@ -358,7 +359,6 @@ public void doTestLaunchDataxWorker(Context context) throws IOException {
}

public static final String KEY_USING_POWERJOB_USE_EXIST_CLUSTER = "usingPowderJobUseExistCluster";
public static final String KEY_TARGET_NAME = "targetName";

@Func(value = PermissionConstant.DATAX_MANAGE)
public void doApplyPodNumber(Context context) throws Exception {
Expand Down Expand Up @@ -393,7 +393,7 @@ writer, new ExecuteSteps(dataxJobWorker, executeSteps), () -> {

/**
* 启动过程中出错,需要重启启动
*
*remove_datax_worker
* @param context
*/
@Func(value = PermissionConstant.DATAX_MANAGE)
Expand Down Expand Up @@ -454,9 +454,9 @@ public void doGetDataxWorkerConfig(Context context) throws Exception {
if (lt.workerCptType == K8SWorkerCptType.Server) {

DataXJobWorker pjServer
= DataXJobWorker.getJobWorker(DataXJobWorker.K8S_DATAX_INSTANCE_NAME, Optional.of(K8SWorkerCptType.Server));
= DataXJobWorker.getJobWorker(TargetResName.K8S_DATAX_INSTANCE_NAME, Optional.of(K8SWorkerCptType.Server));
DataXJobWorker pjWorker
= DataXJobWorker.getJobWorker(DataXJobWorker.K8S_DATAX_INSTANCE_NAME, Optional.of(K8SWorkerCptType.Worker));
= DataXJobWorker.getJobWorker(TargetResName.K8S_DATAX_INSTANCE_NAME, Optional.of(K8SWorkerCptType.Worker));
dataXWorker.put("powderJobServerRCSpec"
, IncrUtils.serializeSpec(IncrSpecResult.create(pjServer.getReplicasSpec(), pjServer.getHpa())));
dataXWorker.put("powderJobWorkerRCSpec"
Expand Down Expand Up @@ -645,7 +645,7 @@ public void doWorkerDesc(Context context) {

@Func(value = PermissionConstant.DATAX_MANAGE, sideEffect = false)
public void doGetDataxWorkerMeta(Context context) {
getJobWoker(context, DataXJobWorker.K8S_DATAX_INSTANCE_NAME);
getJobWoker(context, TargetResName.K8S_DATAX_INSTANCE_NAME);
}

@Func(value = PermissionConstant.DATAX_MANAGE, sideEffect = false)
Expand Down Expand Up @@ -677,8 +677,7 @@ private void getJobWoker(Context context, TargetResName targetName, Optional<Ser
}
DataXJobWorker jobWorker = DataXJobWorker.getJobWorker(targetName, launchToken.map((t) -> t.getWorkerCptType()));
boolean disableRcdeployment = this.getBoolean("disableRcdeployment");
jobWorkerStatus.setState(jobWorker.inService() ? IFlinkIncrJobStatus.State.RUNNING :
IFlinkIncrJobStatus.State.NONE);
jobWorkerStatus.setState((jobWorker != null && jobWorker.inService()) ? IFlinkIncrJobStatus.State.RUNNING : IFlinkIncrJobStatus.State.NONE);
if (jobWorkerStatus.getState() == IFlinkIncrJobStatus.State.RUNNING && !disableRcdeployment) {
jobWorkerStatus.setPayloads(jobWorker.getPayloadInfo());
jobWorkerStatus.setRcDeployments(jobWorker.getRCDeployments());
Expand All @@ -691,7 +690,7 @@ private TargetResName getK8SJobWorkerTargetName() {
}

private TargetResName getK8SJobWorkerTargetName(boolean validate) {
final String targetName = this.getString(KEY_TARGET_NAME);
final String targetName = this.getString(IFullBuildContext.KEY_TARGET_NAME);
if (validate) {
DataXJobWorker.validateTargetName(targetName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import com.qlangtech.tis.plugin.annotation.FormFieldType;
import com.qlangtech.tis.plugin.ds.DataSourceFactory;
import com.qlangtech.tis.runtime.module.action.BasicModule;
import com.qlangtech.tis.runtime.module.misc.BasicRundata;
import com.qlangtech.tis.runtime.module.misc.IMessageHandler;
import com.qlangtech.tis.util.AttrValMap;
import com.qlangtech.tis.util.DescriptorsJSON;
Expand Down Expand Up @@ -906,7 +907,9 @@ public void doSavePluginConfig(Context context) throws Exception {

if (forwardParams != null) {
this.getRequest().setAttribute(ItemsSaveResult.KEY_ITEMS_SAVE_RESULT, describables);
getRundata().forwardTo(forwardParams[0], forwardParams[1], forwardParams[2]);
// getRundata().forwardTo(forwardParams[0], forwardParams[1], forwardParams[2]);

BasicRundata.forward(getRundata(), forwardParams);
return;
}

Expand All @@ -923,7 +926,7 @@ public static List<ItemsSaveResult> getItemsSaveResultInRequest(HttpServletReque
return (List<ItemsSaveResult>) request.getAttribute(ItemsSaveResult.KEY_ITEMS_SAVE_RESULT);
}

private String[] getActionForwardParam(JSONObject postData) {
protected String[] getActionForwardParam(JSONObject postData) {
String serverForward = postData.getString("serverForward");
String[] forwardParams = null;
if (StringUtils.isNotEmpty(serverForward)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ private TISK8sDelegate(String indexName) {
if (StringUtils.isEmpty(indexName)) {
throw new IllegalArgumentException("param indexName can not be null");
}
if (DataXJobWorker.K8S_DATAX_INSTANCE_NAME.getName().equals(indexName)
if (TargetResName.K8S_DATAX_INSTANCE_NAME.getName().equals(indexName)
|| DataXJobWorker.K8S_FLINK_CLUSTER_NAME.match(indexName)
) {
// DataXJobWorker.getFlinkClusterWorker();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import com.qlangtech.tis.plugin.annotation.Validator;
import com.qlangtech.tis.plugin.ds.DataSourceFactory;
import com.qlangtech.tis.pubhook.common.RunEnvironment;
import com.qlangtech.tis.runtime.module.misc.BasicRundata;
import com.qlangtech.tis.runtime.module.misc.DefaultMessageHandler;
import com.qlangtech.tis.runtime.module.misc.IControlMsgHandler;
import com.qlangtech.tis.runtime.module.misc.IFieldErrorHandler;
Expand Down Expand Up @@ -374,6 +375,10 @@ public ITISCoordinator getSolrZkClient() {

private static Rundata createRundata() {
return new Rundata() {
@Override
public String getStringParam(String key) {
return getRequest().getParameter(key);
}

@Override
public HttpServletRequest getRequest() {
Expand Down Expand Up @@ -560,19 +565,11 @@ protected SchemaAction.CreateAppResult createNewApp(Context context, Application
return result;
}

public static interface Rundata {
public interface Rundata extends BasicRundata {

public HttpServletRequest getRequest();

public default void forwardTo(String target) {
throw new UnsupportedOperationException(target);
}

public void forwardTo(String namespace, String target, String method);

public void setLayout(String layout);

public void redirectTo(String target);
// public Context getContext();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http:https://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.qlangtech.tis.runtime.module.action;

Expand All @@ -22,6 +22,7 @@
import com.qlangtech.tis.ibatis.BasicCriteria;
import com.qlangtech.tis.lang.ILogErrorDetail;
import com.qlangtech.tis.lang.TisException;
import com.qlangtech.tis.lang.TisException.ErrorCode;
import com.qlangtech.tis.manage.PermissionConstant;
import com.qlangtech.tis.manage.biz.dal.dao.IOperationLogDAO;
import com.qlangtech.tis.manage.biz.dal.pojo.OperationLogCriteria;
Expand All @@ -43,6 +44,11 @@ public class OperationLogAction extends BasicModule {

private IOperationLogDAO operationLogDAO;

public void doExceptionRestore(Context context) {
ErrorCode errorCode = TisException.parse(this.getString("errorCode"));
errorCode.execForward(this.getRundata());
}

/**
* @param context
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ public class SysInitializeAction extends BasicModule {

private static Boolean _isSysInitialized;



/**
* 当使用TIS作为docker容器启动,本地data目录作为容器卷,初始状态是空的,需要将空的卷初始化
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package com.qlangtech.tis;

import com.alibaba.fastjson.JSONObject;
import com.qlangtech.tis.coredefine.module.action.TriggerBuildResult;
import com.qlangtech.tis.manage.common.*;
import com.qlangtech.tis.manage.common.valve.AjaxValve;
import com.qlangtech.tis.manage.spring.EnvironmentBindService;
Expand All @@ -40,7 +41,7 @@ public class BasicActionTestCase extends StrutsSpringTestCase implements TISEasy


protected void setCollection(String collection) {
request.addHeader("appname", collection);
request.addHeader(TriggerBuildResult.KEY_APPNAME, collection);
DefaultFilter.AppAndRuntime app = new DefaultFilter.AppAndRuntime();
app.setAppName(collection);
DefaultFilter.setAppAndRuntime(app);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import com.alibaba.fastjson.JSONObject;
import com.opensymphony.xwork2.ActionProxy;
import com.qlangtech.tis.BasicActionTestCase;
import com.qlangtech.tis.datax.job.DataXJobWorker;
import com.qlangtech.tis.fullbuild.IFullBuildContext;
import com.qlangtech.tis.manage.common.valve.AjaxValve;
import com.qlangtech.tis.trigger.util.JsonUtil;

Expand All @@ -49,7 +49,7 @@ public void testDoLaunchDataxWorker() throws Exception {
request.setParameter("event_submit_do_launch_datax_worker", "y");
request.setParameter("action", "datax_action");
request.setParameter(DataxAction.KEY_USING_POWERJOB_USE_EXIST_CLUSTER, "false");
request.setParameter(DataxAction.KEY_TARGET_NAME, DataXJobWorker.K8S_DATAX_INSTANCE_NAME.getName());
request.setParameter(IFullBuildContext.KEY_TARGET_NAME, TargetResName.K8S_DATAX_INSTANCE_NAME.getName());
//JSONObject content = new JSONObject();

//content.put(CollectionAction.KEY_INDEX_NAME, TEST_TABLE_EMPLOYEES_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,5 @@ public interface IFullBuildContext {
// public static final File parent = new File(
// "D:\\j2ee_solution\\eclipse-standard-kepler-SR2-win32-x86_64\\workspace\\tis-saturn\\tis-sql-parser\\src\\main\\resources\\totalpaytest");
String NAME_DATAFLOW_DIR = "df";
String KEY_TARGET_NAME = "targetName";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http:https://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.qlangtech.tis.lang;

import com.qlangtech.tis.lang.TisException.ErrorCode;

import java.util.Collections;
import java.util.Map;

/**
* @author: 百岁([email protected]
* @create: 2024-03-23 17:10
**/
public class ErrorValue {
private final ErrorCode code;
private final Map<String, Object> payload;

public static ErrorValue create(ErrorCode code, Map<String, Object> payload) {
return new ErrorValue(code, payload);
}

public static ErrorValue create(ErrorCode code, String key, Object val) {
return create(code, Collections.singletonMap(key, val));
}

private ErrorValue(ErrorCode code, Map<String, Object> payload) {
this.code = code;
this.payload = payload;
}

public ErrorCode getCode() {
return code;
}

public Map<String, Object> getPayload() {
return payload;
}
}
Loading

0 comments on commit 3db705c

Please sign in to comment.