The Round

合同会社ナイツオの開発ブログ

[PR] 5分から相談できるGCP™ 開発コンサル!→こちら

GAE/Go TaskQueueについて その2~Pullキュー~

注:古い記事の為、内容が最新ではない可能性がありますm(_ _)m

こんにちわ!マツウラです。
前回のPushキューに引き続き、今回はPullキューについてです。
公式ドキュメントを参考に見てゆこうと思います。

参考:Go — Google Developers Using Pull Queues in Go

Pullキューの使い方 in Go

Pullキューではタスクを実行するため独自のシステムを設計できます。
タスクをAppEngineアプリのみならず、外部のシステムに実行させることもできます。(タスクキューREST APIを用いて)。
タスクの実行者は指定した期間タスクを貸してもらい、その後処理を行い期間が過ぎる前にそれらを削除します。

Pullキューの使用にはPushキューで自動化されているいくつかの機能をアプリで処理する必要があります。

  • アプリケーションが処理量に基づきワーカー数をスケールする必要があります。
    もしアプリケーションがスケールされないと、処理するタスクがないときにリソースが無駄になります。
    逆に処理するタスクが非常に多いと待ち時間が長くなり危険です。
  • アプリケーションが処理のあとで明示的にタスクを削除する必要があります。
    PushキューではAppEngineがタスクを削除してくれました。
    もしアプリケーションがPullキュータスクを削除しないと別のワーカーがタスクを再処理するかもしれません。
    そのため、タスクが冪等性を保証しないとリソースを浪費したりエラーが起こるリスクがあります。

Pullキューはqueue.yamlでの設定が必要です。
定義の詳細についてはPullキューの定義を参照してください。

Pullキューの概要

PullキューはAppEngineのタスク処理システムの外部でタスクを処理することを可能にします。
実行者がAppEngineアプリの一部ならappengine/taskqueueからAPIを用いてタスクを操作することができます。

プロセスは次のように動作します。

  1. 実行者はTask Queue APIまたはTask Queue REST APIのいずれかを介してタスクを借り受けます。
  2. AppEngineが実行者にタスクデータを送信します。
  3. 実行者がタスクを処理します。
    タスクはリース期限が切れる前ならば、失敗しても再び借り受けできます。
    これはリトライとしてカウントされ、リトライ数の最大値を設定することができます。
  4. 一度タスクが成功すると、実行者はそのタスクを削除しなければなりません。
  5. 実行者は処理量に応じてインスタンスをスケーリングする責任があります。

AppEngineでタスクをPullする

作業を始める前にqueue.yamlを設定して下さい。

Pullキューにタスクを追加する

Pullキューにタスクを追加するには、queue.yamlで定義したキュー名を使用してキューを取得し、PULLにMethodを設定します。
次はpull-queueと名付けたPullキューにタスクをエンキューする例です。

func PullHandler(w http.ResponseWriter, r *http.Request) {
    c := appengine.NewContext(r)

    t := &taskqueue.Task{
        Payload: []byte("Hello world"),
        Method:  "PULL",
    }

    _, err := taskqueue.Add(c, t, "pull-queue")
    if err != nil {
        c.Errorf("%v", err)
        return
    }
}
queue:
- name: pull-queue
  mode: pull

タスクのリース

Important: キューが供給可能になるより早くにタスクのリースを試みているかどうかポーリングループで検出する必要があります。
この障害が起きると、taskqueue.Leaseからエラーが返されます。

これらのエラーを処理し、taskqueue.Leaseを呼び出してからリトライの間隔を空け、後でもう一度やり直さなければなりません。
この問題を回避するには、taskqueue.Leaseを呼ぶ際にはRPCの期限を長く設定するよう検討してください。
リースリクエストがタスクの空リストを返した際もリトライの間隔を空ける必要があるので注意してください。

一度Pullキューにタスクを追加すると、taskqueue.Leaseを用いて1つ以上のタスクをリース可能です。
taskqueue.Lease経由で使用可能になるtaskqueue.Addを用いて、つい先ごろ追加されたタスクは、使用可能になるのに少し時間がかかるかもしれません。
リースリクエストの際には、リースするタスク数と何秒間リースするのかを指定します(タスク数は最大で1000、期間は最大で1週間)。
リース期間は最も遅いタスクが完了するまで十分な時間を保証する必要があります。
これを解決するために、taskqueue.ModifyLeaseを用いてリース期間を延長することが可能です。

タスクをリースすると他のワーカーが処理することは不可になり、リース期限まで利用不可のままです。
個々のタスクをリースする場合、APIはキューの先頭からタスクを選択します。
また、タスクが利用できない場合、空のリストが返されます。

Note: taskqueue.LeaseはPullキューでのみ動作します。
そのためPushキューで追加されたタスクをリースしようとすると、AppEngineはエラーを返します。
queue.yamlの定義を変更することでPushキューをPullキューに変更することができます。
詳細についてはPullキューの定義を参照してください。

次のサンプルコードはPullキューから1時間でタスクを100個リースしています。

c := appengine.NewContext(r)
tasks, err := taskqueue.Lease(c, 100, "pull-queue", 3600)

タスクのタグ付け(Experimental)

Note: 未だ試験的な機能です。互換性のない変更が行われる可能性があるので注意してください。

タスクのすべてが同じではありません。
そこでタスクにタグを付け、リースするタスクをタグから選択することが可能です。
タグはフィルタとして機能します。

_, err := taskqueue.Add(c, &taskqueue.Task{Payload: []byte("parse1"), Method: "PULL", Tag: "parse"}, "pull-queue")
_, err = taskqueue.Add(c, &taskqueue.Task{Payload: []byte("parse2"), Method: "PULL", Tag: "parse"}, "pull-queue")
_, err = taskqueue.Add(c, &taskqueue.Task{Payload: []byte("render1"), Method: "PULL", Tag: "render"}, "pull-queue")
_, err = taskqueue.Add(c, &taskqueue.Task{Payload: []byte("render2"), Method: "PULL", Tag: "render"}, "pull-queue")

// renderタスクのみリースする
tasks, err := taskqueue.LeaseByTag(c, 100, "pull-queue", 3600, "render")

tasks, err = taskqueue.LeaseByTag(c, 100, "pull-queue", 3600, "")

2回目のLeaseByTagではタグ名が省略されています。
これはETAが最も古いタスクに付けられたタグと同名のタグが付いたタスクを100個までリースしています。

タスクの削除

一般的に、一度ワーカーがタスクを完了するとキューからタスクを削除する必要があります。
もしワーカーが処理を終えた後でキューにタスクが残っている場合は、ワーカーが失敗したかもしれません。
この場合は別のワーカーによってタスクを処理する必要があります。

削除はtaskqueue.DeleteMulti に taskqueue.Leaseなどで返されるタスクのリストを渡すことで行います。

tasks, err := taskqueue.Lease(c, 100, "pull-queue", 3600)
// ここでタスク関連の処理を行う。

taskqueue.DeleteMulti(c, tasks, "pull-queue")

これでPullキューは終わりです。
実際に外部からタスクを処理する場合は、Task Queue Rest APIを使用することになります。
APIを外部から使用するためのライブラリの一覧はこちらです。

次回はTaskQueueのdelayパッケージについてです。よろしくおねがいします。