S4

S4というYahooが開発したシステムがある。これはいわゆるストリームコンピューティングのための枠組み。データは連続したストリームとして外部から供給され、それに対する演算を行うというもの。S4はこの処理を分散環境、つまり複数の計算ノードで並列に実行することを目的としたシステムだ。

今のところノードの障害に対応できないなど、実用するにはまだまだ厳しそうな点もあるようだが、なかなかおもしろそうなのでちょっと調べてみた。

S4の構造

S4はデータを一連のイベントとして受け取り、それを連続的に処理していく。計算機構としてのS4は、多数のPE(Processing Element)と呼ばれるイベント処理機構が、ストリームで連結されたグラフ構造となる。このグラフの中をさまざまなイベントがデータフロー的に流れることで演算が行われる。

イベント

イベントはデータストリームを構成するひとかたまりのデータで、属性名と属性値のペアで表される。プログラム上はJava Beanとして定義することになっているようだ。

複数の属性のうち一つがキー属性となるが、どの属性がキー属性となるかは、イベントの配送を請け負うDispatcher(後述)の設定で行う。

PE (Processing Element)

PE は、イベントを処理する機構である。S4のプログラミングは一連のPEをプログラミングすることで行う。すべてのイベントはその型とキー属性の値に従って、特定のPEに配送される。
特定のキー属性値に対応するPEがそれまでに作られていなければ、自動的につくられる。というか、プロトタイプからCloneでコピーされる。

PEはActorモデルに基づいているとのこと。つまりPEはイベントを受け取り、そのイベントに従って内部状態を変え、イベントを送出する。

基本的な動作に対しては、S4がPEをデフォルトで提供している。たとえば一般的なJOIN操作などはプログラムする必要は無い。

PEはAbstactPEというクラスを拡張して定義する。processEvent メソッドと output メソッド、getIdメソッドを定義することになる。PEが状態を持つ場合、イベント同様、Beanとして外部からアクセスできるように書くのが流儀らしい。初期化情報は設定ファイルに書いておき、初期化の際にBeanのインターフェイスをとおして状態が設定される。

public class SentenceReceiverPE extends AbstractPE {
  paublic void processEvent(Sentence sentence) {
      ...
  }
  @Override
  public void output() {
      ...
  }
  @Override
  public String getId() {
  	  ...
  }
}

ProcessEvent は、その名の通りイベントを処理するメソッドで、イベントがPEに配送されると呼び出されるメソッドである。
PEは典型的にはイベントを受け取ることによって内部状態を変え、新たなイベントを作って送出する。もちろん送出しなくてもよい。

output メソッドは、イベントとは独立にシステムが呼び出すメソッドで、典型的にはイベントの送出を行う。たとえば、イベントが来る来ないにかかわらず、一定間隔で統計情報を出力して欲しいような場合にこれを用いる。outputメソッドが呼び出されるタイミングは、設定ファイルで制御する。

Dispatcher

Dispatcher は、イベントの配送先を定めるキー属性を決定する。実際の決定は、さらに下位の構造となるPartitionerが定める。Dispatcherは Partitionerのリストで定義されている。

 <bean id="dispatcher" class=".." init-method="..">
    <property name="partitioners">
      <list>
        <ref bean="sentenceSpeechIdPartitioner"/>
        <ref bean="speechIdPartitioner"/>
      </list>
    </property>
    ...
  </bean>

Partitioner はそのPartitioner が適用されるストリームのリストと、分散に用いる属性名、つまり分散のキーとなる属性名で定義する。キー属性は複数指定できるようだが、複数指定したときの挙動は、マニュアルからは読み取れなかった。

  <bean id="sentenceSpeechIdPartitioner" class="..">
    <property name="streamNames">
      <list>
        <value>Sentence</value>
        <value>SentenceJoined</value>
      </list>
    </property>
    <property name="hashKey">
      <list>
        <value>speechId</value>
      </list>
    </property>
    ..
  </bean>

所感

実行モデルとしてはアリだと思うのだけど、いかんせんプログラムが書きにくすぎるだろう。データフローを設計し、PEを書いて、イベントを書いて、ストリームを定義して、Partitionerを設定して、と書いたり設定しなければならないモジュールが多すぎる。

記述モデルとしてではなく、上位言語のバックエンドの実行系だと考えるとよくできてるのかもしれない。評価してみよう。