リモート開発メインのソフトウェア開発企業のエンジニアブログです

Kinesis Data Firehose の PutRecordBatch のリトライ方法

AWS の Kinesis Data Firehose の PutRecordBatch を使うにあたって、失敗したレコードのリトライに関する日本語の記事があまり多くなかったので今回書いてみました。

使用する言語は Go です。

PutRecordBatch のリトライの必要性

PutRecordBatch (公式) や Amazon Kinesis Data Firehose Quota を見ると、 Delivery Stream には多くの制限がある事が分かります。以下はその一例です:

  • (東京リージョン) Direct PUT を使う場合、100,000レコード/秒, 1,000リクエスト/秒, 1 MiB/秒と言う Quota がデフォルトで設定されている
    • AWS にリクエストする事で緩和する事もできる
  • PutRecordBatch リクエストは1度に500レコードまで, また1レコードの Base64 エンコード前の最大サイズは 1MiB まで、またリクエスト全体で 4MB までと言う制限がある
    • これは緩和不可

この様な制限があり、実際に Delivery Stream に PutRecordBatch で大量にレコードを送信していると、たまにスロットリングが発生します。

AWS 管理コンソールより。レコードがスロットリングされているのが分かる

送信するデータ量をプログラムで制御できるのであれば問題無いのですが、そうは言ってもスロットリングには備えておくに越したことはありません。

Moba Pro

リトライ方法

PutRecordBatch API はオールオアナッシングになっていないので、リクエストに含めたレコードの1部だけがスロットリングにより拒否される事があります。従って、Delivery Stream の宛先が同一レコードの投入に対して冪等になっていない限り、成功したものを省いてスロットリングされたレコードのみをリトライする必要があります。

以下は PutRecordBatch API のレスポンス本文の生データ (JSON) の例になります:

{
   "Encrypted": false,
   "FailedPutCount": 2,
   "RequestResponses": [ 
      { 
         "ErrorCode": null,
         "ErrorMessage": null,
         "RecordId": "GXSJnnv4qjz44sJacT6TCPDw2BjCHG5ml1pD4l2..."
      },
      { 
         "ErrorCode": null,
         "ErrorMessage": null,
         "RecordId": "1RMbYROFhKyhVDmS5UOfI70E3YJc5+tNcEwgS1w..."
      },
      { 
         "ErrorCode": null,
         "ErrorMessage": null,
         "RecordId": "fSEJDBDMPWNg9pfmBug1q/Paz/QV2a2VCU2/DPP..."
      },
      { 
         "ErrorCode": "ServiceUnavailableException",
         "ErrorMessage": "Slow down.",
         "RecordId": null
      },
      { 
         "ErrorCode": "ServiceUnavailableException",
         "ErrorMessage": "Slow down.",
         "RecordId": null
      }
   ]
}

上記は、5レコードを PutRecordBatch API に送信した結果ですが、 RequestResponses を見ての通り最後の2件が送信に失敗しています。

PutRecordBatch (公式) にも記載されていますが、成功したレコードは RecordId にレコード毎にユニークな ID が格納されます。逆に失敗したレコードは RecordId は null で、代わりに ErrorCode や ErrorMessage が格納されます。

また、 RequestResponses はリクエスト時に渡したレコードと 同じ順番で 対応する結果が格納されています。従って、エラーになったレコードと同じインデックスを持つレコードだけを、リクエスト時に渡したレコードの中から取り出せば良さそうです。

リトライ実装

方針が決まったので早速実装します。以下は実際のコード例になります:

func putRecordBatchWithRetry(svc *firehose.Firehose, deliveryStream string, records []*firehose.Record, maxRetries int) error {
	for attempt := 1; attempt <= maxRetries; attempt++ {
		println(fmt.Sprintf("Putting %d records to delivery stream %s (attempt %d)", len(records), deliveryStream, attempt))

		output, err := svc.PutRecordBatch(&firehose.PutRecordBatchInput{
			DeliveryStreamName: aws.String(deliveryStream),
			Records:            records,
		})
		if err != nil {
			return fmt.Errorf("failed to request PutRecordBatch API to delivery stream %s with %d records: %w", deliveryStream, len(records), err)
		}

		if *output.FailedPutCount == 0 {
			println(fmt.Sprintf("Put %d records to delivery stream %s (attempt %d)", len(records), deliveryStream, attempt))

			return nil
		}

		// リトライ用のレコードを作り直す
		retryRecords := make([]*firehose.Record, *output.FailedPutCount)
		counter := 0
		for i, r := range output.RequestResponses {
			if r.RecordId != nil {
				continue
			}
			if *r.ErrorCode != "ServiceUnavailableException" {
				return fmt.Errorf("unexpected error: %s", *r.ErrorCode)
			}

			retryRecords[counter] = records[i]
			counter++
		}
		records = retryRecords

		sleepDur := exponentialBackoffAndJitter(attempt)

		println(fmt.Sprintf("Failed to put %d records, we will retry it %v later...", len(records), sleepDur))
		time.Sleep(sleepDur)
	}

	return fmt.Errorf("%d times failed", maxRetries)
}

func exponentialBackoffAndJitter(attempts int) time.Duration {
	const minSleep = 500 * time.Millisecond
	const maxSleep = 5000 * time.Millisecond

	sleepTime := float64(minSleep) * math.Pow(2, float64(attempts-1))
	if sleepTime > float64(maxSleep) {
		sleepTime = float64(maxSleep)
	}

	// Add some jitter to the sleep time
	sleepTime += float64(rand.Int63n(int64(minSleep)))

	return time.Duration(sleepTime)
}

因みに exponential backoff のアルゴリズム部分は ChatGPT に書かせた物を一部修正した物を使いました。便利な世の中です。

これを、以下のコードを使って特に Sleep を入れず間髪を入れずにレコードを大量に投入してみました:

var svc *firehose.Firehose // 公式 doc とかを参照して事前に初期化しておく事.

for i := 0; i < 10; i++ {
	records := make([]*firehose.Record, 500)
	for j := 0; j < 500; j++ {
		records[j] = &firehose.Record{
			Data: []byte(`{"id":"` + fmt.Sprintf("%03d-%03d", i+1, j+1) + `","msg":"` + strings.Repeat("a", 1024) + `"}`),
		}
	}

	if err := putRecordBatchWithRetry(svc, "my-delivery-stream", records, 10); err != nil {
		panic(err)
	}
}

以下がその結果になります:

Putting 500 records to delivery stream my-delivery-stream (attempt 1)
Put 500 records to delivery stream my-delivery-stream (attempt 1)
Putting 500 records to delivery stream my-delivery-stream (attempt 1)
Failed to put 500 records, we will retry it 1.062s later...
Putting 500 records to delivery stream my-delivery-stream (attempt 2)
Put 500 records to delivery stream my-delivery-stream (attempt 2)
Putting 500 records to delivery stream my-delivery-stream (attempt 1)
Failed to put 7 records, we will retry it 1.186s later...
Putting 7 records to delivery stream my-delivery-stream (attempt 2)
Put 7 records to delivery stream my-delivery-stream (attempt 2)
Putting 500 records to delivery stream my-delivery-stream (attempt 1)
Failed to put 500 records, we will retry it 1.236s later...
Putting 500 records to delivery stream my-delivery-stream (attempt 2)
Failed to put 500 records, we will retry it 2.433s later...
Putting 500 records to delivery stream my-delivery-stream (attempt 3)
Put 500 records to delivery stream my-delivery-stream (attempt 3)
Putting 500 records to delivery stream my-delivery-stream (attempt 1)
Put 500 records to delivery stream my-delivery-stream (attempt 1)
Putting 500 records to delivery stream my-delivery-stream (attempt 1)
Failed to put 7 records, we will retry it 1.011s later...
Putting 7 records to delivery stream my-delivery-stream (attempt 2)
Put 7 records to delivery stream my-delivery-stream (attempt 2)
Putting 500 records to delivery stream my-delivery-stream (attempt 1)
Put 500 records to delivery stream my-delivery-stream (attempt 1)
Putting 500 records to delivery stream my-delivery-stream (attempt 1)
Put 500 records to delivery stream my-delivery-stream (attempt 1)
Putting 500 records to delivery stream my-delivery-stream (attempt 1)
Put 500 records to delivery stream my-delivery-stream (attempt 1)
Putting 500 records to delivery stream my-delivery-stream (attempt 1)
Put 500 records to delivery stream my-delivery-stream (attempt 1)

実際にリトライが発生しましたが、全件無事に投入できた様です。
後で宛先のデータを確認してみましたが、5000件重複も過不足もなくデータが連携されていました。

← 前の投稿

Raspberry pi でBME280の値をRealtimeDatabaseに格納してみた

次の投稿 →

【Go言語】Apache Arrowを使ってParquetファイルを書き込む

コメントを残す