Rust Advent Calendar 2019 1日目 Rust の非同期プログラミングモデルはランタイム観点だと Go のそれに似ている

この記事は Rust Advent Calendar 2019 の1日目の記事になります.

明日は topecongiro さんの予定です.

TL;DR

去る 11/07 に Rust 1.39.0 がリリースされました. これはユーザー待望の async/await 構文が言語機能として取り込まれた安定版リリースとなります. Advent Calendar 最初の記事としては取り上げないわけにはいきません.

もう既に他の良い記事がたくさん書かれていますが, この記事ではそれらを補完する視点から説明してみようと思います.

async/await 構文自体の解説や言語機能としてどう実装するかという話は多いので,

  • 何故 async/await による非同期モデルがユーザー, 言語実装の両観点から扱い易いのか.
  • runtime で Future を効率的に実行するための工夫.

あたりの入門的な話をしようと思います.

async/await の概念自体は知っている人が対象です.

async/await 知らないよって人は, まずは OPTiM TECH BLOG -- Rustの非同期プログラミングをマスターする を読むのが良いでしょう.

いろいろな非同期プログラミングモデル

お題として, web サーバーで大量のリクエストを捌くことを考えます.

あたりが指標になります.

シングルスレッド (非同期機構なし)

ものすごく単純な場合として, 16コアの CPU を積んだサーバーに web アプリのプロセスがひとつあって, それがシングルスレッドで動いているような場合を考えてみます.

use std::net::TcpListener;

fn main() {
    let listener = TcpListener::bind("127.0.0.1:80").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        // 何かの処理をする.
    }
}

引用: https://doc.rust-lang.org/book/ch20-01-single-threaded.html

一度に100リクエストのアクセスが来たとき, このプロセスはそれを順番に処理していきます.

なので, 例えば1リクエストの処理に0.1秒かかるような場合, 100個目のリクエストは約10秒待たされることになります.

きょうびそんなことが許容されることもないでしょう. なので, 非同期に処理をする必要があります.

リクエスト毎にスレッドを立てる

リクエスト毎にスレッドを立てればどうでしょうか.

fn main() {
    let listener = TcpListener::bind("127.0.0.1:80").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        thread::spawn(|| {
            handle_connection(stream);
        });
    }
}

引用: https://doc.rust-lang.org/book/ch20-02-multithreaded.html

16個あるコアを使い切れて, 一番遅いレスポンス(最悪のレイテンシ)は 100 * 0.1 / 16 = 0.625 秒後になります.

これでいいようにも見えますが, 1リクエストあたり 0.001 秒かかる API に 10000 リクエスト来た場合はどうなるでしょうか.

この場合は同時に 10000 スレッド立つことになりますが,

  • このスレッドたちを管理するのは OS です. (よくある) OS は CPU を使いたいスレッドたちが公平に CPU を使えるように, 一定時間ごとに CPU を使えるスレッドを切り替えます. スケジューリングのコストとコンテクストスイッチのコストがかかりますが, これらはそれなりに高価です.
  • 1 OS スレッド毎にそのスレッドが使うスタック領域(メモリ)を確保する必要があります. このスタックのサイズは後から変更できないので, 十分なサイズを割り当てておく必要があります. (スタックについては Performance without the event loop にわかりやすい説明があります.)

このため, スレッド数がコア数よりも大幅に大きい場合は性能が劣化します. この問題を解決するためには, 使用する OS スレッド数を抑える必要があります.

スレッドプール

そういった用途で用いられるのがスレッドプールです.

fn main() {
    let listener = TcpListener::bind("127.0.0.1:80").unwrap();
    let pool = ThreadPool::new(16);

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }
}

引用: https://doc.rust-lang.org/book/ch20-02-multithreaded.html

これは最初に説明したシングルスレッドのものをスレッドプールのサイズ分(ここでは16個)並べたものと思うことができます. (Perl, Ruby, Python などでよくある prefork 型の並列化も非同期プログラミングモデルとしてはこれと似たようなものです.)

1リクエストあたり 0.001 秒かかる API に 10000 リクエスト来た場合, 性能劣化もなく, 最悪のレイテンシは 0.625 秒です.

おめでとう! これでコアをフルに使えるようになりました!

と言いたいところですが, これでもまだ不十分です.

fn main() {
    let listener = TcpListener::bind("127.0.0.1:80").unwrap();
    let pool = ThreadPool::new(16);

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }
}

fn handle_connection(mut stream: TcpStream) {
    let x = call_other_api();  // 外部 API 呼び出し. 0.999 秒かかる.
    process_something(x);      // このプロセスでの処理. 0.001 秒しかかからない.
}

例えば, 処理に1秒かかる API の中で外部サービスの API 呼び出しをしていて, その呼び出しに 0.999 秒かかっているとします.

このサーバー自体は殆ど処理をしていないのに, 同時に処理できるリクエストの数は最大でもスレッドプールのサイズ分です.

つまり RPS (Request Per Second; 1秒あたりに処理できるリクエスト数)は

RPS = スレッドプールのサイズ / 処理にかかる時間

になります. 上の例だと秒間 16 リクエストしか捌けません.


別の毛色の違う例としては, エディタなどの GUI アプリケーションで各コンポーネントを並列に動かすような場合です.

スレッドプールのサイズより多くのコンポーネントたちが同時に(人間や外部プロセスとの通信などで)入力待ちをしようとすると, スレッドが足りずにデッドロックします.

まぁ, このケースだと処理の負荷はあまり問題にならないことが多いので, サイズを固定しないスレッドプールを使うという選択肢もありますが.


これらのケースで共通するのは, 「処理のコンテキストを保持したまま, blocking な処理をしたい」ということです.

(blocking な処理というのは, 正確ではないですがラフに言えば「CPU を消費しないが時間がかかる処理」のことです.)

これにはコルーチンが利用できます.

コルーチン

通常の関数は呼び出した後は return されるだけです. それに対し, 呼び出した後に中断で一度処理を戻し, 後で再開できるもののことをコルーチンと言います.

コルーチンには symmetric/asymmetric なものと stackful/stackless のものがあり, 言語やライブラリによって違います.

「処理のコンテキストを保持したまま, blocking な処理をしたい」という文脈で一番簡単なのは asymmetric stackful coroutine です.

実装方法として例えば, 関数呼び出し時にスレッドを立てて戻り値返却/処理再開用の channel を与えて呼び出し, 処理を戻したい場合は戻り値返却用の channel に値を流して処理再開用の channel で待つようにします.

(Go のプログラムを書くときに似たようなパターンが頻出すると思います.)

この「スレッドが自分の stack を持っている」というのが stackful, 「呼び出し元にしか処理を委譲できない」というのが asymmetric です.

逆に symmetric coroutine は呼び出し元以外にも処理を委譲できます. 処理の委譲をするのが「次の処理を行う」というひとつで, 関数を呼び出すのも値を戻すのも同じ動作でできるから symmetric なわけです.

symmetric stackful coroutine に近いモデルが Rust/tokio の blocking API です.

https://qiita.com/legokichi/items/30e577d996851b6b3ba1

blocking 処理のためにそれ用のスレッドプールに自分で入り, 別のスレッドに処理を委譲します.

ただ, これはスレッドプールのサイズ問題を緩和するためのもの(という風に理解している)であり, コンテキストスイッチが発生する問題は残っています.

コルーチンについては他にも色々あるのでここでは詳しくは述べません. (というか自分もそこまで詳しくない.) 詳しく知りたい方は以下のリンク先のリンクから入るのがよいかもしれません.

https://gist.github.com/yutopp/d3a184f3389df819a5b4b99f2da9b774

Goroutine

最近人気の Go は簡単に並列化対応したプログラムを書けると言われています. これは

  • コンパイラが頑張っている: blocking な API 呼び出しを non-blocking にしていたり, 実行しているタスクの切り替えをユーザーから隠蔽している. (ので, 従来的な書き方のまま並列に動作するプログラムが書き易い.)
  • MPG model: M:N モデルの work stealing スケジューラなので少ないスレッドで効率的に処理ができる. (上の Rust/tokio の blocking API の仕組みも内蔵している. 自動でやってくれるのでユーザーは特別なことをしなくてもよい.)
  • netpoller: 頻出するネットワーク通信の API については, epoll/kqueue を利用している. (OS スレッド数を抑える効果もある.)
  • 可変のスタックサイズ

というのが主なポイントです.

(因みに, Go は stackful symmetric coroutine と言えると思います. 自分は Go の runtime のソースコード読んでないので間違っているかもですが.)

上記は以下の記事で大変わかりやすく解説されているため, ここでは解説しません.

じゃあ, goroutine を Rust に輸入すればいいじゃないかと思うかもしれませんが, Rust が重要視しているものと噛み合わないため, そのままでは難しいのだと思います.

  • ユーザーレベルのスタック操作: goroutine はそれぞれスタックを持っており, タスクの切り替えはそれらを切り替えることで実現される. これにはアセンブラによるスタック操作が必要.
    • 古くは Rust にもこの種のものがあったが, なくなった.
  • 可変スタックサイズや自動でのタスク切り替えのためには, ユーザーが書いたプログラム中にコンパイラが別の命令を差し込む必要がある.
    • OS やベアメタル用途も見据える関係上, コードは書かれたものそのまま動作するべき.

Rust での方針: async/await, Future, Pin, runtime, Waker

Rust の方針は, 上記の Go の方法を上手いこと Rust 流に落し込んだものだと見做すことができるでしょう. (断片的にしか追ってないので, それを意図されているかは知りません. ご存知の方がいたら教えてください.)


表層としては,

  • async/await 構文を Future に変換.
  • Future は asymmetric stackless coroutine. 状態機械として実装可能なので低コスト.
  • Future は合成可能なので asymmetric stackful coroutine 概念を作れる.
  • runtime がそれらを管理することで symmetric stackful coroutine のように見做せる.

詳細:

注意ですが, あくまでも後ろのふたつは概念的にそう見做せるという話です. symmetric/asymmetric や stackful/stackless の定義的には厳密には違うと思います.

アセンブリでスタックポインタを切り替えるのを, runtime が Future を poll して, 戻ってきて, 別の Future を poll する, というので実現しているわけですね.


メモリ管理としては,

  • Future は状態機械なのでヒープに確保すればよいはず.
  • 問題となる自己参照のケースは Pin を導入することによって解決.

詳細:


ただ, 愚直に Future を poll するだけだと, 実はできる仕事がないのに CPU を手放せないということになってしまいます. それを解決するのが Waker です.

(雑に言うと) Waker は Future を poll して欲しくなったら叩くスイッチですが, これは Go で blocking な goroutine が再開可能になったときに global queue に入る動作に似ています.

epoll/kqueue を管理する netpoller に対応するのが, mio などのライブラリなわけです. こういったライブラリは Waker のインターフェースを利用してタスクの実行可否を runtime に伝えることになります.

まとめ

  • いろいろな非同期プログラミングモデルがあり, スレッドプールなど単体では不十分.
  • ユーザーからすると, 非同期プログラミングモデルとして stackful coroutine が扱い易い.
  • Go の方法が参考になるが, そのままでは Rust に導入できない.
  • async/await, Pin, Waker などにより, Go のモデルを上手く Rust に導入することができた. (と見做せる.)

一連の話は Rust に導入するという話でしたが, 実はこれは(OS スレッドは触れるがアセンブリで危いことはできない/したくない)他の言語でも可能でしょう.

Common Lisp はそれに該当する言語で, 実際にやってみようと思ってます.

Rust Advent Calendar 2019, 明日は topecongiro さんの予定です.