mcommit's message

ソフトウェア開発の仕事をしているsimotinといいます。記事の内容でご質問やご意見がありましたらお気軽にコメントしてください\^o^/

AWS kinesis GetRecords の正しい呼び出し方

AWSのサービスの1つであるKinesis Streamを使う機会がありました。

APIを使ったデータのアップロードとダウンロードをする際にいくつか気を付けないといけない点があったので挙げておきたいと思います。

Kinesisの特徴

Kinesisの特徴はデータのリアルタイム処理ができることでクラウドにアップロードしたデータは基本的に24時間の間アクセスできます。
RDBと違って永続的に保存されるわけではなく、あくまでストリームに流したリアルタイムなデータに対して何らかの解析を行うというのがサービスの用途のようです。

データをアップロードするAPI

さて、kinesisへのデータのアップロードは、

  1. PutRecord
  2. PutRecords

という2つのAPIによって行います。

両者の違いは文字通り1つのデータか複数データかになります。

ちなみに、PutRecordsはAPIの仕様を読む限り、複数レコードのアップロードのうち、一部のデータのみアップロードに成功し、一部は失敗するということがあるようです。

トランザクション的な機能が無く、またレスポンスの内容からはどこからどこまでが成功でどこからどこまでが失敗なのかが分からないようです。手堅くデータをあげるのであればPutRecordをつかえということなのかもしれません。

データの取得はどうするのか?

アップロードしたデータを取り出す時はどうすればいいかですが、

  1. GetShardIterator

docs.aws.amazon.com

  1. GetRecords

docs.aws.amazon.com

というAPIの呼び出しによりデータの取得が可能です。


さて、このGetShardIteratorとGetRecordsの使い方ですが、単純に

「こっからここまでのデータをくれ」

という感じでの呼び出しにはなりません。

特にGetRecordsの呼び出しは一般的なWebAPIにおけるデータ取得とは少しイメージが異なる形です。

細かい説明はAPIのリファレンスを見て頂ければ分かると思いますが、ここではRubyaws-sdkを使ったPutRecordとGetRecordsを呼び出すような、サンプルプログラムを上げておきたいと思います。
特に、データを取得する処理に関して、GetRecordsを使われる方のご参考になれば幸いです。

require 'aws-sdk'

# AWS の設定用定数
REAGION = 'リージョン名'
ACCESS_KEY_ID = 'アクセスキー'
SECRET_ACCESS_KEY = 'シークレットアクセスキー'
STREAM_NAME = "ストリーム名"

Aws.config.update({
  region: REAGION,
  credentials: Aws::Credentials.new(ACCESS_KEY_ID, SECRET_ACCESS_KEY)
})

kinesis = Aws::Kinesis::Client.new(region: REAGION)

# PutRecordで上げるデータはBase64エンコードする必要がある
# →従って、文字列だけでなくバイナリデータもあげることができる
enc_data = ["123"].pack('m')

resp = kinesis.put_record({
  stream_name: STREAM_NAME,
  data: enc_data, # required
  partition_key: "key-test", # required
})

# shardIdにはputRecordのレスポンスに格納されているShardIdを設定する
# シャード数が1であれば恐らく "shardId-000000000000"になっている
shd_id = resp.shard_id
puts "shard_id:#{shd_id}"
puts "sequence_number:#{resp.sequence_number}"

resp = kinesis.get_shard_iterator({
  stream_name: STREAM_NAME, # required
  shard_id: shd_id,        # required
  shard_iterator_type: "TRIM_HORIZON"
}
)

shd_it = resp.shard_iterator

resp = kinesis.get_records({shard_iterator: shd_it, limit: 10000})

loop do
	resp = kinesis.get_records({shard_iterator: shd_it, limit: 10000})
  resp.records.each do |record|
	puts record
    end
    # millis_behind_latest が0になった場合に終了させる
    # get_records自体はデータがなくても成功する
    break if resp.millis_behind_latest == 0
    break if resp.next_shard_iterator.nil?
    shd_it = resp.next_shard_iterator
end

ポイント

GetRecordsの呼び出しで注意しないといけないポイントがあります。
GetRecordsのレスポンスとして返ってくるデータには必ずしもデータが入っているとは限りません。
NextShardIteratorがある限り呼び出しを行わないといけないのですが、実はNextShardIteratorだけでチェックしてもデータが全てとり終わったのかどうかが分かりません。

全てのデータがとり終わったかどうかは、MillisBehindLatestが0かどうかでチェックする必要があります。
このチェックがないとGetRecordsをいつまで呼び出していいか分からず無限ループにはまってしまいます。

感想

サンプルコードを書いてみたはいいのですが、私の中でkinesisやMQTTのようなサービスやプロトコルのメリットが今一つ理解できていない部分があります。

世の中には永続化すべきデータとそうでないデータがあり、両方合わせると莫大な量になるが、
「とりあえず集めるだけ集めて必要なデータだけを残そう」という思想なのかもしれません。

IoTが広がるにつれてkinesisのようなサービスはこれからどんどん使われるようになるでしょうか。。。


Amazon Web Services 基礎からのネットワーク&サーバー構築 改訂版

Amazon Web Services 基礎からのネットワーク&サーバー構築 改訂版

Amazon Web Services実践入門 (WEB+DB PRESS plus)

Amazon Web Services実践入門 (WEB+DB PRESS plus)

注意点

Kinesis Streamはデータの通信量による課金とは別に、作成したストリーム単位でも費用が発生するので、ほったらかしにしないよう注意してください。