Skip to content

Commit

Permalink
add tis-console.yaml for cloud native deployment
Browse files Browse the repository at this point in the history
  • Loading branch information
baisui1981 committed Mar 18, 2024
1 parent e1f955a commit 8a68665
Show file tree
Hide file tree
Showing 18 changed files with 245 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,40 +68,10 @@ public static File getPluginLibDir(String pluginName) {
}


// /**
// * 通过在插件中的一个类找到插件的 WEB—INFO/lib 目录
// *
// * @param classInPlugin
// * @return
// */
// public static File getPluginLibDir(Class<?> classInPlugin) {
// if (classInPlugin == null) {
// throw new IllegalArgumentException("classInPlugin can not be null");
// }
// String clazz = classInPlugin.getName();
//
// URL location = classInPlugin.getResource("/" + StringUtils.replace(clazz, ".", "/") + ".class");
//
// if (location != null) {
// final Pattern p = Pattern.compile("^.*file:(.+?" + PLUGIN_LIB_DIR + ").+?!.*$");
// Matcher m = p.matcher(location.toString());
// if (m.find()) {
// // return URLDecoder.decode(, "UTF-8");
// return new File(m.group(1));
// } else {
// throw new IllegalStateException("location is illegal:" + location);
// }
// // throw new ClassNotFoundException("Cannot parse location of '" + location + "'. Probably not loaded from a Jar");
// }
// throw new IllegalStateException("Cannot find class '" + classInPlugin.getName() + " using the classloader");
//
// // return getPluginLibDir(pluginName, true);
// }


public static final String TIS_PUB_PLUGINS_DOC_URL = "http:https://tis.pub/docs/plugin/plugins/#";

public static final String bundlePath = StringUtils.defaultIfEmpty(System.getenv(KEY_ENV_TIS_CFG_BUNDLE_PATH), KEY_DEFAULT_TIS_CFG_BUNDLE_PATH);// ;
public static final String bundlePath
= StringUtils.defaultIfEmpty(System.getenv(KEY_ENV_TIS_CFG_BUNDLE_PATH), KEY_DEFAULT_TIS_CFG_BUNDLE_PATH);// ;
public static final String bundlePathClasspath = bundlePath + ".properties";
public static final String KEY_TIS_DATASOURCE_TYPE = "tis.datasource.type";
public static final String KEY_TIS_DATASOURCE_DBNAME = "tis.datasource.dbname";
Expand Down Expand Up @@ -149,9 +119,6 @@ public static SysDBType parse(String token) {
}
}

// public static final String DB_TYPE_MYSQL = "mysql";
// public static final String DB_TYPE_DERBY = "derby";

public static final String QLANGTECH_PACKAGE = "com.qlangtech";

private static final String GENERATE_PARENT_PACKAGE = QLANGTECH_PACKAGE + ".tis.realtime.transfer";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http:https://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.qlangtech.tis.plugin;

Expand All @@ -23,10 +23,12 @@
* @author 百岁([email protected]
* @date 2020/04/13
*/
@FunctionalInterface
public interface IdentityName {

String MSG_ERROR_NAME_DUPLICATE = "名称重复";


// /**
// * 相同类型的插件不能重名
// *
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,7 @@ private IRCController getRCController() {
@Func(value = PermissionConstant.PERMISSION_INCR_PROCESS_MANAGE)
public void doGetFlinkClusterList(Context context) throws Exception {
FlinkClusterTokenManager clusters = ServerLaunchToken.createFlinkClusterToken();

List<FlinkClusterPojo> allClusters = clusters.getAllClusters();
this.setBizResult(context, allClusters);
this.setBizResult(context, clusters.getAllFlinkSessionClusters());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,8 @@ public void doGetJobWorkerMeta(Context context) {
@Func(value = PermissionConstant.DATAX_MANAGE, sideEffect = false)
public void doGetFlinkSession(Context context) {
final TargetResName targetName = getK8SJobWorkerTargetName(false);
Optional<ServerLaunchToken> launchToken = Optional.of(ServerLaunchToken.createFlinkClusterToken().token(FlinkClusterType.K8SSession, targetName));
Optional<ServerLaunchToken> launchToken
= Optional.of(ServerLaunchToken.createFlinkClusterToken().token(FlinkClusterType.K8SSession, targetName));
getJobWoker(context, targetName, launchToken);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,21 +162,22 @@ public void doGetEndtypeIcons(Context context) throws Exception {
icon.put("name", type.getVal());
icon.put("theme", "fill");
i.setRes(icon, true);
iconsDefs.add(icon);

// if (isRef) {
// icon.put("ref", ((IconReference) i).endType().getVal());
// } else {
// icon.put("icon", i.fillType());
// }

iconsDefs.add(icon);

icon = new JSONObject();
icon.put("name", type.getVal());
icon.put("theme", "outline");
// icon.put("icon", i.outlineType());
i.setRes(icon, false);
iconsDefs.add(icon);

if (i.setRes(icon, false)) {
iconsDefs.add(icon);
}
}
this.setBizResult(context, iconsDefs);
}
Expand Down Expand Up @@ -841,22 +842,6 @@ public void doGetPluginConfigInfo(Context context) throws Exception {
hlist.add(hetero.toJSON());
}

// Map<Class<?>, HeteroList<?>> extendHeteroList = new HashMap<>();
// // 需要将有相同extendpoint 的HeteroList 合并一下,不然保存的时候会有问题
// for (HeteroList<?> h : heteros) {
// hetero = extendHeteroList.get(h.getExtensionPoint());
// if (hetero == null) {
// extendHeteroList.put(h.getExtensionPoint(), h);
// } else {
// hetero.setItems(ListUtils.union(hetero.getItems(), h.getItems()));
// hetero.setDescriptors(ListUtils.union(hetero.getDescriptors(), h.getDescriptors()));
// }
// }

// for (HeteroList<?> h : extendHeteroList.values()) {
// hlist.add(h.toJSON());
// }

pluginDetail.put("plugins", hlist);
this.setBizResult(context, pluginDetail);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,13 @@ public void removeIncrProcess() {
this.incrSync.removeInstance(this.indexName);
this.cleanResource();
} catch (Throwable e) {
// 看一下RC 是否已经没有了如果是没有了 就直接回收资源
if (this.incrSync.getRCDeployment(this.indexName) == null) {
this.cleanResource();
try {
// 看一下RC 是否已经没有了如果是没有了 就直接回收资源
if (this.incrSync.getRCDeployment(this.indexName) == null) {
this.cleanResource();
}
} catch (Exception ex) {
logger.warn(ex.getMessage());
}
throw new RuntimeException(this.indexName.getName(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import com.qlangtech.tis.runtime.module.misc.DefaultMessageHandler;
import com.qlangtech.tis.runtime.module.misc.IFieldErrorHandler;
import com.qlangtech.tis.runtime.module.misc.IMessageHandler;
import com.qlangtech.tis.runtime.module.misc.impl.DefaultFieldErrorHandler;
import com.qlangtech.tis.runtime.module.misc.impl.DefaultFieldErrorHandler.ItemsErrors;
import com.qlangtech.tis.trigger.util.JsonUtil;
import org.apache.struts2.ServletActionContext;
Expand Down Expand Up @@ -167,7 +166,7 @@ private static StringBuffer buildResultStruct(IExecResult actionExecResult, Bool
JSONArray pluginErrs = new JSONArray();
for (List<ItemsErrors> /** item*/
onePluginOfItems : pluginErrorList) {
JSONArray itemErrs = convertItemsErrorList( onePluginOfItems) ;// convertItemsErrorList(onePluginOfItems);
JSONArray itemErrs = convertItemsErrorList(onePluginOfItems);// convertItemsErrorList(onePluginOfItems);
pluginErrs.add(itemErrs);
}
result.append(",\n \"").append(IAjaxResult.KEY_ERROR_FIELDS).append("\":");
Expand Down Expand Up @@ -199,7 +198,6 @@ private static JSONArray convertItemsErrorList(List<ItemsErrors> itemsErrorList)
}



private static void writeJson(HttpServletResponse response, StringBuffer execResult) throws IOException {
// try {
// Thread.sleep(1000);
Expand Down Expand Up @@ -227,7 +225,7 @@ public void addErrorMsg(List<String> msgs) {

private List<String> msgList;

// private List<List<List<DefaultFieldErrorHandler.FieldError>>> pluginErrorList;
// private List<List<List<DefaultFieldErrorHandler.FieldError>>> pluginErrorList;

private List<List<ItemsErrors>> pluginErrorList;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;

Expand All @@ -49,14 +50,24 @@
* @create: 2020-07-23 18:56
*/
public class TisException extends RuntimeException {
private final Optional<ErrorCode> errCode;

/**
* TIS会有专门的错误提示及异常处理流程
*/
public enum ErrorCode {
FLINK_CLUSTER_LOSS_OF_CONTACT,
POWER_JOB_CLUSTER_LOSS_OF_CONTACT
}

public static ErrMsg getErrMsg(Throwable throwable) {
TisException except = find(throwable);
if (except == null) {
Throwable cause = throwable.getCause();
return new ErrMsg(org.apache.commons.lang3.exception.ExceptionUtils.getRootCauseMessage(throwable), cause != null ? cause : throwable);
return new ErrMsg(org.apache.commons.lang3.exception.ExceptionUtils.getRootCauseMessage(throwable)
, cause != null ? cause : throwable, Optional.empty());
} else {
return new ErrMsg(except.getMessage(), except);
return new ErrMsg(except.getMessage(), except, except.errCode);
}
}

Expand All @@ -72,20 +83,26 @@ private static TisException find(Throwable throwable) {
return last;
}

private TisException(String message, Throwable cause) {
private TisException(Optional<ErrorCode> errorCode, String message, Throwable cause) {
super(message, cause);
this.errCode = errorCode;
}

private TisException(String message) {
super(message);
this.errCode = Optional.empty();
}


public static TisException create(String message, Throwable cause) {
return create(null, message, cause);
}


public static TisException create(ErrorCode errorCode, String message, Throwable cause) {
if (cause instanceof TisException) {
return (TisException) cause;
} else {
return new TisException(message, cause);
return new TisException(Optional.ofNullable(errorCode), message, cause);
}
}

Expand All @@ -111,17 +128,23 @@ protected SimpleDateFormat initialValue() {
private long logFileName;
// 异常摘要
private String abstractInfo;
private final Optional<ErrorCode> errCode;

public ErrMsg(String message, Throwable ex) {
public ErrMsg(String message, Throwable ex, Optional<ErrorCode> errCode) {
this.message = message;
this.ex = ex;
this.errCode = Objects.requireNonNull(errCode, "errCode can not be null");
}

@JSONField(serialize = false)
public Throwable getEx() {
return ex;
}

public ErrorCode getErrCode() {
return this.errCode.orElse(null);
}

public String getLogFileName() {
return String.valueOf(this.logFileName);
}
Expand Down Expand Up @@ -197,7 +220,7 @@ public static List<ErrMsg> getErrorLogs() {
List<ErrMsg> result = Lists.newArrayList(Arrays.stream(logs).filter((l) ->
p.matcher(l).matches()
).map((l) -> {
ErrMsg errMsg = new ErrMsg(null, null);
ErrMsg errMsg = new ErrMsg(null, null, Optional.empty());
errMsg.logFileName = Long.parseLong(l);
return errMsg;
}).iterator());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@
package com.qlangtech.tis.datax.job;

import com.qlangtech.tis.datax.job.ServerLaunchToken.FlinkClusterType;
import com.qlangtech.tis.plugin.IdentityName;

/**
* @author: 百岁([email protected]
* @create: 2024-01-15 16:47
**/
public class FlinkClusterPojo {
public class FlinkClusterPojo implements IdentityName {

// public final static String JSON_KEY_WEB_INTERFACE_URL = "webInterfaceURL";
// public final static String JSON_KEY_CLUSTER_ID = "clusterId";
Expand All @@ -43,6 +44,11 @@ public class FlinkClusterPojo {
public FlinkClusterType clusterType;
private long createTime;

@Override
public String identityValue() {
return this.getClusterId();
}

public long getCreateTime() {
return createTime;
}
Expand Down
Loading

0 comments on commit 8a68665

Please sign in to comment.