Skip to content

Commit

Permalink
中继模式回调删除处理
Browse files Browse the repository at this point in the history
支持arm(树莓派)
  • Loading branch information
chenjh committed Jul 21, 2022
1 parent 77a11f2 commit 761da47
Show file tree
Hide file tree
Showing 12 changed files with 66 additions and 32 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
* 下载完成后进行回调通知,回调失败也当做异常,加入异常队列
* 直到下载完成,并进行回调通知,才从下载队列删除。
* 支持nginx开启文件列表时,按列表递归同步所有子目录以及所有件(可用于同步任何文件,含非m3u8)。
* 支持arm服务器(比如树莓派4b),内存要求小,64内存M也能跑起来

## m3u8sync-call-demo调用服务示例(支持回调)
* 增加同步时可传入回调接口地址。
Expand Down
2 changes: 0 additions & 2 deletions m3u8sync-call-demo/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
<description>Sync m3u8 from nginx</description>
<properties>
<java.version>1.8</java.version>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<skipTests>true</skipTests>
</properties>
<dependencies>
Expand Down
2 changes: 0 additions & 2 deletions m3u8sync/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
<description>Sync m3u8 from nginx</description>
<properties>
<java.version>1.8</java.version>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<skipTests>true</skipTests>
</properties>
<dependencies>
Expand Down
1 change: 1 addition & 0 deletions m3u8sync/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
* 直到下载完成,并进行回调通知,才从下载队列删除。
* 支持nginx开启文件列表时,按列表递归同步所有子目录以及所有件。
* 支持多节点中继模式,即可先同步到中间服务器,中间服务器再传到最终服务器。
* 支持arm服务器(比如树莓派4b),内存要求小,64内存M也能跑起来

## 启动命令
java -jar .\m3u8sync-0.0.1-SNAPSHOT.jar --spring.profiles.active=dev
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.ccs.m3u8sync.exceptions.ResultData;
import org.ccs.m3u8sync.utils.CommUtils;
import org.ccs.m3u8sync.vo.CallbackVo;
import org.ccs.m3u8sync.vo.M3u8FileInfoVo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpEntity;
Expand Down Expand Up @@ -49,25 +50,20 @@ private HttpHeaders initPostHeader() {
* @param m3u8Url
* @return
*/
public ResultData addSync(String roomId, String format, String m3u8Url, CallbackVo callback) {
public ResultData addSync(String roomId, String format, String m3u8Url, Integer ifRelayCallDel, CallbackVo callback) {
HttpHeaders requestHeaders = initPostHeader();
requestHeaders.setContentType(MediaType.APPLICATION_JSON);
String nextM3u8SyncUrl = relayConfiguration.getNextM3u8Sync();
String apiUrl = CommUtils.appendUrl(nextM3u8SyncUrl, "downup/add");
apiUrl += "?roomId=" + roomId;
apiUrl += "&ifRelayCallDel=" + ifRelayCallDel;
if (StringUtils.isNotBlank(m3u8Url)) {
apiUrl += "&m3u8Url=" + URLEncoder.encode(m3u8Url);
}
if (StringUtils.isNotBlank(format)) {
apiUrl += "&format=" + URLEncoder.encode(format);
}


//是否自动删除
if (relayConfiguration.isDeleteOnSuccess()) {
callback.setParamUrl(callback.getParamUrl() + "?successDel=" + true);
}

String bodyString = JSONUtil.toJsonStr(callback);
log.info("----addSync--roomId={}, bodyString={}", roomId, bodyString);
HttpEntity<String> httpEntitys = new HttpEntity<>(bodyString, requestHeaders);
Expand All @@ -77,6 +73,33 @@ public ResultData addSync(String roomId, String format, String m3u8Url, Callback
return getResultData(resultRemote);
}

/**
* 通知下一节点,并回调通知本节点
*
* @param roomId
* @param successDel
* @param fileInfo
* @return
*/
public ResultData callbackDel(String roomId, String successDel, M3u8FileInfoVo fileInfo) {
HttpHeaders requestHeaders = initPostHeader();
requestHeaders.setContentType(MediaType.APPLICATION_JSON);
String nextM3u8SyncUrl = relayConfiguration.getNextM3u8Sync();
String apiUrl = CommUtils.appendUrl(nextM3u8SyncUrl, "downup/callbackDel");
if (StringUtils.isNotBlank(successDel)) {
apiUrl += "&successDel=" + URLEncoder.encode(successDel);
}

String bodyString = JSONUtil.toJsonStr(fileInfo);
log.info("----callbackDel--roomId={}, bodyString={}", roomId, bodyString);
HttpEntity<String> httpEntitys = new HttpEntity<>(bodyString, requestHeaders);
ResponseEntity<String> exchanges = restTemplate.postForEntity(apiUrl, httpEntitys, String.class);
String resultRemote = exchanges.getBody();
log.info("----callbackDel--roomId={}, resultRemote={}", roomId, resultRemote);
return getResultData(resultRemote);
}


private ResultData getResultData(String jsonBody) {
JSONObject jsonObject = JSONUtil.parseObj(jsonBody);
String resultCode = jsonObject.getStr("resultCode");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ public class RelayConfiguration {
*/
private String localM3u8Sync;
/**
* 是否开启下载任务
* 是否开启中继模式
*/
private boolean open = false;

/**
* 下载完成通知删除原服务的文件
* 用于中继开始时,通知下节点下载完成,回调callbackDel删除本节点的文件
*/
private boolean deleteOnSuccess=false;
private boolean deleteOnSuccess = false;
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,18 @@ public ResultData one(@RequestParam("roomId") String roomId) {
/**
* 添加m3u8下载任务
*
* @param roomId m3u8房间id
* @param format url中roomId组成的格式
* @param m3u8Url 直接m3u8Url,不用拼接生成
* @param callback 回调接口
* @param roomId m3u8房间id
* @param format url中roomId组成的格式
* @param m3u8Url 直接m3u8Url,不用拼接生成
* @param ifRelayCallDel 用于表示上个接点是中继且希望下载完成后,回调callbackDel接口,以删除上个节点的文件
* @param callback 回调接口
* @return
*/
@PostMapping("add")
public ResultData add(@RequestParam("roomId") String roomId
, @RequestParam(value = "format", required = false, defaultValue = "{roomId}/{roomId}.m3u8") String format
, @RequestParam(value = "m3u8Url", required = false) String m3u8Url
, @RequestParam(value = "ifRelayCallDel", required = false, defaultValue = "0") Integer ifRelayCallDel
, @RequestBody CallbackVo callback) {

//用于快束检测中继模模式下一节点是否正常(报告给上一节点)
Expand All @@ -81,8 +83,8 @@ public ResultData add(@RequestParam("roomId") String roomId
log.warn("----add--roomId={} isBlank", roomId);
throw new ParamNullException("roomId不能为空");
}
if(StringUtils.isNotBlank(format)){
format=URLDecoder.decode(format);
if (StringUtils.isNotBlank(format)) {
format = URLDecoder.decode(format);
}
if (StringUtils.isBlank(m3u8Url)) {
m3u8Url = downUpConfig.getNginxUrl(roomId, format);
Expand Down Expand Up @@ -111,6 +113,7 @@ public ResultData add(@RequestParam("roomId") String roomId
log.info("----add--roomId={} format={} m3u8Url={}", roomId, format, m3u8Url);
DownBean bean = new DownBean(roomId, m3u8Url, new Date(), callback);
bean.setSyncType(SyncType.M3U8.getType());
bean.setIfRelayCallDel(ifRelayCallDel);
downUpService.addTask(roomId, bean);
return ResultData.success();
}
Expand Down Expand Up @@ -236,14 +239,14 @@ public ResultData recover() {
* @param fileInfo
* @return
*/
@PostMapping("callback/{roomId}")
public String callback(@PathVariable(value = "roomId") String roomId, @RequestParam(value = "successDel", required = false) String successDel, @RequestBody M3u8FileInfoVo fileInfo) {
@PostMapping("callbackDel/{roomId}")
public String callbackDel(@PathVariable(value = "roomId") String roomId, @RequestParam(value = "successDel", required = false) String successDel, @RequestBody M3u8FileInfoVo fileInfo) {
//用于快速验证回调接口
if (StringUtils.equals("checkCallback", roomId) && StringUtils.equals("test", fileInfo.getFilePath())) {
log.info("----callback--roomId={} check ok", roomId);
if (StringUtils.equals("checkCallbackDel", roomId) && StringUtils.equals("test", fileInfo.getFilePath())) {
log.info("----callbackDel--roomId={} check ok", roomId);
return "ok";
}
log.info("----callback--roomId={} successDel={} fileInfo={}", roomId, successDel, JSONUtil.toJsonStr(fileInfo));
log.info("----callbackDel--roomId={} successDel={} fileInfo={}", roomId, successDel, JSONUtil.toJsonStr(fileInfo));
if ("true".equals(successDel)) {
downUpService.deleteDown(roomId, fileInfo);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ public DownBean(String roomId, String url, Date initTime, CallbackVo callback) {
private String roomId;
private String url;
private String path;
// private String title;
private Date initTime;
private CallbackVo callback;
private Integer size;
Expand All @@ -36,4 +35,5 @@ public DownBean(String roomId, String url, Date initTime, CallbackVo callback) {
* 同步类型(m3u8/file)
*/
private String syncType;
private Integer ifRelayCallDel=0;
}
Original file line number Diff line number Diff line change
Expand Up @@ -419,8 +419,16 @@ private boolean relayOnSuccess(DownBean downBean, M3u8FileInfoVo fileInfoVo) {
boolean isSuccess = false;

try {
ResultData resultData = nextM3u8SyncRest.addSync(roomId, null, null, downBean.getCallback());
Integer ifRelayCallDel = 0;
if (relayConfiguration.isDeleteOnSuccess()) {
ifRelayCallDel = 1;
}
ResultData resultData = nextM3u8SyncRest.addSync(roomId, null, null, ifRelayCallDel, downBean.getCallback());
isSuccess = resultData.getSuccess();

if (downBean.getIfRelayCallDel() == 1) {
nextM3u8SyncRest.callbackDel(roomId, "true", fileInfoVo);
}
} catch (HttpException e) {
callbackFailCounter.incrementAndGet();
downBean.setError("callbackErr:" + e.getMessage());
Expand Down Expand Up @@ -531,10 +539,10 @@ public boolean checkRelay(String roomId) {
return true;
}
log.info("----checkRelay--roomId={} nextM3u8Sync={}", roomId, nextM3u8Sync);
CallbackVo callback=new CallbackVo();
CallbackVo callback = new CallbackVo();
callback.setBaseUrl(callbackConfiguration.getBaseUrl());
callback.setParamUrl(callbackConfiguration.getParamUrl());
ResultData resultData = nextM3u8SyncRest.addSync(roomId, "test", "test", callback);
ResultData resultData = nextM3u8SyncRest.addSync(roomId, "test", "test", 0, callback);
String responseContent = (String) resultData.getData();
isOk = isCallBackOk(responseContent);
if (!isOk) {
Expand Down
4 changes: 3 additions & 1 deletion m3u8sync/src/main/resources/application-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@ callback:
relay:
#中否开启中继模式,本节点下载成功自动通知下一节点,下一节点下载成功,自动回调删除本节点文件
open: false
#如果是最后的节点,可同时进行回调
callback: false
#用于级联服务,级联下一节点
nextM3u8Sync: https://localhost:9293
localM3u8Sync: https://localhost:${server.port}
#回调成功删除本地文件
#用于中继开始时,通知下节点下载完成,回调callbackDel删除本节点的文件
deleteOnSuccess: true


Expand Down
2 changes: 1 addition & 1 deletion m3u8sync/src/main/resources/application-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ relay:
#用于级联服务,级联下一节点
nextM3u8Sync: https://localhost:9293
localM3u8Sync: https://localhost:${server.port}
#回调成功删除本地文件(down_path+filePath)
#用于中继开始时,通知下节点下载完成,回调callbackDel删除本节点的文件(down_path+filePath)
deleteOnSuccess: true


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public void addSync() {
CallbackVo callback = new CallbackVo();
callback.setBaseUrl(callbackConfiguration.getBaseUrl());
callback.setParamUrl(callbackConfiguration.getParamUrl());
ResultData resultData = nextM3u8SyncRest.addSync(DownUpService.CHECK_RELAY, "test", "test", callback);
ResultData resultData = nextM3u8SyncRest.addSync(DownUpService.CHECK_RELAY, "test", "test", 0, callback);
System.out.println(resultData.getResultCode() + " " + resultData.getData());
}
}

0 comments on commit 761da47

Please sign in to comment.