Javaのselectは面倒くさい

selectといっても,SQLのSELECTではなく,複数の入力を同時に待つほうのselectのお話.Java言語にはもともとselectの機能は無かった.JavaはThreadが言語仕様に統合されているので,複数の入力に対してはそれぞれThreadを割り当てれば良い,という考え方だった.確かに,10や20の入力ならそれでもいいのだが,1000とかいうオーダになったときに,個々の入力にThreadを割り当てるのはいくら何でもオーバヘッドが大きすぎる.ということで,JDK1.4でNIO(New I/O)というものが導入された.NIOはselectに対処するためだけのものではなくて,データハンドリング時のコピーオーバヘッドを低減するためのBuffer機構もNIOに含まれている.

しかし,NIOのselect周りはちょっとひどいような気がする.

  • Select可能なチャネルが限られている.任意のInputStreamに対してselectできる訳ではない.例えば,Processから取得できるstdoutに対して,selectする方法はない(らしい).
  • Selectできるチャネルの取得の仕方が統一されていない.例えば,ファイル読み出しの場合は,FileInputStreamに対して,getChannel()メソッドを実行すれば取得できるのに対して,Socket の場合は,始めからSocketChannelという形でアクセスする.
  • Selectorのへの登録インターフェイスがなんかおかしい.
    • 登録は,SelectableChannel.register(channel, MODE) のようにチャネルのメソッドで行う.
    • 削除は,Selector から取得したSelectionKeyに対して, SelectionKey.cancel()メソッドで行う.
  • そもそもselectしたいだけなのに,チャネルを使うとBuffer も使わなければならない.これが通常のInputStream, OutputStreamの世界ととても相性が悪い.

Java7ではNIO2なるものが導入されるという話.このあたりがちゃんと解決されたインターフェイスになっているといいのだが.

Old I/Oで汎用サーバ

例題として,一つのポートでacceptして,複数のクライアントに対して並行して機能するサーバを書いてみよう.Handlerクラスを定義してそのファクトリを渡すようにしてある.

public class OIOServer {
  static public interface HandlerFactory {
     Handler create(Socket s) throws IOException;
  }
  static public abstract class Handler implements Runnable {
    InputStream is;
    OutputStream os;
    Handler(Socket s) throws IOException{
      this.is = s.getInputStream();
      this.os = s.getOutputStream();
    }
  }	
  ServerSocket ss;
  HandlerFactory factory;
	
  public OIOServer(int port, HandlerFactory factory) 
  throws IOException{
    this.ss = new ServerSocket(port);
    this.factory = factory;
  }

  public void start() throws IOException {
    while (true)
      (new Thread(factory.create(ss.accept()))).start();
  }	
}

とても簡潔に書ける.使い方はこんな感じ.Echoサーバを作っている.

public class OIOEcho {
  static class EchoHandler extends OIOServer.Handler{
    EchoHandler(Socket s) throws IOException {
      super(s);
    }
    public void run() {
      try {
        BufferedReader br = 
          new BufferedReader(new InputStreamReader(is));
        PrintWriter pw = new PrintWriter(os, true);
        String line;
        while ((line = br.readLine()) != null) 
          pw.println(line);
      } catch (IOException e) {
        e.printStackTrace();
      }
    }
  }
  public static void main(String[] args) throws IOException {
    OIOServer server = new OIOServer(10010, 
      new OIOServer.HandlerFactory(){
        public OIOServer.Handler create(final Socket s) 
        throws IOException {
          return new EchoHandler(s);
        }
      });
    server.start();
  }
}

New IOで汎用サーバ

New I/Oで同じことをするものを書いてみる.まずは準備として,SocketChannelをInputStream / OutputStreamとして扱うためのラッパクラスを作る.じつはどこかにあるのかも知れないけど見つけられなかったので.

class ChannelInputStream extends InputStream {
  SocketChannel channel;
  ChannelInputStream(SocketChannel channel){
    this.channel = channel;
  }
	
  public int read() throws IOException {
    byte [] buf = new byte[1];		
    try {
      int readLen = read(buf);
      if (readLen > 0)
        return buf[0];
    } catch (IOException e) {}
    return -1;
  }
  public int read(byte b[], int off, int len) 
  throws IOException {
    ByteBuffer buf = ByteBuffer.allocate(len);
    int readLen = channel.read(buf);
    if (readLen <= 0)
      return -1;
    buf.flip();
    buf.get(b, off, readLen);
      return readLen;
  }
}

class ChannelOutputStream extends OutputStream {
  SocketChannel channel;
  ChannelOutputStream(SocketChannel channel){
    this.channel = channel;
  }
  public void write(int b) throws IOException {
    byte [] ba = new byte[1];
    ba[0] = (byte)(b & 0xff);
    write(ba, 0, 1);
  }
  public void write(byte b[], int off, int len) 
  throws IOException {
    ByteBuffer buf = ByteBuffer.allocate(len);
    buf.put(b, off, len);
    buf.flip();
    channel.write(buf);
  }
}

次に,デフォルトのSelectorは低レベルで使いにくいので,Listenerベースのインターフェイスを持つMySelectorを定義する.

class MySelector {
  interface SelectListener {
    // return true if it want to stay registered in the selector
    boolean handle(SocketChannel channel) throws IOException;
  }

  Selector selector;
  Map<SelectableChannel, SelectListener> map;
  
  MySelector() throws IOException{
    this.selector = Selector.open();
    this.map = new HashMap<SelectableChannel, SelectListener>();
  }
  
  void accept(int port, SelectListener listner) throws IOException {
    ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);
        serverChannel.socket().bind(new InetSocketAddress(port));
    serverChannel.register(selector, SelectionKey.OP_ACCEPT);
    map.put(serverChannel, listner);
  }
  
  void readRegister(SocketChannel c, SelectListener listener) 
  throws IOException {
    c.register(selector, SelectionKey.OP_READ);
    map.put(c, listener);
  }
  
  boolean select() throws IOException {
    int num = selector.select();
    if (selector.selectedKeys().size() == 0)
      return false;
    for (SelectionKey key: selector.selectedKeys()) {
      SocketChannel sc;
      if (key.isAcceptable()) { 
        sc = ((ServerSocketChannel)key.channel()).accept();
        if (sc == null) 
          continue;
      } else {
        sc = (SocketChannel)(key.channel());      
      }
      SelectListener handler = map.get(key.channel());
      SelectableChannel c = key.channel();
      if (! handler.handle(sc)){
        key.cancel();
        map.remove(c);
      }
    }
    return true;
  }
}

ここまで準備ができるとあとは簡単.汎用サーバはこんな風に書ける.

public class NIOServer {
  MySelector mySelector;

  static public interface HandlerFactory {
    Handler create(final SocketChannel s) throws IOException;
  }
  static public abstract class Handler implements MySelector.SelectListener {
    InputStream is;
    OutputStream os;
    Handler(SocketChannel sc) throws IOException{
      this.is = new ChannelInputStream(sc);      
      this.os = new ChannelOutputStream(sc);
    }
  }

  public NIOServer(int port, final HandlerFactory factory) 
  throws IOException{
    mySelector = new MySelector();
    mySelector.accept(port, new MySelector.SelectListener(){
      public boolean handle(SocketChannel channel) 
      throws IOException {
        channel.configureBlocking(false);
        mySelector.readRegister(channel, factory.create(channel));
        return true;
      }
    });
  }

  public void start() throws IOException {
    while (mySelector.select()) ;      
  }
}  

これ使ったEchoサーバはこんな感じ.

public class NIOEcho {
  static class EchoHandler extends NIOServer.Handler{
    BufferedReader br;
    PrintWriter pw;
    EchoHandler(SocketChannel sc) throws IOException {
      super(sc);
      br = new BufferedReader(new InputStreamReader(is));
      pw = new PrintWriter(new OutputStreamWriter(os), true);
    }

    public boolean handle(SocketChannel channel)
        throws IOException {
      String line;
      if ((line = br.readLine()) != null) {
        pw.println(line);
        return true;
      }
      return false;
    }
  }
  
  public static void main(String[] args) throws IOException {
    NIOServer server = 
         new NIOServer(10010, new NIOServer.HandlerFactory(){
      public NIOServer.Handler create(final SocketChannel s)    throws IOException {
        return new EchoHandler(s);
      }
    });
    server.start();
  }
}

まとめ

JavaのIOストリーム周りの設計はよくできていると思う.NIOはNIOでそれなりによくできているのだと思うのだが,両方合わせてみると破綻していると言わざるを得ない.両方合わせて完全にリファクタリングしなければならないのではないだろうか.NIO2がそういうものになっているといいんだけど.