バッチジョブとかを実装する場合、多くのケースでは冪等性を持たせる事が多いと思いますが、中にはジョブの性質上、一定期間内に1度しか起動してはならない場合もあったりします。 (具体的なケースの例はまた別の機会に書きます)
1度しか起動しない仕組みをインフラだけで作るのは結構難しいので、アプリケーション側でもロックを使う等して重複実行に備えたりするのが良いです。ロックと言うと RDB とかの悲観ロックとかを使えると手っ取り早いのですが、 RDB を使わない場合別の仕組みを使ったロック処理を検討しても良さそうです。 (勿論 RDB でロック専用のテーブルとか作っても良いですが)
AWS の DynamoDB を使うと結構簡単に実装できるので、今回は Ruby での実装例を紹介します。
実装方針
DynamoDB の条件付き書き込みの整合性を担保とした実装にします。
因みに DynamoDB の条件付き書き込みを使ったロックの実装例 (Java) は AWS 公式ブログで紹介されていて、それを Go で実装した方のブログもありますので詳しくはそちらをご覧下さい。
今回はこれらの記事で書かれている物よりもっとシンプルな内容にしていきます。
同時並列に実行される可能性がある処理をブロックで包み、ロック取得に成功したプロセスやスレッドのみがブロック内の処理を実行できる、みたいな形にしましょう。
require 'digest'
# user_id に対して何らかの処理をする.
def do_something_with_lock(user_id)
# user_id の文字列表現に対する SHA-1 をキーとする
# 例えば user_id=123 なら key="5747890eb3aae2835312596ca497111b8c858507"
key = Digest::SHA1.hexdigest "do-smething-for-#{user_id}"
# ttl=秒. user_id の処理は60秒間は1回しか実行されない事が保証される
Lock.acquire_lock(key, ttl: 60) do
# do something
end
end
この様な感じのインターフェースにしましょう。さて、今度は Lock#acquire_lock
を実際に DynamoDB を使って実装していきます。
class Lock
class << self
def acquire_lock(key, ttl: 60)
# テーブル名や DynamoDB クライアントなどは再利用の為に静的メソッドではなくインスタンス化した方がいいが今回は割愛
table_name = 'job_locks'
dynamodb = Aws::DynamoDB::Client.new
current_time = Time.now.to_i
expires_at = current_time + ttl
begin
dynamodb.put_item({
table_name: table_name,
item: {
'Key' => key,
'ExpiresAt' => expires_at
},
condition_expression: 'attribute_not_exists(#key) OR #expiresAt <= :now',
expression_attribute_names: {
'#key' => 'Key',
'#expiresAt' => 'ExpiresAt'
},
expression_attribute_values: {
':now' => current_time
}
})
if block_given?
yield
end
rescue Aws::DynamoDB::Errors::ConditionalCheckFailedException
nil
end
end
end
end
condition_expression が示す通り、テーブルに与えられたキーを持つアイテムが存在しないか、既に期限切れでない場合のみにアイテムの作成 or 更新が成功します。(既にレコードが存在していて期限切れの場合は期限が上書きされる)
このオペレーションが複数箇所から同時に実行されたとしても、先述のドキュメントが示す通り1つの実行系だけがアイテムを作成 or 更新でき、失敗した実行系ではコードが示す通り Aws::DynamoDB::Errors::ConditionalCheckFailedException
が raise されます。
今回は非常にシンプルな設計なので、成功した場合のみブロックの処理を実行する形にしています。(引数は無し)
テスト
実際にテストしてみましょう。 AWS の認証を済ませたターミナルを複数開いて、最初のスニペットに貼った Ruby のスクリプトを同時に起動してみます。
すると、以下の通り1つのコンソールでのみ do something!
が出力される筈です。
尚、今回のサンプルはこちらの GitHub リポジトリに載せていますので、興味がある方は実行してみて下さい。
https://github.com/issei-m/dynamodb-locking-test
DynamoDB の TTL について
DynamoDB にはアイテムの TTL を設定する機能が付いています。
以下、実際に job_locks テーブルの ExpiresAt
に TTL を設定した例です:
その他の機能
取得したロックを解放したり、あるいは TTL を延長したい事もあるかと思います。
今回の例で言うと、ロックに成功した場合はキー情報と DynamoDB のアイテム更新 (UpdateItem) や 削除 (DeleteItem) を実行するメソッドを含めたクラスを作ってそのインスタンスをブロックに渡してあげると便利そうです。
繰り返しますが今回はシンプルに DynamoDB を使ったロックの仕組みだけの紹介に留めたいので具体的な実装例は出しましせんが、利用側のコードはこんなイメージです:
Lock.acquire_lock(key, ttl:60) do |lock|
lock.refresh # TTL を延長する (この場合60秒), DynamoDB の UpdateItem を使う
lock.release # ロックを解放する, DynamoDB の DeleteItem を使う
end
また、最初に記載した AWS 公式ブログで紹介されている awslabs/amazon-dynamodb-lock-client は非常にリッチな実装になっているので興味がある方は見てみてはいかがでしょうか。