AWSのサービスの1つであるKinesis Streamを使う機会がありました。
APIを使ったデータのアップロードとダウンロードをする際にいくつか気を付けないといけない点があったので挙げておきたいと思います。
Kinesisの特徴
Kinesisの特徴はデータのリアルタイム処理ができることでクラウドにアップロードしたデータは基本的に24時間の間アクセスできます。
RDBと違って永続的に保存されるわけではなく、あくまでストリームに流したリアルタイムなデータに対して何らかの解析を行うというのがサービスの用途のようです。
データをアップロードするAPI
さて、kinesisへのデータのアップロードは、
- PutRecord
- PutRecords
という2つのAPIによって行います。
両者の違いは文字通り1つのデータか複数データかになります。
ちなみに、PutRecordsはAPIの仕様を読む限り、複数レコードのアップロードのうち、一部のデータのみアップロードに成功し、一部は失敗するということがあるようです。
トランザクション的な機能が無く、またレスポンスの内容からはどこからどこまでが成功でどこからどこまでが失敗なのかが分からないようです。手堅くデータをあげるのであればPutRecordをつかえということなのかもしれません。
データの取得はどうするのか?
アップロードしたデータを取り出す時はどうすればいいかですが、
- GetShardIterator
- GetRecords
というAPIの呼び出しによりデータの取得が可能です。
さて、このGetShardIteratorとGetRecordsの使い方ですが、単純に
「こっからここまでのデータをくれ」
という感じでの呼び出しにはなりません。
特にGetRecordsの呼び出しは一般的なWebAPIにおけるデータ取得とは少しイメージが異なる形です。
細かい説明はAPIのリファレンスを見て頂ければ分かると思いますが、ここではRubyのaws-sdkを使ったPutRecordとGetRecordsを呼び出すような、サンプルプログラムを上げておきたいと思います。
特に、データを取得する処理に関して、GetRecordsを使われる方のご参考になれば幸いです。
require 'aws-sdk' # AWS の設定用定数 REAGION = 'リージョン名' ACCESS_KEY_ID = 'アクセスキー' SECRET_ACCESS_KEY = 'シークレットアクセスキー' STREAM_NAME = "ストリーム名" Aws.config.update({ region: REAGION, credentials: Aws::Credentials.new(ACCESS_KEY_ID, SECRET_ACCESS_KEY) }) kinesis = Aws::Kinesis::Client.new(region: REAGION) # PutRecordで上げるデータはBase64エンコードする必要がある # →従って、文字列だけでなくバイナリデータもあげることができる enc_data = ["123"].pack('m') resp = kinesis.put_record({ stream_name: STREAM_NAME, data: enc_data, # required partition_key: "key-test", # required }) # shardIdにはputRecordのレスポンスに格納されているShardIdを設定する # シャード数が1であれば恐らく "shardId-000000000000"になっている shd_id = resp.shard_id puts "shard_id:#{shd_id}" puts "sequence_number:#{resp.sequence_number}" resp = kinesis.get_shard_iterator({ stream_name: STREAM_NAME, # required shard_id: shd_id, # required shard_iterator_type: "TRIM_HORIZON" } ) shd_it = resp.shard_iterator resp = kinesis.get_records({shard_iterator: shd_it, limit: 10000}) loop do resp = kinesis.get_records({shard_iterator: shd_it, limit: 10000}) resp.records.each do |record| puts record end # millis_behind_latest が0になった場合に終了させる # get_records自体はデータがなくても成功する break if resp.millis_behind_latest == 0 break if resp.next_shard_iterator.nil? shd_it = resp.next_shard_iterator end
ポイント
GetRecordsの呼び出しで注意しないといけないポイントがあります。
GetRecordsのレスポンスとして返ってくるデータには必ずしもデータが入っているとは限りません。
NextShardIteratorがある限り呼び出しを行わないといけないのですが、実はNextShardIteratorだけでチェックしてもデータが全てとり終わったのかどうかが分かりません。
全てのデータがとり終わったかどうかは、MillisBehindLatestが0かどうかでチェックする必要があります。
このチェックがないとGetRecordsをいつまで呼び出していいか分からず無限ループにはまってしまいます。
感想
サンプルコードを書いてみたはいいのですが、私の中でkinesisやMQTTのようなサービスやプロトコルのメリットが今一つ理解できていない部分があります。
世の中には永続化すべきデータとそうでないデータがあり、両方合わせると莫大な量になるが、
「とりあえず集めるだけ集めて必要なデータだけを残そう」という思想なのかもしれません。
IoTが広がるにつれてkinesisのようなサービスはこれからどんどん使われるようになるでしょうか。。。
Amazon Web Services 基礎からのネットワーク&サーバー構築 改訂版
- 作者: 玉川憲,片山暁雄,今井雄太,大澤文孝
- 出版社/メーカー: 日経BP社
- 発売日: 2017/04/13
- メディア: 単行本
- この商品を含むブログを見る
Amazon Web Services実践入門 (WEB+DB PRESS plus)
- 作者: 舘岡守,今井智明,永淵恭子,間瀬哲也,三浦悟,柳瀬任章
- 出版社/メーカー: 技術評論社
- 発売日: 2015/11/10
- メディア: 単行本(ソフトカバー)
- この商品を含むブログ (1件) を見る
注意点
Kinesis Streamはデータの通信量による課金とは別に、作成したストリーム単位でも費用が発生するので、ほったらかしにしないよう注意してください。