ForkJoinPoolとblocking

以下は、いつも通り、私のざっくりな調査内容をまとめるものだから、もし、内容に誤りがあったら報告してほしい。(想像も含むので)

前提

目的としては、akkaベースのウェブフレームワーク(sprayかakka-http)で、複数のリクエストを同時に受けた時にサーバのCPUをフル活用するためにはどうすることである。

フル活用とは、あるスレッドがIOなどで寝た場合、他のタスクにCPUを割り当てて、CPUを使う処理を常に全力で実行することである。こうすると、最善では、結果的に全タスクを処理する時間は、全計算を行った時間まで短縮出来る。こういうテクニックは、GPUにおいてメモリ転送と計算をオーバーラップさせてメモリ転送を償却する形でも用いられる。

さて、ウェブフレームワークは何らかのポートにbindして、そこからリクエストを捌く。akkaでは、この時のスレッドの割当方法をdispatcherなりExecutionContextという抽象に隠蔽しており、これは、設定することが可能である。

調査

少なくとも2つの方式を思いつくことが出来る

1。たくさんのスレッドをプールしておいて、来たリクエストを片っ端からスレッドに結びつけて、あとはカーネルのスケジューラにおまかせする 2。少数のスレッドを保持しておき、もしタスクが寝た場合はコンテキストを退避して(Javaは、スケジュールについてはカーネルに一存のはずだから、ユーザランドから利用可能なswapcontextのようなものを使ってるのだろうか?)、新しいタスクをスレッドに結びつける。これによって、少数のスレッドは常にhotであることを期待する。

1は、FixedThreadPoolというものであり、私が開発した性能測定用フレームワークperflex(https://github.com/akiradeveloper/perflex)で、並列数を限定するために使っているものである。この方式については、動作は自明と思うので、これ以上議論しない。

2は、ForkJoinPoolというものであり、akkaはデフォルトとしてこれを採用している。(リンク先は2.0だが、2.4でも同様)

http://doc.akka.io/docs/akka/2.0/scala/dispatchers.html

Every ActorSystem will have a default dispatcher that will be used in case nothing else is configured for an Actor。The default dispatcher can be configured、and is by default a Dispatcher with a “fork-join-executor”、which gives excellent performance in most cases。

ForkJoinPoolは、Java7から利用されたものであり、仕様についてはjavadocを読むのもあるが、日本人のみなさんには、seraphy氏のブログをオススメしたい。ここを読むと、ForkJoinPoolというものが、私が先に述べた方式2に相当することが分かる。

2014-05-04

しかし方式2からでは、fork/joinと名前がついてる意味が分からないだろう。それは、タスクがfork/joinした時に、同じForkJoinPoolの中にタスクがキューされることに由来する。

また、ForkJoinPoolに入れたForkJoinTaskは、そのタスクの中で新たなタスクをforkすると、同じForkJoinPool内にタスクを予約する。 このとき、タスクは既定ではStackのようにタスクが積まれるため、再帰的な計算をさせるのに向いている。 ForkJoinTaskのタスクは他のタスクと連携しあっており、タスク内でforkした子タスクをjoinして結果を待つ場合には、そのスレッドは「待ち」に入るのではなく、タスクが「中断」された状態となり、スレッドは他のタスクの処理に回される(Work-stealing)ようになっている。

さらに抜粋

ForkJoinPoolは、そのタスクの中でjoinすることでタスクを待ち状態にすると、ただちに空いたCPU資源を他のタスクに割り当てるようにスケジューリングする。 もちろん、スケジューラは、joinメソッドの中で、この切り替えを行っている。 では、たとえば単純に「Thread.sleep()」などでタスクを待ち状態にしたら、どうなるであろうか? スケジューラはタスクから何も通知を受けることが無いため、実際にはスレッドは寝ているにもかかわらず、他のスレッドにCPUを割り当てることができない。 これは、synchronized/wait/notifyによるスレッドのブロックや、あるいは同期クラスを使ったスレッドのブロックも同様である。 もし、アクティブなスレッドがすべてブロックしたならば、それらのブロックが解除されるまでタスクは全く実行されなくなってしまうことになる。 したがって、ForkJoinTaskの中では、これらのスレッドを直接ブロックするような同期処理は使うべきではない。 もし、ForkJoinTaskの中でタスクのブロックが必要な場合には、ManagedBlockerを使うことができる。

これが言ってるのは、「ForkJoinPoolが正しく動作するためには、タスクの方で”私は寝ますよ!”と通知することが大事ですよ」ということである。そしてそのための方法が、ManagedBlockerであるというのだ。

これは、名前のとおり、scala.concurrent.blockingと関連がある。それについてさらに調査を続ける。blockingの型は以下

def blocking[T](body:  T): T

結論

  • BlockContextというのがどこかに存在し、ブロックしたい場合はそこに通知がいく
  • 通知はヒントであり、BlockContextが必要と判断すればスレッドを追加する
  • blockingは、BlockContext.current.blockOn(body)(scala.concurrent.AwaitPermission)である。
  • ForkJoinPool以外のBlockContextは、DefaultBlockContextであり、blockOnメソッドは空であるから、ForkJoinPool以外の時は「何もしない」である。
  • Await.ready、resultは、中でblockingに包んでいる。

scala.concurrent.Await.result() and scala.concurrent.Await.ready() locates an instance of BlockContext by first looking for one provided through BlockContext.withBlockContext()

blockingの動作について、私はこれらのリンク先を読んだ

どうすればよいか?

ForkJoinPoolがデフォルトであることに異論はないが、これは、ユーザ側のコードがBlockContextと正しくやりとりをすることが前提となる。ForkJoinPoolを使うと、コンテキストスイッチが発生するので、その分のコストがある。従って、Javaの基盤側でこの対応をばらまいてくれることはあり得ない。だから、アプリケーションコードの中で対応することになるのだが、これは結構めんどくさい。

従って、まずシンプルに性能を測定したいのであれば、素直に、FixedThreadPoolを設定するのが簡単であるが、カーネルのスケジューラが大変になるのが欠点である。

将来的には、blockingでケアすべきコードを自動的に解析してよしなにしてくれると助かるのだが、そんなことは不可能なのだろうか?


このエントリーをはてなブックマークに追加

See also