selectといっても,SQLのSELECTではなく,複数の入力を同時に待つほうのselectのお話.Java言語にはもともとselectの機能は無かった.JavaはThreadが言語仕様に統合されているので,複数の入力に対してはそれぞれThreadを割り当てれば良い,という考え方だった.確かに,10や20の入力ならそれでもいいのだが,1000とかいうオーダになったときに,個々の入力にThreadを割り当てるのはいくら何でもオーバヘッドが大きすぎる.ということで,JDK1.4でNIO(New I/O)というものが導入された.NIOはselectに対処するためだけのものではなくて,データハンドリング時のコピーオーバヘッドを低減するためのBuffer機構もNIOに含まれている.
- Select可能なチャネルが限られている.任意のInputStreamに対してselectできる訳ではない.例えば,Processから取得できるstdoutに対して,selectする方法はない(らしい).
- Selectできるチャネルの取得の仕方が統一されていない.例えば,ファイル読み出しの場合は,FileInputStreamに対して,getChannel()メソッドを実行すれば取得できるのに対して,Socket の場合は,始めからSocketChannelという形でアクセスする.
- Selectorのへの登録インターフェイスがなんかおかしい.
- 登録は,SelectableChannel.register(channel, MODE) のようにチャネルのメソッドで行う.
- 削除は,Selector から取得したSelectionKeyに対して, SelectionKey.cancel()メソッドで行う.
- そもそもselectしたいだけなのに,チャネルを使うとBuffer も使わなければならない.これが通常のInputStream, OutputStreamの世界ととても相性が悪い.
Old I/Oで汎用サーバ
public class OIOServer { static public interface HandlerFactory { Handler create(Socket s) throws IOException; } static public abstract class Handler implements Runnable { InputStream is; OutputStream os; Handler(Socket s) throws IOException{ = s.getInputStream(); this.os = s.getOutputStream(); } } ServerSocket ss; HandlerFactory factory; public OIOServer(int port, HandlerFactory factory) throws IOException{ = new ServerSocket(port); this.factory = factory; } public void start() throws IOException { while (true) (new Thread(factory.create(ss.accept()))).start(); } }
public class OIOEcho { static class EchoHandler extends OIOServer.Handler{ EchoHandler(Socket s) throws IOException { super(s); } public void run() { try { BufferedReader br = new BufferedReader(new InputStreamReader(is)); PrintWriter pw = new PrintWriter(os, true); String line; while ((line = br.readLine()) != null) pw.println(line); } catch (IOException e) { e.printStackTrace(); } } } public static void main(String[] args) throws IOException { OIOServer server = new OIOServer(10010, new OIOServer.HandlerFactory(){ public OIOServer.Handler create(final Socket s) throws IOException { return new EchoHandler(s); } }); server.start(); } }
New IOで汎用サーバ
New I/Oで同じことをするものを書いてみる.まずは準備として,SocketChannelをInputStream / OutputStreamとして扱うためのラッパクラスを作る.じつはどこかにあるのかも知れないけど見つけられなかったので.
class ChannelInputStream extends InputStream { SocketChannel channel; ChannelInputStream(SocketChannel channel){ = channel; } public int read() throws IOException { byte [] buf = new byte[1]; try { int readLen = read(buf); if (readLen > 0) return buf[0]; } catch (IOException e) {} return -1; } public int read(byte b[], int off, int len) throws IOException { ByteBuffer buf = ByteBuffer.allocate(len); int readLen =; if (readLen <= 0) return -1; buf.flip(); buf.get(b, off, readLen); return readLen; } } class ChannelOutputStream extends OutputStream { SocketChannel channel; ChannelOutputStream(SocketChannel channel){ = channel; } public void write(int b) throws IOException { byte [] ba = new byte[1]; ba[0] = (byte)(b & 0xff); write(ba, 0, 1); } public void write(byte b[], int off, int len) throws IOException { ByteBuffer buf = ByteBuffer.allocate(len); buf.put(b, off, len); buf.flip(); channel.write(buf); } }
class MySelector { interface SelectListener { // return true if it want to stay registered in the selector boolean handle(SocketChannel channel) throws IOException; } Selector selector; Map<SelectableChannel, SelectListener> map; MySelector() throws IOException{ this.selector =; = new HashMap<SelectableChannel, SelectListener>(); } void accept(int port, SelectListener listner) throws IOException { ServerSocketChannel serverChannel =; serverChannel.configureBlocking(false); serverChannel.socket().bind(new InetSocketAddress(port)); serverChannel.register(selector, SelectionKey.OP_ACCEPT); map.put(serverChannel, listner); } void readRegister(SocketChannel c, SelectListener listener) throws IOException { c.register(selector, SelectionKey.OP_READ); map.put(c, listener); } boolean select() throws IOException { int num =; if (selector.selectedKeys().size() == 0) return false; for (SelectionKey key: selector.selectedKeys()) { SocketChannel sc; if (key.isAcceptable()) { sc = ((ServerSocketChannel); if (sc == null) continue; } else { sc = (SocketChannel)(; } SelectListener handler = map.get(; SelectableChannel c =; if (! handler.handle(sc)){ key.cancel(); map.remove(c); } } return true; } }
public class NIOServer { MySelector mySelector; static public interface HandlerFactory { Handler create(final SocketChannel s) throws IOException; } static public abstract class Handler implements MySelector.SelectListener { InputStream is; OutputStream os; Handler(SocketChannel sc) throws IOException{ = new ChannelInputStream(sc); this.os = new ChannelOutputStream(sc); } } public NIOServer(int port, final HandlerFactory factory) throws IOException{ mySelector = new MySelector(); mySelector.accept(port, new MySelector.SelectListener(){ public boolean handle(SocketChannel channel) throws IOException { channel.configureBlocking(false); mySelector.readRegister(channel, factory.create(channel)); return true; } }); } public void start() throws IOException { while ( ; } }
public class NIOEcho { static class EchoHandler extends NIOServer.Handler{ BufferedReader br; PrintWriter pw; EchoHandler(SocketChannel sc) throws IOException { super(sc); br = new BufferedReader(new InputStreamReader(is)); pw = new PrintWriter(new OutputStreamWriter(os), true); } public boolean handle(SocketChannel channel) throws IOException { String line; if ((line = br.readLine()) != null) { pw.println(line); return true; } return false; } } public static void main(String[] args) throws IOException { NIOServer server = new NIOServer(10010, new NIOServer.HandlerFactory(){ public NIOServer.Handler create(final SocketChannel s) throws IOException { return new EchoHandler(s); } }); server.start(); } }