且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

使用线程处理套接字

更新时间:2023-11-22 17:05:16

我过去通过为每个客户端连接定义一个 MessageHandler 类来解决这个问题,负责入站/出站消息流量。在内部,处理程序使用 BlockingQueue 实现,在其上放置出站消息(由内部工作线程)。 I / O发送方线程不断尝试从队列中读取(如果需要则阻塞)并将检索到的每条消息发送给客户端。

I have solved this problem in the past by defining a "MessageHandler" class per client connection, responsible for inbound / outbound message traffic. Internally the handler uses a BlockingQueue implementation onto which outbound messages are placed (by internal worker threads). The I/O sender thread continually attempts to read from the queue (blocking if required) and sends each message retrieved to the client.

这是一些框架示例代码(未经测试) :

Here's some skeleton example code (untested):

/**
 * Our Message definition.  A message is capable of writing itself to
 * a DataOutputStream.
 */
public interface Message {
  void writeTo(DataOutputStream daos) throws IOException;
}

/**
 * Handler definition.  The handler contains two threads: One for sending
 * and one for receiving messages.  It is initialised with an open socket.
 */    
public class MessageHandler {
  private final DataOutputStream daos;
  private final DataInputStream dais;
  private final Thread sender;
  private final Thread receiver;
  private final BlockingQueue<Message> outboundMessages = new LinkedBlockingQueue<Message>();

  public MessageHandler(Socket skt) throws IOException {
    this.daos = new DataOutputStream(skt.getOutputStream());
    this.dais = new DataInputStream(skt.getInputStream());

    // Create sender and receiver threads responsible for performing the I/O.
    this.sender = new Thread(new Runnable() {
      public void run() {
        while (!Thread.interrupted()) {
          Message msg = outboundMessages.take(); // Will block until a message is available.

          try {
            msg.writeTo(daos);
          } catch(IOException ex) {
            // TODO: Handle exception
          }
        }
      }
    }, String.format("SenderThread-%s", skt.getRemoteSocketAddress()));

    this.receiver = new Thread(new Runnable() {
      public void run() {
        // TODO: Read from DataInputStream and create inbound message.
      }
    }, String.format("ReceiverThread-%s", skt.getRemoteSocketAddress()));

    sender.start();
    receiver.start();
  }

  /**
   * Submits a message to the outbound queue, ready for sending.
   */
  public void sendOutboundMessage(Message msg) {
    outboundMessages.add(msg);
  }

  public void destroy() {
    // TODO: Interrupt and join with threads.  Close streams and socket.
  }
}

请注意,Nikolai在阻止I / O方面是正确的每个连接使用1(或2)个线程不是可扩展的解决方案,通常可以使用Java NIO编写应用程序以解决此问题。但是,实际上,除非您正在编写成千上万个客户端同时连接的企业服务器,否则这不是真正的问题。使用Java NIO编写无错误的可伸缩应用程序困难,当然不是我推荐的。

Note that Nikolai is correct in that blocking I/O using 1 (or 2) threads per connection is not a scalable solution and typically applications might be written using Java NIO to get round this. However, in reality unless you're writing an enterprise server which thousands of clients connect to simultaneously then this isn't really an issue. Writing bug-free scalable applications using Java NIO is difficult and certainly not something I'd recommend.