Skip to content

Commit

Permalink
deal with record than cannot be decoded and failed to create shard it…
Browse files Browse the repository at this point in the history
…erator at sequence number
  • Loading branch information
svebk committed Oct 9, 2019
1 parent 3182560 commit 2576209
Showing 1 changed file with 21 additions and 4 deletions.
25 changes: 21 additions & 4 deletions cufacesearch/cufacesearch/ingester/kinesis_ingester.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,21 @@ def get_shard_iterator(self, shard_id):
#ShardIteratorType='AFTER_SEQUENCE_NUMBER')
return shard_iterator['ShardIterator']
except Exception as inst:
msg = "[{}.get_shard_iterator] Could not initialize from previous SequenceNumber {}. {}"
# An error occurred (ProvisionedThroughputExceededException) when calling the GetShardIterator operation: Rate exceeded for shard shardId-000000000000 in stream tf-images-sha1 under account 575573423661.
msg = "[{}.get_shard_iterator] Could not initialize at previous SequenceNumber {}. {}"
print(msg.format(self.pp, sqn, inst))
# Could fail with
#botocore.errorfactory.InvalidArgumentException: An error occurred (InvalidArgumentException) when calling the GetShardIterator operation: StartingSequenceNumber 49592949124142737608129152630877400786080442094840709122 used in GetShardIterator on shard shardId-000000000000 in stream test-local-kinesis-caltech101 under account 000000000000 is invalid because it did not come from this stream.
try:
shard_iterator = self.client.get_shard_iterator(StreamName=self.stream_name,
ShardId=shard_id,
StartingSequenceNumber=sqn,
ShardIteratorType='AFTER_SEQUENCE_NUMBER')
return shard_iterator['ShardIterator']
except Exception as inst2:
msg = "[{}.get_shard_iterator] Also failed to initialize after previous SequenceNumber {}. {}"
print(msg.format(self.pp, sqn, inst2))

# Could fail with
#botocore.errorfactory.InvalidArgumentException: An error occurred (InvalidArgumentException) when calling the GetShardIterator operation: StartingSequenceNumber 49592949124142737608129152630877400786080442094840709122 used in GetShardIterator on shard shardId-000000000000 in stream test-local-kinesis-caltech101 under account 000000000000 is invalid because it did not come from this stream.
shard_iterator_type = self.get_required_param('shard_iterator_type')
if self.verbose > 5:
msg = "[{}.get_shard_iterator] Initializing with type {}"
Expand Down Expand Up @@ -266,8 +277,14 @@ def get_msg_json(self):
print(msg.format(self.pp, lag_ms/1000.0))
sleep_count = 0
for rec in records:
rec_json = json.loads(rec['Data'])
sqn = str(rec['SequenceNumber'].decode("utf-8"))
# This could throw a JSONDecodeError: No JSON object could be decoded
try:
rec_json = json.loads(rec['Data'])
except json.JSONDecodeError:
msg = "[{}: WARNING] Could not parse record at SequenceNumber {}"
print(msg.format(self.pp, sqn))
continue
if self.verbose > 5:
#msg = "[{}: log] Found message at SequenceNumber {} in shard {}: {}"
#print(msg.format(self.pp, sqn, sh_id, rec_json))
Expand Down

0 comments on commit 2576209

Please sign in to comment.