Skip to content

Commit

Permalink
Merge pull request #16 from li-xiao-shuang/feature_update_code
Browse files Browse the repository at this point in the history
基于流 构建  BranchBarrier
  • Loading branch information
li-xiao-shuang committed Nov 23, 2021
2 parents aa6f1cd + caed412 commit 965439d
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 58 deletions.
30 changes: 19 additions & 11 deletions src/main/java/barrier/BranchBarrier.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@

package barrier;

import com.alibaba.fastjson.JSONObject;
import common.constant.ParamFieldConstant;
import common.utils.StreamUtil;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import java.io.InputStream;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
Expand All @@ -39,34 +42,39 @@
@NoArgsConstructor
@Slf4j
public class BranchBarrier {

/**
* 事务类型
*/
private String transType;

/**
* 全局事务id
*/
private String gid;

/**
* 分支id
*/
private String branchId;

/**
* 操作
*/
private String op;

/**
* 屏障id
*/
private int barrierId;


public BranchBarrier(BarrierParam barrierParam) {


public BranchBarrier(InputStream inputStream) throws Exception {
byte[] bytes = StreamUtil.copyToByteArray(inputStream);
BarrierParam barrierParam = JSONObject.parseObject(bytes, BarrierParam.class);
if (Objects.isNull(barrierParam)) {
throw new Exception("read InputStream null");
}
if (barrierParam.getTrans_type().length > 0) {
this.transType = barrierParam.getTrans_type()[0];
}
Expand All @@ -80,7 +88,7 @@ public BranchBarrier(BarrierParam barrierParam) {
this.op = barrierParam.getOp()[0];
}
}

/**
* connection 由使用方自行管理,创建、回收。
*
Expand All @@ -105,7 +113,7 @@ public void call(Connection connection, Consumer<BranchBarrier> consumer) throws
connection.setAutoCommit(true);
}
}

private boolean insertBarrier(Connection connection) throws SQLException {
if (Objects.isNull(connection)) {
return false;
Expand All @@ -120,7 +128,7 @@ private boolean insertBarrier(Connection connection) throws SQLException {
preparedStatement.setString(4, op);
preparedStatement.setString(5, String.format("%02d", barrierId));
preparedStatement.setString(6, op);

if (preparedStatement.executeUpdate() == 0) {
return false;
}
Expand Down
33 changes: 21 additions & 12 deletions src/main/java/client/DtmClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,21 @@
import com.alibaba.fastjson.JSONObject;
import common.model.DtmServerInfo;
import common.utils.HttpUtil;
import okhttp3.Response;
import saga.Saga;
import tcc.Tcc;
import xa.Xa;

public class DtmClient {
import java.util.Objects;

public class DtmClient {

private String ipPort;

public DtmClient(String ipPort) {
this.ipPort = ipPort;
}

/**
* 创建TCC事务
*
Expand All @@ -49,7 +52,7 @@ public DtmClient(String ipPort) {
public Tcc newTcc(String gid) throws Exception {
return new Tcc(ipPort, gid);
}

/**
* 创建XA事务
*
Expand All @@ -59,7 +62,7 @@ public Tcc newTcc(String gid) throws Exception {
public Xa newXA() throws Exception {
return new Xa();
}

/**
* 创建Saga事务
*
Expand All @@ -68,8 +71,8 @@ public Xa newXA() throws Exception {
public Saga newSaga() {
return new Saga();
}


/**
* 生成全局事务id
*
Expand All @@ -78,20 +81,26 @@ public Saga newSaga() {
*/
public String genGid() throws Exception {
DtmServerInfo dtmServerInfo = new DtmServerInfo(ipPort);
JSONObject jsonObject;
JSONObject jsonObject = null;
try {
String content = HttpUtil.get(dtmServerInfo.newGid());
jsonObject = JSONObject.parseObject(content);
Response response = HttpUtil.get(dtmServerInfo.newGid());
if (Objects.nonNull(response.body())) {
String result = response.body().string();
jsonObject = JSONObject.parseObject(result);
}
} catch (Exception e) {
throw new Exception("Can’t get gid, please check the dtm server.");
}
if (Objects.isNull(jsonObject)) {
throw new Exception("Can’t get gid, please check the dtm server.");
}
Object code = jsonObject.get("code");
if (null != code && (int) code > 0) {
Object message = jsonObject.get("message");
throw new Exception(message.toString());
}
return jsonObject.get("gid").toString();
}


}
12 changes: 4 additions & 8 deletions src/main/java/common/utils/HttpUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,9 @@ public class HttpUtil {
* @return
* @throws IOException
*/
public static String get(String url) throws IOException {
public static Response get(String url) throws IOException {
Request request = new Request.Builder().url(url).get().build();
try (Response response = CLIENT.newCall(request).execute()) {
return Objects.requireNonNull(response.body()).string();
}
return CLIENT.newCall(request).execute();
}

/**
Expand All @@ -63,11 +61,9 @@ public static String get(String url) throws IOException {
* @return
* @throws IOException
*/
public static String post(String url, String json) throws IOException {
public static Response post(String url, String json) throws IOException {
RequestBody body = RequestBody.create(MEDIA_TYPE, json);
Request request = new Request.Builder().url(url).post(body).build();
try (Response response = CLIENT.newCall(request).execute()) {
return Objects.requireNonNull(response.body()).string();
}
return CLIENT.newCall(request).execute();
}
}
60 changes: 60 additions & 0 deletions src/main/java/common/utils/StreamUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* MIT License
*
* Copyright (c) 2021 yedf
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package common.utils;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

public class StreamUtil {

public static byte[] copyToByteArray(InputStream in) throws IOException {
if (in == null) {
return new byte[0];
} else {
ByteArrayOutputStream out = new ByteArrayOutputStream(4096);
copy(in, out);
return out.toByteArray();
}
}

private static int copy(InputStream in, OutputStream out) throws IOException {
int byteCount = 0;
if (null == in) {
return byteCount;
}
if (null == out) {
return byteCount;
}
int bytesRead;
for (byte[] buffer = new byte[4096]; (bytesRead = in.read(buffer)) != -1; byteCount += bytesRead) {
out.write(buffer, 0, bytesRead);
}
out.flush();
return byteCount;
}

}
51 changes: 24 additions & 27 deletions src/main/java/tcc/Tcc.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import common.constant.ParamFieldConstant;
import common.enums.TransTypeEnum;
import common.model.TransBase;
import okhttp3.Response;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -42,43 +43,43 @@
import java.util.function.Function;

public class Tcc {

private static final String DEFAULT_STATUS = "prepared";

private static final String OP = "try";

private static final String FAIL_RESULT = "FAILURE";

Logger log = LoggerFactory.getLogger(Tcc.class);


/**
* 事务信息
*/
private TransBase transBase;

/**
* server 信息
*/
private DtmServerInfo dtmServerInfo;

/**
* id 生成器
*/
private BranchIdGeneratorUtil branchIdGeneratorUtil;

public Tcc(String ipPort, String gid) {
this.dtmServerInfo = new DtmServerInfo(ipPort);
this.branchIdGeneratorUtil = new BranchIdGeneratorUtil("");
this.transBase = new TransBase(TransTypeEnum.TCC, gid, false);
}

public String tccGlobalTransaction(Function<Tcc, Boolean> function) throws IOException {
HashMap<String, Object> paramMap = new HashMap<>(Constant.DEFAULT_INITIAL_CAPACITY);
paramMap.put(ParamFieldConstant.GID, transBase.getGid());
paramMap.put(ParamFieldConstant.TRANS_TYPE, TransTypeEnum.TCC.getValue());
String response = HttpUtil.post(dtmServerInfo.prepare(), JSONObject.toJSONString(paramMap));
if (this.checkResult(response)) {
Response post = HttpUtil.post(dtmServerInfo.prepare(), JSONObject.toJSONString(paramMap));
if (this.checkResult(post.body().string())) {
if (function.apply(this)) {
HttpUtil.post(dtmServerInfo.submit(), JSONObject.toJSONString(paramMap));
} else {
Expand All @@ -87,9 +88,8 @@ public String tccGlobalTransaction(Function<Tcc, Boolean> function) throws IOExc
}
return transBase.getGid();
}

public boolean callBranch(Object body, String tryUrl, String confirmUrl, String cancelUrl) throws Exception {
// TODO: 2021/11/22 修改返回值为response

public Response callBranch(Object body, String tryUrl, String confirmUrl, String cancelUrl) throws Exception {
String branchId = branchIdGeneratorUtil.genBranchId();
HashMap<String, Object> registerParam = new HashMap<>(Constant.DEFAULT_INITIAL_CAPACITY);
registerParam.put(ParamFieldConstant.GID, transBase.getGid());
Expand All @@ -100,26 +100,23 @@ public boolean callBranch(Object body, String tryUrl, String confirmUrl, String
registerParam.put(ParamFieldConstant.TRY, tryUrl);
registerParam.put(ParamFieldConstant.CONFIRM, confirmUrl);
registerParam.put(ParamFieldConstant.CANCEL, cancelUrl);

String registerResponse = HttpUtil
Response registerResponse = HttpUtil
.post(dtmServerInfo.registerTccBranch(), JSONObject.toJSONString(registerParam));

if (this.checkResult(registerResponse)) {
if (this.checkResult(registerResponse.body().string())) {
HashMap<String, Object> tryParam = new HashMap<>(Constant.DEFAULT_INITIAL_CAPACITY);
tryParam.put(ParamFieldConstant.GID, Collections.singletonList(transBase.getGid()));
tryParam.put(ParamFieldConstant.TRANS_TYPE, Collections.singletonList(TransTypeEnum.TCC.getValue()));
tryParam.put(ParamFieldConstant.BRANCH_ID, Collections.singletonList(branchId));
tryParam.put(ParamFieldConstant.OP, Collections.singletonList(OP));

String tryResponse = HttpUtil.post(tryUrl, JSONObject.toJSONString(tryParam));

return this.checkResult(tryResponse);

return HttpUtil.post(tryUrl, JSONObject.toJSONString(tryParam));
}
return false;
return null;
}


// TODO: 2021/11/23 需要修改判断dtmResult


public boolean checkResult(String response) {
if (StringUtils.isBlank(response)) {
return false;
Expand Down

0 comments on commit 965439d

Please sign in to comment.