Проблема с SocketChannel

Я написал приложение, которое подключается к серверу по TCP с помощью SocketChannel, но у меня есть две проблемы:

  1. Первый незначительный - иногда по какой-то неизвестной причине я отправляю сцепленные сообщения и
  2. второе критично - периодически приложение перестает отправлять/принимать сообщения

Есть идеи, что не так?

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SocketSelectorWorker extends Thread {
  private static final transient Logger log = LoggerFactory.getLogger(SocketSelectorWorker.class);
  private ExecutorService executorService = Executors.newFixedThreadPool(3);
  private final Queue<byte[]> messages;
  private Selector selector;

  public SocketSelectorWorker(Queue messages, Selector selector) {
    super();
    this.selector = selector;
    this.session = session;
    this.messages = messages;
  }

  @Override
  public void run() {
    super.run();
    while (isConnectionAlive()) {
      try {
        // Wait for an event
        selector.select();
      } catch (IOException e) {
        log.error("Selector error: {}", e.toString());
        log.debug("Stacktrace: ", e);
        session.closeConnection();
        break;
      }
      handleSelectorkeys(selector.selectedKeys());
    }
    executorService.shutdown();
    log.debug("worker stopped");
  }

  private void handleSelectorkeys(Set<SelectionKey> selectedKeys) {
    for (SelectionKey selKey : selector.selectedKeys()) {
      selector.selectedKeys().remove(selKey);
      try {
        processSelectionKey(selKey);
      } catch (IOException e) {
        // Handle error with channel and unregister
        selKey.cancel();
        log.error("Selector error: {}", e.toString());
        log.debug("Stacktrace: ", e);
      }
    }
  }

  public void processSelectionKey(SelectionKey selKey) throws IOException {

    // Since the ready operations are cumulative,
    // need to check readiness for each operation
    if (selKey.isValid() && selKey.isConnectable()) {
      log.debug("connectable");
      // Get channel with connection request
      SocketChannel sChannel = (SocketChannel) selKey.channel();

      boolean success = sChannel.finishConnect();
      if (!success) {
        // An error occurred; handle it
        log.error("Error on finish");
        // Unregister the channel with this selector
        selKey.cancel();
      }
    }

    if (selKey.isValid() && selKey.isReadable()) {
      log.debug("readable");
      readMessage(selKey);
    }

    if (selKey.isValid() && selKey.isWritable()) {
      log.debug("writable");
      writeMessage(selKey);
    }

    if (selKey.isValid() && selKey.isAcceptable()) {
      log.debug("Acceptable");
    }

  }

  private void writeMessage(SelectionKey selKey) throws IOException {
    byte[] message = messages.poll();
    if (message == null) {
      return;
    }

    // Get channel that's ready for more bytes
    SocketChannel socketChannel = (SocketChannel) selKey.channel();

    // See Writing to a SocketChannel
    // Create a direct buffer to get bytes from socket.
    // Direct buffers should be long-lived and be reused as much as
    // possible.

    ByteBuffer buf = ByteBuffer.allocateDirect(1024);// .allocateDirect(toSend.getBytes().length);

    // try {
    // Fill the buffer with the bytes to write;
    // see Putting Bytes into a ByteBuffer
    // buf.put((byte)0xFF);

    buf.clear();
    buf.put(new byte[] { 0x02 });
    buf.put(message);
    buf.put(new byte[] { 0x03 });
    // Prepare the buffer for reading by the socket
    buf.flip();

    // Write bytes
    int numBytesWritten = socketChannel.write(buf);
    log.debug("Written: {}", numBytesWritten);

    while (buf.hasRemaining()) {
      numBytesWritten = socketChannel.write(buf);
      log.debug("Written remining: {}", numBytesWritten);
    }
  }

  private void readMessage(SelectionKey selKey) throws IOException {
    // Get channel with bytes to read
    SocketChannel socketChannel = (SocketChannel) selKey.channel();

    // See Reading from a SocketChannel
    // Create a direct buffer to get bytes from socket.
    // Direct buffers should be long-lived and be reused as much as
    // possible.
    ByteBuffer buf = ByteBuffer.allocateDirect(2048);
    Charset charset = Charset.forName("UTF-8");// Charset.forName("ISO-8859-1");
    CharsetDecoder decoder = charset.newDecoder();

    // try {
    // Clear the buffer and read bytes from socket
    buf.clear();
    int numBytesRead = socketChannel.read(buf);

    if (numBytesRead == -1) {
      // No more bytes can be read from the channel
      // socketChannel.close();
      return;
    }
    log.debug("Read bytes: {}", numBytesRead);
    // To read the bytes, flip the buffer
    buf.flip();

    String result = decoder.decode(buf).toString();

    log.debug("Read string: {}", result);
    //processMessage(result.getBytes());
  }

}

person Diyko    schedule 27.01.2015    source источник


Ответы (1)


  1. TCP не имеет границ сообщений. Это протокол байтового потока. Любые границы сообщений зависят от вас.
  2. Вы неправильно обрабатываете клавиши выбора. Вы должны удалить через итератор во время итерации, а не через набор. Это означает, что вы не можете использовать расширенный цикл for. Вероятно, вы пропускаете ключи.

  3. Когда вы получите -1 от read(), вы должны закрыть канал.

  4. Когда вы получаете IOException, недостаточно просто отменить ключ. Вы должны закрыть канал, который NB автоматически отменяет ключ.

person user207421    schedule 27.01.2015