Skip to content

Latest commit

 

History

History
 
 

otsstreamreader

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 
 
 

TableStore增量数据导出通道:TableStoreStreamReader

快速介绍

TableStoreStreamReader插件主要用于TableStore的增量数据导出,增量数据可以看作操作日志,除了数据本身外还附有操作信息。

与全量导出插件不同,增量导出插件只有多版本模式,同时不支持指定列。这是与增量导出的原理有关的,导出的格式下面有详细介绍。

使用插件前必须确保表上已经开启Stream功能,可以在建表的时候指定开启,或者使用SDK的UpdateTable接口开启。

开启Stream的方法:
SyncClient client = new SyncClient("", "", "", "");
1. 建表的时候开启:
CreateTableRequest createTableRequest = new CreateTableRequest(tableMeta);
createTableRequest.setStreamSpecification(new StreamSpecification(true, 24)); // 24代表增量数据保留24小时
client.createTable(createTableRequest);

2. 如果建表时未开启,可以通过UpdateTable开启:
UpdateTableRequest updateTableRequest = new UpdateTableRequest("tableName");
updateTableRequest.setStreamSpecification(new StreamSpecification(true, 24)); 
client.updateTable(updateTableRequest);

实现原理

首先用户使用SDK的UpdateTable功能,指定开启Stream并设置过期时间,即开启了增量功能。

开启后,TableStore服务端就会将用户的操作日志额外保存起来, 每个分区有一个有序的操作日志队列,每条操作日志会在一定时间后被垃圾回收,这个时间即用户指定的过期时间。

TableStore的SDK提供了几个Stream相关的API用于将这部分操作日志读取出来,增量插件也是通过TableStore SDK的接口获取到增量数据的,并将 增量数据转化为多个6元组的形式(pk, colName, version, colValue, opType, sequenceInfo)导入到ODPS中。

Reader的配置模版:

"reader": {
    "name" : "otsstreamreader",
    "parameter" : {
        "endpoint" : "",
        "accessId" : "",
        "accessKey" : "",
        "instanceName" : "",
        //dataTable即需要导出数据的表。
        "dataTable" : "",  
        //statusTable是Reader用于保存状态的表,若该表不存在,Reader会自动创建该表。
        //一次离线导出任务完成后,用户不应删除该表,该表中记录的状态可用于下次导出任务中。
        "statusTable" : "TableStoreStreamReaderStatusTable",  
        //增量数据的时间范围(左闭右开)的左边界。
        "startTimestampMillis" : "",
        //增量数据的时间范围(左闭右开)的右边界。
        "endTimestampMillis" : "",
        //采云间调度只支持天级别,所以提供该配置,作用与startTimestampMillis和endTimestampMillis类似。
        "date": "",
        //是否导出时序信息。
        "isExportSequenceInfo": true,
        //从TableStore中读增量数据时,每次请求的最大重试次数,默认为30。
        "maxRetries" : 30
    }
}

参数说明

名称 说明 类型 必选
endpoint TableStoreServer的Endpoint地址。 String
accessId 用于访问TableStore服务的accessId。 String
accessKey 用于访问TableStore服务的accessKey。 String
instanceName TableStore的实例名称。 String
dataTable 需要导出增量数据的表的名称。该表需要开启Stream,可以在建表时开启,或者使用UpdateTable接口开启。 String
statusTable Reader插件用于记录状态的表的名称,这些状态可用于减少对非目标范围内的数据的扫描,从而加快导出速度。
1. 用户不需要创建该表,只需要给出一个表名。Reader插件会尝试在用户的instance下创建该表,若该表不存在即创建新表,若该表已存在,会判断该表的Meta是否与期望一致,若不一致会抛出异常。
2. 在一次导出完成之后,用户不应删除该表,该表的状态可用于下次导出任务。
3. 该表会开启TTL,数据自动过期,因此可认为其数据量很小。
4. 针对同一个instance下的多个不同的dataTable的Reader配置,可以使用同一个statusTable,记录的状态信息互不影响。
综上,用户配置一个类似TableStoreStreamReaderStatusTable之类的名称即可,注意不要与业务相关的表重名。
String
startTimestampMillis 增量数据的时间范围(左闭右开)的左边界,单位毫秒。
1. Reader插件会从statusTable中找对应startTimestampMillis的位点,从该点开始读取开始导出数据。
2. 若statusTable中找不到对应的位点,则从系统保留的增量数据的第一条开始读取,并跳过写入时间小于startTimestampMillis的数据。
Long
endTimestampMillis 增量数据的时间范围(左闭右开)的右边界,单位毫秒。
1. Reader插件从startTimestampMillis位置开始导出数据后,当遇到第一条时间戳大于等于endTimestampMillis的数据时,结束导出数据,导出完成。
2. 当读取完当前全部的增量数据时,结束读取,即使未达到endTimestampMillis。
Long
date 日期格式为yyyyMMdd,如20151111,表示导出该日的数据。
若没有指定date,则必须指定startTimestampMillis和endTimestampMillis,反之也成立。
String
isExportSequenceInfo 是否导出时序信息,时序信息包含了数据的写入时间等。默认该值为false,即不导出。 Boolean
maxRetries 从TableStore中读增量数据时,每次请求的最大重试次数,默认为30,重试之间有间隔,30次重试总时间约为5分钟,一般无需更改。 Int

导出的数据格式

首先,在TableStore多版本模型下,表中的数据组织为“行-列-版本”三级的模式, 一行可以有任意列,列名也并非固定的,每一列可以含有多个版本,每个版本都有一个特定的时间戳(版本号)。

用户可以通过TableStore的API进行一系列读写操作, TableStore通过记录用户最近对表的一系列写操作(或称为数据更改操作)来实现记录增量数据的目的, 所以也可以把增量数据看作一批操作记录。

TableStore有三类数据更改操作:PutRow、UpdateRow、DeleteRow。

  • PutRow的语义是写入一行,若该行已存在即覆盖该行。

  • UpdateRow的语义是更新一行,对原行其他数据不做更改, 更新可能包括新增或覆盖(若对应列的对应版本已存在)一些列值、删除某一列的全部版本、删除某一列的某个版本。

  • DeleteRow的语义是删除一行。

TableStore会根据每种操作生成对应的增量数据记录,Reader插件会读出这些记录,并导出成Datax的数据格式。

同时,由于TableStore具有动态列、多版本的特性,所以Reader插件导出的一行不对应TableStore中的一行,而是对应TableStore中的一列的一个版本。 即TableStore中的一行可能会导出很多行,每行包含主键值、该列的列名、该列下该版本的时间戳(版本号)、该版本的值、操作类型。若设置isExportSequenceInfo为true,还会包括时序信息。

转换为Datax的数据格式后,我们定义了四种操作类型,分别为:

  • U(UPDATE): 写入一列的一个版本

  • DO(DELETE_ONE_VERSION): 删除某一列的某个版本

  • DA(DELETE_ALL_VERSION): 删除某一列的全部版本,此时需要根据主键和列名,将对应列的全部版本删除

  • DR(DELETE_ROW): 删除某一行,此时需要根据主键,将该行数据全部删除

举例如下,假设该表有两个主键列,主键列名分别为pkName1, pkName2:

pkName1 pkName2 columnName timestamp columnValue opType
pk1_V1 pk2_V1 col_a 1441803688001 col_val1 U
pk1_V1 pk2_V1 col_a 1441803688002 col_val2 U
pk1_V1 pk2_V1 col_b 1441803688003 col_val3 U
pk1_V2 pk2_V2 col_a 1441803688000 DO
pk1_V2 pk2_V2 col_b DA
pk1_V3 pk2_V3 DR
pk1_V3 pk2_V3 col_a 1441803688005 col_val1 U

假设导出的数据如上,共7行,对应TableStore表内的3行,主键分别是(pk1_V1,pk2_V1), (pk1_V2, pk2_V2), (pk1_V3, pk2_V3)。

对于主键为(pk1_V1, pk2_V1)的一行,包含三个操作,分别是写入col_a列的两个版本和col_b列的一个版本。

对于主键为(pk1_V2, pk2_V2)的一行,包含两个操作,分别是删除col_a列的一个版本、删除col_b列的全部版本。

对于主键为(pk1_V3, pk2_V3)的一行,包含两个操作,分别是删除整行、写入col_a列的一个版本。