Runner in the High

技術のことをかくこころみ

2017年を振り返る

 振り返ります

1月

  • 丁度大学の後期の期末試験を受けていた。2016年夏頃(後期)から大学に復帰したので、そのテストを消化してた。都市社会学のレポートを書くのに清澄白河をウロウロして写真を撮った記憶がある。ジェントリフィケーションについて書いた。それから、まだこの頃は教職課程のガイダンスなどがいくつかあって、このあとすべての努力が無駄になるとは知らずに頑張っていた。

  • 初めて渋谷WOMBへ行って、Zomboyを生で見た。Zomboyもスゴかったが、ぎゅうぎゅう詰めすぎてびっくりした。 www.womb.co.jp

2月

  • 大学の先輩と同期と一緒にスノボをしに竜王スキーパークへ行った。中級者コースから初心者コースのふもとのリフト乗り場まで一直線で滑るとめちゃくちゃ距離があって気持ちいので、最高のスキー場だと思う。それから、中級者コースの途中にある滑っていかないと入れないレストランのタコライスがすごくおいしい。ここでタコライスを食べてからタコライスにドハマりしはじめた。 www.ryuoo.com

3月

  • 内定先の開発合宿に参加した。どこに行ったかは忘れたけど、すごく天気がよくて山のたくさんあるところに行った。

  • 一年休学していた僕より一足先に大学の同期達が卒業していくので、サークルの卒業旅行でグアムへ行った。なんてことはなかったが、海で日焼け止めをちゃんと塗ってなかったせいで、全身大やけどを負い、赤く腫れて水ぶくれたような症状に僕を含めて男子三人がなった。死ぬかと思った。夜寝てても、歩いてても、何をしててもヒリヒリ痛い。飛行機はガタガタゆれるし椅子に背もたれがあるしで、痛くて痛くてしょうがない。帰国してから速攻で近くの皮膚科にいくと、コワモテのオネエの先生が優しく診て塗り薬を出してくれた。それからは少しづつ完治していったが、もう南国はこりごりだという気分になった。それでもグアムは基本的に気温も天気も最高で、場所としては申し分ない。でももうビーチで裸にはなりたくない。

  • この辺から内定先でAPJというものが始まる。これは技術職の内定者と総合職の内定者がチームを組んで、ひとつのAndroidアプリを作るというもの。基本的にコードを書かない側の人間であっても、技術に少しでも触れる経験をするというコンセプトらしい。僕らは、位置情報を利用したSNSのようなものを作ることにした。

4月

  • Junction Asiaに参加した。会場は寺田倉庫だった。僕らは4人1組のチームで参加して、顔認証から悪質な訪問販売などをブラックリスト化するインターフォンのようなものを開発したが、惜しくも賞は取れなかった。久しぶりに1泊2日のハッカソンに参加したが、2日目の朝のあのしんどい感じはいつ経験しても慣れない。

  • 1年生のころから少しづつ教職課程の単位を取りづつけていたが、取得単位の関係でこの年の5月ないし6月に予定されていた教育実習にいかせることはできないという通告を教職課程事務局から受ける。というのも、僕が1年間の休学をしていたことによって、卒業に必要な単位と教育実習へ行くのに必要な単位の講義が2017年度の授業予定で重複してしまったからだ。教職課程事務局はすでに僕がとっている単位から、必要科目として認定単位の振替をできるかもしれないとかなんとか言っていたが、それに関してはなんの処置も行われなかった。最終的には、教職課程を辞退する紙にサインをさせられ、それまでに取ってきた教職課程の単位をすべてフイにすることになった。武蔵大学には、休学をしようが留学をしようがまともに4年間で卒業ができるようなキャリアパスをしっかり用意してもらいたいと思う。

5月

  • 卒論のことを考え始める。

  • 3月から始まったAPJの遠隔ミーティングを頻繁にし始める。

  • 高校の頃に軽音楽部にいた後輩が意識の高い学生になっていて、ミクシィインターンをしているというので渋谷で食事をした。話を聞いてみると、軽音楽部に所属していたまた別の後輩が、スタートアップ界隈で活動をしていて、それにいろいろと影響を受けたらしい。人間どこでどう変わるかわからないものだなと思う。

  • この頃からなぜか急に思い立ってStackoverflowでスコアを伸ばそうとし始める。5月の時点で303ポイントまで上げた。

6月

  • 内定バイトを始める。ReactNative+TypeScriptを本格的に触り始める。

  • 卒業論文の題目を決める時期なのだが、こんなタイミングでテーマが決まるはずがなく、適当なテーマで提出する。アメリカのスタートアップカルチャーについて書きたいなと思っていたが、先行研究がなさ過ぎて頓挫する。

  • 2015年夏のインターンの知り合いが東京の大学院に来たということで遊びに行ったが、そのタイミングで新たなプロジェクトがスタートする。

  • 内定先の会社が上場する。東京証券取引所に行って鐘を鳴らす場面を生で見た。

7月

  • 軽井沢に旅行に行った

  • 3月から本格的に始まっていたAPJが終わる。位置情報を利用したSNS的なもの、という感じでそれっぽいものを作ったが、あまりいい評価は得られなかった。我々の他には、社内の技術本を貸し借りするためのアプリや、成分表の写真を撮るだけでハラルフードを見分けられるアプリなど、いろいろなものがでてきてとても良さがあった。

8月

  • 家族で箱根に旅行に行った。小田急箱根ハイランドホテルといういい感じのところだった。コースのディナーを食べたり、ピーター・ティールのZERO TO ONEを読み終えたりした。 www.hakone-highlandhotel.jp

  • IndiegogoでGPD Pocketを買った。結局大学で卒論を書くのに大活躍してくれた。 izumisy-tech.hatenablog.com

9月

  • Meguro.rbでROM.rbについてLT発表をした。初めてで緊張したけど、とてもエキサイティングでいい経験になった。

  • ダンケルクを見た。戦争色が上手く再現されすぎていて本当に戦争イヤだなと思った。死にたくない。ダンケルクに始まった話ではないが、20歳を過ぎたあたりから戦争映画を見ると涙が出てしまうことが多くなった。 wwws.warnerbros.co.jp

10月

  • 右下の親知らずを抜いた。1時間超のドリル採掘作業はほんとにつらかった。抜歯後の痛みはどちらかといえば中が痛むとかそういうのではなく、縫合の糸で歯茎が引っ張られて痛いというのが大部分で、それだけで食事のモチベーションを無くした。ウィダーインゼリーを大量購入していたが、ゼリーを吸う動きも口の中の筋肉を使うからか奥歯の部分の歯茎が疼いてキツかった。

  • 9月末のMeguro.rbを筆頭に、思い立って4回のLT発表をした。 izumisy-tech.hatenablog.com

  • 楽天テクノロジーカンファレンスというデカイLT大会もあったが、ネタが出てこず参加しなかったのは後悔している。

  • Jelly Proという小さいスマホを買った。LINE MOBILEで運用している。 izumisy-tech.hatenablog.com

11月

  • 10月末頃から急にまたStackoverflowの意欲が再燃し、323だったのを551くらいまで上げる。

  • 卒論の提出が12月に控えているというのもあり、6月からやっていた内定バイトを年末までストップすることにする。教授も自分も焦りがでてくる。このあたりから毎日卒論のことが頭の片隅にあってすごくフラストレーションが溜まってくる。

  • 最後の卒論題目提出期間(うちの大学は2度に渡ってテーマを決めるタイミングがある)だったが、結局大したアイデアも出ず、行き当たりばったりなテーマで提出する。

  • 気分で髪を明るくし始めた。

12月

  • なぜか卒論で忙しかったのにStackoverflowで人助けに精をだしてしまう。そのおかげかスコアは順調に551から771まで伸びる。

  • 卒論の提出をした。結局最後の最後はギリギリまで教授にヘルプをしてもらいながらそれっぽいものが完成したので安心した。

  • 卒論提出の直後に今度は左下の親知らずを抜いた。今度は歯茎が引っ張られていたいとかそういうものはなかったが、今度は歯茎の腫れがずっと収まらず、これをかいているいまも奥にしこりがあるような感じがして違和感がある。右の奥歯はほとんど全快したが、歯茎が完成していないせいで食べ物が挟まったりして困っている。早く治らんかな。

  • Slip.itというソーシャル機能の一切ないブックマーキングアプリを作ったので、公開した。 slipit.me

まとめ

なんかまとまりのない一年でしたね。

グアム行ったのが今年だということを完全に忘れてました。来年もスノボ行きたい。

入門ROM.rb 第3回: Repository

さーて、お待ちかねのROM.rb入門の第3回はRepositoryから。

前回の記事はRelationを説明していますので、読んでない人は参考までに。 izumisy-tech.hatenablog.com

Repository

リポジトリと聞くと文脈的にPoEAAやDDDっぽい雰囲気を感じる人が多いのではないかと思う。

どちらも、最も重要な部分はレイヤード・アーキテクチャの考え方で責務を分離することなのではないかと思うが*1、ROM.rbのリポジトリも、公式のコア・コンセプトの説明*2によるとレイヤード・アーキテクチャの考え方と近く、ドメイン・ロジックとデータアクセス・レイヤを粗結合なものにするためのboundary(境界)としての役割を担っている。

An important function of repositories is to act as a boundary between the data access logic and Application Domain logic. This boundary helps to reduce the complexity of rehydrating your entities and keeps a direct dependency on a particular datastore out of your domain.

リポジトリの重要な点は、ドメイン・レイヤからデータアクセス・レイヤの直接的な依存を追い出す(keeps a direct dependency on a particular datastore out of your domain)というところだ。

このコンセプトと対象的なアプローチはActiveRecordパターンだ。ActiveRecordパターンはあえてドメイン・モデルをデータアクセス・レイヤのデータ構造を粗結合にすることによって、インピーダンス・ミスマッチ*3を減らし、高速なアプリケーション開発を可能にしている。

だが一方で、ActiveRecordパターンでは一挙に「モデル」と呼ばれる責務がデータアクセス・レイヤとドメイン・モデルを兼ねてしまうため、どうしてもモデルが肥大化し見通しが悪くなってしまう場合が多い。この点、ROM.rbではActiveRecordと異なり、データアクセス・レイヤとドメイン・レイヤの間にboundary(境界)としてリポジトリを挟み、それぞれの責務を異なるものとして粗結合な設計を出来るようにしようとしている。

Relationとの違い

前回の記事で説明したRelationは、Repositoryがドメインレイヤに近い存在であるのに対して、どちらかといえばインフラストラクチャ・レイヤに近い存在だ。

Relations ... provide APIs for reading the data from various databases, and low-level interfaces for making changes in the databases. Relations are adapter-specific, which means that each adapter provides its own relation specialization, exposing interfaces that make it easy to leverage the features of your database.

つまり、RelationがHTTPやSQLなどのような、アプリケーション本体の外側にあるアダプタ固有(adapter-specific)の操作をAPIとして提供するのに対して、RepositoryはそれらのRelationを更にドメイン・レイヤにふさわしい形でラップし、ドメイン・モデルがRelation(アダプタ)の操作から影響を受けないようにするためのレイヤであるということだ。

それでは実際に使ってみよう。

require "rom"
require "rom/sql"


#
# ## Relationを定義
#
# ROMのRelationは`ROM::Relation[:アダプタ名]`を継承したクラスとして定義する
# associationsブロックの中で`belongs_to`や`has_many`などの関連も定義をする
# ここで定義したRelationは下で出てくるROMの初期化の際にROMへの登録を行う必要がある。
#

module Relations
  class Users < ROM::Relation[:sql]
    schema(:users) do

      # Relationの関連定義はActiveRecordと似ている。
      # has_many, belongs_toやhas_many-throughなどがあるのでドキュメントを参照のこと
      # http://rom-rb.org/learn/sql/associations/

      associations do
        has_many :books
      end

      # schemaブロックの中では関連の定義に加えて以下のように明示的に
      # Relationのカラムを定義できる。この場合、プライマリ・キーは
      # `Types::Serial`である必要がある

      attribute :id, Types::Serial
      attribute :name, Types::String
      attribute :age, Types::Int
    end

    # Repositoryの中で呼び出せるクラスメソッドを以下のように定義できる
    # ROM.rbのSQLアダプタ(rom-sql)は内部でSequalizeを使っているので
    # where句などもその記法に従う

    def by_pk(id)
      where(id: id)
    end

    def adult
      where{age > 20}
    end
  end

  class Books < ROM::Relation[:sql]
    schema(:books) do
      associations do
        belongs_to :user
      end

      # booksの場合にはuserに対してbelongs_toで関連を持っているので
      # その関連に対して`Types::ForeignKey(:users)`のように明示的に
      # 利用する外部キーを指定できる。ない場合には自動で推論される。

      attribute :id, Types::Serial
      attribute :title, Types::Int
      attribute :user_id, Types::ForeignKey(:users)
    end
  end
end


#
# ## Repositoryを定義
#
# Repositoryからは定義されているRelationを触ることができる
#

module Repositories
  class User < ROM::Repository[:users]
    # ROM.rbではRepositoryがRelationに対してどのようなアクセスをできるのか
    # という点を明確に制限できる。ここでは:createをコマンドとして有効化しているが
    # :create以外にも:updateや:deleteがある。
    commands :create

    # 以下のようにクラスメソッドを定義できる。
    # これらのデータはROM::Structによってラップされて返される
    # ここではusersのみを読んでいるが、booksも同様にメソッドのなかでアクセスできる

    def adults
      users.adult
    end

    # `aggregate`メソッドを使うことによって、Relationで定義されている
    # 関連Relationのデータを子要素としてフェッチできる
    # `aggregates`を呼ぶ場合には明示的にRelationを指定しなくてもよさそう。

    def all
      aggregate(:books)
    end

    def by_id(id)
      aggregate(:books).by_pk(id).one
    end
  end
end


#
# ## ROMにRelationを登録
#
# 上で定義した2つのRelationをROMに登録
#

config = ROM::Configuration.new(:sql, "sqlite::memory")
config.register_relation(Relations::Users)
config.register_relation(Relations::Books)
rom = ROM.container(config)

# SQLiteでテーブルを作るマイグレーションを適用

migration = ROM::SQL.migration(rom) do
  change do
    create_table(:users) do
      primary_key :id
      string :name
      integer :age
    end

    create_table(:books) do
      primary_key :id
      foreign_key :user_id, :users
      string :title
    end
  end
end

gateway = rom.gateways[:default]
migration.apply(gateway.connection, :up)


#
# ## Repositoryを使ってみる
#
# RepositoryはRelationとは異なり、ROMへ登録するのではなく、ROMのインスタンスを
# コンストラクタDIすることでRepositoryのインスタンスを作るという形になる。
#

userRepository = Repositories::User.new(rom)
userRepository.create(name: "Justine", age: 10)
userRepository.create(name: "Jessy", age: 18)
userRepository.create(name: "Michael", age: 23)

# テストのためにBooksテーブルにRelation経由でデータを入れてみる
# (外部キー成約があるので、userレコードを作ったあとじゃないとダメ)

books = rom.relations[:books]
books.insert(title: "Good Book", user_id: 1)
books.insert(title: "Nice Book", user_id: 1)

#
# ### Repositoryからレコードを取得
#
# ここでは`by_id`や`adults`などの、Repositories::Userの中で定義した
# クラスメソッドを呼び出すことができる。Relationの結果が常にハッシュなのに対して
# Repositoryによる取得の返り値はROM::Structという構造体の形になる。
# これはデフォルトでtrueになっている`auto_struct`をfalseに指定することで
# 止めることができる
#

users = userRepository.all
users.each { |user| p user }
# => #<ROM::Struct::User id=1 name="Justine" age=10 books=[
#       #<ROM::Struct::Book id=1 title="Good Book" user_id=1>,
#       #<ROM::Struct::Book id=2 title="Nice Book" user_id=1>
#     ]>
#    #<ROM::Struct::User id=2 name="Jessy" age=18 books=[]>
#    #<ROM::Struct::User id=3 name="Michael" age=23 books=[]>

single_user = userRepository.by_id(1)
p single_user
# => #<ROM::Struct::User id=1 name="Justine" age=10 books=[
#       #<ROM::Struct::Book id=1 title="Good Book" user_id=1>, 
#       #<ROM::Struct::Book id=2 title="Nice Book" user_id=1>
#     ]>

users = userRepository.adults
users.each { |user| p user }
# => #<ROM::Struct::User id=3 name="Michael" age=23>

このように、RepositoryはRelationをドメインモデル(ROM::Struct)へと変換するインターフェイスの役割を果たしていることが分かる。

加えて、たとえばRepositoryが返すドメインモデルのインスタンスを独自のクラス定義にマッピングしたくなるかもしれない。その場合にはauto_structをtrueに維持したうえで、ROM::Structを継承した独自のクラスにRepositoryの中でマッピングするよう指定できる。そのやり方に関しては、次回以降説明していこうと思う。

*1:DDDとレイヤードアーキテクチャはイコールではないが、便宜的に。

*2:http://rom-rb.org/4.0/learn/getting-started/core-concepts/#repositories

*3:簡単に言うと、永続化レイヤ(たとえばデータベース)のデータ構造と永続化対象(たとえばドメインモデル)のデータ構造の整合性の差異

入門ROM.rb 第2回: Relation

さて、入門ROM.rbの第2回は、まずRelationから。

ROM.rbとはなんぞやという方は第1回の下の記事からどうぞ。 izumisy-tech.hatenablog.com

入門なのでいきなりSinatraRailsとどう使うか、ではなくワンソースでどういうAPIがあって、どう使うのか、というところから見ていこうと思う。ちなみに、ソースコードにコメントを書いて読み進めながら学んでいけるスタイルをConversational Tutorial*1というらしい。

Relation

ROMにおけるRelationはインフラレイヤの一部で、アダプタ(Gateway)の実装を隠蔽するインターフェースのような役割を果たす。Relationの中では、そのRelationが対象とするアダプタの操作が行える。

それでは実際に使ってみる

require "rom"
require "rom/sql"

#
# ## Relation
#
# RelationはROM.rbにおいてアダプタ固有の操作をラップする永続化レイヤの責務を扱う
# SQLiteやMySQL固有の操作はAdapterに実装されており、それらに対するインターフェースを提供する
# (余談ではあるが、rom-sqlは中のSQLビルダにSequelを使っている)

module Relations
  class Users < ROM::Relation[:sql]

    # schemaをROM::Relationの継承クラスの中で使うことでデータベースのスキーマを定義できる
    # MySQLなどのデータベースを使っていれば、`schema(infer: true)`の一文でスキーマ推論を有効化できる

    schema(:users) do
      attribute :id, Types::Int
      attribute :name, Types::String
      attribute :age, Types::Int
      attribute :has_car, Types::Bool
    end

    # インスタンスメソッドを定義することで、Relationが持つメソッドを追加できる
    # ここでは、Relationが対象としているアダプタ(例えばこのコードの場合はSQLite)
    # に定義された固有の操作が行える。whereはSQLiteアダプタの中で定義されている。

    def all
      where()
    end

    def has_car
      where(has_car: true)
    end

    # デフォルトではすべてのカラムが返り値のレコードに含まれて返されるが
    # それを変更したければdatasetを定義することができる。
    # この例の場合はhas_carを意図的にレコードに含めないようにしている。

    dataset do
      select(:id, :name, :age)
    end

  end
end

# ROMを初期化して定義したRelationを登録する。
# auto_registrationというディレクトリをまるごと登録対象にできるメソッドがあるので
# ちゃんとしたプロジェクトであればそちらをつかうのがよい。

config = ROM::Configuration.new(:sql, "sqlite::memory")
config.register_relation(Relations::Users)
rom = ROM.container(config)

# データベースのテーブルがないのでマイグレーションをする
# こちらもちゃんとしたRakeタスクをROMが用意してくれているので
# まともなプロジェクトではこのようにマイグレーションを書くことはない。

migration = ROM::SQL.migration(rom) do
  change do
    create_table(:users) do
      primary_key :id
      string :name
      integer :age
      boolean :has_car, default: false
    end
  end
end

gateway = rom.gateways[:default]
migration.apply(gateway.connection, :up)

# 
# ## ROMに登録されたRelationを操作してみる。
#
# Relationを経由すればinsertなどのSQLite固有のメソッドも使えるが、できればRelationのメソッド
# としてラップしたほうがよい。固有のメソッド以外にもoneやallなどのビルトインメソッドもある。
# RelationはROMによってシングルトンのオブジェクトにされるので、`rom.relations[:クラス名]`でアクセスできる。

users = rom.relations[:users]

users.insert(name: "Bob", age: 22, has_car: true)
users.insert(name: "Alice", age: 23)

p users.all.to_a # [{:id=>1, :name=>"Bob", :age=>22}, {:id=>2, :name=>"Alice", :age=>23}]

p users.has_car.to_a # [{:id=>1, :name=>"Bob", :age=>22}]

 上のコメントでも触れているように、RelationというのはAdapterのラッパなので、ROM::Relation継承クラスではできるだけアダプタだけが知っている操作を隠蔽するような実装にしていくのがよい。そうすることで、アプリケーションが外部のインフラレイヤと粗結合になり、変更に強くすることができる。

 たとえば、サンプルではinsertメソッドを直接Relationから読んでしまっているが、例えばHTTPアダプタなら必ずしもinsertではなくpostメソッドのようなものが生えているかもしれない。ROMを使うアプリケーションにとっては、インフラレイヤのアダプタがデータベースなのかWebAPIなのか、という点は興味の対象にはならないので、実際のアダプタの操作を抽象化して隠蔽するメソッドがRelationに生えているほうが好ましいと言える。

*1:たとえばROMの作者が書いているチュートリアルブログもこの形式になっている: https://www.icelab.com.au/notes/a-conversational-introduction-to-rom-rb

Pumaのスレッドプールの実装を読んでみる

D言語アプリケーションサーバを書いていて、そういえば普段良く使っているRubyのPumaだと、どういう実装をされているんだろう? というのが気になったので少し読んでみる。

server.rb

まず読み始めるのはPuma::Serverクラスから。 クラスのインスタンス化の際にinitializeの中でスレッド数の下限と上限を変数で保持している。

    ...

    def initialize(app, events=Events.stdio, options={})
      ...

      @min_threads = 0     # デフォルトの最小スレッド数
      @max_threads = 16    # デフォルトの最大スレッド数
      @auto_trim_time = 30
      @reaping_time = 1

      @thread = nil
      @thread_pool = nil

      ...
    end
   
    ...

その数値を元にrunメソッドでPuma::ThreadPoolクラスをインスタンス化し、ワーカスレッドを起動。ブロックに渡ってくる処理は一旦割愛。

最後に、handle_serversでリクエストをワーカスレッドに引き渡す処理を始める。

    ...

    def run(background=true)
      ...

      @thread_pool = ThreadPool.new(@min_threads,
                                    @max_threads,
                                    IOBuffer) do |client, buffer|
        ...
      end

      ...

      if background
        @thread = Thread.new { handle_servers }
        return @thread
      else
        handle_servers
      end
    end

    ...

thread_pool.rb

上で使われているPuma::ThreadPoolクラスのインスタンス化の際には何が行われているのかというと、Mutexで同期をとった上で最小ワーカスレッド数で指定されている数だけスレッドを立ち上げている。

  ...

  def initialize(min, max, *extra, &block)
    ...

    @mutex = Mutex.new

    @todo = [] # ワーカへの処理を積むキュー

    @spawned = 0 # 起動しているワーカの数
    @waiting = 0 # todoを待っているワーカの数

    ...

    @min = Integer(min) # min_threads
    @max = Integer(max) # max_threads

    ...

    @workers = [] # spawn_threadで起動されたワーカのアレイ

    ...

    @mutex.synchronize do
      @min.times { spawn_thread } # ワーカスレッドを起動
    end

    ...
  end

  ...

spawn_threadはちょっと長いが、中で行われているのは、whileでのスレッドの処理ループだ。

initializeの中で初期化された@todoというインスタンス変数にワーカへの処理が積まれていくので、ワーカが起動している間(continueがtrue)は、todoに積まれているキューをshiftして、スレッドに渡されているブロックで実行している。また、todoが空っぽの際はThread::ConditionVariableでスレッドをストップさせている。

また、作られたワーカスレッドは@workersに追加される。

    ...

    def spawn_thread
      @spawned += 1

      th = Thread.new(@spawned) do |spawned|
        ...

        todo  = @todo

        ...

        while true
          work = nil
 
          ...

          mutex.synchronize do
            while todo.empty? # ワーカが処理するものは何もない
              ...

              @waiting += 1
              not_full.signal
              not_empty.wait mutex # todoが入ってくるまでスレッドを止める
              @waiting -= 1
            end

            work = todo.shift if continue # todoからひとつもらってworkにいれる
          end

          ...

          begin
            block.call(work, *extra) # workの処理をスレッドに引き渡す。
          rescue Exception => e
            ...
          end
        end

        ...
      end

      @workers << th # 作ったワーカスレッドを追加

      th
    end

    ...

Puma::ThreadPoolには<<のオペレータが定義されていて、インスタンスtodoを積み、スレッドを動かせるようになっている。

また、積む際に待っているワーカがいなければ、再び上で触れたspawn_threadを実行して再び上限までワーカスレッドを起動しようとする。

  ...

  # Add +work+ to the todo list for a Thread to pickup and process.
  def <<(work)
    @mutex.synchronize do
      ...

      @todo << work

      if @waiting < @todo.size and @spawned < @max
        spawn_thread
      end

      @not_empty.signal # 空ではなくなったことを通知して、スレッドを動かす
    end
  end

  ...

ちなみに、この<<オペレータは一番最初に触れたPuma::Serverクラスの中にあるhandle_serversメソッドの中で使われていて、ソケットで受けたリクエストからPuma::Cilentクラスのインスタンスを生成し、その処理をスレッドプールに任せるために使われている模様。

    ...
   
    def handle_servers
      begin
        ...

        pool = @thread_pool

        ...

        while @status == :run
          begin
            ios = IO.select sockets
            ios.first.each do |sock|
              if sock == check
                ...
              else
                begin
                  if io = sock.accept_nonblock
                    client = Client.new io, @binder.env(sock)
                    ...

                    pool << client # スレッドプールへtodoを積む
                    pool.wait_until_not_full
                  end
                rescue SystemCallError
                  ...
                rescue Errno::ECONNABORTED
                  ...
                end
              end
            end
          rescue Object => e
            ...
          end
        end

        ...
      rescue Exception => e
        ...
      ensure
        ...
      end

      ...
    end

    ...

そういえば、起動されたワーカが必要なくなったときの処理はどうしているんだろう? と思ったら、trimというメソッドがPuma::ThreadPoolクラスの中にあるのを見つけた。このメソッドの中では待ちワーカがひとつ以上あり、かつワーカの数が下限を下回らない範囲で、ワーカスレッドを終了させるキュー(trim_requested)を加算していく。

    ...

    # If too many threads are in the pool, tell one to finish go ahead
    # and exit. If +force+ is true, then a trim request is requested
    # even if all threads are being utilized.
    #
    def trim(force=false)
      @mutex.synchronize do
        if (force or @waiting > 0) and @spawned - @trim_requested > @min
          @trim_requested += 1
          @not_empty.signal
        end
      end
    end

    ...

@trim_requestedが加算されていくと、todoが空っぽの際にワーカスレッドが自ら終了していく。この処理は上で触れたspawn_threadの中のワーカスレッドの処理ループの中で行われている。

    ...    

    def spawn_thread
      @spawned += 1

      th = Thread.new(@spawned) do |spawned|
        ...

        todo  = @todo

        ...

        while true
          ...

          continue = true # ワーカスレッドを起動し続けるかどうか

          mutex.synchronize do
            while todo.empty?
              if @trim_requested > 0 # 終了がリクエストされている
                @trim_requested -= 1
                continue = false # ワーカスレッドの終了をセット
                not_full.signal
                break
              end

              ...
            end

            ...
          end

          break unless continue # ワーカスレッドの処理ループを抜ける。

          ...
        end

        mutex.synchronize do
          @spawned -= 1      # 動いているワーカの数をひとつ減らす
          @workers.delete th # ワーカのアレイから削除
        end
      end

      @workers << th

      th
    end

    ...

Puma::Serverクラスのrunメソッドは、ワーカスレッドを起動するのと同じタイミングで、デフォルト30秒のインターバル(@auto_trim_timeで指定されている)毎にtrimメソッドを呼び出すPuma::ThreadPool::AutoTrimクラスのインスタンスを準備する(auto_trim!

    def initialize(app, events=Events.stdio, options={})
      ...

      @min_threads = 0
      @max_threads = 16
      @auto_trim_time = 30 # auto_trim!のインターバル
      @reaping_time = 1

      ...
    end

    ...
   
    def run(background=true)
      ...

      @thread_pool = ThreadPool.new(@min_threads,
                                    @max_threads,
                                    IOBuffer) do |client, buffer|
        ...
      end

      ...

      if @auto_trim_time # インターバルが指定されていれば、auto_trimを始める。
        @thread_pool.auto_trim!(@auto_trim_time)
      end
      
      ...
    end

Puma::ThreadPool::AutoTrimクラスは@timeoutで指定されたインターバル毎に、Puma::ThreadPooltrimメソッドを呼び出して余ったワーカスレッドの終了を試みる。

    ...

    class AutoTrim
      def initialize(pool, timeout)
        @pool = pool
        @timeout = timeout
        @running = false
      end

      def start!
        @running = true

        @thread = Thread.new do
          while @running
            @pool.trim
            sleep @timeout
          end
        end
      end

      ...
    end

    def auto_trim!(timeout=30)
      @auto_trim = AutoTrim.new(self, timeout)
      @auto_trim.start!
    end

    ...

Pumaでは、Puma::ThreadPoolクラスが、ワーカスレッドと、ワーカへの処理のキューを管理していて、キューが予約されるタイミングでワーカを増やしたり、減らしたりしているということが分かった。

今回は、スレッドプールの部分を中心に読んだだけなので、実装にリクエストをどう処理しているのか、クラスタリングの実装はどうなっているのか、あたりはまだしっかり読んでいない。けれども、既存のアプリケーションサーバの実装を読んでみることで、自分で1から作っていく際にどういう実装にするのがベターかというプラクティスを探るきっかけになりそうだな〜と思う◎

D言語でforkを使う

core.sys.posix.unistdの中にある。使い方はforkそのまま。

import std.stdio;
import core.thread;
import core.stdc.stdlib;
import core.sys.posix.unistd;

void main() {
  auto pid = core.sys.posix.unistd.fork();

  if (pid == 0) {
    writeln("child working...");
    Thread.sleep(dur!("seconds")(10));
    _Exit(0);
  } else {
    writeln("parent working...");
    Thread.sleep(dur!("seconds")(20));
    exit(0);
  }
}

ちなみに親プロセスを終了するときはexit(0)で、子プロセスを終了するときには_Exit(0)で抜ける(どちらもcore.stdc.stdlibの中で定義されている)。この2つの関数の違いは、標準入出力のデータを終了時にフラッシュするかしないか、というところらしい。

exit()と_exit()の違い | Siguniang's Blog