[Java] I/O에서 New I/O로의 흐름

Java I/O

New I/O(이하 nio)이전의 io는 모두 blocking으로 동작한다. Socket socket = serverSocket.accept();, in = socket.getInputStream(); len = in.read(buff); 모두 동작 중에 blocking이 된다. 이 구조에서는 성능 문제가 있어 client 마다 thread를 생성해서 1 대 1 대응을 했다. 이러한 모델에서는 client가 많아지면 관리해야하는 thread가 많아져서 성능 향상의 한계점이 명확하다.

이를 일차적으로 해결하는 방법이 Polling이다. Socket socket = serverSocket.accept();으로 생성되는 client와 연결되는 socket을 socketList에 저장하고, 각 thread가 특정 수의 socketList에 대응하는 형태를 가진다. 여기에 더해 thread pool을 적용하면 이전보다는 성능은 향상되지만 여전히 문제점이 있다. 바로 in = socket.getInputStream(); len = in.read(buff);에서 read()가 blocking된다는 것이다. 이를 위해서 java 1.4에서 non-blocking read가 추가되었다.

Java New I/O

Selector::accept만 non-blocking

논블럭킹 모드인 경우

  • 클라이언트의 연결 요청이 없을 경우 ServerSocketChannel.accept() 메소드는 곧바로 null을 리턴한다.
  • 채널로부터 읽어올 데이터가 없는 경우 SocketChannel.read() 메소드는 곧바로 리턴되며, 인자로 전달한 ByteBuffer에는 어떤 내용도 입력되지 않는다.

socketChannel을 non-block으로 사용하기 위해서는 아래와 같이 socketChannel.configureBlocking(false);를 지정해주면 된다.

1
2
3
4
5
6
7
8
while(true) {
    SocketChannel socketChannel = serverChannel.accept();
    // 소켓채널 논블럭킹 모드 지정
    socketChannel.configureBlocking(false); 
    socketList.addSocket(socketChannel);
    ...
    ...
}

이렇게하면 socketList에 대응하는 thread에서 read를 해도 block 되지 않는다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
while(true) {
    Thread.sleep(100); // 0.1초간 대기
    
    for (int i = 0 ; i < socketList.size() ; i++) {
        // 클라이언트의 요청을 차례대로 처리
        SocketChannel socket = socketList.getSocket(i);
        
        buffer.clear();
        socket.read(buffer); // 블럭킹 되지 않음
        if (buffer.position() > 0) {
            ... // 소켓에서 읽어온 데이터 처리
        }
    }
}

그런데 위 코드에서 SocketChannel socketChannel = serverChannel.accept();가 non-block으로 수행되어 임의의 시간에 접속하는 client에 빠르게 대응할 수는 있지만, while loop을 돌면서 cpu time을 소모하기 때문에 불필요한 cpu 사용량이 증가한다. 이를 막기 위해서 non-block을 지원하는 channel에서 Selector를 등록할 수 있는 메서드 register를 지원한다.

먼저 객체를 생성할 때 #.open()를 호출하는 nio의 특성에 따라 Selector selector = Selector.open();을 수행한다. 그리고 channel이 생성되면 이 selector에 channel을 등록한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
ServerSocketChannel ssc = null;
  ..
  try {
    ssc = ServerSocketChannel.open();
    ssc.blockingConfigure(false);
    
    Selector selector = Selector.open();
    
    ssc.register(selector, SelectionKey.OP_ACCEPT, null);
    
    ...
  } catch(..) {
    ...
  }

selector에 등록 후 이벤트가 발생한 채널을 찾아서 로직을 수행하는 과정은 아래와 같이 진행된다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 1. Selector 생성   Selector selector = Selector.open();   
// 2. Selector를 등록할 수 있는 채널 생성
ServerSocketChannel channel = ServerSocketChannel.open();
...
// 3. 채널에 Selector 등록
channel.register(selector, SelectionKey.OP_ACCEPT, null);
...

while(true) {
// 4. selector를 이용하여 채널의 이벤트 대기
  int readyKey = selector.readyOps();

// 5. readyKey가 0 이상이면 이벤트가 발생한 것으로 처리
  if (readyKey > 0) {

// 6. selector로부터 채널에서 발생한 이벤트와 관련된 SelectionKey Set 구함
      Set selectionKeySet = selector.selectedKeys();

// 7. Set에서 각 SelectionKey를 차례대로 읽어와
      Iterator iter = selectionKeySet.iterator();
      while(iter.hasNext()) {
        SelectionKey selectionKey = (SelectionKey)iter.next();
        
// 8. SelectionKey로부터 채널을 구함
        ServerSocketChannel relatedChannel =
            (ServerSocketChannel)selectionKey.channel();
        
// 9. 채널을 사용하여 알맞은 작업 수행
        ...
      }
  }
}

accept는 해결되었지만 여전히 문제점이 존재한다. 아래에서 등록하는 selector는 ServerSocketChannel이 등록되는 accept selector이다. 위 코드 3번에서 accept selector에 ServerSocketChannel을 등록 후, 4~8번을 동해서 client와 연결된 socketchannel을 구할 수 있다. 그런데 9번 채널을 사용하여 알맞은 작업을 수행하느 과정을 일반적으로 다른 accept를 수행하는 Thread와 다른 thread에서 수행된다. 이 때 thread는 cpu의 core만큼 생성되서 thread pool을 이룬다. thread pool 속의 단일 thread들은 각자가 관리하는 socketchannel list가 있다. 그리고 이들 channel에 데이터가 도착했을 때 return되는 read selector를 가지고 있어야 한다. 바로 이 시점에서 accpet selector에는 register된 이후 read selector에는 등록되기 이전에 데이터가 들어오면 read selector는 이를 알 수가 없어 계속 block 상태에 존재한다. 따라서 accept selector에 register 이후 read selector를 return시켜주는 read-selector.wakeup()이 필요하다.

간단히 설명하면

  1. ServerSocketChannel thread에서 accept 이후 socketchannel을 channelQueue에 등록
  2. ClientProcessor.processSocketChannelQueue에서 ClientProcessor.readSelector에 register
  3. ClientProcessor.processRequest의 readSelector가 이벤트를 기다리고 있는 시점에서 1번이 수행되면 새로운 socketchannel은 readSelector에 등록되지 못함
  4. 따라서 1번에서 channelQueue에 socketchannel을 add하고 ClientProcessor.readSelector를 wakeup 시켜줘야함.
  5. 이를 위해서 ServerSocketChannel thread에서 channelQueue를 생성할 때 ClientProcessor가 this.channelQueue.setReadSelector(readSelector);를 할 수 있도록 구현해둠.
  6. 그러면 ServerSocketChannel thread는 channelQueue에 channel add 이후 readSelector.wakeup();를 호출
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public class ClientProcessor extends Thread {
    private SocketChannelQueue channelQueqe
    private Selector readSelector;
    ... 

    public ClientProcessor(SocketChannelQueue channelQueue, File rootDirectory)
    throws IOException {
        this.channelQueue = channelQueue;
        ...
        readSelector = Selector.open();
        this.channelQueue.setReadSelector(readSelector);
    }

    public void run() {
        while(true) {
          try {
              processSocketChannelQueue();
              int numKeys = readSelector.select();
              if (numKeys > 0) {
                processRequest();
              }
          } catch (IOException e) {
          }
        }
    }
    
    private void processSocketChannelQueue() throws IOException {
        SocketChannel socketChannel = null;
        while ( (socketChannel = channelQueue.getFirst()) != null) {
          socketChannel.configureBlocking(false);
          socketChannel.register( readSelector, SelectionKey.OP_READ, new StringBuffer());
        }
    }
    
    private void processRequest() {
        Iterator iter = readSelector.selectedKeys().iterator();
        while( iter.hasNext() ) {
          SelectionKey key = (SelectionKey)iter.next();
          iter.remove();
          
          SocketChannel socketChannel = (SocketChannel)key.channel();
          
          try {
              socketChannel.read(readBuffer);
              readBuffer.flip();
              String result = iso8859decoder.decode(readBuffer).toString();
              StringBuffer requestString = (StringBuffer)key.attachment();
              requestString.append(result);
                
              readBuffer.clear();
              
              if(result.endsWith("\n\n") || result.endsWith("\r\n\r\n")) {
                completeRequest(requestString.toString(), socketChannel);
              }
          } catch (IOException e) {
              // 에러 발생
          }
        }
    }

AsynchronousServerSocketChannel::데이터 처리되는 부분도 non-blocking

Usage Example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
final AsynchronousServerSocketChannel listener =
    AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(5000));

listener.accept(null, new CompletionHandler<AsynchronousSocketChannel,Void>() {
    public void completed(AsynchronousSocketChannel ch, Void att) {
        // accept the next connection
        listener.accept(null, this);

        // handle this connection
        handle(ch);
    }
    public void failed(Throwable exc, Void att) {
        ...
    }
});

Tags: ,

Categories:

Updated:

Leave a comment