且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

Socket单客户端/服务器连接,服务器可以发送多次,客户端只能发送一次

更新时间:2022-02-21 22:15:20

为什么不使用异步套接字?这是一些代码:

Why not use asynchronous sockets? Here is some code:

// Server socket
class ControllerSocket : Socket, IDisposable
{
    // MessageQueue queues messages to be processed by the controller
    public Queue<MessageBase> messageQueue = null;

    // This is a list of all attached clients
    public List<Socket> clients = new List<Socket>();

    #region Events
    // Event definitions handled in the controller
    public delegate void SocketConnectedHandler(Socket socket);
    public event SocketConnectedHandler SocketConnected;

    public delegate void DataRecievedHandler(Socket socket, int bytesRead);
    public event DataRecievedHandler DataRecieved;

    public delegate void DataSentHandler(Socket socket, int length);
    public event DataSentHandler DataSent;

    public delegate void SocketDisconnectedHandler();
    public event SocketDisconnectedHandler SocketDisconnected;
    #endregion

    #region Constructor
    public ControllerSocket(int port)
        : base(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp)
    {
        // Create the message queue
        this.messageQueue = new Queue<MessageBase>();

        // Acquire the host address and port, then bind the server socket
        IPHostEntry ipHostInfo = Dns.GetHostEntry(Dns.GetHostName());
        IPAddress ipAddress = ipHostInfo.AddressList[0];
        IPEndPoint localEndPoint = new IPEndPoint(ipAddress, port);
        this.Bind(localEndPoint);
    }
    #endregion

    // Starts accepting client connections
    public void StartListening()
    {
        this.Listen(100);
        this.BeginAccept(AcceptCallback, this);
    }

    // Connects to a client
    private void AcceptCallback(IAsyncResult ar)
    {
        Socket listener = (Socket)ar.AsyncState;
        Socket socket = listener.EndAccept(ar);

        try
        {
            // Add the connected client to the list
            clients.Add(socket);

            // Notify any event handlers
            if (SocketConnected != null)
                SocketConnected(socket);

            // Create an initial state object to hold buffer and socket details
            StateObject state = new StateObject();
            state.workSocket = socket;
            state.BufferSize = 8192;

            // Begin asynchronous read
            socket.BeginReceive(state.Buffer, 0, state.BufferSize, 0,
                ReadCallback, state);
        }
        catch (SocketException)
        {
            HandleClientDisconnect(socket);
        }
        finally
        {
            // Listen for more client connections
            StartListening();
        }
    }

    // Read data from the client
    private void ReadCallback(IAsyncResult ar)
    {
        StateObject state = (StateObject)ar.AsyncState;
        Socket socket = state.workSocket;

        try
        {
            if (socket.Connected)
            {
                // Read the socket
                int bytesRead = socket.EndReceive(ar);

                // Deserialize objects
                foreach (MessageBase msg in MessageBase.Receive(socket, bytesRead, state))
                {
                    // Add objects to the message queue
                    lock (this.messageQueue)
                        messageQueue.Enqueue(msg);
                }

                // Notify any event handlers
                if (DataRecieved != null)
                    DataRecieved(socket, bytesRead);

                // Asynchronously read more client data
                socket.BeginReceive(state.Buffer, state.readOffset, state.BufferSize - state.readOffset, 0,
                    ReadCallback, state);
            }
            else
            {
                HandleClientDisconnect(socket);
            }
        }
        catch (SocketException)
        {
            HandleClientDisconnect(socket);
        }
    }

    // Send data to a specific client
    public void Send(Socket socket, MessageBase msg)
    {
        try
        {
            // Serialize the message
            byte[] bytes = msg.Serialize();

            if (socket.Connected)
            {
                // Send the message
                socket.BeginSend(bytes, 0, bytes.Length, 0, SendCallback, socket);
            }
            else
            {
                HandleClientDisconnect(socket);
            }
        }
        catch (SocketException)
        {
            HandleClientDisconnect(socket);
        }
    }

    // Broadcast data to all clients
    public void Broadcast(MessageBase msg)
    {
        try
        {
            // Serialize the message
            byte[] bytes = msg.Serialize();

            // Process all clients
            foreach (Socket client in clients)
            {
                try
                {
                    // Send the message
                    if (client.Connected)
                    {
                        client.BeginSend(bytes, 0, bytes.Length, 0, SendCallback, client);
                    }
                    else
                    {
                        HandleClientDisconnect(client);
                    }
                }
                catch (SocketException)
                {
                    HandleClientDisconnect(client);
                }
            }
        }
        catch (Exception e)
        {
            // Serialization error
            Console.WriteLine(e.ToString());
        }
    }

    // Data sent to a client socket
    private void SendCallback(IAsyncResult ar)
    {
        try
        {
            Socket socket = (Socket)ar.AsyncState;

            if (socket.Connected)
            {
                // Complete sending the data
                int bytesSent = socket.EndSend(ar);

                // Notify any attached handlers
                if (DataSent != null)
                    DataSent(socket, bytesSent);
            }
            else
            {
                HandleClientDisconnect(socket);
            }
        }
        catch (Exception e)
        {
            Console.WriteLine(e.ToString());
        }
    }

    private void HandleClientDisconnect(Socket client)
    {
        // Client socket may have shutdown unexpectedly
        if (client.Connected)
            client.Shutdown(SocketShutdown.Both);

        // Close the socket and remove it from the list
        client.Close();
        clients.Remove(client);

        // Notify any event handlers
        if (SocketDisconnected != null)
            SocketDisconnected();
    }

    // Close all client connections
    public void Dispose()
    {
        foreach (Socket client in clients)
        {
            if (client.Connected)
                client.Shutdown(SocketShutdown.Receive);

            client.Close();
        }
    }
}

// Client socket

        class ReceiverSocket : Socket
        {
            // MessageQueue queues messages to be processed by the controller
            public Queue<MessageBase> messageQueue = null;

            #region Events
            // Event definitions handled in the controller
            public delegate void SocketConnectedHandler(Socket socket);
            public event SocketConnectedHandler SocketConnected;

            public delegate void DataRecievedHandler(Socket socket, MessageBase msg);
            public event DataRecievedHandler DataRecieved;

            public delegate void DataSentHandler(Socket socket, int length);
            public event DataSentHandler DataSent;

            public delegate void SocketDisconnectedHandler();
            public event SocketDisconnectedHandler SocketDisconnected;

            private IPEndPoint remoteEP = null;
            #endregion

            #region Constructor
            public ReceiverSocket(int port)
                : base(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp)
            {
                // Create the message queue
                messageQueue = new Queue<MessageBase>();

                // Acquire the host address and port
                IPHostEntry ipHostInfo = Dns.GetHostEntry(Dns.GetHostName());
                IPAddress ipAddress = ipHostInfo.AddressList[0];
                remoteEP = new IPEndPoint(ipAddress, port);
            }
            #endregion

            // Connect to the server
            public void Connect()
            {
                this.BeginConnect(remoteEP, ConnectCallback, this);
            }

            // Server connected
            private void ConnectCallback(IAsyncResult ar)
            {
    //            Console.WriteLine("Connect Callback");

                try
                {
                    Socket client = (Socket)ar.AsyncState;
                    if (client.Connected)
                    {
                        client.EndConnect(ar);
    //                    Console.WriteLine("Connect Callback - Connected");

                        // Create an initial state object to hold buffer and socket details
                        StateObject state = new StateObject();
                        state.workSocket = client;
                        state.BufferSize = 8192;

                        // Notify any event handlers
                        if (SocketConnected != null)
                            SocketConnected(client);

                        // Begin asynchronous read
                        client.BeginReceive(state.Buffer, state.readOffset, state.BufferSize - state.readOffset, 0,
                            ReceiveCallback, state);
                    }
                    else
                    {
    //                    Console.WriteLine("Connect Callback - Reconnect");
                        Thread.Sleep(5000);
                        Connect();
                    }
                }
                catch (Exception ex)
                {
                    // Attempt server reconnect
                    Reconnect();
                }
            }

            // Read data from the server
            private void ReceiveCallback(IAsyncResult ar)
            {
                try
                {
                    StateObject state = (StateObject)ar.AsyncState;
                    Socket client = state.workSocket;

                    // Read the socket
                    if (client.Connected)
                    {
                        int bytesRead = client.EndReceive(ar);

                        // Deserialize objects
                        foreach (MessageBase msg in MessageBase.Receive(client, bytesRead, state))
                        {
                            // Add objects to the message queue
                            lock (this.messageQueue)
                                this.messageQueue.Enqueue(msg);
                        }

                        // Notify any event handlers
                        if (DataRecieved != null)
                            DataRecieved(client, null);

                        // Asynchronously read more server data
                        client.BeginReceive(state.Buffer, state.readOffset, state.BufferSize - state.readOffset, 0,
                            ReceiveCallback, state);
                    }
                    else
                    {
                        Reconnect();
                    }
                }
                catch (SocketException)
                {
                    // Attempt server reconnect
                    Reconnect();
                }
            }

            public void Send(MessageBase msg)
            {
                try
                {
                    // Serialize the message
                    byte[] bytes = msg.Serialize();

                    if (this.Connected)
                    {
                        // Send the message
                        this.BeginSend(bytes, 0, bytes.Length, 0, SendCallback, this);
                    }
                    else
                    {
                        Reconnect();
                    }
                }
                catch (SocketException sox)
                {
                    // Attempt server reconnect
                    Reconnect();
                }
                catch (Exception ex)
                {
                    int i = 0;
                }
            }

            private void SendCallback(IAsyncResult ar)
            {
                try
                {
                    Socket client = (Socket)ar.AsyncState;

                    if (client.Connected)
                    {
                        // Complete sending the data
                        int bytesSent = client.EndSend(ar);

                        // Notify any event handlers
                        if (DataSent != null)
                            DataSent(client, bytesSent);
                    }
                    else
                    {
                        Reconnect();
                    }
                }
                catch (SocketException)
                {
                    Reconnect();
                }
            }

            // Attempt to reconnect to the server
            private void Reconnect()
            {
                try
                {
                    // Disconnect the original socket
                    if (this.Connected)
                        this.Disconnect(true);

                    this.Close();

                    // Notify any event handlers
                    if (SocketDisconnected != null)
                        SocketDisconnected();
                }
                catch (Exception e)
                {
    //                Console.WriteLine(e.ToString());
                }
            }
        }

    // Encapsulates information about the socket and data buffer
public class StateObject
{
    public Socket workSocket = null;
    public int readOffset = 0;
    public StringBuilder sb = new StringBuilder();

    private int bufferSize = 0;
    public int BufferSize
    {
        set
        {
            this.bufferSize = value;
            buffer = new byte[this.bufferSize];
        }

        get { return this.bufferSize; }
    }

    private byte[] buffer = null;
    public byte[] Buffer
    {
        get { return this.buffer; }
    }
}

您需要做的就是插入您自己的消息.

All you need to do is plug in your own messages.

请记住,套接字流可能(并且大多数情况下)包含部分消息.因此,***将消息的长度作为消息的第一个字节发送.您还必须通过在读取之间组装部分消息来相应地管理读取缓冲区.查看以下消息基类.

Keep in mind that the socket stream may (and most of the time do) contain partial messages. Therefore it is good practice to send the length of the message as the first bytes of a message. You also have to manage the read buffer accordingly by assembling partial messages between reads. Check out the following message base class.

public partial class MessageBase
{
    // Virtual Execute method following the Command pattern
    public virtual string Execute(Socket socket) { return string.Empty; }

    protected virtual bool MustEncrypt
    {
        get { return false; }
    }

    // Binary serialization
    public byte[] Serialize()
    {
        using (MemoryStream stream = new MemoryStream())
        {
            using (DeflateStream ds = new DeflateStream(stream, CompressionMode.Compress, true))
            {
                BinaryFormatter formatter = new BinaryFormatter();
                formatter.Serialize(ds, this);
                ds.Flush();
            }

            byte[] bytes = stream.GetBuffer();
            byte[] bytes2 = new byte[stream.Length];
            Buffer.BlockCopy(bytes, 0, bytes2, 0, (int)stream.Length);
//                Array.Copy(bytes, bytes2, stream.Length);

            if (this.MustEncrypt)
                bytes2 = RijndaelEncrptor.Instance.Encrypt(bytes2);

            // Create a buffer large enough to hold the encrypted message and size bytes
            byte[] data = new byte[8 + bytes2.Length];

            // Add the message size
            BitConverter.GetBytes(bytes2.Length).CopyTo(data, 0);
            BitConverter.GetBytes(this.MustEncrypt).CopyTo(data, 4);

            // Add the message data
            bytes2.CopyTo(data, 8);
            return data;
        }
    }

    static public MessageBase Deserialize(byte[] buffer)
    {
        int length = BitConverter.ToInt32(buffer, 0);
        bool mustDecrypt = BitConverter.ToBoolean(buffer, 4);
        MessageBase b = null;

        try
        {
            b = MessageBase.Deserialize(buffer, 8, length, mustDecrypt);
        }
        catch { }

        return b;
    }

    static public MessageBase Deserialize(byte[] buffer, int offset, int length, bool mustDecrypt)
    {
        // Create a buffer and initialize it with data from
        // the input buffer offset by the specified offset amount
        // and length determined by the specified length
        byte[] data = new byte[length];
        Buffer.BlockCopy(buffer, offset, data, 0, length);
//            Array.Copy(buffer, offset, data, 0, length);

        // Decrypt message
        if (mustDecrypt)
            data = RijndaelEncrptor.Instance.Decrypt(data);

        // Deserialize the binary data into a new object of type MessageBase
        using (MemoryStream stream = new MemoryStream(data))
        {
            using (DeflateStream ds = new DeflateStream(stream, CompressionMode.Decompress, false))
            {
                BinaryFormatter formatter = new BinaryFormatter();
                try
                {
                    return formatter.Deserialize(ds) as MessageBase;
                }
                catch
                {
                    return null;
                }
            }
        }
    }

    static public IEnumerable<MessageBase> Receive(Socket client, int bytesReceived, StateObject state)
    {
        // Total buffered count is the bytes received this read
        // plus any unprocessed bytes from the last receive
        int bufferLen = bytesReceived + state.readOffset;

        // Reset the next read offset in the case
        // this recieve lands on a message boundary
        state.readOffset = 0;

        // Make sure there are bytes to read
        if (bufferLen >= 0)
        {
            // Initialize the current read position
            int readOffset = 0;

            // Process the receive buffer
            while (readOffset < bufferLen)
            {
                // Get the message size
                int length = BitConverter.ToInt32(state.Buffer, readOffset);
                bool mustDecrypt = BitConverter.ToBoolean(state.Buffer, readOffset + 4);

                // Increment the current read position by the length header
                readOffset += 8;

                // Change the buffer size if necessary
                if (length + readOffset > state.Buffer.Length)
                {
                    byte[] oldBuffer = new byte[state.BufferSize];
                    Buffer.BlockCopy(state.Buffer, 0, oldBuffer, 0, state.BufferSize);
//                        Array.Copy(state.Buffer, oldBuffer, state.BufferSize);

                    state.BufferSize = length + readOffset;
                    Buffer.BlockCopy(oldBuffer, 0, state.Buffer, 0, oldBuffer.Length);
//                        Array.Copy(oldBuffer, state.Buffer, oldBuffer.Length);
                }

                // Ensure there are enough bytes to process the message
                if (readOffset + length <= bufferLen)
                    yield return MessageBase.Deserialize(state.Buffer, readOffset, length, mustDecrypt);
                else
                {
                    // Add back the message length
                    readOffset -= 8;

                    // Reorder the receive buffer so unprocessed
                    // bytes are moved to the start of the array
                    Buffer.BlockCopy(state.Buffer, 0, state.Buffer, 0, bufferLen - readOffset);
//                        Array.Copy(state.Buffer, state.Buffer, bufferLen - readOffset);

                    // Set the receive position to the current read position
                    // This is the offset where the next socket read will start
                    state.readOffset = bufferLen - readOffset;
                    break;
                }

                // Update the read position by the message length
                readOffset += length;
            }
        }
    }
}

上面的代码应该可以帮助你.

The above code should get you going.