リモート開発メインのソフトウェア開発企業のエンジニアブログです

Goroutines と Channels を使ってジョブを並行化した時のメモ

先日、Go で書かれたとあるバッチジョブのパフォーマンス改善として処理の並行化をするにあたり、今回初めて使った Goroutines, Channels で色々とハマりどころがあったので備忘録用にここにまとめておきます。

尚、今回は並行化を施す前のサンプルから、最終的な実装まで順を追ってコードの実装例を交えて書きますので少々長くなります。最終的な実装の部分だけ見たい人は https://github.com/issei-m/go-concurrency-test にコードを公開しているのでこちらをご覧下さい。

対象読者

Go 初学者、あるいは Goroutines や Channels を使って初めて並行処理を実装しようとしている方。

また、 Goroutines や Channels に関しては公式ドキュメントを含め質の高い情報が豊富にあるので、本記事はそれぞれ詳細な部分は極力省いて実践的な内容としました。似た様な実装をしようとしている方の参考になれば幸いです。

今回並行処理を実装した元のバッチジョブのサンプル

ざっくり書くと以下の様な感じです。(色々端折ってますが詳しくは GitHub リポジトリの方をご覧ください)

var bufferSize = 20

func ProcessItems(items []item.Item) {
	failed := false
	processedCount := 0

	for _, targetItem := range items {
		result, err := item.ProcessItem(targetItem) // 重いブロッキング I/O が中で動く
		if err != nil {
			failed = true
			break // 1個でも失敗したら処理を抜ける
		}
        
		logger.Info(result) // 処理成功時は結果をロガーで出力する

		processedCount++

		if processedCount%bufferSize == 0 {
			// バッファをクリアする処理
		}
	}

	if !failed && processedCount%bufferSize > 0 {
		// バッファをクリアする処理
	}
}

処理内容をまとめると、

  • 渡された items []item.Item から項目を1個ずつ取り出して item.ProcessItem(targetItem item.Item) (string, error) に渡す
  • 処理に成功した場合は処理結果である1番目の戻り値をロガーで記録する
  • 途中で1回でも失敗した場合、以降の処理を全てキャンセルする
    • 既に成功した部分についてはロールバックする必要はない
  • ループ内の処理で別途何かのバッファリングを行っており、20件に1回クリアする必要がある

と言った形になります。item.ProcessItem(targetItem) は内部で重いブロッキング I/O 処理を実行している為、逐次処理では効率が悪く、性能試験でボトルネックとなっていた為、この部分を goroutine を使って並行化する運びとなりました。

並行化の流れ

ここから実際に Goroutines と Channels を使って実装を行っていきますが、冒頭にも書いた通りいくつかの段階に分けて途中のコードを記載していくので、最終実装だけ見たい人は GitHub リポジトリをご覧下さい。

では早速 Goroutine を使って行きましょう。今回は item.ProcessItem() 関数の実行を並行に行いたいので、これを安直に実装するとこんな感じになります:

var wg sync.WaitGroup
for _, targetItem := range items {
    wg.Add(1)
    go func(targetItem item.Item) {
        defer wg.Done()
        result, _ := item.ProcessItem(targetItem) // 重いブロッキング I/O が中で動く
        
        logger.Info(result) // 処理成功時は結果をロガーで出力する
    }(targetItem)
}

wg.Wait() // 全ての Goroutine で wg.Done() が呼ばれるまで処理がブロックされる

これで全ての item.ProcessItem(targetItem) の処理は並行に実行されます。

Goroutine によって並行に行われる item.ProcessItem() の実行を、メインスレッド (因みにメインスレッドも Goroutine の1つです) は待ってくれないので、全ての Goroutine の処理が終わるまでメインスレッドの実行をブロックする必要があります。そうしないとプログラムが直ちに終了してしまう為です。

この時点では sync.WaitGroup を使っています。wg.Add は内部のカウンタをインクリメントし、 wg.Done はデクリメントを行います。 wg.Wait はそのカウンタが0になるまで処理をブロックしますので、この様にする事で全ての Goroutine の処理が終わるのを待つ事ができます。

さて、この時点では上記サンプルはまだ不完全で、以下の問題があります。

  • items の数だけ を無制限に作っている
  • item.ProcessItem(targetItem) のエラー処理やバッファリングが未実装

次回以降の節では Channels を使ってこれらを順に対応していきます。

並行数の制御

先程の例では items の数だけ Goroutine が作られてました。Goroutines そのものは軽量なので大量に作っても結構問題ないのですが、中で実行している処理のスループットを抑えたい等 (API のレートリミットや、外部ミドルウェアへの負荷等) の理由で同時処理数を制限したくなる事があります。

並行処理数を制限する方法は主にセマフォを使って for range items のループを適宜止めるか、予め指定した数の Goroutine を 非同期処理の worker として事前にスポーンしておく 等があると思いますが、今回は結果処理の兼ね合いがあるので後者で実装します。

concurrency := 20 // 同時処理数は20個までとする

chItems := make(chan item.Item) // メインスレッドからワーカーに `item.Item` を受け渡す Channel

var wg sync.WaitGroup
for i := 0; i < concurrency; i++ {
    wg.Add(1)
    go func() {
        defer wg.Done()
        
        // close(chItems) が呼ばれるまで targetItem の取り出しが (Goroutine 間で均等に) 行われる
        for targetItem := range chItems {
            result, _ := item.ProcessItem(targetItem) // 重いブロッキング I/O が中で動く
        
            logger.Info(result) // 処理成功時は結果をロガーで出力する
        }
    }()
}

for _, targetItem := range items {
    chItems <- targetItem
}
close(chItems) // 全て送り出したら close する. これをしないと `for targetItem := range chItems` が正しく動作しない

wg.Wait() // 全ての Goroutine で wg.Done() が呼ばれるまで処理がブロックされる

concurrency の数だけ予め Goroutine をワーカーとしてスポーンしておきます。内部では items の中身を処理しますが、 Go では Goroutine 間で安全な値の受け渡しには通常 Channels を使います。この実装では Channel である chItems から受け取った targetItem を逐次処理していくと言った内容になっています。また、この時点では chItems に値は入っていないので、全ての Goroutine はバックグラウンドで chItems に値が送信されるまで待機している事になります。

次の for range では全ての targetItem を chItems に送出します。この時点で、各ワーカーでは随時 chItems からの値の受信が始まり、非同期に処理が行われていきます。
また、全ての chItems の受信者間 (ワーカー間) で受信は均等にロードバランスされます。

全ての送出が終わった時点 (ワーカーとは非同期で動くのですぐ終わります) で close(chItems) をしてこれ以上処理する item.Item が無い事を通知します。こうする事で、やがて各ワーカーの chItems の受信が随時終了し、ワーカーも終了していきます。

なお、 close を忘れると、ワーカーの for targetItem := range chItems は終わらずブロックされます。メインスレッドではワーカーの終了を sync.WaitGroup で待機していますが、ワーカーは永久に終わらないので deadlock となり、エラーになるので注意しましょう。

エラー処理

ここで一番最初のスニペットを見てみましょう。

result, err := item.ProcessItem(targetItem) // 重いブロッキング I/O が中で動く
if err != nil {
    //
}
logger.Info(result)

item.ProcessItem(targetItem) は処理成功した場合は1番目の戻り値に結果を、失敗した場合は2番目の戻り値にエラーをセットして返す多値返却の関数です。

さて、この関数は Goroutine で処理しますが、エラー検出時の処理やバッファリングを行う為、結果をメインスレッドに送り返す必要があります。ここでも Channels を使って実際に結果を返してみましょう。

// 関数の戻り値が多値な為、1つの構造体にまとめる
type taskResult struct {
	result string
	err    error
}

// ...

concurrency := 20 // 同時処理数は20個までとする

chItems := make(chan item.Item)     // メインスレッドからワーカーに `item.Item` を受け渡す Channel
chResults := make(chan *taskResult) // ワーカーからメインスレッドに `item.ProcessItem` の結果を返す Channel

var wg sync.WaitGroup
for i := 0; i < concurrency; i++ {
    wg.Add(1)
    go func() {
        defer wg.Done()

        // close(chItems) が呼ばれるまで targetItem の取り出しが (Goroutine 間で均等に) 行われる
        for targetItem := range chItems {
            r, err := item.ProcessItem(targetItem)        // 重いブロッキング I/O が中で動く
            chResults <- &taskResult{result: r, err: err} // 結果を構造体に入れ、 `chResults` に送出
        }
    }()
}

for _, targetItem := range items {
    chItems <- targetItem
}
close(chItems) // 全て送り出したら close する. これをしないと `for targetItem := range chItems` が正しく動作しない

wg.Wait() // 全ての Goroutine で wg.Done() が呼ばれるまで処理がブロックされる

但しこのままでは正常に動きません。Channel への値の送出は、その時点でこれを受信する別の Goroutine が動いていないと処理がブロックされる為です。ワーカー内の chResults <- &taskResult{result: r, err: err} は、一連の処理で chResults の受信者が不在な為1発でブロックされ、 deadlock と見なされエラーになります。

そこで、次の様にコードを改修します。ちょっと修正量が多いですがご容赦下さい。

concurrency := 20 // 同時処理数は20個までとする

chItems := make(chan item.Item)     // メインスレッドからワーカーに `item.Item` を受け渡す Channel
chResults := make(chan *taskResult) // ワーカーからメインスレッドに `item.ProcessItem` の結果を返す Channel

numWorkers := int32(concurrency) // sync.WaitGroup の代わりにカウンタを使う
for i := 0; i < concurrency; i++ {
    go func() {
        defer atomic.AddInt32(&numWorkers, -1) // Goroutine が終わる度にカウンタをデクリメントする. 並行処理中に安全にに処理する為 `atomic.AddInt32` を使う.

        // close(chItems) が呼ばれるまで targetItem の取り出しが (Goroutine 間で均等に) 行われる
        for targetItem := range chItems {
            r, err := item.ProcessItem(targetItem)        // 重いブロッキング I/O が中で動く
            chResults <- &taskResult{result: r, err: err} // 結果を構造体に入れ、 `chResults` に送出
        }
    }()
}

// `targetItem` の送出は別の Goroutine 内で行う
go func() {
    for _, targetItem := range items {
        chItems <- targetItem
    }
    close(chItems)
}()

// ワーカーが全て終了するまで `chResults` から値を取り出し続ける
for numWorkers > 0 {
    result := <-chResults
    if result.err != nil {
        // TODO: エラー処理
    } else {
        logger.Info(result.result) // 処理成功時は結果をロガーで出力する
        
        // TODO: バッファリング処理
    }
}

これで動作する様になります。今回2つポイントがあります。

まずは、chItems への送出自体を別の Goroutine に分けた事です。理由としては先程説明した chResults <- と同様に、chItems <- による送出もこれを受信する別の Goroutine が動いていないとブロックされ deadlock になってしまう為です。
この場合、 <-chResults による結果の受信処理、あるいは chItems <- への targetItem の送出処理のいずれかを別の Goroutine で処理する必要がありますが、今回はメインスレッドでは結果処理とそれによる後続処理の制御を行いたい為、送出側を別 Goroutine にしています。

次にワーカーの終了待ちを sync.WaitGroup から単純なカウンタに置き換えた事です。これもメインスレッドでは単にワーカーの処理を待ち続けながら別の処理も行う為そうしています。尚、カウンタのデクリメントは競合を防ぐ為、 atomic.AddInt32 を使います。

さて、ここまで来たら後は TODO の部分を実装するだけです。内容は次の通りでした:

  • 途中で1回でも失敗した場合、以降の処理を全てキャンセルする
    • 既に成功した部分についてはロールバックする必要はない
  • ループ内の処理で別途何かのバッファリングを行っており、20件に1回クリアする必要がある

バッファのクリアは大した事はないですね。問題はエラー処理です。エラーを検出した時点でワーカーを含めた全体の処理を止める必要があり、これには context.WithCancel を使うのが簡単です。最後に、バッファリングとエラー処理を実装してみましょう。

concurrency := 20 // 同時処理数は20個までとする

chItems := make(chan item.Item)     // メインスレッドからワーカーに `item.Item` を受け渡す Channel
chResults := make(chan *taskResult) // ワーカーからメインスレッドに `item.ProcessItem` の結果を返す Channel

ctx, cancel := context.WithCancel(context.Background()) // 既に使っている `context.Context` があればそれを指定する。今回は無いので `context.Background()` を新たに作って指定
defer cancel()

numWorkers := int32(concurrency) // sync.WaitGroup の代わりにカウンタを使う
for i := 0; i < concurrency; i++ {
    go func() {
        defer atomic.AddInt32(&numWorkers, -1) // Goroutine が終わる度にカウンタをデクリメントする. 並行処理中に安全にに処理する為 `atomic.AddInt32` を使う.

        // close(chItems) が呼ばれるまで targetItem の取り出しが (Goroutine 間で均等に) 行われる
        for targetItem := range chItems {
            r, err := item.ProcessItem(targetItem)        // 重いブロッキング I/O が中で動く
            chResults <- &taskResult{result: r, err: err} // 結果を構造体に入れ、 `chResults` に送出
        }
    }()
}

// `targetItem` の送出は別の Goroutine 内で行う
go func() {
    defer close(chItems) // Goroutine 終了時に確実に close する

    for _, targetItem := range items {
        select {
        case <-ctx.Done():
            // cancel() が実行された後は余計な `item.ProcessItem` を実行したくないので、送出を中止して Goroutine を終了する
            return
        default:
            chItems <- targetItem
        }
    }
}()

failed := false
processedCount := 0

// ワーカーが全て終了するまで `chResults` から値を取り出し続ける
for numWorkers > 0 {
    select {
    case result := <-chResults:
        if failed {
            break
        }

        // エラーを検出したら `cancel()` を実行し、他の Goroutine にそれを通知し、終了する.
        // 但しメイン Goroutine のこの時点では終了せず全てのワーカーが閉じられるまで待つ.
        // (途中でループから抜けると、ワーカー内で `chResults <-` で送出している部分がブロックされてしまい、 Goroutine が終了せずリークしてしまう)
        if result.err != nil {
            logger.Error(fmt.Sprintf("Error detected: %e", result.err))
            cancel()
            failed = true
            break
        }

        logger.Info(result.result) // 処理成功時は結果をロガーで出力する

        processedCount++

        if processedCount%20 == 0 {
            logger.Info("Flush buffer!")
        }
    default:
        // `default` が無いと、 `chResults` deadlock になるので注意
    }
}

if !failed && processedCount%20 > 0 {
    logger.Info("Flush buffer!")
}

ここでのポイントは、コメントにもある通り、エラーを検出しても全てのワーカーが終了するまでループを抜けないと言う事です。途中で抜けてしまうと chResults を受信する箇所が無くなってしまい、ワーカーの chResults <- 部分が永久にブロックされる事で Goroutine が終了せず、リークしてしまうからです。エラーを検出したらフラグを立てて以降は <-chResults の結果を破棄しながらワーカーの終了を待ちます。キャンセル後は chItems への送出も止まるので、すぐにワーカーは終了するでしょう。

後は Goroutine を使わないバージョンで実装してたとおり、一定の件数毎にバッファのクリア処理をしています。当然ここでもエラーが発生する場合があると思いますが、その場合は item.ProcessItem() のエラー処理と同様の以下の処理を書いてあげれば OK です:

// ...

if processedCount%20 == 0 {
    // 20件毎にバッファをクリアする. エラーが発生した時は他と同様以降の処理を止める
    if err := バッファのクリア処理; err != nil {
        logger.Error(fmt.Sprintf("Error detected: %e", err))
        cancel()
        failed = true
    }
}

// ...

まとめ

以上、 Goroutines と Channels を使った並行処理についてでした。 特に Channels に関しては正しく使わないと deadlock を起こしたり、 Goroutine がいつまでも消えずにリークし続けたりとハマりどころが多いので注意が必要です。

また、冒頭でお知らせしたリポジトリでは今回の実装を少しだけキレイにした物を公開しています。 go run main.go --concurrency 20 --verbose の様に --verbose を付けて並行実行をすると、ワーカーの状態等が逐次出力されるので内容を理解しやすいです。

go run main.go –concurrency 20 –verbose の出力結果

参考

← 前の投稿

OpenCVで画像処理を試す

次の投稿 →

SQL WINDOW関数を用いた重複レコードの排除

コメントを残す