リモート開発メインのソフトウェア開発企業のエンジニアブログです

CloudWatch Logs + Spark SQL で Rails ActiveJob の実行時間を集計してみた

Rails には主にバックグラウンドで処理を実行できる ActiveJob と言う機能がありますが、先日とあるプロジェクトで、各ジョブの実行時間を集計する必要がありました。幸いデフォルトで ActiveJob はジョブの実行時間をログに出力するのでこのログを元に集計を行いました。ワーカーは AWS Fargate で動作しており、実行ログは CloudWatch Logs に流していたので、このログを Spark SQL を使って集計したのですが、生成 AI のおかげもあり簡単に集計できたのでその時にやった事を共有しておきます。

Spark のコードは Spark SQL で集計 で紹介しています。尚、 Rails のバージョンは 7.1 ですがよほど古い物で無い限りは大体のバージョンで使えると思います。

ログの形式について

まずは当プロジェクトの ActiveJob で出力されるログのフォーマット例を出しておきます。

{
  "time": "2024-06-04T17:09:06.577425+09:00",
  "level": "INFO",
  "msg": "Performed MyJob (Job ID: 26920a57-77de-48f0-889b-d850624a5945) from AmazonSqs(default) in 571.68ms"
}

ご覧の通り、ログのエントリは JSON により構造化されています。実際にはもっと沢山のフィールドがありますが割愛します。”msg” の部分がメッセージ部分にあたり、この内容は ActiveJob により出力されています。尚、後述しますがログの形式は JSON である必要はありません。Rails のデフォルトの設定ではおそらくこんな感じの形式で出力されると思います:

I, [2024-06-04T17:09:06.577425 #12345] INFO -- : Performed MyJob (Job ID: 26920a57-77de-48f0-889b-d850624a5945) from AmazonSqs(default) in 571.68ms

いずれにせよ、このログはジョブが終了する時に出力されますが、末尾に実行時間が記載されているのでこれを集計に使えば良さそうです。

CloudWatch Logs のログを S3 にエクスポートする

これはコマンド一発でできます。以下はコマンド例なのですが、日本時間で 2024-04-01 00:00:00 ~ 2024-06-30 23:59:59 までを対象としています。

aws logs create-export-task \
    --log-group-name "ロググループ名" \
    --log-stream-name-prefix "ストリーム名のプレフィックス(省略可)" \
    --from 1711897200000 \
    --to 1719759599999 \
    --destination "ログの出力先バケット名" \
    --destination-prefix "active-job-worker-logs-20240401-20240630" \
    --region ap-northeast-1

ログの量によっては時間がかかるので待ちましょう。

Spark SQL で集計

Spark は Scala や Python などの言語で大規模なデータの処理を簡単に実装できるフレームワークです。(Spark SQL は Spark の機能の1つ) AWS には Glue と言う、 Spark のコードをサーバーレスで実行できるサービスがあるので今回はそれを使う事にしました。

今回の Spark SQL コードサンプルを Gist に置いておきました。 おそらく大抵のプロジェクトで使えると思います。これ以降、1ずつポイントを抑えていきますので興味のある方はどうぞ。

CloudWatch Logs のエクスポートは余分なタイムスタンプが付与される

CloudWatch Logs のエクスポートは、各ログエントリを行に持つファイルを **/*.gz の形式で出力します。我々の実行ログは JSON に構造化されており、 Spark はデフォルトで gz ファイルを認識できるので、何もしなくても spark.read.json で読み込めそうな気がしますが、実はこれはできません。

org.apache.spark.sql.AnalysisException: Unable to infer schema for JSON. It must be specified manually.

spark.read.json の結果

その理由が、各行エントリが次の様な形式で出力されるからです:

2024-06-04T17:00:06.577Z {"time":"2024-06-04T17:09:06.577425+09:00","level":"INFO","msg":"Performed MyJob (Job ID: 26920a57-77de-48f0-889b-d850624a5945) from AmazonSqs(default) in 571.68ms"}

各行エントリの形式

これは CloudWatch Logs を Management Console で見た場合も同じですが、先頭に CloudWatch Logs に記録されたタイムスタンプが付与されてしまうからです。この為、 Spark で処理する際には自前で行データの加工が必要になります。先述したログの形式が JSON になっていなくても大丈夫とはこの事だったのです。

具体的には Glue コード上の以下の部分で処理しています:

// ログのパターンにマッチする正規表現
val logPattern: Regex = """Performed ([\w:]+) \(Job ID: [\w-]+\) from [\w\(\)]+ in (\d+\.\d+)ms""".r

// ログのエントリをテキストとして読み込む. YOUR-BUCKET の部分を置き換える.
val logDF = spark.read.textFile("s3://YOUR-BUCKET/**/*.gz")

// 読み出したテキストの DF の中からパターンに適合する物のみ抽出し、ジョブ名と実行時間を取り出す
val jobDF = logDF.flatMap { line =>
  logPattern.findFirstMatchIn(line).map { m =>
    (m.group(1), m.group(2).toDouble)
  }
}.toDF("job", "exec_time")

最初に実行時間が記載されている ActiveJob のログにマッチする正規表現を用意しておきます。お使いのシステムに応じて正規表現を調整して下さい。 まず Spark で該当のバケットからテキストとして DF を読み込みます。行エントリの形式がなんであっても型は Dataset[String] になります。この DF に対して1行ずつ正規表現を使ってパターンにマッチする物だけジョブ名と実行時間を取り出し、 Some((String, Double)) として返します。マッチしない物は None なので、 flatMap によって取り除かれます。最終的にこの2つの列に “job”, “exec_time” と名付けた DF に変換します。後は自分の好きな処理を行いましょう。

今回はこの後 jobDF を tempView に移して SQL を実行しています。Glue での実行結果として、標準出力に集計結果が出力されました:

+--------------------------------+-----------+-------------+--------------+----------------+--------------+--------------+--------------+
|job                             |job_count  |avg_exec_time|exec_time_25th|exec_time_median|exec_time_75th|exec_time_90th|exec_time_99th|
+--------------------------------+-----------+-------------+--------------+----------------+--------------+--------------+--------------+
|MyJob1                          |2547843    |5432.7       |3021.56       |4587.92         |7234.18       |9876.43       |18765.32      |
|MyJob2                          |1234576    |1987.65      |287.43        |456.78          |789.12        |1234.56       |54321.09      |
|MyJob3                          |1123359    |3456.78      |1876.54       |2345.67         |3987.65       |5678.90       |19876.54      |
|MyJob4                          |1098728    |45678.90     |23456.78      |34567.89        |56789.01      |78901.23      |156789.01     |
|MyJob5                          |54317      |18765.43     |3456.78       |7890.12         |23456.78      |56789.01      |123456.78     |
|MyJob6                          |45592      |34.56        |15.67         |23.45           |28.90         |37.89         |187.65        |
|MyJob7                          |34486      |567.89       |123.45        |234.56          |456.78        |1234.56       |3456.78       |
|MyJob8                          |23415      |12.34        |5.67          |8.90            |13.45         |18.76         |34.56         |
|MyJob9                          |12307      |789.01       |567.89        |678.90          |890.12        |1234.56       |2345.67       |
|MyJob10                         |8694       |23456.78     |12345.67      |18765.43        |28901.23      |45678.90      |78901.23      |
|MyJob11                         |6523       |56.78        |23.45         |34.56           |45.67         |78.90         |234.56        |
|MyJob12                         |3218       |98.76        |45.67         |67.89           |89.01         |123.45        |345.67        |
+--------------------------------+-----------+-------------+--------------+----------------+--------------+--------------+--------------+

コードの生成について

今回の作ったスクリプトですが、冒頭にも書きましたがほとんど全て GPT-4o が生成したコードを流用しています。コメント欄にプロンプトを書いておいたので興味のある方はご覧ください。(最近 Spark をあまり触っておらず API をほとんど覚えていなかったのでとても助かりました)

まとめ

今回は CloudWatch Logs 上に出力した ActiveJob のログを Spark を使う事で各ジョブの実行時間を集計しました。特に Spark の使い捨てのコードを生成 AI を使って殆ど加工する事なく使えてとても簡単に行う事ができました。生成 AI が普及してだいぶ経ちましたが、この様に使い捨てのコードを書くときはとても重宝しますね。

← 前の投稿

生成 AI にシステム構成図を描かせる

次の投稿 →

Dockerを再インストールしたら「ext4.vhdx」が1/10になった件

コメントを残す