注:古い記事の為、内容が最新ではない可能性がありますm(_ _)m
こんちわ!マツウラです。
前回に引き続きGAE/GoからBigQuery APIを使用してみます。
今回は、BigQueryにデータのリアルタイム挿入を行うStreaming Insertをやってみます。
参考:Streaming Data Into BigQuery - Google BigQuery — Google Developers
まずは、Streaming isertについてGoogle Developersから解説を引っ張ってきました。
今更必要ないって方は、Google APIs Client Librariesを使用してBigQuery APIを叩く下記サンプルコードへどうぞ。
Streaming Data Into BigQuery
BigQueryではデータを読み込むためjobを実行する代わりに、tabledata().insertAll()メソッドを使用して、データを一度にBigQueryの1レコードにストリーミングする方法を選択できます。
Streaming insertは、load job実行による遅延無しにデータを照会することができます。
ただし、考慮しておくトレードオフがあります。
割り当て制限
- 行の最大サイズ:20KB
- insert毎の行全体のデータサイズ:1MB
- 毎秒ごとの最大行数:テーブルごとに毎秒10,000行。この制限を超えるとquota_exceededエラーが発生。
- リクエストごとの最大行数:500
- 毎秒ごとの最大バイト数:テーブルごとに毎秒10MB。この制限を超えるとquota_exceededエラーが発生。
また、Streaming insertは始めに最大2分のウォームアップ期間があるため、その間ストリーミングデータはアクセスできません。
ウォームアップの後、全ストリーミングデータが追加されている間も即時照会可能です。
数時間の間、非アクティブであると次回insert時にふたたびウォームアップが発生します。
データのコピー、エクスポートが利用可能になるまで90分かかります。
整合性
Streaming insert中のデータ整合性を保つためには各行に挿入されるinsertIdを指定します。
BigQueryは少なくとも1分間はこのIDを記憶します。
仮にこの間に同じIDが指定された行セットをストリーミングすると、BigQueryはベストエフォート方式でデータの重複排除にinsertIdプロパティを使用します。
このIDを使用する主な理由は、特定のエラー状況におけるStreaming insertの状態を判断する方法がないからです。
たとえば、ネットワークエラーやBigQuery内部のエラーがあります。
まれに指定地域のデータセンターが利用不可な状態であると、混乱した地域のデータホストによってデータの重複が発生する可能性があります。
新しい行の挿入は、別地域のデータセンターにルーティングされますが、混乱した地域のデータとの重複排除は出来ないでしょう。
データにより強い一貫性が必要な場合、トランザクションをサポートしている代替サービスとしてGoogle Cloud Datastoreがあります。
BigQueryのユースケース
イベントログを大量に取る
リアルタイムに大量のデータ収集を行う場合、Streaming insertは良い選択です。
一般的には次のような基準があります。
- トランザクショナルではない:大量かつ継続的に行が追加される。アプリがデータの重複が発生する可能性や、一時的にデータを使用できないことを許容することができる。
- 集計分析:単一または限られたレコード選択とは対照的に、クエリが一般的に傾向分析のため実行される。
一例としては、イベントトラッキングがあります。
イベントを追跡するモバイルアプリを持っているとします。
アプリ、またはモバイルサーバーは、独立してユーザーインタラクションやシステムエラーを記録し、BigQueryにストリーミング出来ました。
全体の動向の決定、交流の活発な地域、問題のある地域、リアルタイムのエラー状態監視などを行うためデータの分析を行う、となります。
リアルタイムのダッシュボード、クエリ
特定の状況では、BigQueryにデータをストリーミングすることで、トランザクションデータをリアルタイムで分析することが可能になります。
これには、ストリーミングデータの行の重複とデータ損失の可能性があるので、BigQuery外にトランザクションデータストアを持っているか確認してください。
データの最新のビューを持てるように、トランザクションデータを分析することを保証するため、いくつか予防措置を講じることができます。
- 同一のスキーマをもつ2つのテーブルを作成します。1つ目のテーブルは調整済みデータ用、2つ目のテーブルはリアルタイムの未調整データ用です。
- クライアント側では、レコードのトランザクションデータストアを維持します。
- 2.のレコードについてはinsertAll()リクエストを打ちっ放します。insertAll()リクエストはあて先のテーブルとして1.の未調整データ用テーブルをリアルタイムに指定する必要があります。
- ある程度の間隔で、トランザクションデータストアから調整済みデータを追加し、未調整データ用テーブルを全データ削除します。
- このユースケースでは、両方のテーブルからデータを選択することができます。未調整のデータテーブルは重複やレコードの取りこぼしが含まれる可能性があります。
Streaming insertの実行
それでは、Google APIs Client Librariesを使用してBigQuery APIのStreaming insertを実行してみます。
2014/10/02
api パッケージを更新した所、下記の修正は不要になりました。
その前に、Google APIs Client Librariesのコードを一部変更する必要があります。
// $GOPATH/src/code.google.com/p/google-api-go-client/bigquery/v2/bigquery-gen.go type TableDataInsertAllRequestRows struct { InsertId string `json:"insertId,omitempty"` Json *JsonObject `json:"json,omitempty"` }
これを次のように変更します。
type TableDataInsertAllRequestRows struct { InsertId string `json:"insertId,omitempty"` Json interface{} `json:"json,omitempty"` // こちらを変更。 }
この修正について気になる方は、下記の「Tabledata: insertAllについて」を御覧ください。
上記のライブラリ変更が済んだなら、次のサンプルを実行することができます。
下記サンプルでは、新しく2件のレコードを追加しています。
データを格納する構造体では、jsonタグによるフィールド名と、BigQueryのスキーマが一致するように注意してください。
2014/10/02
api パッケージの更新に伴う、コードの変更点があります。
import ( "appengine" "code.google.com/p/goauth2/appengine/serviceaccount" "code.google.com/p/google-api-go-client/bigquery/v2" "fmt" "net/http" ) const ( PROJECT_ID string = "metal-bus-589" DATASET_ID string = "sample_dataset" TABLE_ID string = "sample_table" ) /* 2014/10/02 変更点 不要になりました。 type Row struct { Kind string `json:"kind"` Name string `json:"name"` Population int `json:"population"` } */ func init() { http.HandleFunc("/streaming_insert", StreamingInsertHandler) } func InsertAllHandler(rw http.ResponseWriter, req *http.Request) { c := appengine.NewContext(req) client, err := serviceaccount.NewClient(c, bigquery.BigqueryScope) if err != nil { fmt.Fprintf(rw, "%s", err.Error()) return } service, err := bigquery.New(client) if err != nil { fmt.Fprintf(rw, "%s", err.Error()) return } // データの作成。 // 2014/10/02 変更点 rows := make([]*bigquery.TableDataInsertAllRequestRows, 2) rows[0] = &bigquery.TableDataInsertAllRequestRows{ // 旧コード Json: &Row{Kind: "country", Name: "Shizuoka", Population: 707183}, Json: map[string]bigquery.JsonValue{ "kind": "country", "name": "Shizuoka", "population": 707183, }, } rows[1] = &bigquery.TableDataInsertAllRequestRows{ // 旧コード Json: &Row{Kind: "country", Name: "Hamamatsu", Population: 791546}, Json: map[string]bigquery.JsonValue{ "kind": "country", "name": "Hamamatsu", "population": 791546, }, } // Streaming insertの実行。 _, err := service.Tabledata.InsertAll(projectId, datasetId, tableId, &bigquery.TableDataInsertAllRequest{ Kind: "bigquery#tableDataInsertAllRequest", Rows: rows, }).Do() if err != nil { fmt.Fprintf(rw, "%s", err.Error()) return } } func parse(r interface{}) (map[string]bigquery.JsonValue, error) { tmp, ok := r.(map[string]interface{}) if !ok { return nil, errors.New(fmt.Sprintf("failed to type assertion: %+v", r)) } result := make(map[string]bigquery.JsonValue) for k, v := range tmp { jv, ok := v.(bigquery.JsonValue) if !ok { return nil, errors.New(fmt.Sprintf("failed to type assertion: %+v", v)) } result[k] = jv } return result, nil }
無事に完了したならば、Google Cloud ConsoleのサイドメニューからBigQueryのページを開いてみましょう。
指定したデータセットのテーブルに、追加したレコードが表示されているはずです。
おまけ
Tabledata: insertAllについて
2014/10/02
api パッケージを更新した所、下記問題は解決されていました。
Google API Client LibrariesのBigQuery APIにはTabledata: insertAllも当然ながらあります。
ただし、Go言語版においては少々問題があります。
上記で修正したファイルを読むと分かりますが、insertAllで送信するデータは次のような構造になっています。
type TableDataInsertAllRequest struct { Kind string `json:"kind,omitempty"` Rows []*TableDataInsertAllRequestRows `json:"rows,omitempty"` } type TableDataInsertAllRequestRows struct { InsertId string `json:"insertId,omitempty"` Json *JsonObject `json:"json,omitempty"` } type JsonObject struct { }
はい。何も定義されてませんね、JsonObjectの中身。
このコードのまま使用すると当然ながらエラーになります。
この問題に関してはissueも上がっているようです。
Issue 52: BigQuery v2 JsonObject is not well defined
そこで、JsonObject構造体を次の形に書き換えました。
type TableDataInsertAllRequestRows struct { InsertId string `json:"insertId,omitempty"` Json interface{} `json:"json,omitempty"` }
これで自由な型を使うことが可能です。
はやめにissueが解決されることを願うばかりです。
BigQueryを扱ってみましたが、Google APIs Clinet Librariesの対応するAPIであればGoogle App Engineから簡単にアクセスすることが出来ます。 お試しあれ。