Tech Racho エンジニアの「?」を「!」に。
  • Ruby / Rails以外の開発一般

利用目的別 Kotlin Flow/Channel まとめ

今ではもう一般的となった感のある Kotlin コルーチンの Flow (と Channel) についての記事になります。

今更かよという感じなんですが、利用を始めて結構経つのに未だに細かい挙動を忘れてドキュメントを見返すことが頻繁にあります。

機能の解説自体は色々なところで見かけるのですが、自分の欲しい情報にたどり着くのに時間がかかることが多いため今回記事としてまとめることにしました。

ここではコルーチンの以下の機能について、利用頻度が高いと思われる内容を利用目的別に記載して行きます。

自分が重要だと思う点についてコンパクトにまとめるのを目的としており、Flow 自体の詳細や各種メソッドの細かい説明は割愛しています。

※公式ドキュメントにはアクティブな collector を subscriber と呼ぶという記載がありますが、この記事内では全て表記を collector で統一しています

🔗SharedFlow

SharedFlow には以下のような特徴があります。

  • ブロードキャスト方式ですべての collector に値を送信する
  • SharedFlow に対する collect が正常に完了することはない
    • つまり、基本的に collect の後ろのコードが実行されることはない (実行中の CoroutineScope をキャンセル&例外キャッチなどしない限り)
  • MutableSharedFlow 生成時に以下の設定を変更可能
    • replay: 1以上の値を指定することで、collect 以前に送信された値を受け取ることが出来る
      • 例えば replay を2に設定し、1->2->3 と3回データを送信後に collect すると、直近の2件である 2 と 3 を順に受け取ることが出来る
      • デフォルトは0のため、collector がいない時に送信した値は全て失われる
    • extraBufferCapacity: replay に加えて追加でバッファリングされる値の数
      • バッファ領域が残っている場合、後述の onBufferOverflow の設定によらず emit は suspend しない
      • デフォルトは0
      • collector がいない時に値を送信した場合、この領域は使用されず replay キャッシュにのみ値を保存&古い値の置き換えが行われる
    • onBufferOverflow: バッファが溢れた場合の動作を BufferOverflow で設定する
      • SUSPEND(デフォルト): バッファがいっぱいの場合 emit が suspend する
        • replay と extraBufferCapacity が共に0の場合、collector がいない場合の emit は suspend しないが、collector が以前に送信した値を処理中だと suspend する
      • DROP_OLDEST: オーバーフロー時にバッファ内の最も古い値を削除して新しい値を追加し、emit は suspend はしない (replay と extraBufferCapacity が共に0の場合は無効)
      • DROP_LATEST: オーバーフロー時はバッファの内容は変化せず (つまり最新の送信値が破棄される)、 emit は suspend はしない (replay と extraBufferCapacity が共に0の場合は無効)

デフォルト設定の場合、以下のような状況で利用します。

  • collector がいない時に送信した値は失われて問題ない
  • collector がいる時に送信した値は基本的に全て処理したい
    • 値の送信頻度より collector の処理時間の方が長い可能性がある場合、emit がなるべく suspend しないように extraBufferCapacity や onBufferOverflow の設定変更が有効か検討する
  • 複数の collector がいる場合、送信した値を全ての collector で受け取りたい

デフォルト設定の動作を図で表すと以下のような形となります。

MutableSharedFlow を SharedFlow として公開する場合は以下のように asSharedFlow を利用します。

private val _events = MutableSharedFlow<String>()
val events = _events.asSharedFlow()

単純に val events: SharedFlow = _events とすることも可能ですが、これだと利用側で MutableSharedFlow にキャストして利用することが可能なため、より安全に公開するには asSharedFlow を利用すべきとなります。

🔗StateFlow

StateFlow は前述の SharedFlow を継承して value を生やしたインタフェースとなります。

その名の通り現在の状態を表すのに適しており、MutableStateFlow は MutableSharedFlow の各種設定をその目的に適した値に固定&処理を最適化したものと考えることができ、以下のような特徴があります。

  • 常に値を1つ持っており、初期値が必須
  • collect 時に最新の値を1件受け取る (replay=1 の挙動)
  • 値の送信 (設定) が value への代入で行える (suspend fun ではない、当然 suspend することはない)
  • 現在値の取得は value の参照で行える (suspend fun ではない)
  • collector が連続して同じ値を受け取ることはない
  • collector の処理中に新たな値が設定された場合、バッファから溢れた途中の値は破棄され、collector は最新の値のみ受け取る
    • 前述の replay=1 に加え、onBufferOverflow=BufferOverflow.DROP_OLDEST の挙動

以下のような状況で利用します。

  • collect 開始直後に必ず値を受け取りたい
  • 最新の値が重要であり、途中の値は失われて問題ない
  • 同じ値が連続で設定された際の通知は不要

動作を図で表すと以下のような形となります。

MutableStateFlow を StateFlow として公開する場合は以下のように asStateFlow を利用します。

private val _state = MutableStateFlow<String>("initial value")
val state = _state.asStateFlow()

キャストではなく asStateFlow を利用すべき理由は SharedFlow と同様。

🔗Channel

Channel は Flow ではなく SendShannelReceiveChannel を継承したインタフェースです。
値を受け取る側も collector ではなく receiver と呼ばれます。

  • 送信した値は1度のみ通知される
    • 複数の receiver が同時に存在しても1つの receiver しか値を受け取らない
  • Flow と違い SendChannel で close、ReceiveChannel で cancel が出来る
    • receiver は for (it in channel) などで受信処理が出来るが、この channel が close されると for 文を抜ける
  • Channel 生成時に以下の設定を変更可能
    • capacity
      • RENDEZVOUS(デフォルト): バッファを持たず、receiver がいない時の send は suspend する
      • BUFFERED: デフォルト容量のバッファを持つ、多分使わないので詳細は割愛
      • UNLIMITED: 容量無制限(利用可能なメモリのみによって制限)のバッファを持ち、send は決して suspend しない
      • CONFLATED: 1つのバッファを持ち、send は決して suspend せず receiver は最新の値のみ受け取る (途中の値は破棄される可能性がある)
      • 正の値(UNLIMITED=Int.MAX_VALUEを除く): 固定容量のバッファを持ち、バッファがいっぱいの時に値を送信しようとすると suspend する
    • onBufferOverflow: バッファが溢れた場合の動作を BufferOverflow で設定する (capacity が UNLIMITED または CONFLATED の場合は無効)
      • 各設定値の説明は MutableSharedFlow の設定と同様なので割愛
      • capacity=1 && onBufferOverflow=DROP_OLDEST の指定は CONFLATED と同じ意味であり、CONFLATED 指定はこの組み合わせのショートカットとなる
    • onUndeliveredElement: 値が送信されたが配信に失敗した場合に通知される関数を指定出来る、詳細は Channel のドキュメントを参照

Flow として公開したい場合は以下のように receiveAsFlow を利用します。

private val _event = Channel<String>()
val event = _event.receiveAsFlow()

🔗capacity に正の値(または UNLIMITED)を指定した場合

主な特徴は前述の通りで、例えば BlockingQueue を利用した処理の置き換えに利用することができます。

ブロッキング操作の変わりに suspend があり、close することも可能です。

直近で利用したことがあるのは以前書いた記事に記載したダウンロードライブラリ内のキュー処理でした。
あとは画像取得のリクエストを BlockingQueue に詰めてスレッドプールで処理したことがありますが、それをコルーチンで作成した場合はそのまま Channel に置き換え出来そうです。

capacity=1 の動作を図で表すと以下のような形となります。

UNLIMITED の場合は以下のような形となります。

🔗capacity に CONFLATED を指定した場合

前述の通り capacity=1 && onBufferOverflow=DROP_OLDEST 設定と同じ意味であり、以下の特徴があります。

  • 最後に送信した値のみバッファリングされる
    • send は決して suspend されない
  • receiver は最後に送信された値のみ受信する

以下のような状況で利用します。

  • receiver がいない時に送信した値も保存しておきたい (receiver が現れた際に配信したい)
  • 最新の値が receiver によって処理される前に新たな値が送信された場合、途中の値は破棄されて問題ない
    • receiver がいる時に送信したとしても、receiver が以前の値を処理中の場合、最後に送信した値以外は破棄される
  • 送信した値は1度だけ処理したい
    • 複数の receiver が同時に存在したとしても、1つの receiver しか値を受け取らない

動作を図で表すと以下のような形となります。

この挙動はいわゆる Single Event 処理に使うことも出来ます。

実際に CONFLATED を利用した例としては以下のような仕様を実現する際にちょうど良かったです。

  • バックグラウンドで処理する WebAPI などのレスポンスに応じてメッセージを表示する
  • 複数回同時に処理が発生したとしてもメッセージは1度だけ表示すれば良い
  • コンテンツ閲覧中はメッセージ表示を抑制したい
    • 例えばコンテンツ閲覧中にバックグラウンドで行っていた処理が終了した場合、コンテンツを閉じた時にメッセージを表示したい

コンテンツ一覧画面とコンテンツ閲覧画面があるとして、コンテンツ一覧画面表示中のみ receiver が存在するようにすると、閲覧画面の裏で何回処理が実行されたとしても一覧画面表示時に1度だけ receiver の処理を行うことが出来ます。

具体的な機能で言うと以下のような感じです。

  • メンテナンス情報の表示
    • 適当なタイミングで取得処理のトリガーを引きユーザーの邪魔にならないタイミングで表示
  • WebAPI がログインセッション切れエラーを返したので再ログインを促すメッセージを表示
    • 3種の WebAPI A B C が同時実行され、それぞれでエラーが発生してもなるべく処理は1回にまとめたい

🔗capacity に RENDEZVOUS を指定した場合

Flow の方と違ってデフォルトであるこの設定で利用したことは無いので利用例の記載は割愛させていただきます。

動作を図で表すと以下のような形となります。

🔗最後に

久々に普通の技術記事を書いた気がします。

今回は図のお絵描きで大分疲れました。でもこれが無いと再確認時にパッと挙動を理解しづらいんですよね。
この図は自分が見返した時に知りたい情報をなるべく詰め込んだつもりです。

基本的には自分のためにまとめた情報を記事として公開した形ですが、同じような思いをしていた方達の役に立つ内容であれば幸いです。


CONTACT

TechRachoでは、パートナーシップをご検討いただける方からの
ご連絡をお待ちしております。ぜひお気軽にご意見・ご相談ください。