リモート開発メインのソフトウェア開発企業のエンジニアブログです

DynamoDB でロックを実装する

バッチジョブとかを実装する場合、多くのケースでは冪等性を持たせる事が多いと思いますが、中にはジョブの性質上、一定期間内に1度しか起動してはならない場合もあったりします。 (具体的なケースの例はまた別の機会に書きます)
1度しか起動しない仕組みをインフラだけで作るのは結構難しいので、アプリケーション側でもロックを使う等して重複実行に備えたりするのが良いです。ロックと言うと RDB とかの悲観ロックとかを使えると手っ取り早いのですが、 RDB を使わない場合別の仕組みを使ったロック処理を検討しても良さそうです。 (勿論 RDB でロック専用のテーブルとか作っても良いですが)

AWS の DynamoDB を使うと結構簡単に実装できるので、今回は Ruby での実装例を紹介します。

実装方針

DynamoDB の条件付き書き込みの整合性を担保とした実装にします。

因みに DynamoDB の条件付き書き込みを使ったロックの実装例 (Java) は AWS 公式ブログで紹介されていて、それを Go で実装した方のブログもありますので詳しくはそちらをご覧下さい。

今回はこれらの記事で書かれている物よりもっとシンプルな内容にしていきます。
同時並列に実行される可能性がある処理をブロックで包み、ロック取得に成功したプロセスやスレッドのみがブロック内の処理を実行できる、みたいな形にしましょう。

require 'digest'

# user_id に対して何らかの処理をする.
def do_something_with_lock(user_id)
  # user_id の文字列表現に対する SHA-1 をキーとする
  # 例えば user_id=123 なら key="5747890eb3aae2835312596ca497111b8c858507"  
  key = Digest::SHA1.hexdigest "do-smething-for-#{user_id}" 

  # ttl=秒. user_id の処理は60秒間は1回しか実行されない事が保証される
  Lock.acquire_lock(key, ttl: 60) do
    # do something
  end
end

この様な感じのインターフェースにしましょう。さて、今度は Lock#acquire_lock を実際に DynamoDB を使って実装していきます。

class Lock
  class << self
    def acquire_lock(key, ttl: 60)
      # テーブル名や DynamoDB クライアントなどは再利用の為に静的メソッドではなくインスタンス化した方がいいが今回は割愛
      table_name = 'job_locks'
      dynamodb = Aws::DynamoDB::Client.new

      current_time = Time.now.to_i
      expires_at = current_time + ttl

      begin
        dynamodb.put_item({
                            table_name: table_name,
                            item: {
                              'Key' => key,
                              'ExpiresAt' => expires_at
                            },
                            condition_expression: 'attribute_not_exists(#key) OR #expiresAt <= :now',
                            expression_attribute_names: {
                              '#key' => 'Key',
                              '#expiresAt' => 'ExpiresAt'
                            },
                            expression_attribute_values: {
                              ':now' => current_time
                            }
                          })

        if block_given?
          yield
        end
      rescue Aws::DynamoDB::Errors::ConditionalCheckFailedException
        nil
      end
    end
  end
end

condition_expression が示す通り、テーブルに与えられたキーを持つアイテムが存在しないか、既に期限切れでない場合のみにアイテムの作成 or 更新が成功します。(既にレコードが存在していて期限切れの場合は期限が上書きされる)

このオペレーションが複数箇所から同時に実行されたとしても、先述のドキュメントが示す通り1つの実行系だけがアイテムを作成 or 更新でき、失敗した実行系ではコードが示す通り Aws::DynamoDB::Errors::ConditionalCheckFailedException が raise されます。

今回は非常にシンプルな設計なので、成功した場合のみブロックの処理を実行する形にしています。(引数は無し)

Moba Pro

テスト

実際にテストしてみましょう。 AWS の認証を済ませたターミナルを複数開いて、最初のスニペットに貼った Ruby のスクリプトを同時に起動してみます。

すると、以下の通り1つのコンソールでのみ do something! が出力される筈です。

4つ同時にロック取得を試みている。結果、右上のターミナルが取得に成功。

尚、今回のサンプルはこちらの GitHub リポジトリに載せていますので、興味がある方は実行してみて下さい。

https://github.com/issei-m/dynamodb-locking-test

DynamoDB の TTL について

DynamoDB にはアイテムの TTL を設定する機能が付いています。

参考: 仕組み: DynamoDB の有効期限 (TTL)

以下、実際に job_locks テーブルの ExpiresAt に TTL を設定した例です:

ExpiresAt には期限を表す unixtime が整数で入る。その時間を過ぎると DynamoDB によって自動で削除される

その他の機能

取得したロックを解放したり、あるいは TTL を延長したい事もあるかと思います。
今回の例で言うと、ロックに成功した場合はキー情報と DynamoDB のアイテム更新 (UpdateItem) や 削除 (DeleteItem) を実行するメソッドを含めたクラスを作ってそのインスタンスをブロックに渡してあげると便利そうです。

繰り返しますが今回はシンプルに DynamoDB を使ったロックの仕組みだけの紹介に留めたいので具体的な実装例は出しましせんが、利用側のコードはこんなイメージです:

Lock.acquire_lock(key, ttl:60) do |lock|
  lock.refresh # TTL を延長する (この場合60秒), DynamoDB の UpdateItem を使う
    
  lock.release # ロックを解放する, DynamoDB の DeleteItem を使う
end

また、最初に記載した AWS 公式ブログで紹介されている awslabs/amazon-dynamodb-lock-client は非常にリッチな実装になっているので興味がある方は見てみてはいかがでしょうか。

← 前の投稿

Google Pixel Watch でHello Worldを表示してみた

次の投稿 →

docker run/exec 時にキャリッジリターンが混じる件の解決方法

コメントを残す