AWSのサービスの1つであるKinesis Streamを使う機会がありました。
APIを使ったデータのアップロードとダウンロードをする際にいくつか気を付けないといけない点があったので挙げておきたいと思います。
Kinesisの特徴はデータのリアルタイム処理ができることでクラウドにアップロードしたデータは基本的に24時間の間アクセスできます。
RDBと違って永続的に保存されるわけではなく、あくまでストリームに流したリアルタイムなデータに対して何らかの解析を行うというのがサービスの用途のようです。
データをアップロードするAPI
さて、kinesisへのデータのアップロードは、
- PutRecord
- PutRecords
という2つのAPIによって行います。
両者の違いは文字通り1つのデータか複数データかになります。
ちなみに、PutRecordsはAPIの仕様を読む限り、複数レコードのアップロードのうち、一部のデータのみアップロードに成功し、一部は失敗するということがあるようです。
トランザクション的な機能が無く、またレスポンスの内容からはどこからどこまでが成功でどこからどこまでが失敗なのかが分からないようです。手堅くデータをあげるのであればPutRecordをつかえということなのかもしれません。
データの取得はどうするのか?
アップロードしたデータを取り出す時はどうすればいいかですが、
- GetShardIterator
docs.aws.amazon.com
- GetRecords
docs.aws.amazon.com
というAPIの呼び出しによりデータの取得が可能です。
さて、このGetShardIteratorとGetRecordsの使い方ですが、単純に
「こっからここまでのデータをくれ」
という感じでの呼び出しにはなりません。
特にGetRecordsの呼び出しは一般的なWebAPIにおけるデータ取得とは少しイメージが異なる形です。
細かい説明はAPIのリファレンスを見て頂ければ分かると思いますが、ここではRubyのaws-sdkを使ったPutRecordとGetRecordsを呼び出すような、サンプルプログラムを上げておきたいと思います。
特に、データを取得する処理に関して、GetRecordsを使われる方のご参考になれば幸いです。
require 'aws-sdk'
REAGION = 'リージョン名'
ACCESS_KEY_ID = 'アクセスキー'
SECRET_ACCESS_KEY = 'シークレットアクセスキー'
STREAM_NAME = "ストリーム名"
Aws.config.update({
region: REAGION,
credentials: Aws::Credentials.new(ACCESS_KEY_ID, SECRET_ACCESS_KEY)
})
kinesis = Aws::Kinesis::Client.new(region: REAGION)
enc_data = ["123"].pack('m')
resp = kinesis.put_record({
stream_name: STREAM_NAME,
data: enc_data,
partition_key: "key-test",
})
shd_id = resp.shard_id
puts "shard_id:#{shd_id}"
puts "sequence_number:#{resp.sequence_number}"
resp = kinesis.get_shard_iterator({
stream_name: STREAM_NAME,
shard_id: shd_id,
shard_iterator_type: "TRIM_HORIZON"
}
)
shd_it = resp.shard_iterator
resp = kinesis.get_records({shard_iterator: shd_it, limit: 10000})
loop do
resp = kinesis.get_records({shard_iterator: shd_it, limit: 10000})
resp.records.each do |record|
puts record
end
break if resp.millis_behind_latest == 0
break if resp.next_shard_iterator.nil?
shd_it = resp.next_shard_iterator
end
ポイント
GetRecordsの呼び出しで注意しないといけないポイントがあります。
GetRecordsのレスポンスとして返ってくるデータには必ずしもデータが入っているとは限りません。
NextShardIteratorがある限り呼び出しを行わないといけないのですが、実はNextShardIteratorだけでチェックしてもデータが全てとり終わったのかどうかが分かりません。
全てのデータがとり終わったかどうかは、MillisBehindLatestが0かどうかでチェックする必要があります。
このチェックがないとGetRecordsをいつまで呼び出していいか分からず無限ループにはまってしまいます。
感想
サンプルコードを書いてみたはいいのですが、私の中でkinesisやMQTTのようなサービスやプロトコルのメリットが今一つ理解できていない部分があります。
世の中には永続化すべきデータとそうでないデータがあり、両方合わせると莫大な量になるが、
「とりあえず集めるだけ集めて必要なデータだけを残そう」という思想なのかもしれません。
IoTが広がるにつれてkinesisのようなサービスはこれからどんどん使われるようになるでしょうか。。。
注意点
Kinesis Streamはデータの通信量による課金とは別に、作成したストリーム単位でも費用が発生するので、ほったらかしにしないよう注意してください。