更新时间:2022-02-21 22:15:20
为什么不使用异步套接字?这是一些代码:
Why not use asynchronous sockets? Here is some code:
// Server socket
class ControllerSocket : Socket, IDisposable
{
// MessageQueue queues messages to be processed by the controller
public Queue<MessageBase> messageQueue = null;
// This is a list of all attached clients
public List<Socket> clients = new List<Socket>();
#region Events
// Event definitions handled in the controller
public delegate void SocketConnectedHandler(Socket socket);
public event SocketConnectedHandler SocketConnected;
public delegate void DataRecievedHandler(Socket socket, int bytesRead);
public event DataRecievedHandler DataRecieved;
public delegate void DataSentHandler(Socket socket, int length);
public event DataSentHandler DataSent;
public delegate void SocketDisconnectedHandler();
public event SocketDisconnectedHandler SocketDisconnected;
#endregion
#region Constructor
public ControllerSocket(int port)
: base(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp)
{
// Create the message queue
this.messageQueue = new Queue<MessageBase>();
// Acquire the host address and port, then bind the server socket
IPHostEntry ipHostInfo = Dns.GetHostEntry(Dns.GetHostName());
IPAddress ipAddress = ipHostInfo.AddressList[0];
IPEndPoint localEndPoint = new IPEndPoint(ipAddress, port);
this.Bind(localEndPoint);
}
#endregion
// Starts accepting client connections
public void StartListening()
{
this.Listen(100);
this.BeginAccept(AcceptCallback, this);
}
// Connects to a client
private void AcceptCallback(IAsyncResult ar)
{
Socket listener = (Socket)ar.AsyncState;
Socket socket = listener.EndAccept(ar);
try
{
// Add the connected client to the list
clients.Add(socket);
// Notify any event handlers
if (SocketConnected != null)
SocketConnected(socket);
// Create an initial state object to hold buffer and socket details
StateObject state = new StateObject();
state.workSocket = socket;
state.BufferSize = 8192;
// Begin asynchronous read
socket.BeginReceive(state.Buffer, 0, state.BufferSize, 0,
ReadCallback, state);
}
catch (SocketException)
{
HandleClientDisconnect(socket);
}
finally
{
// Listen for more client connections
StartListening();
}
}
// Read data from the client
private void ReadCallback(IAsyncResult ar)
{
StateObject state = (StateObject)ar.AsyncState;
Socket socket = state.workSocket;
try
{
if (socket.Connected)
{
// Read the socket
int bytesRead = socket.EndReceive(ar);
// Deserialize objects
foreach (MessageBase msg in MessageBase.Receive(socket, bytesRead, state))
{
// Add objects to the message queue
lock (this.messageQueue)
messageQueue.Enqueue(msg);
}
// Notify any event handlers
if (DataRecieved != null)
DataRecieved(socket, bytesRead);
// Asynchronously read more client data
socket.BeginReceive(state.Buffer, state.readOffset, state.BufferSize - state.readOffset, 0,
ReadCallback, state);
}
else
{
HandleClientDisconnect(socket);
}
}
catch (SocketException)
{
HandleClientDisconnect(socket);
}
}
// Send data to a specific client
public void Send(Socket socket, MessageBase msg)
{
try
{
// Serialize the message
byte[] bytes = msg.Serialize();
if (socket.Connected)
{
// Send the message
socket.BeginSend(bytes, 0, bytes.Length, 0, SendCallback, socket);
}
else
{
HandleClientDisconnect(socket);
}
}
catch (SocketException)
{
HandleClientDisconnect(socket);
}
}
// Broadcast data to all clients
public void Broadcast(MessageBase msg)
{
try
{
// Serialize the message
byte[] bytes = msg.Serialize();
// Process all clients
foreach (Socket client in clients)
{
try
{
// Send the message
if (client.Connected)
{
client.BeginSend(bytes, 0, bytes.Length, 0, SendCallback, client);
}
else
{
HandleClientDisconnect(client);
}
}
catch (SocketException)
{
HandleClientDisconnect(client);
}
}
}
catch (Exception e)
{
// Serialization error
Console.WriteLine(e.ToString());
}
}
// Data sent to a client socket
private void SendCallback(IAsyncResult ar)
{
try
{
Socket socket = (Socket)ar.AsyncState;
if (socket.Connected)
{
// Complete sending the data
int bytesSent = socket.EndSend(ar);
// Notify any attached handlers
if (DataSent != null)
DataSent(socket, bytesSent);
}
else
{
HandleClientDisconnect(socket);
}
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
}
}
private void HandleClientDisconnect(Socket client)
{
// Client socket may have shutdown unexpectedly
if (client.Connected)
client.Shutdown(SocketShutdown.Both);
// Close the socket and remove it from the list
client.Close();
clients.Remove(client);
// Notify any event handlers
if (SocketDisconnected != null)
SocketDisconnected();
}
// Close all client connections
public void Dispose()
{
foreach (Socket client in clients)
{
if (client.Connected)
client.Shutdown(SocketShutdown.Receive);
client.Close();
}
}
}
// Client socket
class ReceiverSocket : Socket
{
// MessageQueue queues messages to be processed by the controller
public Queue<MessageBase> messageQueue = null;
#region Events
// Event definitions handled in the controller
public delegate void SocketConnectedHandler(Socket socket);
public event SocketConnectedHandler SocketConnected;
public delegate void DataRecievedHandler(Socket socket, MessageBase msg);
public event DataRecievedHandler DataRecieved;
public delegate void DataSentHandler(Socket socket, int length);
public event DataSentHandler DataSent;
public delegate void SocketDisconnectedHandler();
public event SocketDisconnectedHandler SocketDisconnected;
private IPEndPoint remoteEP = null;
#endregion
#region Constructor
public ReceiverSocket(int port)
: base(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp)
{
// Create the message queue
messageQueue = new Queue<MessageBase>();
// Acquire the host address and port
IPHostEntry ipHostInfo = Dns.GetHostEntry(Dns.GetHostName());
IPAddress ipAddress = ipHostInfo.AddressList[0];
remoteEP = new IPEndPoint(ipAddress, port);
}
#endregion
// Connect to the server
public void Connect()
{
this.BeginConnect(remoteEP, ConnectCallback, this);
}
// Server connected
private void ConnectCallback(IAsyncResult ar)
{
// Console.WriteLine("Connect Callback");
try
{
Socket client = (Socket)ar.AsyncState;
if (client.Connected)
{
client.EndConnect(ar);
// Console.WriteLine("Connect Callback - Connected");
// Create an initial state object to hold buffer and socket details
StateObject state = new StateObject();
state.workSocket = client;
state.BufferSize = 8192;
// Notify any event handlers
if (SocketConnected != null)
SocketConnected(client);
// Begin asynchronous read
client.BeginReceive(state.Buffer, state.readOffset, state.BufferSize - state.readOffset, 0,
ReceiveCallback, state);
}
else
{
// Console.WriteLine("Connect Callback - Reconnect");
Thread.Sleep(5000);
Connect();
}
}
catch (Exception ex)
{
// Attempt server reconnect
Reconnect();
}
}
// Read data from the server
private void ReceiveCallback(IAsyncResult ar)
{
try
{
StateObject state = (StateObject)ar.AsyncState;
Socket client = state.workSocket;
// Read the socket
if (client.Connected)
{
int bytesRead = client.EndReceive(ar);
// Deserialize objects
foreach (MessageBase msg in MessageBase.Receive(client, bytesRead, state))
{
// Add objects to the message queue
lock (this.messageQueue)
this.messageQueue.Enqueue(msg);
}
// Notify any event handlers
if (DataRecieved != null)
DataRecieved(client, null);
// Asynchronously read more server data
client.BeginReceive(state.Buffer, state.readOffset, state.BufferSize - state.readOffset, 0,
ReceiveCallback, state);
}
else
{
Reconnect();
}
}
catch (SocketException)
{
// Attempt server reconnect
Reconnect();
}
}
public void Send(MessageBase msg)
{
try
{
// Serialize the message
byte[] bytes = msg.Serialize();
if (this.Connected)
{
// Send the message
this.BeginSend(bytes, 0, bytes.Length, 0, SendCallback, this);
}
else
{
Reconnect();
}
}
catch (SocketException sox)
{
// Attempt server reconnect
Reconnect();
}
catch (Exception ex)
{
int i = 0;
}
}
private void SendCallback(IAsyncResult ar)
{
try
{
Socket client = (Socket)ar.AsyncState;
if (client.Connected)
{
// Complete sending the data
int bytesSent = client.EndSend(ar);
// Notify any event handlers
if (DataSent != null)
DataSent(client, bytesSent);
}
else
{
Reconnect();
}
}
catch (SocketException)
{
Reconnect();
}
}
// Attempt to reconnect to the server
private void Reconnect()
{
try
{
// Disconnect the original socket
if (this.Connected)
this.Disconnect(true);
this.Close();
// Notify any event handlers
if (SocketDisconnected != null)
SocketDisconnected();
}
catch (Exception e)
{
// Console.WriteLine(e.ToString());
}
}
}
// Encapsulates information about the socket and data buffer
public class StateObject
{
public Socket workSocket = null;
public int readOffset = 0;
public StringBuilder sb = new StringBuilder();
private int bufferSize = 0;
public int BufferSize
{
set
{
this.bufferSize = value;
buffer = new byte[this.bufferSize];
}
get { return this.bufferSize; }
}
private byte[] buffer = null;
public byte[] Buffer
{
get { return this.buffer; }
}
}
您需要做的就是插入您自己的消息.
All you need to do is plug in your own messages.
请记住,套接字流可能(并且大多数情况下)包含部分消息.因此,***将消息的长度作为消息的第一个字节发送.您还必须通过在读取之间组装部分消息来相应地管理读取缓冲区.查看以下消息基类.
Keep in mind that the socket stream may (and most of the time do) contain partial messages. Therefore it is good practice to send the length of the message as the first bytes of a message. You also have to manage the read buffer accordingly by assembling partial messages between reads. Check out the following message base class.
public partial class MessageBase
{
// Virtual Execute method following the Command pattern
public virtual string Execute(Socket socket) { return string.Empty; }
protected virtual bool MustEncrypt
{
get { return false; }
}
// Binary serialization
public byte[] Serialize()
{
using (MemoryStream stream = new MemoryStream())
{
using (DeflateStream ds = new DeflateStream(stream, CompressionMode.Compress, true))
{
BinaryFormatter formatter = new BinaryFormatter();
formatter.Serialize(ds, this);
ds.Flush();
}
byte[] bytes = stream.GetBuffer();
byte[] bytes2 = new byte[stream.Length];
Buffer.BlockCopy(bytes, 0, bytes2, 0, (int)stream.Length);
// Array.Copy(bytes, bytes2, stream.Length);
if (this.MustEncrypt)
bytes2 = RijndaelEncrptor.Instance.Encrypt(bytes2);
// Create a buffer large enough to hold the encrypted message and size bytes
byte[] data = new byte[8 + bytes2.Length];
// Add the message size
BitConverter.GetBytes(bytes2.Length).CopyTo(data, 0);
BitConverter.GetBytes(this.MustEncrypt).CopyTo(data, 4);
// Add the message data
bytes2.CopyTo(data, 8);
return data;
}
}
static public MessageBase Deserialize(byte[] buffer)
{
int length = BitConverter.ToInt32(buffer, 0);
bool mustDecrypt = BitConverter.ToBoolean(buffer, 4);
MessageBase b = null;
try
{
b = MessageBase.Deserialize(buffer, 8, length, mustDecrypt);
}
catch { }
return b;
}
static public MessageBase Deserialize(byte[] buffer, int offset, int length, bool mustDecrypt)
{
// Create a buffer and initialize it with data from
// the input buffer offset by the specified offset amount
// and length determined by the specified length
byte[] data = new byte[length];
Buffer.BlockCopy(buffer, offset, data, 0, length);
// Array.Copy(buffer, offset, data, 0, length);
// Decrypt message
if (mustDecrypt)
data = RijndaelEncrptor.Instance.Decrypt(data);
// Deserialize the binary data into a new object of type MessageBase
using (MemoryStream stream = new MemoryStream(data))
{
using (DeflateStream ds = new DeflateStream(stream, CompressionMode.Decompress, false))
{
BinaryFormatter formatter = new BinaryFormatter();
try
{
return formatter.Deserialize(ds) as MessageBase;
}
catch
{
return null;
}
}
}
}
static public IEnumerable<MessageBase> Receive(Socket client, int bytesReceived, StateObject state)
{
// Total buffered count is the bytes received this read
// plus any unprocessed bytes from the last receive
int bufferLen = bytesReceived + state.readOffset;
// Reset the next read offset in the case
// this recieve lands on a message boundary
state.readOffset = 0;
// Make sure there are bytes to read
if (bufferLen >= 0)
{
// Initialize the current read position
int readOffset = 0;
// Process the receive buffer
while (readOffset < bufferLen)
{
// Get the message size
int length = BitConverter.ToInt32(state.Buffer, readOffset);
bool mustDecrypt = BitConverter.ToBoolean(state.Buffer, readOffset + 4);
// Increment the current read position by the length header
readOffset += 8;
// Change the buffer size if necessary
if (length + readOffset > state.Buffer.Length)
{
byte[] oldBuffer = new byte[state.BufferSize];
Buffer.BlockCopy(state.Buffer, 0, oldBuffer, 0, state.BufferSize);
// Array.Copy(state.Buffer, oldBuffer, state.BufferSize);
state.BufferSize = length + readOffset;
Buffer.BlockCopy(oldBuffer, 0, state.Buffer, 0, oldBuffer.Length);
// Array.Copy(oldBuffer, state.Buffer, oldBuffer.Length);
}
// Ensure there are enough bytes to process the message
if (readOffset + length <= bufferLen)
yield return MessageBase.Deserialize(state.Buffer, readOffset, length, mustDecrypt);
else
{
// Add back the message length
readOffset -= 8;
// Reorder the receive buffer so unprocessed
// bytes are moved to the start of the array
Buffer.BlockCopy(state.Buffer, 0, state.Buffer, 0, bufferLen - readOffset);
// Array.Copy(state.Buffer, state.Buffer, bufferLen - readOffset);
// Set the receive position to the current read position
// This is the offset where the next socket read will start
state.readOffset = bufferLen - readOffset;
break;
}
// Update the read position by the message length
readOffset += length;
}
}
}
}
上面的代码应该可以帮助你.
The above code should get you going.