全てのメッセージを時系列にまとめて取得できていたchannels.historyが廃止されました。この仕様変更により、APIを利用しているシステムに結構な改修をしなければならず大変だったというお話です。
新しいAPIは2つに別れていてconversations.history(スレッド内の子メッセージは取得できない)とconversations.replies(スレッド内の子メッセージと親メッセージが取得可能)があります。別れたことにより、なかなか面倒な操作をしなければならない状況になりました。
- https://api.slack.com/methods/conversations.history
- https://api.slack.com/methods/conversations.replies
DBやElasticsearchに全メッセージを貯め込んで2次利用するためのシステムでの改修で悩んだことをつらつらと書いていきます。
考えなければならないこと
- conversations.historyのうちスレッド化されているメッセージとされていないメッセージをチェックする仕組みが必要です。(親メッセージの投稿から時間がかなり経過した後に子メッセージが投稿された場合にも対応する必要があったため)
- Slackは1分間に約50回くらいのAPI呼び出し制限があるので、スレッド化されているメッセージにのみconversations.repliesを実行するようにして、無駄な呼び出しを減らすようにする。
- 全チャンネルを一気に読み込もうとすると上記の制限に引っかかる可能性があるため、呼び出し上限の制御を入れる必要がある。
- conversations.repliesの中の子メッセージはある程度時間が経ってから増える可能性があるため、1回読み込んだ後も何回かは読み込む必要がある。
- conversations.repliesの中の子メッセージが10000件あったりする場合を考慮して、複数回に分けてバッチを実行する仕組みが必要です。(読み込み済みの場所を記録しておいて、次回のバッチではそこから読み込む)
- 他にも色々あるので全ては書ききれませんが、メッセージが削除されたり、更新されたりした場合の考慮なども必要です。
実際の設計
PostgreSQLのテーブルを2つ追加して、Slackのメッセージの読み込み状態を保持することにしました。
1つ目はスレッド化されているメッセージを監視対象にするために使用するテーブル、2つ目はスレッド内の子メッセージの最終読み込み地点を保持するために使用するテーブルです。具体的には以下のような感じです。
スレッド化されている親メッセージをhistoryの中から見つけながら、最後にこの存在チェックをした親メッセージのts(タイムスタンプ)を以下のテーブルにnext_tsとして保持しておく。次回の存在チェックの開始地点をこのnext_tsとすることで、存在チェックを複数回に分けて行い、処理量や負荷を減らす。
CREATE TABLE fetch_status_slack_thread_exists (
organization_id integer NOT NULL,
channel_id varchar(20) NOT NULL,
next_ts text NOT NULL,
fetched_at timestamp without time zone NOT NULL DEFAULT CURRENT_TIMESTAMP,
created_at timestamp without time zone NOT NULL DEFAULT CURRENT_TIMESTAMP,
primary key (organization_id, channel_id)
);
organization_id | channel_id | next_ts | fetched_at | created_at
-----------------+------------+-------------------+-------------------------+-------------------------
3 | CB9SN1FDW | 1539347416.000100 | 2021-02-18 01:40:13.783 | 2021-02-08 05:35:14.587
16 | G0M6NHYLC | 1612365407.188300 | 2021-02-18 01:40:14.153 | 2021-02-08 06:30:31.54
上記の親メッセージのスレッド存在チェックでスレッドありと判定されたメッセージの情報、状態を以下のテーブルに保存します。1回のバッチでスレッド内の子メッセージを100件読み込む仕様にしたため、それを超える子メッセージがある場合はcurrent_thread_tsに最後に読み込んだメッセージのtsを保持し、次回のバッチで使います。
CREATE TABLE fetch_status_slack_thread (
organization_id integer NOT NULL,
channel_id varchar(20) NOT NULL,
ts text NOT NULL,
current_thread_ts text NOT NULL,
total_fetch_count integer NOT NULL,
status smallint NOT NULL,
conversation_type text NOT NULL,
fetched_at timestamp without time zone NOT NULL DEFAULT CURRENT_TIMESTAMP,
created_at timestamp without time zone NOT NULL DEFAULT CURRENT_TIMESTAMP,
primary key (organization_id, channel_id, ts)
);
organization_id | channel_id | ts | current_thread_ts | fetched_reply_count | total_fetch_count | status | conversation_type | fetched_at | created_at
-----------------+------------+-------------------+-------------------+---------------------+-------------------+--------+-------------------+-------------------------+-------------------------
3 | CB9SN1FDW | 1612172072.002700 | 1612172481.004000 | 0 | 48 | 2 | Channel | 2021-02-18 04:55:14.354 | 2021-02-08 05:35:12.567
16 | CB9SN1FDW | 1611971508.000100 | 1611971508.000100 | 0 | 98 | 3 | Channel | 2021-02-18 04:50:12.582 | 2021-02-08 05:35:12.593
-----------------
status is '0:Default 1:WithThread 2:Fetching 3:FetchingLatest'
そして、ここからさらにメッセージの更新や削除にも対応しようとするともう少し工夫が必要になる気がします。Slackと取り込みシステムの状態の完全一致を実現させることについては出来なくはないでしょうが、コストに見合わない面も出てきてしまうので、例えば3日以上経過したメッセージの更新削除は行わないなどの妥協が必要だと思います。
まとめ
やはり2つのAPIに別れたことで、API呼び出し制限の問題がさらに実装の複雑さを上げてしまったということは否めないですね。
ただ、既存のシステムの構成を変えずに無理やり、2つのAPI方式に切り替えたことで無理が生じている部分もあるため、エクスポートするバッチをゼロから考え直せば良い実装をすることも可能かもしれませんね。
次期課題としたいと思います!