Runner in the High

技術のことをかくこころみ

Pumaのスレッドプールの実装を読んでみる

D言語アプリケーションサーバを書いていて、そういえば普段良く使っているRubyのPumaだと、どういう実装をされているんだろう? というのが気になったので少し読んでみる。

server.rb

まず読み始めるのはPuma::Serverクラスから。 クラスのインスタンス化の際にinitializeの中でスレッド数の下限と上限を変数で保持している。

    ...

    def initialize(app, events=Events.stdio, options={})
      ...

      @min_threads = 0     # デフォルトの最小スレッド数
      @max_threads = 16    # デフォルトの最大スレッド数
      @auto_trim_time = 30
      @reaping_time = 1

      @thread = nil
      @thread_pool = nil

      ...
    end
   
    ...

その数値を元にrunメソッドでPuma::ThreadPoolクラスをインスタンス化し、ワーカスレッドを起動。ブロックに渡ってくる処理は一旦割愛。

最後に、handle_serversでリクエストをワーカスレッドに引き渡す処理を始める。

    ...

    def run(background=true)
      ...

      @thread_pool = ThreadPool.new(@min_threads,
                                    @max_threads,
                                    IOBuffer) do |client, buffer|
        ...
      end

      ...

      if background
        @thread = Thread.new { handle_servers }
        return @thread
      else
        handle_servers
      end
    end

    ...

thread_pool.rb

上で使われているPuma::ThreadPoolクラスのインスタンス化の際には何が行われているのかというと、Mutexで同期をとった上で最小ワーカスレッド数で指定されている数だけスレッドを立ち上げている。

  ...

  def initialize(min, max, *extra, &block)
    ...

    @mutex = Mutex.new

    @todo = [] # ワーカへの処理を積むキュー

    @spawned = 0 # 起動しているワーカの数
    @waiting = 0 # todoを待っているワーカの数

    ...

    @min = Integer(min) # min_threads
    @max = Integer(max) # max_threads

    ...

    @workers = [] # spawn_threadで起動されたワーカのアレイ

    ...

    @mutex.synchronize do
      @min.times { spawn_thread } # ワーカスレッドを起動
    end

    ...
  end

  ...

spawn_threadはちょっと長いが、中で行われているのは、whileでのスレッドの処理ループだ。

initializeの中で初期化された@todoというインスタンス変数にワーカへの処理が積まれていくので、ワーカが起動している間(continueがtrue)は、todoに積まれているキューをshiftして、スレッドに渡されているブロックで実行している。また、todoが空っぽの際はThread::ConditionVariableでスレッドをストップさせている。

また、作られたワーカスレッドは@workersに追加される。

    ...

    def spawn_thread
      @spawned += 1

      th = Thread.new(@spawned) do |spawned|
        ...

        todo  = @todo

        ...

        while true
          work = nil
 
          ...

          mutex.synchronize do
            while todo.empty? # ワーカが処理するものは何もない
              ...

              @waiting += 1
              not_full.signal
              not_empty.wait mutex # todoが入ってくるまでスレッドを止める
              @waiting -= 1
            end

            work = todo.shift if continue # todoからひとつもらってworkにいれる
          end

          ...

          begin
            block.call(work, *extra) # workの処理をスレッドに引き渡す。
          rescue Exception => e
            ...
          end
        end

        ...
      end

      @workers << th # 作ったワーカスレッドを追加

      th
    end

    ...

Puma::ThreadPoolには<<のオペレータが定義されていて、インスタンスtodoを積み、スレッドを動かせるようになっている。

また、積む際に待っているワーカがいなければ、再び上で触れたspawn_threadを実行して再び上限までワーカスレッドを起動しようとする。

  ...

  # Add +work+ to the todo list for a Thread to pickup and process.
  def <<(work)
    @mutex.synchronize do
      ...

      @todo << work

      if @waiting < @todo.size and @spawned < @max
        spawn_thread
      end

      @not_empty.signal # 空ではなくなったことを通知して、スレッドを動かす
    end
  end

  ...

ちなみに、この<<オペレータは一番最初に触れたPuma::Serverクラスの中にあるhandle_serversメソッドの中で使われていて、ソケットで受けたリクエストからPuma::Cilentクラスのインスタンスを生成し、その処理をスレッドプールに任せるために使われている模様。

    ...
   
    def handle_servers
      begin
        ...

        pool = @thread_pool

        ...

        while @status == :run
          begin
            ios = IO.select sockets
            ios.first.each do |sock|
              if sock == check
                ...
              else
                begin
                  if io = sock.accept_nonblock
                    client = Client.new io, @binder.env(sock)
                    ...

                    pool << client # スレッドプールへtodoを積む
                    pool.wait_until_not_full
                  end
                rescue SystemCallError
                  ...
                rescue Errno::ECONNABORTED
                  ...
                end
              end
            end
          rescue Object => e
            ...
          end
        end

        ...
      rescue Exception => e
        ...
      ensure
        ...
      end

      ...
    end

    ...

そういえば、起動されたワーカが必要なくなったときの処理はどうしているんだろう? と思ったら、trimというメソッドがPuma::ThreadPoolクラスの中にあるのを見つけた。このメソッドの中では待ちワーカがひとつ以上あり、かつワーカの数が下限を下回らない範囲で、ワーカスレッドを終了させるキュー(trim_requested)を加算していく。

    ...

    # If too many threads are in the pool, tell one to finish go ahead
    # and exit. If +force+ is true, then a trim request is requested
    # even if all threads are being utilized.
    #
    def trim(force=false)
      @mutex.synchronize do
        if (force or @waiting > 0) and @spawned - @trim_requested > @min
          @trim_requested += 1
          @not_empty.signal
        end
      end
    end

    ...

@trim_requestedが加算されていくと、todoが空っぽの際にワーカスレッドが自ら終了していく。この処理は上で触れたspawn_threadの中のワーカスレッドの処理ループの中で行われている。

    ...    

    def spawn_thread
      @spawned += 1

      th = Thread.new(@spawned) do |spawned|
        ...

        todo  = @todo

        ...

        while true
          ...

          continue = true # ワーカスレッドを起動し続けるかどうか

          mutex.synchronize do
            while todo.empty?
              if @trim_requested > 0 # 終了がリクエストされている
                @trim_requested -= 1
                continue = false # ワーカスレッドの終了をセット
                not_full.signal
                break
              end

              ...
            end

            ...
          end

          break unless continue # ワーカスレッドの処理ループを抜ける。

          ...
        end

        mutex.synchronize do
          @spawned -= 1      # 動いているワーカの数をひとつ減らす
          @workers.delete th # ワーカのアレイから削除
        end
      end

      @workers << th

      th
    end

    ...

Puma::Serverクラスのrunメソッドは、ワーカスレッドを起動するのと同じタイミングで、デフォルト30秒のインターバル(@auto_trim_timeで指定されている)毎にtrimメソッドを呼び出すPuma::ThreadPool::AutoTrimクラスのインスタンスを準備する(auto_trim!

    def initialize(app, events=Events.stdio, options={})
      ...

      @min_threads = 0
      @max_threads = 16
      @auto_trim_time = 30 # auto_trim!のインターバル
      @reaping_time = 1

      ...
    end

    ...
   
    def run(background=true)
      ...

      @thread_pool = ThreadPool.new(@min_threads,
                                    @max_threads,
                                    IOBuffer) do |client, buffer|
        ...
      end

      ...

      if @auto_trim_time # インターバルが指定されていれば、auto_trimを始める。
        @thread_pool.auto_trim!(@auto_trim_time)
      end
      
      ...
    end

Puma::ThreadPool::AutoTrimクラスは@timeoutで指定されたインターバル毎に、Puma::ThreadPooltrimメソッドを呼び出して余ったワーカスレッドの終了を試みる。

    ...

    class AutoTrim
      def initialize(pool, timeout)
        @pool = pool
        @timeout = timeout
        @running = false
      end

      def start!
        @running = true

        @thread = Thread.new do
          while @running
            @pool.trim
            sleep @timeout
          end
        end
      end

      ...
    end

    def auto_trim!(timeout=30)
      @auto_trim = AutoTrim.new(self, timeout)
      @auto_trim.start!
    end

    ...

Pumaでは、Puma::ThreadPoolクラスが、ワーカスレッドと、ワーカへの処理のキューを管理していて、キューが予約されるタイミングでワーカを増やしたり、減らしたりしているということが分かった。

今回は、スレッドプールの部分を中心に読んだだけなので、実装にリクエストをどう処理しているのか、クラスタリングの実装はどうなっているのか、あたりはまだしっかり読んでいない。けれども、既存のアプリケーションサーバの実装を読んでみることで、自分で1から作っていく際にどういう実装にするのがベターかというプラクティスを探るきっかけになりそうだな〜と思う◎