Runner in the High

技術のことをかくこころみ

入門ROM-rb: Relation

さて、入門ROM-rbの第2回は、まずRelationから。

ROM-rbとはなんぞやという方は第1回の下の記事からどうぞ。 izumisy-tech.hatenablog.com

入門なのでいきなりSinatraRailsとどう使うか、ではなくワンソースでどういうAPIがあって、どう使うのか、というところから見ていこうと思う。ちなみに、ソースコードにコメントを書いて読み進めながら学んでいけるスタイルをConversational Tutorial*1というらしい。

Relation

ROMにおけるRelationはインフラレイヤの一部で、アダプタ(Gateway)の実装を隠蔽するインターフェースのような役割を果たす。Relationの中では、そのRelationが対象とするアダプタの操作が行える。

それでは実際に使ってみる

require "rom"
require "rom/sql"

#
# ## Relation
#
# RelationはROM.rbにおいてアダプタ固有の操作をラップする永続化レイヤの責務を扱う
# SQLiteやMySQL固有の操作はAdapterに実装されており、それらに対するインターフェースを提供する
# (余談ではあるが、rom-sqlは中のSQLビルダにSequelを使っている)

module Relations
  class Users < ROM::Relation[:sql]

    # schemaをROM::Relationの継承クラスの中で使うことでデータベースのスキーマを定義できる
    # MySQLなどのデータベースを使っていれば、`schema(infer: true)`の一文でスキーマ推論を有効化できる

    schema(:users) do
      attribute :id, Types::Int
      attribute :name, Types::String
      attribute :age, Types::Int
      attribute :has_car, Types::Bool
    end

    # インスタンスメソッドを定義することで、Relationが持つメソッドを追加できる
    # ここでは、Relationが対象としているアダプタ(例えばこのコードの場合はSQLite)
    # に定義された固有の操作が行える。whereはSQLiteアダプタの中で定義されている。

    def all
      where()
    end

    def has_car
      where(has_car: true)
    end

    # デフォルトではすべてのカラムが返り値のレコードに含まれて返されるが
    # それを変更したければdatasetを定義することができる。
    # この例の場合はhas_carを意図的にレコードに含めないようにしている。

    dataset do
      select(:id, :name, :age)
    end

  end
end

# ROMを初期化して定義したRelationを登録する。
# auto_registrationというディレクトリをまるごと登録対象にできるメソッドがあるので
# ちゃんとしたプロジェクトであればそちらをつかうのがよい。

config = ROM::Configuration.new(:sql, "sqlite::memory")
config.register_relation(Relations::Users)
rom = ROM.container(config)

# データベースのテーブルがないのでマイグレーションをする
# こちらもちゃんとしたRakeタスクをROMが用意してくれているので
# まともなプロジェクトではこのようにマイグレーションを書くことはない。

migration = ROM::SQL.migration(rom) do
  change do
    create_table(:users) do
      primary_key :id
      string :name
      integer :age
      boolean :has_car, default: false
    end
  end
end

gateway = rom.gateways[:default]
migration.apply(gateway.connection, :up)

# ROMに登録されたRelationを操作してみる。
# Relationを経由すればinsertなどのSQLite固有のメソッドも使えるが、できればRelationのメソッド
# としてラップしたほうがよい。固有のメソッド以外にもoneやallなどのビルトインメソッドもある。

users = rom.relations[:users]

users.insert(name: "Bob", age: 22, has_car: true)
users.insert(name: "Alice", age: 23)

p users.all.to_a # [{:id=>1, :name=>"Bob", :age=>22}, {:id=>2, :name=>"Alice", :age=>23}]

p users.has_car.to_a # [{:id=>1, :name=>"Bob", :age=>22}]

 上のコメントでも触れているように、RelationというのはAdapterのラッパなので、ROM::Relation継承クラスではできるだけアダプタだけが知っている操作を隠蔽するような実装にしていくのがよい。そうすることで、アプリケーションが外部のインフラレイヤと粗結合になり、変更に強くすることができる。

 たとえば、サンプルではinsertメソッドを直接Relationから読んでしまっているが、例えばHTTPアダプタなら必ずしもinsertではなくpostメソッドのようなものが生えているかもしれない。ROMを使うアプリケーションにとっては、インフラレイヤのアダプタがデータベースなのかWebAPIなのか、という点は興味の対象にはならないので、実際のアダプタの操作を抽象化して隠蔽するメソッドがRelationに生えているほうが好ましいと言える。

*1:たとえばROMの作者が書いているチュートリアルブログもこの形式になっている: https://www.icelab.com.au/notes/a-conversational-introduction-to-rom-rb

echoでyaagを使う

いろいろ省略して以下の部分のコードだけあればechoでyaagが使える。公式のREADMEにサンプルコードがまだないのでコントリビューションできるチャンスかも。

package main

import (
    "github.com/betacraft/yaag/echo"
    "github.com/betacraft/yaag/yaag"

    ...(省略)...
)

func main() {
    yaag.Init(&yaag.Config{On: true, DocTitle: "API", DocPath: "docs/apidoc.html"})

    server := echo.New()
    server.Use(middleware.Yaag())

    ...(省略)...
}

Pumaのスレッドプールの実装を読んでみる

D言語アプリケーションサーバを書いていて、そういえば普段良く使っているRubyのPumaだと、どういう実装をされているんだろう? というのが気になったので少し読んでみる。

server.rb

まず読み始めるのはPuma::Serverクラスから。 クラスのインスタンス化の際にinitializeの中でスレッド数の下限と上限を変数で保持している。

    ...

    def initialize(app, events=Events.stdio, options={})
      ...

      @min_threads = 0     # デフォルトの最小スレッド数
      @max_threads = 16    # デフォルトの最大スレッド数
      @auto_trim_time = 30
      @reaping_time = 1

      @thread = nil
      @thread_pool = nil

      ...
    end
   
    ...

その数値を元にrunメソッドでPuma::ThreadPoolクラスをインスタンス化し、ワーカスレッドを起動。ブロックに渡ってくる処理は一旦割愛。

最後に、handle_serversでリクエストをワーカスレッドに引き渡す処理を始める。

    ...

    def run(background=true)
      ...

      @thread_pool = ThreadPool.new(@min_threads,
                                    @max_threads,
                                    IOBuffer) do |client, buffer|
        ...
      end

      ...

      if background
        @thread = Thread.new { handle_servers }
        return @thread
      else
        handle_servers
      end
    end

    ...

thread_pool.rb

上で使われているPuma::ThreadPoolクラスのインスタンス化の際には何が行われているのかというと、Mutexで同期をとった上で最小ワーカスレッド数で指定されている数だけスレッドを立ち上げている。

  ...

  def initialize(min, max, *extra, &block)
    ...

    @mutex = Mutex.new

    @todo = [] # ワーカへの処理を積むキュー

    @spawned = 0 # 起動しているワーカの数
    @waiting = 0 # todoを待っているワーカの数

    ...

    @min = Integer(min) # min_threads
    @max = Integer(max) # max_threads

    ...

    @workers = [] # spawn_threadで起動されたワーカのアレイ

    ...

    @mutex.synchronize do
      @min.times { spawn_thread } # ワーカスレッドを起動
    end

    ...
  end

  ...

spawn_threadはちょっと長いが、中で行われているのは、whileでのスレッドの処理ループだ。

initializeの中で初期化された@todoというインスタンス変数にワーカへの処理が積まれていくので、ワーカが起動している間(continueがtrue)は、todoに積まれているキューをshiftして、スレッドに渡されているブロックで実行している。また、todoが空っぽの際はThread::ConditionVariableでスレッドをストップさせている。

また、作られたワーカスレッドは@workersに追加される。

    ...

    def spawn_thread
      @spawned += 1

      th = Thread.new(@spawned) do |spawned|
        ...

        todo  = @todo

        ...

        while true
          work = nil
 
          ...

          mutex.synchronize do
            while todo.empty? # ワーカが処理するものは何もない
              ...

              @waiting += 1
              not_full.signal
              not_empty.wait mutex # todoが入ってくるまでスレッドを止める
              @waiting -= 1
            end

            work = todo.shift if continue # todoからひとつもらってworkにいれる
          end

          ...

          begin
            block.call(work, *extra) # workの処理をスレッドに引き渡す。
          rescue Exception => e
            ...
          end
        end

        ...
      end

      @workers << th # 作ったワーカスレッドを追加

      th
    end

    ...

Puma::ThreadPoolには<<のオペレータが定義されていて、インスタンスtodoを積み、スレッドを動かせるようになっている。

また、積む際に待っているワーカがいなければ、再び上で触れたspawn_threadを実行して再び上限までワーカスレッドを起動しようとする。

  ...

  # Add +work+ to the todo list for a Thread to pickup and process.
  def <<(work)
    @mutex.synchronize do
      ...

      @todo << work

      if @waiting < @todo.size and @spawned < @max
        spawn_thread
      end

      @not_empty.signal # 空ではなくなったことを通知して、スレッドを動かす
    end
  end

  ...

ちなみに、この<<オペレータは一番最初に触れたPuma::Serverクラスの中にあるhandle_serversメソッドの中で使われていて、ソケットで受けたリクエストからPuma::Cilentクラスのインスタンスを生成し、その処理をスレッドプールに任せるために使われている模様。

    ...
   
    def handle_servers
      begin
        ...

        pool = @thread_pool

        ...

        while @status == :run
          begin
            ios = IO.select sockets
            ios.first.each do |sock|
              if sock == check
                ...
              else
                begin
                  if io = sock.accept_nonblock
                    client = Client.new io, @binder.env(sock)
                    ...

                    pool << client # スレッドプールへtodoを積む
                    pool.wait_until_not_full
                  end
                rescue SystemCallError
                  ...
                rescue Errno::ECONNABORTED
                  ...
                end
              end
            end
          rescue Object => e
            ...
          end
        end

        ...
      rescue Exception => e
        ...
      ensure
        ...
      end

      ...
    end

    ...

そういえば、起動されたワーカが必要なくなったときの処理はどうしているんだろう? と思ったら、trimというメソッドがPuma::ThreadPoolクラスの中にあるのを見つけた。このメソッドの中では待ちワーカがひとつ以上あり、かつワーカの数が下限を下回らない範囲で、ワーカスレッドを終了させるキュー(trim_requested)を加算していく。

    ...

    # If too many threads are in the pool, tell one to finish go ahead
    # and exit. If +force+ is true, then a trim request is requested
    # even if all threads are being utilized.
    #
    def trim(force=false)
      @mutex.synchronize do
        if (force or @waiting > 0) and @spawned - @trim_requested > @min
          @trim_requested += 1
          @not_empty.signal
        end
      end
    end

    ...

@trim_requestedが加算されていくと、todoが空っぽの際にワーカスレッドが自ら終了していく。この処理は上で触れたspawn_threadの中のワーカスレッドの処理ループの中で行われている。

    ...    

    def spawn_thread
      @spawned += 1

      th = Thread.new(@spawned) do |spawned|
        ...

        todo  = @todo

        ...

        while true
          ...

          continue = true # ワーカスレッドを起動し続けるかどうか

          mutex.synchronize do
            while todo.empty?
              if @trim_requested > 0 # 終了がリクエストされている
                @trim_requested -= 1
                continue = false # ワーカスレッドの終了をセット
                not_full.signal
                break
              end

              ...
            end

            ...
          end

          break unless continue # ワーカスレッドの処理ループを抜ける。

          ...
        end

        mutex.synchronize do
          @spawned -= 1      # 動いているワーカの数をひとつ減らす
          @workers.delete th # ワーカのアレイから削除
        end
      end

      @workers << th

      th
    end

    ...

Puma::Serverクラスのrunメソッドは、ワーカスレッドを起動するのと同じタイミングで、デフォルト30秒のインターバル(@auto_trim_timeで指定されている)毎にtrimメソッドを呼び出すPuma::ThreadPool::AutoTrimクラスのインスタンスを準備する(auto_trim!

    def initialize(app, events=Events.stdio, options={})
      ...

      @min_threads = 0
      @max_threads = 16
      @auto_trim_time = 30 # auto_trim!のインターバル
      @reaping_time = 1

      ...
    end

    ...
   
    def run(background=true)
      ...

      @thread_pool = ThreadPool.new(@min_threads,
                                    @max_threads,
                                    IOBuffer) do |client, buffer|
        ...
      end

      ...

      if @auto_trim_time # インターバルが指定されていれば、auto_trimを始める。
        @thread_pool.auto_trim!(@auto_trim_time)
      end
      
      ...
    end

Puma::ThreadPool::AutoTrimクラスは@timeoutで指定されたインターバル毎に、Puma::ThreadPooltrimメソッドを呼び出して余ったワーカスレッドの終了を試みる。

    ...

    class AutoTrim
      def initialize(pool, timeout)
        @pool = pool
        @timeout = timeout
        @running = false
      end

      def start!
        @running = true

        @thread = Thread.new do
          while @running
            @pool.trim
            sleep @timeout
          end
        end
      end

      ...
    end

    def auto_trim!(timeout=30)
      @auto_trim = AutoTrim.new(self, timeout)
      @auto_trim.start!
    end

    ...

Pumaでは、Puma::ThreadPoolクラスが、ワーカスレッドと、ワーカへの処理のキューを管理していて、キューが予約されるタイミングでワーカを増やしたり、減らしたりしているということが分かった。

今回は、スレッドプールの部分を中心に読んだだけなので、実装にリクエストをどう処理しているのか、クラスタリングの実装はどうなっているのか、あたりはまだしっかり読んでいない。けれども、既存のアプリケーションサーバの実装を読んでみることで、自分で1から作っていく際にどういう実装にするのがベターかというプラクティスを探るきっかけになりそうだな〜と思う◎

D言語でforkを使う

core.sys.posix.unistdの中にある。使い方はforkそのまま。

import std.stdio;
import core.thread;
import core.stdc.stdlib;
import core.sys.posix.unistd;

void main() {
  auto pid = core.sys.posix.unistd.fork();

  if (pid == 0) {
    writeln("child working...");
    Thread.sleep(dur!("seconds")(10));
    _Exit(0);
  } else {
    writeln("parent working...");
    Thread.sleep(dur!("seconds")(20));
    exit(0);
  }
}

ちなみに親プロセスを終了するときはexit(0)で、子プロセスを終了するときには_Exit(0)で抜ける(どちらもcore.stdc.stdlibの中で定義されている)。この2つの関数の違いは、標準入出力のデータを終了時にフラッシュするかしないか、というところらしい。

exit()と_exit()の違い | Siguniang's Blog

D言語でstd.signalsを使ったオブザーバー実装

std.signalsというモジュールを使うとオブザーバー・パターンの実装が簡単にできる。 この例ではひとつのオブザーバーしか登録していないが、複数のオブザーバーを登録した上で一気にシグナルを発行できる。

dlang.org

import std.signals;
import std.stdio;

enum SignalTypes {
  INCREASE,
  DECREASE
};

class SignalObserver {
  private int signalCounter;

  this() {
    signalCounter = 0;
  }

  void watch(SignalTypes type) {
    switch (type) {
      case SignalTypes.INCREASE: signalCounter++; break;
      case SignalTypes.DECREASE: signalCounter--; break;
      default: break;
    }
  }

  @property int counter() const {
    return signalCounter;
  }
}

class SignalTrigger {
  void update(SignalTypes type) {
    emit(type);
  }

  mixin Signal!(SignalTypes);
}

__gshared SignalTrigger trigger = new SignalTrigger();
__gshared SignalObserver observer = new SignalObserver();

void main() {
  trigger.connect(&observer.watch);

  trigger.update(SignalTypes.INCREASE);
  trigger.update(SignalTypes.INCREASE);
  trigger.update(SignalTypes.INCREASE);

  writeln(observer.counter); // 3

  trigger.update(SignalTypes.DECREASE);
  trigger.update(SignalTypes.DECREASE);

  writeln(observer.counter); // 1
}

D言語でCtrl-Cのシグナルハンドラを登録する

whileループからCtrl-Cで抜ける処理を横取りしたいときに。

import std.stdio;
import core.thread;
import core.stdc.stdlib;
import core.sys.posix.signal;

extern (C) {
  void cleanup(int signal) {
    writeln("cleanup");
    exit(1);
  }
}

void main() {
  sigset(SIGINT, &cleanup);

  while (true) {
    writeln("waiting...");
    Thread.sleep(dur!("seconds")(1));
  }
}

ポイントはクリーンナップコードをextern (C) { ... }の中で定義することと、integer型の引数を与えること。 SIGINT以外にもいろいろハンドラが登録できるっぽい。ちなみにちゃんとexitしないと、killコマンドで殺さないといけなくなるので注意。

github.com

10月にしたLTまとめ

 唐突に、この10月にした4つのLTに関して振り返ってみようと思う。

 これまで自分は勉強会などでいわゆるライトニング・トークというものをしたことがなかったが、内定先の師匠と話をしていてちょっと試しにやってみるのも悪くはないなと思い始めたのがきっかけだった。仮になにかテキトウなことをLTの中で話してしまったとしても、まさか命までは取られまい、という思いでえいやとやってみようと思ったのだ。

Meguro.rb #7

 これはちょうど直近で参加できそうなRubyの勉強会を探していて、初めて参加したMeguro.rbでのLT。
 ROMと呼ばれるヘキサゴナル・アーキテクチャで実装されたデータ・マッパー・パターンの永続レイヤのライブラリについて話した。普段は僕もRailsActiveRecordをガンガンに使っているが、ドメインモデルと永続化の責務を分割したいと思うならROMという選択肢も充分にありえるのではないかと思う。ROMの思想を一番よく表しているのが、永続化レイヤをDBだけのものとして考えず、WebAPIへのリクエストなども永続化の責務としてアダプタブルにできるものとして扱う、という点で、レイヤード・アーキテクチャが好きな僕としてはとても感動した。
 これが人生で初めてのLTだったので、割と早めに資料を作ってちょいちょい練習っぽいこともした。何事も初めてであれば練習をしすぎるということはない。参加者のひとたちも、発表が終わったあとに話しかけてくれたりして、初めてのLTとしてはいいスタートを切れたと思う。

WEBエンジニア勉強会 #3

 WEBエンジニア勉強会も、connpassで何かLTができそうな手頃な勉強会がないかなと探していたときに見つけた勉強会のひとつだ。この第三回目は新橋のコワーキングスペースで開催された。
 このスライドは、フロントエンド・アプリケーションにおけるバリデーション・ロジックの責務をどこにおくのか、という問題提起をしてみたくて作ったスライドだ。基本的に、フロントエンドのバリデーションというと、ビュー側に意識が寄ってしまうことが多い。なぜなら、バリデーションというのは、バリデーション自体のロジックと、そのエラーの表示というものがワンセットになっていると考えがちだからだ。だが、アプリケーションの種類問わず、バリデーションというのはビジネス・ルール含むことが多く、基本的に「バリデーションを実行する」という責務はドメイン・レイヤに置き、「バリデーションのエラーを表示する」という責務のみをビューに置くのが最適解なのではないかと僕は考えている。
 そのような趣旨でこのスライドを作ってはみたものの、実際に勉強会に来てくれているひとたちのなかにフロントエンド開発をしている人がさほど多くなかったので、次回からはもう少し勉強会に併せて話す内容を工夫したほうがいいかもな、という反省を得た。

 ちなみにではあるが、上のスライドで登場するvalidatable-recordはnpmで公開してある

www.npmjs.com

izumisy-tech.hatenablog.com

第12回若手Webエンジニア交流会

 知り合いのiOSエンジニアが主催していた勉強会を偶然connpassで見つけたのでここもLT枠で突撃した。仰々しいタイトルだが、僕のスタックオーバーフローのREPは500程度しかない。この勉強会は開会と同時に飲酒開始なので、LTも非常に気持ちよく話せてとてもエキサイトした。発表が終わったあと、すごくおもしろかった!的な声を数人から頂いて、やはり「トピックx話し方」でLTのウケは結構変わるんだな〜とも思った。あと酒を飲むと非常に饒舌になれるので、シャイな自分はあらかじめ酒をちょっと入れておくというイイということを学んだ。
 このスライドはもともと、僕が今年に入ってからスタックオーバーフローでいくつか回答をし始めて、思ったよりもREP稼ぐハードルって高くないんだな!と思ったところからアイデアを得た。スタックオーバーフローというと外国のすごそうなエンジニアがじゃぶじゃぶいるようなイメージだが、実際には我々日本人の英語力であれば全くコミュニティに貢献できること間違いなしだ。僕でもできるからみんなでやろう、という意気込みをぶつけるスライドだ。
 ちなみに参加者の中にJapanese Languageで4000REPくらいもっているという人がいて、ヒエ〜となった。是非そういう人からより具体的なREP稼ぎのノウハウを聞いてみたいものだ。

第33回 PORTもくもく会

 おととしのインターンで知り合った三人でちょっとした開発チームのようなものを組んでいるので、僕含めその三人でLTをしようとこのもくもく会に参加。会場は新宿にあるでっかいビルだった。
 これまでの勉強会と比べて集まっている人たちがほんとに様々で、Vimプラグインをめっちゃ作っている人や、Rubocopのコントリビューター、自分の本を書いている人、などなど思ったより幅広かった。そこで僕がしたLTがchooというウェブ・フロントエンド・フレームワークに関するものだったが、そもそもまたフロントエンドを触っている人がそんなにいないということもあり、ウケが微妙だった。
 このLTでchooを取り上げた理由は、chooはフロントエンド・フレームワークの中でも関数型プログラミングの雰囲気を持つように設計されていて、いわゆるVue.jsやReactで言うところのコンポーネントが、chooにおいては純粋なDOMを返す状態を持たない関数となるように実装できるというところが好みだったからだ。
 フロントエンドにおける「状態」はもはやコンポーネント単位のステートという形で責務分けするのではなく、ビューから切り離された状態というそれ自体の責務になるほうが、スケーラビリティがある。この考え方をすれば、そもそもビューを「ステート→DOM」の単なる変換に専念させられるし、ビューとステートが密結合になりづらくなる。なお、この話関しては別の記事で書いたのでこちらを参照のこと。

izumisy-tech.hatenablog.com

4回LTしてみて

 LTをする前は、まだまだ僕には話せることなんてそんなにないと思っていたけれども、実際にLTに申し込んでスライドを作り始めると、思った以上に話すことがでてきたり、なにより人前でマイクを持って話すのがわりと興奮(?)したりするということが分かった。Twitterで誰かが、「まずLTに申し込む。アイデアはあとからでてくる。」と言っていたのを思い出して、ん〜たしかにそのとおりだった、と思う。
 とはいえ、勉強会によっては参加者の層的にあんまりLTの内容が伝わらなかったりすることがあるので、そういう点でもみんながわかりやすい/共感しやすいLTがこれからはできていけたらいいなという所存。いつかはでっかいカンファレンスとかで発表ができるといいな。