[+] 接入ET8服务端

[+] 接入ET8服务端
This commit is contained in:
ALEXTANG
2023-07-13 12:23:48 +08:00
parent e0be062006
commit 336d4b2eb9
1316 changed files with 130657 additions and 626 deletions

View File

@@ -0,0 +1,47 @@
using System;
using System.IO;
using System.Net;
namespace ET
{
public enum ChannelType
{
Connect,
Accept,
}
public struct Packet
{
public const int MinPacketSize = 2;
public const int OpcodeIndex = 16;
public const int KcpOpcodeIndex = 0;
public const int OpcodeLength = 2;
public const int ActorIdIndex = 0;
public const int ActorIdLength = 16;
public const int MessageIndex = 18;
public ushort Opcode;
public long ActorId;
public MemoryStream MemoryStream;
}
public abstract class AChannel: IDisposable
{
public long Id;
public ChannelType ChannelType { get; protected set; }
public int Error { get; set; }
public bool IsDisposed
{
get
{
return this.Id == 0;
}
}
public abstract void Dispose();
}
}

View File

@@ -0,0 +1,11 @@
fileFormatVersion: 2
guid: 93ca66d00a5a67f42a4e7d53738c125b
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

View File

@@ -0,0 +1,89 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Net;
namespace ET
{
public abstract class AService: IDisposable
{
public Action<long, IPEndPoint> AcceptCallback;
public Action<long, ActorId, object> ReadCallback;
public Action<long, int> ErrorCallback;
public long Id { get; set; }
public ServiceType ServiceType { get; protected set; }
public const int MaxCacheBufferSize = 1024;
private readonly Queue<MemoryBuffer> pool = new();
public MemoryBuffer Fetch(int size = 0)
{
if (size > MaxCacheBufferSize)
{
return new MemoryBuffer(size);
}
if (size < MaxCacheBufferSize)
{
size = MaxCacheBufferSize;
}
if (this.pool.Count == 0)
{
return new MemoryBuffer(size);
}
return pool.Dequeue();
}
public void Recycle(MemoryBuffer memoryBuffer)
{
if (memoryBuffer.Capacity > 1024)
{
return;
}
if (this.pool.Count > 10) // 这里不需要太大其实Kcp跟Tcp,这里1就足够了
{
return;
}
memoryBuffer.Seek(0, SeekOrigin.Begin);
memoryBuffer.SetLength(0);
this.pool.Enqueue(memoryBuffer);
}
public AService()
{
NetServices.Instance.Add(this);
}
public virtual void Dispose()
{
NetServices.Instance?.Remove(this.Id);
}
public abstract void Update();
public abstract void Remove(long id, int error = 0);
public abstract bool IsDisposed();
public abstract void Create(long id, IPEndPoint address);
public abstract void Send(long channelId, ActorId actorId, MessageObject message);
public virtual (uint, uint) GetChannelConn(long channelId)
{
throw new Exception($"default conn throw Exception! {channelId}");
}
public virtual void ChangeAddress(long channelId, IPEndPoint ipEndPoint)
{
}
}
}

View File

@@ -0,0 +1,11 @@
fileFormatVersion: 2
guid: 868f9f49d4ae1e64b8eeb8a81d6fa6e1
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

View File

@@ -0,0 +1,16 @@
using MemoryPack;
namespace ET
{
[Message(ushort.MaxValue)]
[MemoryPackable]
public partial class ActorResponse: MessageObject, IActorResponse
{
[MemoryPackOrder(1)]
public int RpcId { get; set; }
[MemoryPackOrder(2)]
public int Error { get; set; }
[MemoryPackOrder(3)]
public string Message { get; set; }
}
}

View File

@@ -0,0 +1,11 @@
fileFormatVersion: 2
guid: 82646c1ccd21692429cf45e8d8ba7cd9
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

View File

@@ -0,0 +1,306 @@
using System;
using System.Collections.Generic;
using System.IO;
namespace ET
{
public class CircularBuffer: Stream
{
public int ChunkSize = 8192;
private readonly Queue<byte[]> bufferQueue = new Queue<byte[]>();
private readonly Queue<byte[]> bufferCache = new Queue<byte[]>();
public int LastIndex { get; set; }
public int FirstIndex { get; set; }
private byte[] lastBuffer;
public CircularBuffer()
{
this.AddLast();
}
public override long Length
{
get
{
int c = 0;
if (this.bufferQueue.Count == 0)
{
c = 0;
}
else
{
c = (this.bufferQueue.Count - 1) * ChunkSize + this.LastIndex - this.FirstIndex;
}
if (c < 0)
{
Log.Error("CircularBuffer count < 0: {0}, {1}, {2}".Fmt(this.bufferQueue.Count, this.LastIndex, this.FirstIndex));
}
return c;
}
}
public void AddLast()
{
byte[] buffer;
if (this.bufferCache.Count > 0)
{
buffer = this.bufferCache.Dequeue();
}
else
{
buffer = new byte[ChunkSize];
}
this.bufferQueue.Enqueue(buffer);
this.lastBuffer = buffer;
}
public void RemoveFirst()
{
this.bufferCache.Enqueue(bufferQueue.Dequeue());
}
public byte[] First
{
get
{
if (this.bufferQueue.Count == 0)
{
this.AddLast();
}
return this.bufferQueue.Peek();
}
}
public byte[] Last
{
get
{
if (this.bufferQueue.Count == 0)
{
this.AddLast();
}
return this.lastBuffer;
}
}
/// <summary>
/// 从CircularBuffer读到stream中
/// </summary>
/// <param name="stream"></param>
/// <returns></returns>
//public async ETTask ReadAsync(Stream stream)
//{
// long buffLength = this.Length;
// int sendSize = this.ChunkSize - this.FirstIndex;
// if (sendSize > buffLength)
// {
// sendSize = (int)buffLength;
// }
//
// await stream.WriteAsync(this.First, this.FirstIndex, sendSize);
//
// this.FirstIndex += sendSize;
// if (this.FirstIndex == this.ChunkSize)
// {
// this.FirstIndex = 0;
// this.RemoveFirst();
// }
//}
// 从CircularBuffer读到stream
public void Read(Stream stream, int count)
{
if (count > this.Length)
{
throw new Exception($"bufferList length < count, {Length} {count}");
}
int alreadyCopyCount = 0;
while (alreadyCopyCount < count)
{
int n = count - alreadyCopyCount;
if (ChunkSize - this.FirstIndex > n)
{
stream.Write(this.First, this.FirstIndex, n);
this.FirstIndex += n;
alreadyCopyCount += n;
}
else
{
stream.Write(this.First, this.FirstIndex, ChunkSize - this.FirstIndex);
alreadyCopyCount += ChunkSize - this.FirstIndex;
this.FirstIndex = 0;
this.RemoveFirst();
}
}
}
// 从stream写入CircularBuffer
public void Write(Stream stream)
{
int count = (int)(stream.Length - stream.Position);
int alreadyCopyCount = 0;
while (alreadyCopyCount < count)
{
if (this.LastIndex == ChunkSize)
{
this.AddLast();
this.LastIndex = 0;
}
int n = count - alreadyCopyCount;
if (ChunkSize - this.LastIndex > n)
{
stream.Read(this.lastBuffer, this.LastIndex, n);
this.LastIndex += count - alreadyCopyCount;
alreadyCopyCount += n;
}
else
{
stream.Read(this.lastBuffer, this.LastIndex, ChunkSize - this.LastIndex);
alreadyCopyCount += ChunkSize - this.LastIndex;
this.LastIndex = ChunkSize;
}
}
}
/// <summary>
/// 从stream写入CircularBuffer
/// </summary>
/// <param name="stream"></param>
/// <returns></returns>
//public async ETTask<int> WriteAsync(Stream stream)
//{
// int size = this.ChunkSize - this.LastIndex;
//
// int n = await stream.ReadAsync(this.Last, this.LastIndex, size);
//
// if (n == 0)
// {
// return 0;
// }
//
// this.LastIndex += n;
//
// if (this.LastIndex == this.ChunkSize)
// {
// this.AddLast();
// this.LastIndex = 0;
// }
//
// return n;
//}
// 把CircularBuffer中数据写入buffer
public override int Read(byte[] buffer, int offset, int count)
{
if (buffer.Length < offset + count)
{
throw new Exception($"bufferList length < coutn, buffer length: {buffer.Length} {offset} {count}");
}
long length = this.Length;
if (length < count)
{
count = (int)length;
}
int alreadyCopyCount = 0;
while (alreadyCopyCount < count)
{
int n = count - alreadyCopyCount;
if (ChunkSize - this.FirstIndex > n)
{
Array.Copy(this.First, this.FirstIndex, buffer, alreadyCopyCount + offset, n);
this.FirstIndex += n;
alreadyCopyCount += n;
}
else
{
Array.Copy(this.First, this.FirstIndex, buffer, alreadyCopyCount + offset, ChunkSize - this.FirstIndex);
alreadyCopyCount += ChunkSize - this.FirstIndex;
this.FirstIndex = 0;
this.RemoveFirst();
}
}
return count;
}
// 把buffer写入CircularBuffer中
public override void Write(byte[] buffer, int offset, int count)
{
int alreadyCopyCount = 0;
while (alreadyCopyCount < count)
{
if (this.LastIndex == ChunkSize)
{
this.AddLast();
this.LastIndex = 0;
}
int n = count - alreadyCopyCount;
if (ChunkSize - this.LastIndex > n)
{
Array.Copy(buffer, alreadyCopyCount + offset, this.lastBuffer, this.LastIndex, n);
this.LastIndex += count - alreadyCopyCount;
alreadyCopyCount += n;
}
else
{
Array.Copy(buffer, alreadyCopyCount + offset, this.lastBuffer, this.LastIndex, ChunkSize - this.LastIndex);
alreadyCopyCount += ChunkSize - this.LastIndex;
this.LastIndex = ChunkSize;
}
}
}
public override void Flush()
{
throw new NotImplementedException();
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotImplementedException();
}
public override void SetLength(long value)
{
throw new NotImplementedException();
}
public override bool CanRead
{
get
{
return true;
}
}
public override bool CanSeek
{
get
{
return false;
}
}
public override bool CanWrite
{
get
{
return true;
}
}
public override long Position { get; set; }
}
}

View File

@@ -0,0 +1,11 @@
fileFormatVersion: 2
guid: 94baaad2aba686c4aa2c1e8e3fec21bc
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

View File

@@ -0,0 +1,86 @@
namespace ET
{
[UniqueId(100000, 500000)]
public static class ErrorCore
{
public const int ERR_MyErrorCode = 110000;
public const int ERR_KcpConnectTimeout = 100205;
public const int ERR_KcpAcceptTimeout = 100206;
public const int ERR_PeerDisconnect = 100208;
public const int ERR_SocketCantSend = 100209;
public const int ERR_SocketError = 100210;
public const int ERR_KcpWaitSendSizeTooLarge = 100211;
public const int ERR_KcpCreateError = 100212;
public const int ERR_SendMessageNotFoundTChannel = 100213;
public const int ERR_TChannelRecvError = 100214;
public const int ERR_MessageSocketParserError = 100215;
public const int ERR_KcpNotFoundChannel = 100216;
public const int ERR_WebsocketSendError = 100217;
public const int ERR_WebsocketPeerReset = 100218;
public const int ERR_WebsocketMessageTooBig = 100219;
public const int ERR_WebsocketRecvError = 100220;
public const int ERR_KcpReadNotSame = 100230;
public const int ERR_KcpSplitError = 100231;
public const int ERR_KcpSplitCountError = 100232;
public const int ERR_ActorLocationSenderTimeout = 110004;
public const int ERR_PacketParserError = 110005;
public const int ERR_KcpChannelAcceptTimeout = 110206;
public const int ERR_KcpRemoteDisconnect = 110207;
public const int ERR_WebsocketError = 110303;
public const int ERR_WebsocketConnectError = 110304;
public const int ERR_RpcFail = 110307;
public const int ERR_ReloadFail = 110308;
public const int ERR_ConnectGateKeyError = 110309;
public const int ERR_SessionSendOrRecvTimeout = 110311;
public const int ERR_OuterSessionRecvInnerMessage = 110312;
public const int ERR_NotFoundActor = 110313;
public const int ERR_ActorTimeout = 110315;
public const int ERR_UnverifiedSessionSendMessage = 110316;
public const int ERR_ActorLocationSenderTimeout2 = 110317;
public const int ERR_ActorLocationSenderTimeout3 = 110318;
public const int ERR_ActorLocationSenderTimeout4 = 110319;
public const int ERR_ActorLocationSenderTimeout5 = 110320;
public const int ERR_KcpRouterTimeout = 110401;
public const int ERR_KcpRouterTooManyPackets = 110402;
public const int ERR_KcpRouterSame = 110403;
public const int ERR_KcpRouterConnectFail = 110404;
public const int ERR_KcpRouterRouterSyncCountTooMuchTimes = 110405;
public const int ERR_KcpRouterSyncCountTooMuchTimes = 110406;
// 110000 以上避免跟SocketError冲突
//-----------------------------------
// 小于这个Rpc会抛异常大于这个异常的error需要自己判断处理也就是说需要处理的错误应该要大于该值
public const int ERR_Exception = 200000;
public const int ERR_Cancel = 200001;
public const int ERR_Timeout = 200002;
public static bool IsRpcNeedThrowException(int error)
{
if (error == 0)
{
return false;
}
// ws平台返回错误专用的值
if (error == -1)
{
return false;
}
if (error > ERR_Exception)
{
return false;
}
return true;
}
}
}

View File

@@ -0,0 +1,11 @@
fileFormatVersion: 2
guid: 7222eec1269a5cb45bf59dbc34ea3fea
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

View File

@@ -0,0 +1,67 @@
using System.Net;
using System.Net.Sockets;
namespace ET
{
public static class Extensions
{
// always pass the same IPEndPointNonAlloc instead of allocating a new
// one each time.
//
// use IPEndPointNonAlloc.temp to get the latest SocketAdddress written
// by ReceiveFrom_Internal!
//
// IMPORTANT: .temp will be overwritten in next call!
// hash or manually copy it if you need to store it, e.g.
// when adding a new connection.
public static int ReceiveFrom_NonAlloc(
this Socket socket,
byte[] buffer,
int offset,
int size,
SocketFlags socketFlags,
EndPoint remoteEndPoint)
{
// call ReceiveFrom with IPEndPointNonAlloc.
// need to wrap this in ReceiveFrom_NonAlloc because it's not
// obvious that IPEndPointNonAlloc.Create does NOT create a new
// IPEndPoint. it saves the result in IPEndPointNonAlloc.temp!
#if UNITY
EndPoint casted = remoteEndPoint;
return socket.ReceiveFrom(buffer, offset, size, socketFlags, ref casted);
#else
return socket.ReceiveFrom(buffer, offset, size, socketFlags, ref remoteEndPoint);
#endif
}
// same as above, different parameters
public static int ReceiveFrom_NonAlloc(this Socket socket, byte[] buffer, ref EndPoint remoteEndPoint)
{
#if UNITY
EndPoint casted = remoteEndPoint;
return socket.ReceiveFrom(buffer, ref casted);
#else
return socket.ReceiveFrom(buffer, ref remoteEndPoint);
#endif
}
// SendTo allocates too:
// https://github.com/mono/mono/blob/f74eed4b09790a0929889ad7fc2cf96c9b6e3757/mcs/class/System/System.Net.Sockets/Socket.cs#L2240
// -> the allocation is in EndPoint.Serialize()
// NOTE: technically this function isn't necessary.
// could just pass IPEndPointNonAlloc.
// still good for strong typing.
//public static int SendTo_NonAlloc(
// this Socket socket,
// byte[] buffer,
// int offset,
// int size,
// SocketFlags socketFlags,
// IPEndPointNonAlloc remoteEndPoint)
//{
// EndPoint casted = remoteEndPoint;
// return socket.SendTo(buffer, offset, size, socketFlags, casted);
//}
}
}

View File

@@ -0,0 +1,11 @@
fileFormatVersion: 2
guid: b60b6ed73c53bbd489c9a3f958c72722
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

View File

@@ -0,0 +1,14 @@
namespace ET
{
public interface IActorLocationMessage: IActorLocationRequest
{
}
public interface IActorLocationRequest: IActorRequest
{
}
public interface IActorLocationResponse: IActorResponse
{
}
}

View File

@@ -0,0 +1,11 @@
fileFormatVersion: 2
guid: 572c6f5a92280bb4da6c210e2a1c4fef
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

View File

@@ -0,0 +1,15 @@
namespace ET
{
// 不需要返回消息
public interface IActorMessage: IMessage
{
}
public interface IActorRequest: IRequest, IActorMessage
{
}
public interface IActorResponse: IResponse, IActorMessage
{
}
}

View File

@@ -0,0 +1,11 @@
fileFormatVersion: 2
guid: 76f16794a16d10c4e926f472af29d69d
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

View File

@@ -0,0 +1,251 @@
using System;
using System.Net;
using System.Net.Sockets;
namespace ET
{
public class IPEndPointNonAlloc : IPEndPoint
{
#if UNITY
// Two steps to remove allocations in ReceiveFrom_Internal:
//
// 1.) remoteEndPoint.Serialize():
// https://github.com/mono/mono/blob/f74eed4b09790a0929889ad7fc2cf96c9b6e3757/mcs/class/System/System.Net.Sockets/Socket.cs#L1733
// -> creates an EndPoint for ReceiveFrom_Internal to write into
// -> it's never read from:
// ReceiveFrom_Internal passes it to native:
// https://github.com/mono/mono/blob/f74eed4b09790a0929889ad7fc2cf96c9b6e3757/mcs/class/System/System.Net.Sockets/Socket.cs#L1885
// native recv populates 'sockaddr* from' with the remote address:
// https://docs.microsoft.com/en-us/windows/win32/api/winsock/nf-winsock-recvfrom
// -> can NOT be null. bricks both Unity and Unity Hub otherwise.
// -> it seems as if Serialize() is only called to avoid allocating
// a 'new SocketAddress' in ReceiveFrom. it's up to the EndPoint.
//
// 2.) EndPoint.Create(SocketAddress):
// https://github.com/mono/mono/blob/f74eed4b09790a0929889ad7fc2cf96c9b6e3757/mcs/class/System/System.Net.Sockets/Socket.cs#L1761
// -> SocketAddress is the remote's address that we want to return
// -> to avoid 'new EndPoint(SocketAddress), it seems up to the user
// to decide how to create a new EndPoint via .Create
// -> SocketAddress is the object that was returned by Serialize()
//
// in other words, all we need is an extra SocketAddress field that we
// can pass to ReceiveFrom_Internal to write the result into.
// => callers can then get the result from the extra field!
// => no allocations
//
// IMPORTANT: remember that IPEndPointNonAlloc is always the same object
// and never changes. only the helper field is changed.
public SocketAddress temp;
// constructors simply create the field once by calling the base method.
// (our overwritten method would create anything new)
public IPEndPointNonAlloc(long address, int port) : base(address, port)
{
temp = base.Serialize();
}
public IPEndPointNonAlloc(IPAddress address, int port) : base(address, port)
{
temp = base.Serialize();
}
// Serialize simply returns it
public override SocketAddress Serialize() => temp;
// Create doesn't need to create anything.
// SocketAddress object is already the one we returned in Serialize().
// ReceiveFrom_Internal simply wrote into it.
public override EndPoint Create(SocketAddress socketAddress)
{
// original IPEndPoint.Create validates:
if (socketAddress.Family != AddressFamily)
throw new ArgumentException($"Unsupported socketAddress.AddressFamily: {socketAddress.Family}. Expected: {AddressFamily}");
if (socketAddress.Size < 8)
throw new ArgumentException($"Unsupported socketAddress.Size: {socketAddress.Size}. Expected: <8");
// double check to guarantee that ReceiveFrom actually did write
// into our 'temp' field. just in case that's ever changed.
if (socketAddress != temp)
{
// well this is fun.
// in the latest mono from the above github links,
// the result of Serialize() is passed as 'ref' so ReceiveFrom
// does in fact write into it.
//
// in Unity 2019 LTS's mono version, it does create a new one
// each time. this is from ILSpy Receive_From:
//
// SocketPal.CheckDualModeReceiveSupport(this);
// ValidateBlockingMode();
// if (NetEventSource.IsEnabled)
// {
// NetEventSource.Info(this, $"SRC{LocalEndPoint} size:{size} remoteEP:{remoteEP}", "ReceiveFrom");
// }
// EndPoint remoteEP2 = remoteEP;
// System.Net.Internals.SocketAddress socketAddress = SnapshotAndSerialize(ref remoteEP2);
// System.Net.Internals.SocketAddress socketAddress2 = IPEndPointExtensions.Serialize(remoteEP2);
// int bytesTransferred;
// SocketError socketError = SocketPal.ReceiveFrom(_handle, buffer, offset, size, socketFlags, socketAddress.Buffer, ref socketAddress.InternalSize, out bytesTransferred);
// SocketException ex = null;
// if (socketError != 0)
// {
// ex = new SocketException((int)socketError);
// UpdateStatusAfterSocketError(ex);
// if (NetEventSource.IsEnabled)
// {
// NetEventSource.Error(this, ex, "ReceiveFrom");
// }
// if (ex.SocketErrorCode != SocketError.MessageSize)
// {
// throw ex;
// }
// }
// if (!socketAddress2.Equals(socketAddress))
// {
// try
// {
// remoteEP = remoteEP2.Create(socketAddress);
// }
// catch
// {
// }
// if (_rightEndPoint == null)
// {
// _rightEndPoint = remoteEP2;
// }
// }
// if (ex != null)
// {
// throw ex;
// }
// if (NetEventSource.IsEnabled)
// {
// NetEventSource.DumpBuffer(this, buffer, offset, size, "ReceiveFrom");
// NetEventSource.Exit(this, bytesTransferred, "ReceiveFrom");
// }
// return bytesTransferred;
//
// so until they upgrade their mono version, we are stuck with
// some allocations.
//
// for now, let's pass the newly created on to our temp so at
// least we reuse it next time.
temp = socketAddress;
// SocketAddress.GetHashCode() depends on SocketAddress.m_changed.
// ReceiveFrom only sets the buffer, it does not seem to set m_changed.
// we need to reset m_changed for two reasons:
// * if m_changed is false, GetHashCode() returns the cahced m_hash
// which is '0'. that would be a problem.
// https://github.com/mono/mono/blob/bdd772531d379b4e78593587d15113c37edd4a64/mcs/class/referencesource/System/net/System/Net/SocketAddress.cs#L262
// * if we have a cached m_hash, but ReceiveFrom modified the buffer
// then the GetHashCode() should change too. so we need to reset
// either way.
//
// the only way to do that is by _actually_ modifying the buffer:
// https://github.com/mono/mono/blob/bdd772531d379b4e78593587d15113c37edd4a64/mcs/class/referencesource/System/net/System/Net/SocketAddress.cs#L99
// so let's do that.
// -> unchecked in case it's byte.Max
unchecked
{
temp[0] += 1;
temp[0] -= 1;
}
// make sure this worked.
// at least throw an Exception to make it obvious if the trick does
// not work anymore, in case ReceiveFrom is ever changed.
if (temp.GetHashCode() == 0)
throw new Exception($"SocketAddress GetHashCode() is 0 after ReceiveFrom. Does the m_changed trick not work anymore?");
// in the future, enable this again:
//throw new Exception($"Socket.ReceiveFrom(): passed SocketAddress={socketAddress} but expected {temp}. This should never happen. Did ReceiveFrom() change?");
}
// ReceiveFrom sets seed_endpoint to the result of Create():
// https://github.com/mono/mono/blob/f74eed4b09790a0929889ad7fc2cf96c9b6e3757/mcs/class/System/System.Net.Sockets/Socket.cs#L1764
// so let's return ourselves at least.
// (seed_endpoint only seems to matter for BeginSend etc.)
return this;
}
// we need to overwrite GetHashCode() for two reasons.
// https://github.com/mono/mono/blob/bdd772531d379b4e78593587d15113c37edd4a64/mcs/class/referencesource/System/net/System/Net/IPEndPoint.cs#L160
// * it uses m_Address. but our true SocketAddress is in m_temp.
// m_Address might not be set at all.
// * m_Address.GetHashCode() allocates:
// https://github.com/mono/mono/blob/bdd772531d379b4e78593587d15113c37edd4a64/mcs/class/referencesource/System/net/System/Net/IPAddress.cs#L699
public override int GetHashCode() => temp.GetHashCode();
public int GetThisHashCode() => base.GetHashCode();
// helper function to create an ACTUAL new IPEndPoint from this.
// server needs it to store new connections as unique IPEndPoints.
public IPEndPoint DeepCopyIPEndPoint()
{
// we need to create a new IPEndPoint from 'temp' SocketAddress.
// there is no 'new IPEndPoint(SocketAddress) constructor.
// so we need to be a bit creative...
// allocate a placeholder IPAddress to copy
// our SocketAddress into.
// -> needs to be the same address family.
IPAddress ipAddress;
if (temp.Family == AddressFamily.InterNetworkV6)
ipAddress = IPAddress.IPv6Any;
else if (temp.Family == AddressFamily.InterNetwork)
ipAddress = IPAddress.Any;
else
throw new Exception($"Unexpected SocketAddress family: {temp.Family}");
// allocate a placeholder IPEndPoint
// with the needed size form IPAddress.
// (the real class. not NonAlloc)
IPEndPoint placeholder = new IPEndPoint(ipAddress, 0);
// the real IPEndPoint's .Create function can create a new IPEndPoint
// copy from a SocketAddress.
return (IPEndPoint)placeholder.Create(temp);
}
#else
public IPEndPointNonAlloc(long address, int port) : base(address, port)
{
}
public IPEndPointNonAlloc(IPAddress address, int port) : base(address, port)
{
}
#endif
public bool Equals(IPEndPoint ipEndPoint)
{
if (!object.Equals(ipEndPoint.Address, this.Address))
{
return false;
}
if (ipEndPoint.Port != this.Port)
{
return false;
}
return true;
}
}
public static class EndPointHelper
{
public static IPEndPoint Clone(this EndPoint endPoint)
{
#if UNITY
IPEndPoint ip = ((IPEndPointNonAlloc)endPoint).DeepCopyIPEndPoint();
#else
IPEndPoint ip = (IPEndPoint)endPoint;
ip = new IPEndPoint(ip.Address, ip.Port);
#endif
return ip;
}
}
}

View File

@@ -0,0 +1,11 @@
fileFormatVersion: 2
guid: 3ebd832de5e3f1041a938a17cb70c51a
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

View File

@@ -0,0 +1,523 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Runtime.InteropServices;
namespace ET
{
public class KChannel : AChannel
{
private readonly KService Service;
private Kcp kcp { get; set; }
private readonly Queue<ActorMessageInfo> waitSendMessages = new Queue<ActorMessageInfo>();
public readonly uint CreateTime;
public uint LocalConn
{
get
{
return (uint)this.Id;
}
private set
{
this.Id = value;
}
}
public uint RemoteConn { get; set; }
private readonly byte[] sendCache = new byte[2 * 1024];
public bool IsConnected { get; set; }
public string RealAddress { get; set; }
private MemoryBuffer readMemory;
private int needReadSplitCount;
private IPEndPoint remoteAddress;
public IPEndPoint RemoteAddress
{
get
{
return this.remoteAddress;
}
set
{
this.remoteAddress = new IPEndPointNonAlloc(value.Address, value.Port);
}
}
private void InitKcp()
{
switch (this.Service.ServiceType)
{
case ServiceType.Inner:
this.kcp.SetNoDelay(1, 10, 2, true);
this.kcp.SetWindowSize(1024, 1024);
this.kcp.SetMtu(1400); // 默认1400
this.kcp.SetMinrto(30);
break;
case ServiceType.Outer:
this.kcp.SetNoDelay(1, 10, 2, true);
this.kcp.SetWindowSize(256, 256);
this.kcp.SetMtu(470);
this.kcp.SetMinrto(30);
break;
}
}
// connect
public KChannel(uint localConn, IPEndPoint remoteEndPoint, KService kService)
{
this.LocalConn = localConn;
this.ChannelType = ChannelType.Connect;
Log.Info($"channel create: {this.LocalConn} {remoteEndPoint} {this.ChannelType}");
this.Service = kService;
this.RemoteAddress = remoteEndPoint;
this.CreateTime = kService.TimeNow;
this.Connect(this.CreateTime);
}
// accept
public KChannel(uint localConn, uint remoteConn, IPEndPoint remoteEndPoint, KService kService)
{
this.ChannelType = ChannelType.Accept;
Log.Info($"channel create: {localConn} {remoteConn} {remoteEndPoint} {this.ChannelType}");
this.Service = kService;
this.LocalConn = localConn;
this.RemoteConn = remoteConn;
this.RemoteAddress = remoteEndPoint;
this.kcp = new Kcp(this.RemoteConn, this.Output);
this.InitKcp();
this.CreateTime = kService.TimeNow;
}
public override void Dispose()
{
if (this.IsDisposed)
{
return;
}
uint localConn = this.LocalConn;
uint remoteConn = this.RemoteConn;
Log.Info($"channel dispose: {localConn} {remoteConn} {this.Error}");
long id = this.Id;
this.Id = 0;
this.Service.Remove(id);
try
{
if (this.Error != ErrorCore.ERR_PeerDisconnect)
{
this.Service.Disconnect(localConn, remoteConn, this.Error, this.RemoteAddress, 3);
}
}
catch (Exception e)
{
Log.Error(e);
}
this.kcp = null;
}
public void HandleConnnect()
{
// 如果连接上了就不用处理了
if (this.IsConnected)
{
return;
}
this.kcp = new Kcp(this.RemoteConn, this.Output);
this.InitKcp();
Log.Info($"channel connected: {this.LocalConn} {this.RemoteConn} {this.RemoteAddress}");
this.IsConnected = true;
while (true)
{
if (this.waitSendMessages.Count <= 0)
{
break;
}
ActorMessageInfo buffer = this.waitSendMessages.Dequeue();
this.Send(buffer.ActorId, buffer.MessageObject);
}
}
private long lastConnectTime = long.MaxValue;
/// <summary>
/// 发送请求连接消息
/// </summary>
private void Connect(uint timeNow)
{
try
{
if (this.IsConnected)
{
return;
}
// 300毫秒后再次update发送connect请求
if (timeNow < this.lastConnectTime + 300)
{
this.Service.AddToUpdate(300, this.Id);
return;
}
// 10秒连接超时
if (timeNow > this.CreateTime + KService.ConnectTimeoutTime)
{
Log.Error($"kChannel connect timeout: {this.Id} {this.RemoteConn} {timeNow} {this.CreateTime} {this.ChannelType} {this.RemoteAddress}");
this.OnError(ErrorCore.ERR_KcpConnectTimeout);
return;
}
byte[] buffer = sendCache;
buffer.WriteTo(0, KcpProtocalType.SYN);
buffer.WriteTo(1, this.LocalConn);
buffer.WriteTo(5, this.RemoteConn);
this.Service.Socket.SendTo(buffer, 0, 9, SocketFlags.None, this.RemoteAddress);
// 这里很奇怪 调用socket.LocalEndPoint会动到this.RemoteAddressNonAlloc里面的temp这里就不仔细研究了
Log.Info($"kchannel connect {this.LocalConn} {this.RemoteConn} {this.RealAddress}");
this.lastConnectTime = timeNow;
this.Service.AddToUpdate(300, this.Id);
}
catch (Exception e)
{
Log.Error(e);
this.OnError(ErrorCore.ERR_SocketCantSend);
}
}
public void Update(uint timeNow)
{
if (this.IsDisposed)
{
return;
}
// 如果还没连接上,发送连接请求
if (!this.IsConnected && this.ChannelType == ChannelType.Connect)
{
this.Connect(timeNow);
return;
}
if (this.kcp == null)
{
return;
}
try
{
this.kcp.Update(timeNow);
}
catch (Exception e)
{
Log.Error(e);
this.OnError(ErrorCore.ERR_SocketError);
return;
}
uint nextUpdateTime = this.kcp.Check(timeNow);
this.Service.AddToUpdate(nextUpdateTime, this.Id);
}
public unsafe void HandleRecv(byte[] date, int offset, int length)
{
if (this.IsDisposed)
{
return;
}
this.kcp.Input(date.AsSpan(offset, length));
this.Service.AddToUpdate(0, this.Id);
while (true)
{
if (this.IsDisposed)
{
break;
}
int n = this.kcp.PeekSize();
if (n < 0)
{
break;
}
if (n == 0)
{
this.OnError((int)SocketError.NetworkReset);
return;
}
if (this.needReadSplitCount > 0) // 说明消息分片了
{
byte[] buffer = readMemory.GetBuffer();
int count = this.kcp.Receive(buffer.AsSpan((int)(this.readMemory.Length - this.needReadSplitCount), n));
this.needReadSplitCount -= count;
if (n != count)
{
Log.Error($"kchannel read error1: {this.LocalConn} {this.RemoteConn}");
this.OnError(ErrorCore.ERR_KcpReadNotSame);
return;
}
if (this.needReadSplitCount < 0)
{
Log.Error($"kchannel read error2: {this.LocalConn} {this.RemoteConn}");
this.OnError(ErrorCore.ERR_KcpSplitError);
return;
}
// 没有读完
if (this.needReadSplitCount != 0)
{
continue;
}
}
else
{
this.readMemory = this.Service.Fetch(n);
this.readMemory.SetLength(n);
this.readMemory.Seek(0, SeekOrigin.Begin);
byte[] buffer = readMemory.GetBuffer();
int count = this.kcp.Receive(buffer.AsSpan(0, n));
if (n != count)
{
break;
}
// 判断是不是分片
if (n == 8)
{
int headInt = BitConverter.ToInt32(this.readMemory.GetBuffer(), 0);
if (headInt == 0)
{
this.needReadSplitCount = BitConverter.ToInt32(readMemory.GetBuffer(), 4);
if (this.needReadSplitCount <= AService.MaxCacheBufferSize)
{
Log.Error($"kchannel read error3: {this.needReadSplitCount} {this.LocalConn} {this.RemoteConn}");
this.OnError(ErrorCore.ERR_KcpSplitCountError);
return;
}
this.readMemory.SetLength(this.needReadSplitCount);
this.readMemory.Seek(0, SeekOrigin.Begin);
continue;
}
}
}
switch (this.Service.ServiceType)
{
case ServiceType.Inner:
this.readMemory.Seek(Packet.ActorIdLength + Packet.OpcodeLength, SeekOrigin.Begin);
break;
case ServiceType.Outer:
this.readMemory.Seek(Packet.OpcodeLength, SeekOrigin.Begin);
break;
}
MemoryBuffer memoryBuffer = this.readMemory;
this.readMemory = null;
this.OnRead(memoryBuffer);
this.Service.Recycle(memoryBuffer);
}
}
public void Output(byte[] bytes, int count)
{
if (this.IsDisposed)
{
return;
}
try
{
// 没连接上 kcp不往外发消息, 其实本来没连接上不会调用update这里只是做一层保护
if (!this.IsConnected)
{
return;
}
if (count == 0)
{
Log.Error($"output 0");
return;
}
bytes.WriteTo(0, KcpProtocalType.MSG);
// 每个消息头部写下该channel的id;
bytes.WriteTo(1, this.LocalConn);
this.Service.Socket.SendTo(bytes, 0, count + 5, SocketFlags.None, this.RemoteAddress);
}
catch (Exception e)
{
Log.Error(e);
this.OnError(ErrorCore.ERR_SocketCantSend);
}
}
private void KcpSend(ActorId actorId, MemoryBuffer memoryStream)
{
if (this.IsDisposed)
{
return;
}
switch (this.Service.ServiceType)
{
case ServiceType.Inner:
{
memoryStream.GetBuffer().WriteTo(0, actorId);
break;
}
case ServiceType.Outer:
{
// 外网不需要发送actorId跳过
memoryStream.Seek(Packet.ActorIdLength, SeekOrigin.Begin);
break;
}
}
int count = (int) (memoryStream.Length - memoryStream.Position);
// 超出maxPacketSize需要分片
if (count <= AService.MaxCacheBufferSize)
{
this.kcp.Send(memoryStream.GetBuffer().AsSpan((int)memoryStream.Position, count));
}
else
{
// 先发分片信息
this.sendCache.WriteTo(0, 0);
this.sendCache.WriteTo(4, count);
this.kcp.Send(this.sendCache.AsSpan(0, 8));
// 分片发送
int alreadySendCount = 0;
while (alreadySendCount < count)
{
int leftCount = count - alreadySendCount;
int sendCount = leftCount < AService.MaxCacheBufferSize? leftCount: AService.MaxCacheBufferSize;
this.kcp.Send(memoryStream.GetBuffer().AsSpan((int)memoryStream.Position + alreadySendCount, sendCount));
alreadySendCount += sendCount;
}
}
this.Service.AddToUpdate(0, this.Id);
}
public void Send(ActorId actorId, MessageObject message)
{
if (!this.IsConnected)
{
ActorMessageInfo actorMessageInfo = new() { ActorId = actorId, MessageObject = message };
this.waitSendMessages.Enqueue(actorMessageInfo);
return;
}
MemoryBuffer stream = this.Service.Fetch();
MessageSerializeHelper.MessageToStream(stream, message);
message.Dispose();
if (this.kcp == null)
{
throw new Exception("kchannel connected but kcp is zero!");
}
// 检查等待发送的消息,如果超出最大等待大小,应该断开连接
int n = this.kcp.WaitSnd;
int maxWaitSize = 0;
switch (this.Service.ServiceType)
{
case ServiceType.Inner:
maxWaitSize = Kcp.InnerMaxWaitSize;
break;
case ServiceType.Outer:
maxWaitSize = Kcp.OuterMaxWaitSize;
break;
default:
throw new ArgumentOutOfRangeException();
}
if (n > maxWaitSize)
{
Log.Error($"kcp wait snd too large: {n}: {this.LocalConn} {this.RemoteConn}");
this.OnError(ErrorCore.ERR_KcpWaitSendSizeTooLarge);
return;
}
this.KcpSend(actorId, stream);
this.Service.Recycle(stream);
}
private void OnRead(MemoryBuffer memoryStream)
{
try
{
long channelId = this.Id;
object message = null;
ActorId actorId = default;
switch (this.Service.ServiceType)
{
case ServiceType.Outer:
{
ushort opcode = BitConverter.ToUInt16(memoryStream.GetBuffer(), Packet.KcpOpcodeIndex);
Type type = OpcodeType.Instance.GetType(opcode);
message = MessageSerializeHelper.Deserialize(type, memoryStream);
break;
}
case ServiceType.Inner:
{
byte[] buffer = memoryStream.GetBuffer();
actorId.Process = BitConverter.ToInt32(buffer, Packet.ActorIdIndex);
actorId.Fiber = BitConverter.ToInt32(buffer, Packet.ActorIdIndex + 4);
actorId.InstanceId = BitConverter.ToInt64(buffer, Packet.ActorIdIndex + 8);
ushort opcode = BitConverter.ToUInt16(memoryStream.GetBuffer(), Packet.OpcodeIndex);
Type type = OpcodeType.Instance.GetType(opcode);
message = MessageSerializeHelper.Deserialize(type, memoryStream);
break;
}
}
this.Service.ReadCallback(channelId, actorId, message);
}
catch (Exception e)
{
Log.Error(e);
this.OnError(ErrorCore.ERR_PacketParserError);
}
}
public void OnError(int error)
{
long channelId = this.Id;
this.Service.Remove(channelId, error);
this.Service.ErrorCallback(channelId, error);
}
}
}

View File

@@ -0,0 +1,11 @@
fileFormatVersion: 2
guid: 251e6a92fa54e1a46a4d233e72c458ca
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

View File

@@ -0,0 +1,628 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Runtime.InteropServices;
namespace ET
{
public static class KcpProtocalType
{
public const byte SYN = 1;
public const byte ACK = 2;
public const byte FIN = 3;
public const byte MSG = 4;
public const byte RouterReconnectSYN = 5;
public const byte RouterReconnectACK = 6;
public const byte RouterSYN = 7;
public const byte RouterACK = 8;
}
public enum ServiceType
{
Outer,
Inner,
}
public sealed class KService: AService
{
public const int ConnectTimeoutTime = 20 * 1000;
private DateTime dt1970;
// KService创建的时间
private readonly long startTime;
// 当前时间 - KService创建的时间, 线程安全
public uint TimeNow
{
get
{
return (uint)((DateTime.UtcNow.Ticks - this.startTime) / 10000);
}
}
public Socket Socket;
public KService(IPEndPoint ipEndPoint, ServiceType serviceType)
{
this.ServiceType = serviceType;
this.startTime = DateTime.UtcNow.Ticks;
this.Socket = new Socket(ipEndPoint.AddressFamily, SocketType.Dgram, ProtocolType.Udp);
if (!RuntimeInformation.IsOSPlatform(OSPlatform.OSX))
{
this.Socket.SendBufferSize = Kcp.OneM * 64;
this.Socket.ReceiveBufferSize = Kcp.OneM * 64;
}
try
{
this.Socket.Bind(ipEndPoint);
}
catch (Exception e)
{
throw new Exception($"bind error: {ipEndPoint}", e);
}
NetworkHelper.SetSioUdpConnReset(this.Socket);
}
public KService(AddressFamily addressFamily, ServiceType serviceType)
{
this.ServiceType = serviceType;
this.startTime = DateTime.UtcNow.Ticks;
this.Socket = new Socket(addressFamily, SocketType.Dgram, ProtocolType.Udp);
NetworkHelper.SetSioUdpConnReset(this.Socket);
}
// 保存所有的channel
private readonly Dictionary<long, KChannel> localConnChannels = new Dictionary<long, KChannel>();
private readonly Dictionary<long, KChannel> waitAcceptChannels = new Dictionary<long, KChannel>();
private readonly byte[] cache = new byte[2048];
private EndPoint ipEndPoint = new IPEndPointNonAlloc(IPAddress.Any, 0);
private readonly List<long> cacheIds = new List<long>();
#if !UNITY
// 下帧要更新的channel
private readonly HashSet<long> updateIds = new HashSet<long>();
// 下次时间更新的channel
private readonly MultiMap<long, long> timeId = new();
private readonly List<long> timeOutTime = new List<long>();
// 记录最小时间不用每次都去MultiMap取第一个值
private long minTime;
#endif
public override bool IsDisposed()
{
return this.Socket == null;
}
public override void Dispose()
{
if (this.IsDisposed())
{
return;
}
base.Dispose();
foreach (long channelId in this.localConnChannels.Keys.ToArray())
{
this.Remove(channelId);
}
this.Socket.Close();
this.Socket = null;
}
public override (uint, uint) GetChannelConn(long channelId)
{
KChannel kChannel = this.Get(channelId);
if (kChannel == null)
{
throw new Exception($"GetChannelConn conn not found KChannel! {channelId}");
}
return (kChannel.LocalConn, kChannel.RemoteConn);
}
public override void ChangeAddress(long channelId, IPEndPoint newIPEndPoint)
{
KChannel kChannel = this.Get(channelId);
if (kChannel == null)
{
return;
}
kChannel.RemoteAddress = newIPEndPoint;
}
private void Recv()
{
if (this.Socket == null)
{
return;
}
while (this.Socket != null && this.Socket.Available > 0)
{
int messageLength = this.Socket.ReceiveFrom_NonAlloc(this.cache, ref this.ipEndPoint);
// 长度小于1不是正常的消息
if (messageLength < 1)
{
continue;
}
// accept
byte flag = this.cache[0];
// conn从100开始如果为123则是特殊包
uint remoteConn = 0;
uint localConn = 0;
try
{
KChannel kChannel = null;
switch (flag)
{
case KcpProtocalType.RouterReconnectSYN:
{
// 长度!=5不是RouterReconnectSYN消息
if (messageLength != 13)
{
break;
}
string realAddress = null;
remoteConn = BitConverter.ToUInt32(this.cache, 1);
localConn = BitConverter.ToUInt32(this.cache, 5);
uint connectId = BitConverter.ToUInt32(this.cache, 9);
this.localConnChannels.TryGetValue(localConn, out kChannel);
if (kChannel == null)
{
Log.Warning($"kchannel reconnect not found channel: {localConn} {remoteConn} {realAddress}");
break;
}
// 这里必须校验localConn客户端重连localConn一定是一样的
if (localConn != kChannel.LocalConn)
{
Log.Warning($"kchannel reconnect localconn error: {localConn} {remoteConn} {realAddress} {kChannel.LocalConn}");
break;
}
if (remoteConn != kChannel.RemoteConn)
{
Log.Warning($"kchannel reconnect remoteconn error: {localConn} {remoteConn} {realAddress} {kChannel.RemoteConn}");
break;
}
// 重连的时候router地址变化, 这个不能放到msg中必须经过严格的验证才能切换
if (!this.ipEndPoint.Equals(kChannel.RemoteAddress))
{
kChannel.RemoteAddress = this.ipEndPoint.Clone();
}
try
{
byte[] buffer = this.cache;
buffer.WriteTo(0, KcpProtocalType.RouterReconnectACK);
buffer.WriteTo(1, kChannel.LocalConn);
buffer.WriteTo(5, kChannel.RemoteConn);
buffer.WriteTo(9, connectId);
this.Socket.SendTo(buffer, 0, 13, SocketFlags.None, this.ipEndPoint);
}
catch (Exception e)
{
Log.Error(e);
kChannel.OnError(ErrorCore.ERR_SocketCantSend);
}
break;
}
case KcpProtocalType.SYN: // accept
{
// 长度!=5不是SYN消息
if (messageLength < 9)
{
break;
}
string realAddress = null;
if (messageLength > 9)
{
realAddress = this.cache.ToStr(9, messageLength - 9);
}
remoteConn = BitConverter.ToUInt32(this.cache, 1);
localConn = BitConverter.ToUInt32(this.cache, 5);
this.waitAcceptChannels.TryGetValue(remoteConn, out kChannel);
if (kChannel == null)
{
// accept的localConn不能与connect的localConn冲突所以设置为一个大的数
// localConn被人猜出来问题不大因为remoteConn是随机的,第三方并不知道
localConn = NetServices.Instance.CreateAcceptChannelId();
// 已存在同样的localConn则不处理等待下次sync
if (this.localConnChannels.ContainsKey(localConn))
{
break;
}
kChannel = new KChannel(localConn, remoteConn, this.ipEndPoint.Clone(), this);
this.waitAcceptChannels.Add(kChannel.RemoteConn, kChannel); // 连接上了或者超时后会删除
this.localConnChannels.Add(kChannel.LocalConn, kChannel);
kChannel.RealAddress = realAddress;
IPEndPoint realEndPoint = kChannel.RealAddress == null? kChannel.RemoteAddress : NetworkHelper.ToIPEndPoint(kChannel.RealAddress);
this.AcceptCallback(kChannel.Id, realEndPoint);
}
if (kChannel.RemoteConn != remoteConn)
{
break;
}
// 地址跟上次的不一致则跳过
if (kChannel.RealAddress != realAddress)
{
Log.Error($"kchannel syn address diff: {kChannel.Id} {kChannel.RealAddress} {realAddress}");
break;
}
try
{
byte[] buffer = this.cache;
buffer.WriteTo(0, KcpProtocalType.ACK);
buffer.WriteTo(1, kChannel.LocalConn);
buffer.WriteTo(5, kChannel.RemoteConn);
Log.Info($"kservice syn: {kChannel.Id} {remoteConn} {localConn}");
this.Socket.SendTo(buffer, 0, 9, SocketFlags.None, kChannel.RemoteAddress);
}
catch (Exception e)
{
Log.Error(e);
kChannel.OnError(ErrorCore.ERR_SocketCantSend);
}
break;
}
case KcpProtocalType.ACK: // connect返回
// 长度!=9不是connect消息
if (messageLength != 9)
{
break;
}
remoteConn = BitConverter.ToUInt32(this.cache, 1);
localConn = BitConverter.ToUInt32(this.cache, 5);
kChannel = this.Get(localConn);
if (kChannel != null)
{
Log.Info($"kservice ack: {localConn} {remoteConn}");
kChannel.RemoteConn = remoteConn;
kChannel.HandleConnnect();
}
break;
case KcpProtocalType.FIN: // 断开
// 长度!=13不是DisConnect消息
if (messageLength != 13)
{
break;
}
remoteConn = BitConverter.ToUInt32(this.cache, 1);
localConn = BitConverter.ToUInt32(this.cache, 5);
int error = BitConverter.ToInt32(this.cache, 9);
// 处理chanel
kChannel = this.Get(localConn);
if (kChannel == null)
{
break;
}
// 校验remoteConn防止第三方攻击
if (kChannel.RemoteConn != remoteConn)
{
break;
}
Log.Info($"kservice recv fin: {localConn} {remoteConn} {error}");
kChannel.OnError(ErrorCore.ERR_PeerDisconnect);
break;
case KcpProtocalType.MSG: // 断开
// 长度<9不是Msg消息
if (messageLength < 9)
{
break;
}
// 处理chanel
remoteConn = BitConverter.ToUInt32(this.cache, 1);
localConn = BitConverter.ToUInt32(this.cache, 5);
kChannel = this.Get(localConn);
if (kChannel == null)
{
// 通知对方断开
this.Disconnect(localConn, remoteConn, ErrorCore.ERR_KcpNotFoundChannel, this.ipEndPoint, 1);
break;
}
// 校验remoteConn防止第三方攻击
if (kChannel.RemoteConn != remoteConn)
{
break;
}
// 对方发来msg说明kchannel连接完成
if (!kChannel.IsConnected)
{
kChannel.IsConnected = true;
this.waitAcceptChannels.Remove(kChannel.RemoteConn);
}
kChannel.HandleRecv(this.cache, 5, messageLength - 5);
break;
}
}
catch (Exception e)
{
Log.Error($"kservice error: {flag} {remoteConn} {localConn}\n{e}");
}
}
}
public KChannel Get(long id)
{
KChannel channel;
this.localConnChannels.TryGetValue(id, out channel);
return channel;
}
public override void Create(long id, IPEndPoint address)
{
if (this.localConnChannels.TryGetValue(id, out KChannel kChannel))
{
return;
}
try
{
// 低32bit是localConn
uint localConn = (uint)id;
kChannel = new KChannel(localConn, address, this);
this.localConnChannels.Add(kChannel.LocalConn, kChannel);
}
catch (Exception e)
{
Log.Error($"kservice get error: {id}\n{e}");
}
}
public override void Remove(long id, int error = 0)
{
if (!this.localConnChannels.TryGetValue(id, out KChannel kChannel))
{
return;
}
kChannel.Error = error;
Log.Info($"kservice remove channel: {id} {kChannel.LocalConn} {kChannel.RemoteConn} {error}");
this.localConnChannels.Remove(kChannel.LocalConn);
if (this.waitAcceptChannels.TryGetValue(kChannel.RemoteConn, out KChannel waitChannel))
{
if (waitChannel.LocalConn == kChannel.LocalConn)
{
this.waitAcceptChannels.Remove(kChannel.RemoteConn);
}
}
kChannel.Dispose();
}
public void Disconnect(uint localConn, uint remoteConn, int error, EndPoint address, int times)
{
try
{
if (this.Socket == null)
{
return;
}
byte[] buffer = this.cache;
buffer.WriteTo(0, KcpProtocalType.FIN);
buffer.WriteTo(1, localConn);
buffer.WriteTo(5, remoteConn);
buffer.WriteTo(9, (uint) error);
for (int i = 0; i < times; ++i)
{
this.Socket.SendTo(buffer, 0, 13, SocketFlags.None, address);
}
}
catch (Exception e)
{
Log.Error($"Disconnect error {localConn} {remoteConn} {error} {address} {e}");
}
Log.Info($"channel send fin: {localConn} {remoteConn} {address} {error}");
}
public override void Send(long channelId, ActorId actorId, MessageObject message)
{
KChannel channel = this.Get(channelId);
if (channel == null)
{
return;
}
channel.Send(actorId, message);
}
public override void Update()
{
uint timeNow = this.TimeNow;
this.TimerOut(timeNow);
this.CheckWaitAcceptChannel(timeNow);
this.Recv();
this.UpdateChannel(timeNow);
}
private void CheckWaitAcceptChannel(uint timeNow)
{
cacheIds.Clear();
foreach (var kv in this.waitAcceptChannels)
{
KChannel kChannel = kv.Value;
if (kChannel.IsDisposed)
{
continue;
}
if (kChannel.IsConnected)
{
continue;
}
if (timeNow < kChannel.CreateTime + ConnectTimeoutTime)
{
continue;
}
cacheIds.Add(kChannel.Id);
}
foreach (long id in this.cacheIds)
{
if (!this.waitAcceptChannels.TryGetValue(id, out KChannel kChannel))
{
continue;
}
kChannel.OnError(ErrorCore.ERR_KcpAcceptTimeout);
}
}
private void UpdateChannel(uint timeNow)
{
#if UNITY
// Unity中每帧更新Channel
this.cacheIds.Clear();
foreach (var kv in this.waitAcceptChannels)
{
this.cacheIds.Add(kv.Key);
}
foreach (var kv in this.localConnChannels)
{
this.cacheIds.Add(kv.Key);
}
foreach (long id in this.cacheIds)
{
KChannel kChannel = this.Get(id);
if (kChannel == null)
{
continue;
}
if (kChannel.Id == 0)
{
continue;
}
kChannel.Update(timeNow);
}
#else
foreach (long id in this.updateIds)
{
KChannel kChannel = this.Get(id);
if (kChannel == null)
{
continue;
}
if (kChannel.Id == 0)
{
continue;
}
kChannel.Update(timeNow);
}
this.updateIds.Clear();
#endif
}
// 服务端需要看channel的update时间是否已到
public void AddToUpdate(long time, long id)
{
#if !UNITY
if (time == 0)
{
this.updateIds.Add(id);
return;
}
if (time < this.minTime)
{
this.minTime = time;
}
this.timeId.Add(time, id);
#endif
}
// 计算到期需要update的channel
private void TimerOut(uint timeNow)
{
#if !UNITY
if (this.timeId.Count == 0)
{
return;
}
if (timeNow < this.minTime)
{
return;
}
this.timeOutTime.Clear();
foreach (KeyValuePair<long, List<long>> kv in this.timeId)
{
long k = kv.Key;
if (k > timeNow)
{
minTime = k;
break;
}
this.timeOutTime.Add(k);
}
foreach (long k in this.timeOutTime)
{
foreach (long v in this.timeId[k])
{
this.updateIds.Add(v);
}
this.timeId.Remove(k);
}
#endif
}
}
}

View File

@@ -0,0 +1,11 @@
fileFormatVersion: 2
guid: 40d9951107a787d4ebe60fd2a63461eb
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

View File

@@ -0,0 +1,15 @@
namespace ET
{
public class MessageAttribute: BaseAttribute
{
public ushort Opcode
{
get;
}
public MessageAttribute(ushort opcode = 0)
{
this.Opcode = opcode;
}
}
}

View File

@@ -0,0 +1,11 @@
fileFormatVersion: 2
guid: c9e3a99c677b8744786b52472661a314
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

View File

@@ -0,0 +1,66 @@
using System;
using System.Collections.Generic;
namespace ET
{
public class MessagePool
{
private readonly Dictionary<Type, Queue<MessageObject>> pool = new();
public T Fetch<T>() where T: MessageObject
{
return this.Fetch(typeof (T)) as T;
}
// 只有客户端才用消息池,服务端不使用
public MessageObject Fetch(Type type)
{
lock (this.pool)
{
MessageObject messageObject;
Queue<MessageObject> queue = null;
if (!pool.TryGetValue(type, out queue))
{
messageObject = Activator.CreateInstance(type) as MessageObject;
}
else if (queue.Count == 0)
{
messageObject = Activator.CreateInstance(type) as MessageObject;
}
else
{
messageObject = queue.Dequeue();
}
messageObject.IsFromPool = true;
return messageObject;
}
}
public void Recycle(MessageObject obj)
{
if (!obj.IsFromPool)
{
return;
}
Type type = obj.GetType();
lock (this.pool)
{
Queue<MessageObject> queue = null;
if (!pool.TryGetValue(type, out queue))
{
queue = new Queue<MessageObject>();
pool.Add(type, queue);
}
// 一种对象最大为100个
if (queue.Count > 100)
{
return;
}
queue.Enqueue(obj);
}
}
}
}

View File

@@ -0,0 +1,11 @@
fileFormatVersion: 2
guid: 21b73f9512ba740f78c657a1e87792db
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

View File

@@ -0,0 +1,49 @@
using System;
using System.IO;
namespace ET
{
public static class MessageSerializeHelper
{
public static byte[] Serialize(MessageObject message)
{
return MemoryPackHelper.Serialize(message);
}
public static void Serialize(MessageObject message, MemoryBuffer stream)
{
MemoryPackHelper.Serialize(message, stream);
}
public static MessageObject Deserialize(Type type, byte[] bytes, int index, int count)
{
object o = ObjectPool.Instance.Fetch(type);
MemoryPackHelper.Deserialize(type, bytes, index, count, ref o);
return o as MessageObject;
}
public static MessageObject Deserialize(Type type, MemoryBuffer stream)
{
object o = ObjectPool.Instance.Fetch(type);
MemoryPackHelper.Deserialize(type, stream, ref o);
return o as MessageObject;
}
public static ushort MessageToStream(MemoryBuffer stream, MessageObject message)
{
int headOffset = Packet.ActorIdLength;
ushort opcode = OpcodeType.Instance.GetOpcode(message.GetType());
stream.Seek(headOffset + Packet.OpcodeLength, SeekOrigin.Begin);
stream.SetLength(headOffset + Packet.OpcodeLength);
stream.GetBuffer().WriteTo(headOffset, opcode);
MessageSerializeHelper.Serialize(message, stream);
stream.Seek(0, SeekOrigin.Begin);
return opcode;
}
}
}

View File

@@ -0,0 +1,11 @@
fileFormatVersion: 2
guid: 20170a9502b055949aa3d9afeb5f4b77
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

View File

@@ -0,0 +1,65 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
namespace ET
{
public enum NetworkProtocol
{
TCP,
KCP,
Websocket,
}
public class NetServices: Singleton<NetServices>, ISingletonAwake
{
private readonly ConcurrentDictionary<long, AService> services = new();
private long idGenerator;
public void Awake()
{
}
protected override void Destroy()
{
foreach (var kv in this.services)
{
kv.Value.Dispose();
}
}
public void Add(AService aService)
{
aService.Id = Interlocked.Increment(ref this.idGenerator);
this.services[aService.Id] = aService;
}
public AService Get(long id)
{
AService aService;
this.services.TryGetValue(id, out aService);
return aService;
}
public void Remove(long id)
{
this.services.Remove(id, out AService _);
}
// 这个因为是NetClientComponent中使用不会与Accept冲突
public uint CreateConnectChannelId()
{
return RandomGenerator.RandUInt32();
}
// 防止与内网进程号的ChannelId冲突所以设置为一个大的随机数
private uint acceptIdGenerator = uint.MaxValue;
public uint CreateAcceptChannelId()
{
return --this.acceptIdGenerator;
}
}
}

View File

@@ -0,0 +1,11 @@
fileFormatVersion: 2
guid: 8d287718ca1c8b1449db4794f0a3524b
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

View File

@@ -0,0 +1,16 @@
namespace ET
{
public static class OpcodeHelper
{
public static bool IsOuterMessage(ushort opcode)
{
return opcode < OpcodeRangeDefine.OuterMaxOpcode;
}
public static bool IsInnerMessage(ushort opcode)
{
return opcode >= OpcodeRangeDefine.InnerMinOpcode;
}
}
}

View File

@@ -0,0 +1,11 @@
fileFormatVersion: 2
guid: 9f70c945e6c5ab84ab4a50cb0448927b
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

View File

@@ -0,0 +1,14 @@
namespace ET
{
public static class OpcodeRangeDefine
{
public const ushort OuterMinOpcode = 10001;
public const ushort OuterMaxOpcode = 20000;
// 20001-30000 内网pb
public const ushort InnerMinOpcode = 20001;
public const ushort InnerMaxOpcode = 40000;
public const ushort MaxOpcode = 60000;
}
}

View File

@@ -0,0 +1,11 @@
fileFormatVersion: 2
guid: 2c103b75789be07478ca3d6280f28081
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

View File

@@ -0,0 +1,90 @@
using System;
using System.Collections.Generic;
namespace ET
{
public class OpcodeType: SingletonLock<OpcodeType>, ISingletonAwake
{
// 初始化后不变,所以主线程,网络线程都可以读
private readonly DoubleMap<Type, ushort> typeOpcode = new();
private readonly HashSet<ushort> outrActorMessage = new HashSet<ushort>();
private readonly Dictionary<Type, Type> requestResponse = new Dictionary<Type, Type>();
public void Awake()
{
HashSet<Type> types = EventSystem.Instance.GetTypes(typeof (MessageAttribute));
foreach (Type type in types)
{
object[] att = type.GetCustomAttributes(typeof (MessageAttribute), false);
if (att.Length == 0)
{
continue;
}
MessageAttribute messageAttribute = att[0] as MessageAttribute;
if (messageAttribute == null)
{
continue;
}
ushort opcode = messageAttribute.Opcode;
if (opcode != 0)
{
this.typeOpcode.Add(type, opcode);
}
// 检查request response
if (typeof (IRequest).IsAssignableFrom(type))
{
if (typeof (IActorLocationMessage).IsAssignableFrom(type))
{
this.requestResponse.Add(type, typeof (ActorResponse));
continue;
}
var attrs = type.GetCustomAttributes(typeof (ResponseTypeAttribute), false);
if (attrs.Length == 0)
{
Log.Error($"not found responseType: {type}");
continue;
}
ResponseTypeAttribute responseTypeAttribute = attrs[0] as ResponseTypeAttribute;
this.requestResponse.Add(type, EventSystem.Instance.GetType($"ET.{responseTypeAttribute.Type}"));
}
}
}
public override void Load()
{
World.Instance.AddSingleton<OpcodeType>(true);
}
public ushort GetOpcode(Type type)
{
return this.typeOpcode.GetValueByKey(type);
}
public Type GetType(ushort opcode)
{
Type type = this.typeOpcode.GetKeyByValue(opcode);
if (type == null)
{
throw new Exception($"OpcodeType not found type: {opcode}");
}
return type;
}
public Type GetResponseType(Type request)
{
if (!this.requestResponse.TryGetValue(request, out Type response))
{
throw new Exception($"not found response type, request type: {request.FullName}");
}
return response;
}
}
}

View File

@@ -0,0 +1,11 @@
fileFormatVersion: 2
guid: 6fadd49a69c766241ab3733bd675e308
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

View File

@@ -0,0 +1,103 @@
using System;
using System.IO;
namespace ET
{
public enum ParserState
{
PacketSize,
PacketBody
}
public class PacketParser
{
private readonly CircularBuffer buffer;
private int packetSize;
private ParserState state;
private readonly AService service;
private readonly byte[] cache = new byte[8];
public const int InnerPacketSizeLength = 4;
public const int OuterPacketSizeLength = 2;
public MemoryBuffer MemoryBuffer;
public PacketParser(CircularBuffer buffer, AService service)
{
this.buffer = buffer;
this.service = service;
}
public bool Parse(out MemoryBuffer memoryBuffer)
{
while (true)
{
switch (this.state)
{
case ParserState.PacketSize:
{
if (this.service.ServiceType == ServiceType.Inner)
{
if (this.buffer.Length < InnerPacketSizeLength)
{
memoryBuffer = null;
return false;
}
this.buffer.Read(this.cache, 0, InnerPacketSizeLength);
this.packetSize = BitConverter.ToInt32(this.cache, 0);
if (this.packetSize > ushort.MaxValue * 16 || this.packetSize < Packet.MinPacketSize)
{
throw new Exception($"recv packet size error, 可能是外网探测端口: {this.packetSize}");
}
}
else
{
if (this.buffer.Length < OuterPacketSizeLength)
{
memoryBuffer = null;
return false;
}
this.buffer.Read(this.cache, 0, OuterPacketSizeLength);
this.packetSize = BitConverter.ToUInt16(this.cache, 0);
if (this.packetSize < Packet.MinPacketSize)
{
throw new Exception($"recv packet size error, 可能是外网探测端口: {this.packetSize}");
}
}
this.state = ParserState.PacketBody;
break;
}
case ParserState.PacketBody:
{
if (this.buffer.Length < this.packetSize)
{
memoryBuffer = null;
return false;
}
memoryBuffer = this.service.Fetch(this.packetSize);
this.buffer.Read(memoryBuffer, this.packetSize);
//memoryStream.SetLength(this.packetSize - Packet.MessageIndex);
if (this.service.ServiceType == ServiceType.Inner)
{
memoryBuffer.Seek(Packet.MessageIndex, SeekOrigin.Begin);
}
else
{
memoryBuffer.Seek(Packet.OpcodeLength, SeekOrigin.Begin);
}
this.state = ParserState.PacketSize;
return true;
}
default:
throw new ArgumentOutOfRangeException();
}
}
}
}
}

View File

@@ -0,0 +1,11 @@
fileFormatVersion: 2
guid: 0eca8ccff942df7438ebc49e74229f6b
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

View File

@@ -0,0 +1,14 @@
using System;
namespace ET
{
public class ResponseTypeAttribute: BaseAttribute
{
public string Type { get; }
public ResponseTypeAttribute(string type)
{
this.Type = type;
}
}
}

View File

@@ -0,0 +1,11 @@
fileFormatVersion: 2
guid: 47e49260dfd00574e9c98a40a30bdbe9
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

View File

@@ -0,0 +1,25 @@
using System;
namespace ET
{
/// <summary>
/// RPC异常,带ErrorCode
/// </summary>
public class RpcException: Exception
{
public int Error
{
get;
}
public RpcException(int error, string message): base($"Error: {error} Message: {message}")
{
this.Error = error;
}
public RpcException(int error, string message, Exception e): base($"Error: {error} Message: {message}", e)
{
this.Error = error;
}
}
}

View File

@@ -0,0 +1,11 @@
fileFormatVersion: 2
guid: 7c08a3a7ecb608845a83af0456e11dde
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

View File

@@ -0,0 +1,414 @@
using System;
using System.IO;
using System.Net;
using System.Net.Sockets;
namespace ET
{
/// <summary>
/// 封装Socket,将回调push到主线程处理
/// </summary>
public sealed class TChannel: AChannel
{
private readonly TService Service;
private Socket socket;
private SocketAsyncEventArgs innArgs = new();
private SocketAsyncEventArgs outArgs = new();
private readonly CircularBuffer recvBuffer = new();
private readonly CircularBuffer sendBuffer = new();
private bool isSending;
private bool isConnected;
private readonly PacketParser parser;
public IPEndPoint RemoteAddress { get; set; }
private readonly byte[] sendCache = new byte[Packet.OpcodeLength + Packet.ActorIdLength];
private void OnComplete(object sender, SocketAsyncEventArgs e)
{
this.Service.Queue.Enqueue(new TArgs() {ChannelId = this.Id, SocketAsyncEventArgs = e});
}
public TChannel(long id, IPEndPoint ipEndPoint, TService service)
{
this.ChannelType = ChannelType.Connect;
this.Id = id;
this.Service = service;
this.socket = new Socket(ipEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
this.socket.NoDelay = true;
this.parser = new PacketParser(this.recvBuffer, this.Service);
this.innArgs.Completed += this.OnComplete;
this.outArgs.Completed += this.OnComplete;
this.RemoteAddress = ipEndPoint;
this.isConnected = false;
this.isSending = false;
this.Service.Queue.Enqueue(new TArgs(){Op = TcpOp.Connect,ChannelId = this.Id});
}
public TChannel(long id, Socket socket, TService service)
{
this.ChannelType = ChannelType.Accept;
this.Id = id;
this.Service = service;
this.socket = socket;
this.socket.NoDelay = true;
this.parser = new PacketParser(this.recvBuffer, this.Service);
this.innArgs.Completed += this.OnComplete;
this.outArgs.Completed += this.OnComplete;
this.RemoteAddress = (IPEndPoint)socket.RemoteEndPoint;
this.isConnected = true;
this.isSending = false;
this.Service.Queue.Enqueue(new TArgs() { Op = TcpOp.StartSend, ChannelId = this.Id});
this.Service.Queue.Enqueue(new TArgs() { Op = TcpOp.StartRecv, ChannelId = this.Id});
}
public override void Dispose()
{
if (this.IsDisposed)
{
return;
}
Log.Info($"channel dispose: {this.Id} {this.RemoteAddress} {this.Error}");
long id = this.Id;
this.Id = 0;
this.Service.Remove(id);
this.socket.Close();
this.innArgs.Dispose();
this.outArgs.Dispose();
this.innArgs = null;
this.outArgs = null;
this.socket = null;
}
public void Send(ActorId actorId, MessageObject message)
{
if (this.IsDisposed)
{
throw new Exception("TChannel已经被Dispose, 不能发送消息");
}
MemoryBuffer stream = this.Service.Fetch();
MessageSerializeHelper.MessageToStream(stream, message);
message.Dispose();
switch (this.Service.ServiceType)
{
case ServiceType.Inner:
{
int messageSize = (int) (stream.Length - stream.Position);
if (messageSize > ushort.MaxValue * 16)
{
throw new Exception($"send packet too large: {stream.Length} {stream.Position}");
}
this.sendCache.WriteTo(0, messageSize);
this.sendBuffer.Write(this.sendCache, 0, PacketParser.InnerPacketSizeLength);
// actorId
stream.GetBuffer().WriteTo(0, actorId);
this.sendBuffer.Write(stream.GetBuffer(), (int)stream.Position, (int)(stream.Length - stream.Position));
break;
}
case ServiceType.Outer:
{
stream.Seek(Packet.ActorIdLength, SeekOrigin.Begin); // 外网不需要actorId
ushort messageSize = (ushort) (stream.Length - stream.Position);
this.sendCache.WriteTo(0, messageSize);
this.sendBuffer.Write(this.sendCache, 0, PacketParser.OuterPacketSizeLength);
this.sendBuffer.Write(stream.GetBuffer(), (int)stream.Position, (int)(stream.Length - stream.Position));
break;
}
}
if (!this.isSending)
{
//this.StartSend();
this.Service.Queue.Enqueue(new TArgs() { Op = TcpOp.StartSend, ChannelId = this.Id});
}
this.Service.Recycle(stream);
}
public void ConnectAsync()
{
this.outArgs.RemoteEndPoint = this.RemoteAddress;
if (this.socket.ConnectAsync(this.outArgs))
{
return;
}
OnConnectComplete(this.outArgs);
}
public void OnConnectComplete(SocketAsyncEventArgs e)
{
if (this.socket == null)
{
return;
}
if (e.SocketError != SocketError.Success)
{
this.OnError((int)e.SocketError);
return;
}
e.RemoteEndPoint = null;
this.isConnected = true;
this.Service.Queue.Enqueue(new TArgs() { Op = TcpOp.StartSend, ChannelId = this.Id});
this.Service.Queue.Enqueue(new TArgs() { Op = TcpOp.StartRecv, ChannelId = this.Id});
}
public void OnDisconnectComplete(SocketAsyncEventArgs e)
{
this.OnError((int)e.SocketError);
}
public void StartRecv()
{
while (true)
{
try
{
if (this.socket == null)
{
return;
}
int size = this.recvBuffer.ChunkSize - this.recvBuffer.LastIndex;
this.innArgs.SetBuffer(this.recvBuffer.Last, this.recvBuffer.LastIndex, size);
}
catch (Exception e)
{
Log.Error($"tchannel error: {this.Id}\n{e}");
this.OnError(ErrorCore.ERR_TChannelRecvError);
return;
}
if (this.socket.ReceiveAsync(this.innArgs))
{
return;
}
this.HandleRecv(this.innArgs);
}
}
public void OnRecvComplete(SocketAsyncEventArgs o)
{
this.HandleRecv(o);
if (this.socket == null)
{
return;
}
this.Service.Queue.Enqueue(new TArgs() { Op = TcpOp.StartRecv, ChannelId = this.Id});
}
private void HandleRecv(SocketAsyncEventArgs e)
{
if (this.socket == null)
{
return;
}
if (e.SocketError != SocketError.Success)
{
this.OnError((int)e.SocketError);
return;
}
if (e.BytesTransferred == 0)
{
this.OnError(ErrorCore.ERR_PeerDisconnect);
return;
}
this.recvBuffer.LastIndex += e.BytesTransferred;
if (this.recvBuffer.LastIndex == this.recvBuffer.ChunkSize)
{
this.recvBuffer.AddLast();
this.recvBuffer.LastIndex = 0;
}
// 收到消息回调
while (true)
{
// 这里循环解析消息执行有可能执行消息的过程中断开了session
if (this.socket == null)
{
return;
}
try
{
bool ret = this.parser.Parse(out MemoryBuffer memoryBuffer);
if (!ret)
{
break;
}
this.OnRead(memoryBuffer);
this.Service.Recycle(memoryBuffer);
}
catch (Exception ee)
{
Log.Error($"ip: {this.RemoteAddress} {ee}");
this.OnError(ErrorCore.ERR_SocketError);
return;
}
}
}
public void StartSend()
{
if(!this.isConnected)
{
return;
}
if (this.isSending)
{
return;
}
while (true)
{
try
{
if (this.socket == null)
{
this.isSending = false;
return;
}
// 没有数据需要发送
if (this.sendBuffer.Length == 0)
{
this.isSending = false;
return;
}
this.isSending = true;
int sendSize = this.sendBuffer.ChunkSize - this.sendBuffer.FirstIndex;
if (sendSize > this.sendBuffer.Length)
{
sendSize = (int)this.sendBuffer.Length;
}
this.outArgs.SetBuffer(this.sendBuffer.First, this.sendBuffer.FirstIndex, sendSize);
if (this.socket.SendAsync(this.outArgs))
{
return;
}
HandleSend(this.outArgs);
}
catch (Exception e)
{
throw new Exception($"socket set buffer error: {this.sendBuffer.First.Length}, {this.sendBuffer.FirstIndex}", e);
}
}
}
public void OnSendComplete(SocketAsyncEventArgs o)
{
HandleSend(o);
this.isSending = false;
this.Service.Queue.Enqueue(new TArgs() { Op = TcpOp.StartSend, ChannelId = this.Id});
}
private void HandleSend(SocketAsyncEventArgs e)
{
if (this.socket == null)
{
return;
}
if (e.SocketError != SocketError.Success)
{
this.OnError((int)e.SocketError);
return;
}
if (e.BytesTransferred == 0)
{
this.OnError(ErrorCore.ERR_PeerDisconnect);
return;
}
this.sendBuffer.FirstIndex += e.BytesTransferred;
if (this.sendBuffer.FirstIndex == this.sendBuffer.ChunkSize)
{
this.sendBuffer.FirstIndex = 0;
this.sendBuffer.RemoveFirst();
}
}
private void OnRead(MemoryBuffer memoryStream)
{
try
{
long channelId = this.Id;
object message = null;
ActorId actorId = default;
switch (this.Service.ServiceType)
{
case ServiceType.Outer:
{
ushort opcode = BitConverter.ToUInt16(memoryStream.GetBuffer(), Packet.KcpOpcodeIndex);
Type type = OpcodeType.Instance.GetType(opcode);
message = MessageSerializeHelper.Deserialize(type, memoryStream);
break;
}
case ServiceType.Inner:
{
byte[] buffer = memoryStream.GetBuffer();
actorId.Process = BitConverter.ToInt32(buffer, Packet.ActorIdIndex);
actorId.Fiber = BitConverter.ToInt32(buffer, Packet.ActorIdIndex + 4);
actorId.InstanceId = BitConverter.ToInt64(buffer, Packet.ActorIdIndex + 8);
ushort opcode = BitConverter.ToUInt16(buffer, Packet.OpcodeIndex);
Type type = OpcodeType.Instance.GetType(opcode);
message = MessageSerializeHelper.Deserialize(type, memoryStream);
break;
}
}
this.Service.ReadCallback(channelId, actorId, message);
}
catch (Exception e)
{
Log.Error($"{this.RemoteAddress} {memoryStream.Length} {e}");
// 出现任何消息解析异常都要断开Session防止客户端伪造消息
this.OnError(ErrorCore.ERR_PacketParserError);
}
}
private void OnError(int error)
{
Log.Info($"TChannel OnError: {error} {this.RemoteAddress}");
long channelId = this.Id;
this.Service.Remove(channelId);
this.Service.ErrorCallback(channelId, error);
}
}
}

View File

@@ -0,0 +1,11 @@
fileFormatVersion: 2
guid: fb6fe58f92d2ac444981eb135511618d
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

View File

@@ -0,0 +1,285 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
namespace ET
{
public enum TcpOp
{
StartSend,
StartRecv,
Connect,
}
public struct TArgs
{
public TcpOp Op;
public long ChannelId;
public SocketAsyncEventArgs SocketAsyncEventArgs;
}
public sealed class TService : AService
{
private readonly Dictionary<long, TChannel> idChannels = new();
private readonly SocketAsyncEventArgs innArgs = new();
private Socket acceptor;
public ConcurrentQueue<TArgs> Queue = new();
public TService(AddressFamily addressFamily, ServiceType serviceType)
{
this.ServiceType = serviceType;
}
public TService(IPEndPoint ipEndPoint, ServiceType serviceType)
{
this.ServiceType = serviceType;
this.acceptor = new Socket(ipEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
// 容易出问题,先注释掉,按需开启
//this.acceptor.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true);
this.innArgs.Completed += this.OnComplete;
try
{
this.acceptor.Bind(ipEndPoint);
}
catch (Exception e)
{
throw new Exception($"bind error: {ipEndPoint}", e);
}
this.acceptor.Listen(1000);
this.AcceptAsync();
}
private void OnComplete(object sender, SocketAsyncEventArgs e)
{
switch (e.LastOperation)
{
case SocketAsyncOperation.Accept:
this.Queue.Enqueue(new TArgs() {SocketAsyncEventArgs = e});
break;
default:
throw new Exception($"socket error: {e.LastOperation}");
}
}
private void OnAcceptComplete(SocketError socketError, Socket acceptSocket)
{
if (this.acceptor == null)
{
return;
}
if (socketError != SocketError.Success)
{
Log.Error($"accept error {socketError}");
return;
}
try
{
long id = NetServices.Instance.CreateAcceptChannelId();
TChannel channel = new TChannel(id, acceptSocket, this);
this.idChannels.Add(channel.Id, channel);
long channelId = channel.Id;
this.AcceptCallback(channelId, channel.RemoteAddress);
}
catch (Exception exception)
{
Log.Error(exception);
}
// 开始新的accept
this.AcceptAsync();
}
private void AcceptAsync()
{
this.innArgs.AcceptSocket = null;
if (this.acceptor.AcceptAsync(this.innArgs))
{
return;
}
OnAcceptComplete(this.innArgs.SocketError, this.innArgs.AcceptSocket);
}
private TChannel Create(IPEndPoint ipEndPoint, long id)
{
TChannel channel = new TChannel(id, ipEndPoint, this);
this.idChannels.Add(channel.Id, channel);
return channel;
}
public override void Create(long id, IPEndPoint address)
{
if (this.idChannels.TryGetValue(id, out TChannel _))
{
return;
}
this.Create(address, id);
}
private TChannel Get(long id)
{
TChannel channel = null;
this.idChannels.TryGetValue(id, out channel);
return channel;
}
public override void Dispose()
{
base.Dispose();
this.acceptor?.Close();
this.acceptor = null;
this.innArgs.Dispose();
foreach (long id in this.idChannels.Keys.ToArray())
{
TChannel channel = this.idChannels[id];
channel.Dispose();
}
this.idChannels.Clear();
}
public override void Remove(long id, int error = 0)
{
if (this.idChannels.TryGetValue(id, out TChannel channel))
{
channel.Error = error;
channel.Dispose();
}
this.idChannels.Remove(id);
}
public override void Send(long channelId, ActorId actorId, MessageObject message)
{
try
{
TChannel aChannel = this.Get(channelId);
if (aChannel == null)
{
this.ErrorCallback(channelId, ErrorCore.ERR_SendMessageNotFoundTChannel);
return;
}
aChannel.Send(actorId, message);
}
catch (Exception e)
{
Log.Error(e);
}
}
public override void Update()
{
while (true)
{
if (!this.Queue.TryDequeue(out var result))
{
break;
}
SocketAsyncEventArgs e = result.SocketAsyncEventArgs;
if (e == null)
{
switch (result.Op)
{
case TcpOp.StartSend:
{
TChannel tChannel = this.Get(result.ChannelId);
if (tChannel != null)
{
tChannel.StartSend();
}
break;
}
case TcpOp.StartRecv:
{
TChannel tChannel = this.Get(result.ChannelId);
if (tChannel != null)
{
tChannel.StartRecv();
}
break;
}
case TcpOp.Connect:
{
TChannel tChannel = this.Get(result.ChannelId);
if (tChannel != null)
{
tChannel.ConnectAsync();
}
break;
}
}
continue;
}
switch (e.LastOperation)
{
case SocketAsyncOperation.Accept:
{
SocketError socketError = e.SocketError;
Socket acceptSocket = e.AcceptSocket;
this.OnAcceptComplete(socketError, acceptSocket);
break;
}
case SocketAsyncOperation.Connect:
{
TChannel tChannel = this.Get(result.ChannelId);
if (tChannel != null)
{
tChannel.OnConnectComplete(e);
}
break;
}
case SocketAsyncOperation.Disconnect:
{
TChannel tChannel = this.Get(result.ChannelId);
if (tChannel != null)
{
tChannel.OnDisconnectComplete(e);
}
break;
}
case SocketAsyncOperation.Receive:
{
TChannel tChannel = this.Get(result.ChannelId);
if (tChannel != null)
{
tChannel.OnRecvComplete(e);
}
break;
}
case SocketAsyncOperation.Send:
{
TChannel tChannel = this.Get(result.ChannelId);
if (tChannel != null)
{
tChannel.OnSendComplete(e);
}
break;
}
default:
throw new ArgumentOutOfRangeException($"{e.LastOperation}");
}
}
}
public override bool IsDisposed()
{
return this.acceptor == null;
}
}
}

View File

@@ -0,0 +1,11 @@
fileFormatVersion: 2
guid: c86d0145bc0f3904795b74ea77da1c45
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

View File

@@ -0,0 +1,258 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Net.WebSockets;
using System.Threading;
namespace ET
{
public class WChannel: AChannel
{
public HttpListenerWebSocketContext WebSocketContext { get; }
private readonly WService Service;
private readonly WebSocket webSocket;
private readonly Queue<MessageObject> queue = new();
private bool isSending;
private bool isConnected;
public IPEndPoint RemoteAddress { get; set; }
private CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
public WChannel(long id, HttpListenerWebSocketContext webSocketContext, WService service)
{
this.Id = id;
this.Service = service;
this.ChannelType = ChannelType.Accept;
this.WebSocketContext = webSocketContext;
this.webSocket = webSocketContext.WebSocket;
isConnected = true;
this.Service.ThreadSynchronizationContext.Post(()=>
{
this.StartRecv().Coroutine();
this.StartSend().Coroutine();
});
}
public WChannel(long id, WebSocket webSocket, string connectUrl, WService service)
{
this.Id = id;
this.Service = service;
this.ChannelType = ChannelType.Connect;
this.webSocket = webSocket;
isConnected = false;
this.Service.ThreadSynchronizationContext.Post(()=>this.ConnectAsync(connectUrl).Coroutine());
}
public override void Dispose()
{
if (this.IsDisposed)
{
return;
}
this.cancellationTokenSource.Cancel();
this.cancellationTokenSource.Dispose();
this.cancellationTokenSource = null;
this.webSocket.Dispose();
}
public async ETTask ConnectAsync(string url)
{
try
{
await ((ClientWebSocket) this.webSocket).ConnectAsync(new Uri(url), cancellationTokenSource.Token);
isConnected = true;
this.StartRecv().Coroutine();
this.StartSend().Coroutine();
}
catch (Exception e)
{
Log.Error(e);
this.OnError(ErrorCore.ERR_WebsocketConnectError);
}
}
public void Send(MessageObject message)
{
this.queue.Enqueue(message);
if (this.isConnected)
{
this.StartSend().Coroutine();
}
}
private async ETTask StartSend()
{
if (this.IsDisposed)
{
return;
}
try
{
if (this.isSending)
{
return;
}
this.isSending = true;
while (true)
{
if (this.queue.Count == 0)
{
this.isSending = false;
return;
}
MessageObject message = this.queue.Dequeue();
MemoryBuffer stream = this.Service.Fetch();
MessageSerializeHelper.MessageToStream(stream, message);
message.Dispose();
switch (this.Service.ServiceType)
{
case ServiceType.Inner:
break;
case ServiceType.Outer:
stream.Seek(Packet.ActorIdLength, SeekOrigin.Begin);
break;
}
try
{
await this.webSocket.SendAsync(stream.GetMemory(), WebSocketMessageType.Binary, true, cancellationTokenSource.Token);
this.Service.Recycle(stream);
if (this.IsDisposed)
{
return;
}
}
catch (Exception e)
{
Log.Error(e);
this.OnError(ErrorCore.ERR_WebsocketSendError);
return;
}
}
}
catch (Exception e)
{
Log.Error(e);
}
}
private readonly byte[] cache = new byte[ushort.MaxValue];
public async ETTask StartRecv()
{
if (this.IsDisposed)
{
return;
}
try
{
while (true)
{
ValueWebSocketReceiveResult receiveResult;
int receiveCount = 0;
do
{
receiveResult = await this.webSocket.ReceiveAsync(
new Memory<byte>(cache, receiveCount, this.cache.Length - receiveCount),
cancellationTokenSource.Token);
if (this.IsDisposed)
{
return;
}
receiveCount += receiveResult.Count;
}
while (!receiveResult.EndOfMessage);
if (receiveResult.MessageType == WebSocketMessageType.Close)
{
this.OnError(ErrorCore.ERR_WebsocketPeerReset);
return;
}
if (receiveResult.Count > ushort.MaxValue)
{
await this.webSocket.CloseAsync(WebSocketCloseStatus.MessageTooBig, $"message too big: {receiveCount}",
cancellationTokenSource.Token);
this.OnError(ErrorCore.ERR_WebsocketMessageTooBig);
return;
}
MemoryBuffer memoryBuffer = this.Service.Fetch(receiveCount);
memoryBuffer.SetLength(receiveCount);
memoryBuffer.Seek(2, SeekOrigin.Begin);
Array.Copy(this.cache, 0, memoryBuffer.GetBuffer(), 0, receiveCount);
this.OnRead(memoryBuffer);
this.Service.Recycle(memoryBuffer);
}
}
catch (Exception e)
{
Log.Error(e);
this.OnError(ErrorCore.ERR_WebsocketRecvError);
}
}
private void OnRead(MemoryBuffer memoryStream)
{
try
{
long channelId = this.Id;
object message = null;
switch (this.Service.ServiceType)
{
case ServiceType.Outer:
{
ushort opcode = BitConverter.ToUInt16(memoryStream.GetBuffer(), Packet.KcpOpcodeIndex);
Type type = OpcodeType.Instance.GetType(opcode);
message = MessageSerializeHelper.Deserialize(type, memoryStream);
break;
}
}
this.Service.ReadCallback(channelId, new ActorId(), message);
}
catch (Exception e)
{
Log.Error($"{this.RemoteAddress} {memoryStream.Length} {e}");
// 出现任何消息解析异常都要断开Session防止客户端伪造消息
this.OnError(ErrorCore.ERR_PacketParserError);
}
}
private void OnError(int error)
{
Log.Info($"WChannel error: {error} {this.RemoteAddress}");
long channelId = this.Id;
this.Service.Remove(channelId);
this.Service.ErrorCallback(channelId, error);
}
}
}

View File

@@ -0,0 +1,11 @@
fileFormatVersion: 2
guid: b1521b5d15359624296c9e2f801bee89
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

View File

@@ -0,0 +1,151 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Net.WebSockets;
namespace ET
{
public class WService: AService
{
private long idGenerater = 200000000;
private HttpListener httpListener;
private readonly Dictionary<long, WChannel> channels = new Dictionary<long, WChannel>();
public ThreadSynchronizationContext ThreadSynchronizationContext;
public WService(IEnumerable<string> prefixs)
{
this.ThreadSynchronizationContext = new ThreadSynchronizationContext();
this.httpListener = new HttpListener();
StartAccept(prefixs).Coroutine();
}
public WService()
{
this.ThreadSynchronizationContext = new ThreadSynchronizationContext();
}
private long GetId
{
get
{
return ++this.idGenerater;
}
}
public WChannel Create(string address, long id)
{
ClientWebSocket webSocket = new ClientWebSocket();
WChannel channel = new WChannel(id, webSocket, address, this);
this.channels[channel.Id] = channel;
return channel;
}
public override void Remove(long id, int error = 0)
{
WChannel channel;
if (!this.channels.TryGetValue(id, out channel))
{
return;
}
channel.Error = error;
this.channels.Remove(id);
channel.Dispose();
}
public override bool IsDisposed()
{
return this.ThreadSynchronizationContext == null;
}
protected void Get(long id, string address)
{
if (!this.channels.TryGetValue(id, out _))
{
this.Create(address, id);
}
}
public override void Dispose()
{
base.Dispose();
this.ThreadSynchronizationContext = null;
this.httpListener?.Close();
this.httpListener = null;
}
private async ETTask StartAccept(IEnumerable<string> prefixs)
{
try
{
foreach (string prefix in prefixs)
{
this.httpListener.Prefixes.Add(prefix);
}
httpListener.Start();
while (true)
{
try
{
HttpListenerContext httpListenerContext = await this.httpListener.GetContextAsync();
HttpListenerWebSocketContext webSocketContext = await httpListenerContext.AcceptWebSocketAsync(null);
WChannel channel = new WChannel(this.GetId, webSocketContext, this);
this.channels[channel.Id] = channel;
this.AcceptCallback(channel.Id, channel.RemoteAddress);
}
catch (Exception e)
{
Log.Error(e);
}
}
}
catch (HttpListenerException e)
{
if (e.ErrorCode == 5)
{
throw new Exception($"CMD管理员中输入: netsh http add urlacl url=http://*:8080/ user=Everyone", e);
}
Log.Error(e);
}
catch (Exception e)
{
Log.Error(e);
}
}
public override void Create(long id, IPEndPoint address)
{
throw new NotImplementedException();
}
public override void Send(long channelId, ActorId actorId, MessageObject message)
{
this.channels.TryGetValue(channelId, out WChannel channel);
if (channel == null)
{
return;
}
channel.Send(message);
}
public override void Update()
{
this.ThreadSynchronizationContext.Update();
}
}
}

View File

@@ -0,0 +1,11 @@
fileFormatVersion: 2
guid: edded4e0a631e0347bb019ee35ad5975
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant: