Skip to content

Commit

Permalink
add new date format support for swifts
Browse files Browse the repository at this point in the history
  • Loading branch information
郭寅兴 committed Mar 7, 2019
1 parent 796a3b5 commit 048cc0f
Show file tree
Hide file tree
Showing 18 changed files with 173 additions and 277 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import akka.actor.{ActorRef, Props}
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer
import edp.rider.common._
import edp.rider.kafka.{CacheMap, KafkaUtils, RiderConsumer}
import edp.rider.kafka.{ KafkaUtils, RiderConsumer}
import edp.rider.module._
import edp.rider.monitor.ElasticSearch
import edp.rider.rest.persistence.entities.User
Expand Down Expand Up @@ -62,8 +62,6 @@ object RiderStarter extends App with RiderLogger {
if (Await.result(modules.userDal.findByFilter(_.email === RiderConfig.riderServer.adminUser), minTimeOut).isEmpty)
Await.result(modules.userDal.insert(User(0, RiderConfig.riderServer.adminUser, RiderConfig.riderServer.adminPwd, RiderConfig.riderServer.adminUser, "admin", RiderConfig.riderServer.defaultLanguage, active = true, currentSec, 1, currentSec, 1)), minTimeOut)

CacheMap.cacheMapInit

if (RiderConfig.monitor.databaseType.equalsIgnoreCase("es"))
ElasticSearch.initial(RiderConfig.es)

Expand Down
180 changes: 0 additions & 180 deletions rider/rider-server/src/main/scala/edp/rider/kafka/CacheMap.scala

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

package edp.rider.kafka

import java.util.Date

import edp.rider.RiderStarter.modules
import edp.rider.common.FlowStatus._
import edp.rider.common.{RiderConfig, RiderLogger}
Expand Down Expand Up @@ -114,6 +116,7 @@ object FeedbackProcess extends RiderLogger {
record.payload_get.map(tuple => {
val umsTsValue = UmsFieldType.umsFieldValue(tuple.tuple, fields, "ums_ts_")
val streamIdValue = UmsFieldType.umsFieldValue(tuple.tuple, fields, "stream_id")
val flowIdValue = UmsFieldType.umsFieldValue(tuple.tuple, fields, "flow_id")
val sinkNamespaceValue = UmsFieldType.umsFieldValue(tuple.tuple, fields, "sink_namespace")
val errMaxWaterMarkTsValue = UmsFieldType.umsFieldValue(tuple.tuple, fields, "error_max_watermark_ts")
val errMinWaterMarkTsValue = UmsFieldType.umsFieldValue(tuple.tuple, fields, "error_min_watermark_ts")
Expand All @@ -123,22 +126,23 @@ object FeedbackProcess extends RiderLogger {
if (UmsFieldType.umsFieldValue(tuple.tuple, fields, "topics") != null)
UmsFieldType.umsFieldValue(tuple.tuple, fields, "topics").toString
else null

FeedbackFlowErr(1, protocolType.toString, umsTsValue.toString, streamIdValue.toString.toLong,
/* FeedbackFlowErr(1, protocolType.toString, umsTsValue.toString, streamIdValue.toString.toLong,
srcNamespace, sinkNamespaceValue.toString, errorCountValue.toString.toInt,
errMaxWaterMarkTsValue.toString, errMinWaterMarkTsValue.toString,
errorInfoValue.toString, topics, curTs)
errorInfoValue.toString, topics, curTs)*/
// FeedbackErr(1,1,streamIdValue.toString.toLong,flowIdValue.toString.toLong,srcNamespace,sinkNamespaceValue,,"sparkx",topics,errorCountValue.toString.toInt,
// errMaxWaterMarkTsValue.toString, errMinWaterMarkTsValue.toString,errorInfoValue,umsTsValue.toString,(new Date()).toString)
})
})
Await.result(modules.feedbackFlowErrDal.insert(insertSeq), minTimeOut)
Await.result(modules.feedbackErrDal.insert(insertSeq), minTimeOut)
} catch {
case ex: Exception =>
records.foreach(record => riderLogger.error(s"-----$record------"))
riderLogger.error(s"process $FEEDBACK_SPARKX_FLOW_ERROR message failed", ex)
}
}

def doStreamBatchError(records: List[Ums]): Unit = {
/* def doStreamBatchError(records: List[Ums]): Unit = {
try {
val insertSeq = records.flatMap(record => {
val protocolType = record.protocol.`type`.toString
Expand All @@ -157,11 +161,11 @@ object FeedbackProcess extends RiderLogger {
statusValue.toString, resultDescValue.toString, topics, curTs)
})
})
Await.result(modules.feedbackStreamErrDal.insert(insertSeq), minTimeOut)
Await.result(modules.feedbackErrDal.insert(insertSeq), minTimeOut)
} catch {
case ex: Exception => riderLogger.error(s"process $FEEDBACK_STREAM_BATCH_ERROR message failed", ex)
}
}
}*/

@Deprecated
def doFeedbackStreamTopicOffset(message: Ums): Unit = {
Expand Down Expand Up @@ -199,35 +203,31 @@ object FeedbackProcess extends RiderLogger {
val fields = record.schema.fields_get
var throughput: Long = 0
record.payload_get.map(tuple => {
val umsTsValue = UmsFieldType.umsFieldValue(tuple.tuple, fields, "ums_ts_")
val streamIdValue = UmsFieldType.umsFieldValue(tuple.tuple, fields, "stream_id")
val flowIdValue = UmsFieldType.umsFieldValue(tuple.tuple, fields, "flow_id")
val batchIdValue = UmsFieldType.umsFieldValue(tuple.tuple, fields, "batch_id")
val dataTypeValue = UmsFieldType.umsFieldValue(tuple.tuple, fields, "data_type")
val topics = UmsFieldType.umsFieldValue(tuple.tuple, fields, "topics")
val sinkNamespaceValue = UmsFieldType.umsFieldValue(tuple.tuple, fields, "sink_namespace").toString
val rddCountValue = UmsFieldType.umsFieldValue(tuple.tuple, fields, "rdd_count").toString.toInt
val feedbackTime = UmsFieldType.umsFieldValue(tuple.tuple, fields, "ums_ts_")
//todo 兼容0.6.0及之前版本stream feedback数据
val cdcTsValue =
if (UmsFieldType.umsFieldValue(tuple.tuple, fields, "data_generated_ts") != null)
UmsFieldType.umsFieldValue(tuple.tuple, fields, "data_generated_ts").toString.toLong
else UmsFieldType.umsFieldValue(tuple.tuple, fields, "data_genereated_ts").toString.toLong
val rddTsValue = UmsFieldType.umsFieldValue(tuple.tuple, fields, "rdd_generated_ts").toString.toLong
val directiveTsValue = UmsFieldType.umsFieldValue(tuple.tuple, fields, "directive_process_start_ts").toString.toLong
val mainDataTsValue = UmsFieldType.umsFieldValue(tuple.tuple, fields, "data_process_start_ts").toString.toLong
val mainDataTsValue = UmsFieldType.umsFieldValue(tuple.tuple, fields, "data_process_start_ts").toString.toLong
val swiftsTsValue = UmsFieldType.umsFieldValue(tuple.tuple, fields, "swifts_start_ts").toString.toLong
val sinkTsValue = UmsFieldType.umsFieldValue(tuple.tuple, fields, "sink_start_ts").toString.toLong
val doneTsValue = UmsFieldType.umsFieldValue(tuple.tuple, fields, "done_ts").toString.toLong

val riderSinkNamespace = if (sinkNamespaceValue.toString == "") riderNamespace else namespaceRiderString(sinkNamespaceValue.toString)
val flowName = s"${
riderNamespace
}_$riderSinkNamespace"

val interval_data_process_dataums = (mainDataTsValue.toString.toLong - cdcTsValue.toString.toLong) / 1000
val interval_data_process_rdd = (rddTsValue.toString.toLong - mainDataTsValue.toString.toLong) / 1000
val interval_data_process_swifts = (swiftsTsValue.toString.toLong - mainDataTsValue.toString.toLong) / 1000
val interval_data_process_sink = (sinkTsValue.toString.toLong - mainDataTsValue.toString.toLong) / 1000
val interval_data_process_done = (doneTsValue.toString.toLong - mainDataTsValue.toString.toLong) / 1000

val interval_data_ums_done = (doneTsValue.toString.toLong - cdcTsValue.toString.toLong) / 1000
val interval_rdd_swifts = (swiftsTsValue.toString.toLong - rddTsValue.toString.toLong) / 1000
val interval_rdd_done = (doneTsValue.toString.toLong - rddTsValue.toString.toLong) / 1000
val interval_data_swifts_sink = (sinkTsValue.toString.toLong - swiftsTsValue.toString.toLong) / 1000
val interval_data_sink_done = (doneTsValue.toString.toLong - sinkTsValue.toString.toLong) / 1000
Expand All @@ -237,19 +237,17 @@ object FeedbackProcess extends RiderLogger {
} else throughput = rddCountValue.toString.toInt / interval_rdd_done

val monitorInfo = MonitorInfo(0L, batchIdValue.toString,
string2EsDateString(umsTsValue.toString),
CacheMap.getProjectId(streamIdValue.toString.toLong), streamIdValue.toString.toLong,
CacheMap.getStreamName(streamIdValue.toString.toLong), CacheMap.getFlowId(flowName), flowName,
streamIdValue.toString.toLong,flowIdValue.toString.toLong,
riderNamespace,riderSinkNamespace,dataTypeValue,
rddCountValue.toString.toInt, if (topics == null) "" else topics.toString, throughput,
string2EsDateString(DateUtils.dt2string(cdcTsValue.toString.toLong * 1000, DtFormat.TS_DASH_MICROSEC)),
string2EsDateString(DateUtils.dt2string(rddTsValue.toString.toLong * 1000, DtFormat.TS_DASH_MICROSEC)),
string2EsDateString(DateUtils.dt2string(directiveTsValue.toString.toLong * 1000, DtFormat.TS_DASH_MICROSEC)),
string2EsDateString(DateUtils.dt2string(mainDataTsValue.toString.toLong * 1000, DtFormat.TS_DASH_MICROSEC)),
string2EsDateString(DateUtils.dt2string(swiftsTsValue.toString.toLong * 1000, DtFormat.TS_DASH_MICROSEC)),
string2EsDateString(DateUtils.dt2string(sinkTsValue.toString.toLong * 1000, DtFormat.TS_DASH_MICROSEC)),
string2EsDateString(DateUtils.dt2string(doneTsValue.toString.toLong * 1000, DtFormat.TS_DASH_MICROSEC)),
Interval(interval_data_process_dataums, interval_data_process_rdd, interval_data_process_swifts, interval_data_process_sink, interval_data_process_done,
interval_data_ums_done, interval_rdd_done, interval_data_swifts_sink, interval_data_sink_done))
Interval(interval_data_process_dataums, interval_data_process_rdd, interval_rdd_swifts, interval_data_process_done,
interval_data_swifts_sink, interval_data_sink_done),feedbackTime,new Date())
monitorInfo
})
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class RiderConsumer extends Actor with RiderLogger {
private def processMessage(records: Seq[ConsumerRecord[String, String]]): Unit = {
try {
val sparkxFlowErrorBuffer = new ListBuffer[Ums]
val sparkxStreamErrorBuffer = new ListBuffer[Ums]
// val sparkxStreamErrorBuffer = new ListBuffer[Ums]
val sparkxFlowStatsBuffer = new ListBuffer[Ums]
val flinkxFlowErrorBuffer = new ListBuffer[Ums]
val feedbackFlowStartBuffer = new ListBuffer[Ums]
Expand All @@ -112,19 +112,21 @@ class RiderConsumer extends Actor with RiderLogger {
if (record.key().startsWith(FEEDBACK_SPARKX_FLOW_ERROR.toString)
|| record.key().startsWith(FEEDBACK_FLOW_ERROR.toString)) {
if (ums != null) sparkxFlowErrorBuffer.append(ums)
} else if (record.key().startsWith(FEEDBACK_STREAM_BATCH_ERROR.toString)) {
if (ums != null) sparkxStreamErrorBuffer.append(ums)
} else if (record.key().startsWith(FEEDBACK_SPARKX_FLOW_STATS.toString)) {
if (ums != null) sparkxFlowStatsBuffer.append(ums)
} else if (record.key().startsWith(FEEDBACK_FLINKX_FLOW_ERROR.toString)) {
if (ums != null) flinkxFlowErrorBuffer.append(ums)
} else if(record.key().startsWith(FEEDBACK_DIRECTIVE.toString)) {
if(ums != null) feedbackFlowStartBuffer.append(ums)
}

//else if (record.key().startsWith(FEEDBACK_STREAM_BATCH_ERROR.toString)) {
// if (ums != null) sparkxStreamErrorBuffer.append(ums)
// }
})

doSparkxFlowError(sparkxFlowErrorBuffer.toList)
doStreamBatchError(sparkxStreamErrorBuffer.toList)
//doStreamBatchError(sparkxStreamErrorBuffer.toList)
doSparkxFlowStats(sparkxFlowStatsBuffer.toList)
doFeedbackDirective(feedbackFlowStartBuffer.toList)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ trait PersistenceModule {
val feedbackStreamErrDal: FeedbackStreamErrorDal
val feedbackFlowErrDal: FeedbackFlowErrDal
val feedbackDirectiveDal: BaseDal[FeedbackDirectiveTable, FeedbackDirective]
val feedbackErrDal: FeedbackErrDal
val monitorInfoDal: MonitorInfoDal

val instanceQuery: TableQuery[InstanceTable] = TableQuery[InstanceTable]
Expand Down Expand Up @@ -117,6 +118,7 @@ trait PersistenceModule {
val feedbackStreamErrQuery = TableQuery[FeedbackStreamErrTable]
val feedbackFlowErrQuery = TableQuery[FeedbackFlowErrTable]
val feedbackDirectiveQuery = TableQuery[FeedbackDirectiveTable]
val feedbackErrQuery = TableQuery[FeedbackErrTable]
val monitorInfoQuery = TableQuery[MonitorInfoTable]

}
Expand Down Expand Up @@ -150,5 +152,6 @@ trait PersistenceModuleImpl extends PersistenceModule {
override lazy val feedbackStreamErrDal = new FeedbackStreamErrorDal(feedbackStreamErrQuery, streamDal)
override lazy val feedbackFlowErrDal = new FeedbackFlowErrDal(feedbackFlowErrQuery, streamDal, flowDal)
override lazy val feedbackDirectiveDal = new BaseDalImpl[FeedbackDirectiveTable, FeedbackDirective](feedbackDirectiveQuery)
override lazy val feedbackErrDal= new FeedbackErrDal(feedbackErrQuery)
override lazy val monitorInfoDal = new MonitorInfoDal(monitorInfoQuery, streamDal, flowDal)
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ trait RoutesModuleImpl {

lazy val jobAppService = new JobAppApi(jobDal, projectDal)
lazy val flowAppService = new FlowAppApi(flowDal, streamDal, projectDal)
lazy val monitorAppService = new MonitorAppApi(flowDal, projectDal, streamDal, jobDal, feedbackFlowErrDal, feedbackOffsetDal, monitorInfoDal)
lazy val monitorAppService = new MonitorAppApi(flowDal, projectDal, streamDal, jobDal, feedbackErrDal, feedbackOffsetDal, monitorInfoDal)
lazy val monitorService=new MonitorApi(flowDal,streamDal,monitorInfoDal)

}
Expand Down
Loading

0 comments on commit 048cc0f

Please sign in to comment.