The Round

合同会社ナイツオの開発ブログ

[PR] 5分から相談できるGCP™ 開発コンサル!→こちら

Google App Engine for GoからBigQueryへStreaming Insertしてみる

注:古い記事の為、内容が最新ではない可能性がありますm(_ _)m

こんちわ!マツウラです。
前回に引き続きGAE/GoからBigQuery APIを使用してみます。
今回は、BigQueryにデータのリアルタイム挿入を行うStreaming Insertをやってみます。

参考:Streaming Data Into BigQuery - Google BigQuery — Google Developers

まずは、Streaming isertについてGoogle Developersから解説を引っ張ってきました。
今更必要ないって方は、Google APIs Client Librariesを使用してBigQuery APIを叩く下記サンプルコードへどうぞ。

Streaming Data Into BigQuery

BigQueryではデータを読み込むためjobを実行する代わりに、tabledata().insertAll()メソッドを使用して、データを一度にBigQueryの1レコードにストリーミングする方法を選択できます。
Streaming insertは、load job実行による遅延無しにデータを照会することができます。
ただし、考慮しておくトレードオフがあります。

割り当て制限

  • 行の最大サイズ:20KB
  • insert毎の行全体のデータサイズ:1MB
  • 毎秒ごとの最大行数:テーブルごとに毎秒10,000行。この制限を超えるとquota_exceededエラーが発生。
  • リクエストごとの最大行数:500
  • 毎秒ごとの最大バイト数:テーブルごとに毎秒10MB。この制限を超えるとquota_exceededエラーが発生。

また、Streaming insertは始めに最大2分のウォームアップ期間があるため、その間ストリーミングデータはアクセスできません。

ウォームアップの後、全ストリーミングデータが追加されている間も即時照会可能です。
数時間の間、非アクティブであると次回insert時にふたたびウォームアップが発生します。

データのコピー、エクスポートが利用可能になるまで90分かかります。

整合性

Streaming insert中のデータ整合性を保つためには各行に挿入されるinsertIdを指定します。
BigQueryは少なくとも1分間はこのIDを記憶します。
仮にこの間に同じIDが指定された行セットをストリーミングすると、BigQueryはベストエフォート方式でデータの重複排除にinsertIdプロパティを使用します。

このIDを使用する主な理由は、特定のエラー状況におけるStreaming insertの状態を判断する方法がないからです。
たとえば、ネットワークエラーやBigQuery内部のエラーがあります。
まれに指定地域のデータセンターが利用不可な状態であると、混乱した地域のデータホストによってデータの重複が発生する可能性があります。
新しい行の挿入は、別地域のデータセンターにルーティングされますが、混乱した地域のデータとの重複排除は出来ないでしょう。

データにより強い一貫性が必要な場合、トランザクションをサポートしている代替サービスとしてGoogle Cloud Datastoreがあります。

BigQueryのユースケース

イベントログを大量に取る

リアルタイムに大量のデータ収集を行う場合、Streaming insertは良い選択です。
一般的には次のような基準があります。

  • トランザクショナルではない:大量かつ継続的に行が追加される。アプリがデータの重複が発生する可能性や、一時的にデータを使用できないことを許容することができる。
  • 集計分析:単一または限られたレコード選択とは対照的に、クエリが一般的に傾向分析のため実行される。

一例としては、イベントトラッキングがあります。
イベントを追跡するモバイルアプリを持っているとします。
アプリ、またはモバイルサーバーは、独立してユーザーインタラクションやシステムエラーを記録し、BigQueryにストリーミング出来ました。
全体の動向の決定、交流の活発な地域、問題のある地域、リアルタイムのエラー状態監視などを行うためデータの分析を行う、となります。

リアルタイムのダッシュボード、クエリ

特定の状況では、BigQueryにデータをストリーミングすることで、トランザクションデータをリアルタイムで分析することが可能になります。
これには、ストリーミングデータの行の重複とデータ損失の可能性があるので、BigQuery外にトランザクションデータストアを持っているか確認してください。

データの最新のビューを持てるように、トランザクションデータを分析することを保証するため、いくつか予防措置を講じることができます。

  1. 同一のスキーマをもつ2つのテーブルを作成します。1つ目のテーブルは調整済みデータ用、2つ目のテーブルはリアルタイムの未調整データ用です。
  2. クライアント側では、レコードのトランザクションデータストアを維持します。
  3. 2.のレコードについてはinsertAll()リクエストを打ちっ放します。insertAll()リクエストはあて先のテーブルとして1.の未調整データ用テーブルをリアルタイムに指定する必要があります。
  4. ある程度の間隔で、トランザクションデータストアから調整済みデータを追加し、未調整データ用テーブルを全データ削除します。
  5. このユースケースでは、両方のテーブルからデータを選択することができます。未調整のデータテーブルは重複やレコードの取りこぼしが含まれる可能性があります。

Streaming insertの実行

それでは、Google APIs Client Librariesを使用してBigQuery APIのStreaming insertを実行してみます。


2014/10/02
api パッケージを更新した所、下記の修正は不要になりました。

その前に、Google APIs Client Librariesのコードを一部変更する必要があります。

// $GOPATH/src/code.google.com/p/google-api-go-client/bigquery/v2/bigquery-gen.go

type TableDataInsertAllRequestRows struct {
    InsertId string `json:"insertId,omitempty"`
    Json *JsonObject `json:"json,omitempty"`
}

これを次のように変更します。

type TableDataInsertAllRequestRows struct {
    InsertId string `json:"insertId,omitempty"`
    Json interface{} `json:"json,omitempty"`    // こちらを変更。
}

この修正について気になる方は、下記の「Tabledata: insertAllについて」を御覧ください。


上記のライブラリ変更が済んだなら、次のサンプルを実行することができます。
下記サンプルでは、新しく2件のレコードを追加しています。
データを格納する構造体では、jsonタグによるフィールド名と、BigQueryのスキーマが一致するように注意してください。

2014/10/02
api パッケージの更新に伴う、コードの変更点があります。

import (
    "appengine"
    "code.google.com/p/goauth2/appengine/serviceaccount"
    "code.google.com/p/google-api-go-client/bigquery/v2"
    "fmt"
    "net/http"
)

const (
    PROJECT_ID string = "metal-bus-589"
    DATASET_ID string = "sample_dataset"
    TABLE_ID   string = "sample_table"
)

/* 2014/10/02 変更点 不要になりました。
type Row struct {
   Kind       string `json:"kind"`
   Name       string `json:"name"`
   Population int    `json:"population"`
}
*/

func init() {
    http.HandleFunc("/streaming_insert", StreamingInsertHandler)
}

func InsertAllHandler(rw http.ResponseWriter, req *http.Request) {
    c := appengine.NewContext(req)
    client, err := serviceaccount.NewClient(c, bigquery.BigqueryScope)
    if err != nil {
        fmt.Fprintf(rw, "%s", err.Error())
        return
    }
    service, err := bigquery.New(client)
    if err != nil {
        fmt.Fprintf(rw, "%s", err.Error())
        return
    }

    // データの作成。
    // 2014/10/02 変更点
    rows := make([]*bigquery.TableDataInsertAllRequestRows, 2)
    rows[0] = &bigquery.TableDataInsertAllRequestRows{
        // 旧コード Json: &Row{Kind: "country", Name: "Shizuoka", Population: 707183},
            Json: map[string]bigquery.JsonValue{
                "kind":       "country",
                "name":       "Shizuoka",
                "population": 707183,
            },
    }
    rows[1] = &bigquery.TableDataInsertAllRequestRows{
        // 旧コード Json: &Row{Kind: "country", Name: "Hamamatsu", Population: 791546},
            Json: map[string]bigquery.JsonValue{
                "kind":       "country",
                "name":       "Hamamatsu",
                "population": 791546,
            },
    }
 
    // Streaming insertの実行。 
    _, err := service.Tabledata.InsertAll(projectId, datasetId, tableId, &bigquery.TableDataInsertAllRequest{
        Kind: "bigquery#tableDataInsertAllRequest",
        Rows: rows,
    }).Do()
    if err != nil {
        fmt.Fprintf(rw, "%s", err.Error())
        return
    }
}

func parse(r interface{}) (map[string]bigquery.JsonValue, error) {
    tmp, ok := r.(map[string]interface{})
    if !ok {
        return nil, errors.New(fmt.Sprintf("failed to type assertion: %+v", r))
    }

    result := make(map[string]bigquery.JsonValue)
    for k, v := range tmp {
        jv, ok := v.(bigquery.JsonValue)
        if !ok {
            return nil, errors.New(fmt.Sprintf("failed to type assertion: %+v", v))
        }
        result[k] = jv
    }

    return result, nil
}

無事に完了したならば、Google Cloud ConsoleのサイドメニューからBigQueryのページを開いてみましょう。
指定したデータセットのテーブルに、追加したレコードが表示されているはずです。

おまけ

Tabledata: insertAllについて

2014/10/02
api パッケージを更新した所、下記問題は解決されていました。

Google API Client LibrariesのBigQuery APIにはTabledata: insertAllも当然ながらあります。
ただし、Go言語版においては少々問題があります。

上記で修正したファイルを読むと分かりますが、insertAllで送信するデータは次のような構造になっています。

type TableDataInsertAllRequest struct {
    Kind string `json:"kind,omitempty"`
    Rows []*TableDataInsertAllRequestRows `json:"rows,omitempty"`
}

type TableDataInsertAllRequestRows struct {
    InsertId string `json:"insertId,omitempty"`
    Json *JsonObject `json:"json,omitempty"`
}

type JsonObject struct {
}

はい。何も定義されてませんね、JsonObjectの中身。

このコードのまま使用すると当然ながらエラーになります。
この問題に関してはissueも上がっているようです。
Issue 52: BigQuery v2 JsonObject is not well defined

そこで、JsonObject構造体を次の形に書き換えました。

type TableDataInsertAllRequestRows struct {
    InsertId string `json:"insertId,omitempty"`
    Json interface{} `json:"json,omitempty"`
}

これで自由な型を使うことが可能です。
はやめにissueが解決されることを願うばかりです。


BigQueryを扱ってみましたが、Google APIs Clinet Librariesの対応するAPIであればGoogle App Engineから簡単にアクセスすることが出来ます。 お試しあれ。