1.替换了KCP为C#版本 2.KCP增加了Memery<T>支持。 3.框架增加了对Memery<T>的支持,服务器网络消息采用Memery<T>,性能得大幅度的提升 4.优化了消息调度执行逻辑

1.替换了KCP为C#版本
2.KCP增加了Memery<T>支持。
3.框架增加了对Memery<T>的支持,服务器网络消息采用Memery<T>,性能得大幅度的提升
4.优化了消息调度执行逻辑
This commit is contained in:
ALEXTANG
2023-07-23 00:25:47 +08:00
parent c96d20a89a
commit 35d2012546
67 changed files with 2447 additions and 1025 deletions

View File

@@ -96,6 +96,38 @@ namespace TEngine.DataStructure
}
}
public void Read(Memory<byte> memory, int count)
{
if (count > Length)
{
throw new Exception($"bufferList length < count, {Length} {count}");
}
var copyCount = 0;
while (copyCount < count)
{
var n = count - copyCount;
var asMemory = First.AsMemory();
if (ChunkSize - FirstIndex > n)
{
var slice = asMemory.Slice(FirstIndex, n);
slice.CopyTo(memory.Slice(copyCount, n));
FirstIndex += n;
copyCount += n;
}
else
{
var length = ChunkSize - FirstIndex;
var slice = asMemory.Slice(FirstIndex, length);
slice.CopyTo(memory.Slice(copyCount, length));
copyCount += ChunkSize - FirstIndex;
FirstIndex = 0;
RemoveFirst();
}
}
}
public override int Read(byte[] buffer, int offset, int count)
{
if (buffer.Length < offset + count)
@@ -124,7 +156,6 @@ namespace TEngine.DataStructure
}
Array.Copy(First, FirstIndex, buffer, copyCount + offset, ChunkSize - FirstIndex);
copyCount += ChunkSize - FirstIndex;
FirstIndex = 0;

View File

@@ -33,7 +33,9 @@ namespace TEngine.Core
{
var configFile = GetConfigPath(dataConfig);
var bytes = File.ReadAllBytes(configFile);
var data = (AProto) ProtoBufHelper.FromBytes(typeof(T), bytes, 0, bytes.Length);
// var data = (AProto) ProtoBufHelper.FromBytes(typeof(T), bytes, 0, bytes.Length);
// var data = ProtoBufHelper.FromBytes<T>(bytes, 0, bytes.Length);
var data = (T)ProtoBufHelper.FromBytes(typeof(T), bytes, 0, bytes.Length);
data.AfterDeserialization();
ConfigDic[dataConfig] = data;
return (T)data;
@@ -65,7 +67,7 @@ namespace TEngine.Core
{
var configFile = GetConfigPath(type.Name);
var bytes = File.ReadAllBytes(configFile);
var data = (AProto) ProtoBufHelper.FromBytes(type, bytes, 0, bytes.Length);
var data = (AProto)ProtoBufHelper.FromBytes(type, bytes, 0, bytes.Length);
data.AfterDeserialization();
ConfigDic[dataConfig] = data;
return data;

View File

@@ -95,11 +95,54 @@ public sealed class MongoHelper : Singleton<MongoHelper>
{
return BsonSerializer.Deserialize(str, type);
}
public object Deserialize(Span<byte> span, Type type)
{
using var stream = MemoryStreamHelper.GetRecyclableMemoryStream();
stream.Write(span);
stream.Seek(0, SeekOrigin.Begin);
return BsonSerializer.Deserialize(stream, type);
}
public object Deserialize(Memory<byte> memory, Type type)
{
using var stream = MemoryStreamHelper.GetRecyclableMemoryStream();
stream.Write(memory.Span);
stream.Seek(0, SeekOrigin.Begin);
return BsonSerializer.Deserialize(stream, type);
}
public object Deserialize<T>(Span<byte> span)
{
using var stream = MemoryStreamHelper.GetRecyclableMemoryStream();
stream.Write(span);
stream.Seek(0, SeekOrigin.Begin);
return BsonSerializer.Deserialize<T>(stream);
}
public object Deserialize<T>(Memory<byte> memory)
{
using var stream = MemoryStreamHelper.GetRecyclableMemoryStream();
stream.Seek(0, SeekOrigin.Begin);
return BsonSerializer.Deserialize<T>(stream);
}
public T Deserialize<T>(byte[] bytes)
{
return BsonSerializer.Deserialize<T>(bytes);
}
public void SerializeTo<T>(T t, Memory<byte> memory)
{
using var memoryStream = new MemoryStream();
using (var writer = new BsonBinaryWriter(memoryStream, BsonBinaryWriterSettings.Defaults))
{
BsonSerializer.Serialize(writer, typeof(T), t);
}
memoryStream.GetBuffer().AsMemory().CopyTo(memory);
}
public object Deserialize(byte[] bytes, Type type)
{

View File

@@ -1,38 +1,89 @@
using System;
using System.Buffers;
using System.IO;
using System.Reflection;
using ProtoBuf;
#pragma warning disable CS8604
using ProtoBuf.Meta;
namespace TEngine.Core
{
public static class ProtoBufHelper
{
public static object FromSpan(Type type, Span<byte> span)
{
#if TENGINE_UNITY
using var recyclableMemoryStream = MemoryStreamHelper.GetRecyclableMemoryStream();
recyclableMemoryStream.Write(span);
recyclableMemoryStream.Seek(0, SeekOrigin.Begin);
return Serializer.Deserialize(type, recyclableMemoryStream);
#else
return RuntimeTypeModel.Default.Deserialize(type, span);
#endif
}
public static object FromMemory(Type type, Memory<byte> memory)
{
#if TENGINE_UNITY
using var recyclableMemoryStream = MemoryStreamHelper.GetRecyclableMemoryStream();
recyclableMemoryStream.Write(memory.Span);
recyclableMemoryStream.Seek(0, SeekOrigin.Begin);
return Serializer.Deserialize(type, recyclableMemoryStream);
#else
return RuntimeTypeModel.Default.Deserialize(type, memory);
#endif
}
public static object FromBytes(Type type, byte[] bytes, int index, int count)
{
using var stream = new MemoryStream(bytes, index, count);
#if TENGINE_UNITY
using var stream = MemoryStreamHelper.GetRecyclableMemoryStream();
stream.Write(bytes, index, count);
stream.Seek(0, SeekOrigin.Begin);
return Serializer.Deserialize(type, stream);
#else
var memory = new Memory<byte>(bytes, index, count);
return RuntimeTypeModel.Default.Deserialize(type, memory);
#endif
}
public static T FromBytes<T>(byte[] bytes)
{
using var stream = new MemoryStream(bytes, 0, bytes.Length);
#if TENGINE_UNITY
using var stream = MemoryStreamHelper.GetRecyclableMemoryStream();
stream.Write(bytes, 0, bytes.Length);
stream.Seek(0, SeekOrigin.Begin);
return Serializer.Deserialize<T>(stream);
// return FromBytes<T>(bytes, 0, bytes.Length);
#else
return Serializer.Deserialize<T>(new Span<byte>(bytes));
#endif
}
public static T FromBytes<T>(byte[] bytes, int index, int count)
{
using var stream = new MemoryStream(bytes, index, count);
#if TENGINE_UNITY
using var stream = MemoryStreamHelper.GetRecyclableMemoryStream();
stream.Write(bytes, 0, bytes.Length);
stream.Seek(0, SeekOrigin.Begin);
return Serializer.Deserialize<T>(stream);
#else
return Serializer.Deserialize<T>(new Span<byte>(bytes, index, count));
#endif
}
public static byte[] ToBytes(object message)
{
using var stream = new MemoryStream();
using var stream = MemoryStreamHelper.GetRecyclableMemoryStream();
Serializer.Serialize(stream, message);
return stream.ToArray();
}
public static void ToMemory(object message, Memory<byte> memory)
{
using var stream = MemoryStreamHelper.GetRecyclableMemoryStream();
Serializer.Serialize(stream, message);
stream.GetBuffer().AsMemory().CopyTo(memory);
}
public static void ToStream(object message, MemoryStream stream)
{
Serializer.Serialize(stream, message);
@@ -50,8 +101,8 @@ namespace TEngine.Core
public static T Clone<T>(T t)
{
var bytes = ToBytes(t);
using var stream = new MemoryStream(bytes, 0, bytes.Length);
using var stream = MemoryStreamHelper.GetRecyclableMemoryStream();
Serializer.Serialize(stream, t);
return Serializer.Deserialize<T>(stream);
}
}

View File

@@ -1,6 +1,7 @@
using System;
using System.IO;
using TEngine.Core;
using System.Buffers;
// ReSharper disable ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract
#pragma warning disable CS8625
#pragma warning disable CS8618

View File

@@ -1,5 +1,3 @@
using TEngine.Core;
#if TENGINE_NET
namespace TEngine.Core.Network;
@@ -11,14 +9,8 @@ public sealed class ServerInnerSession : Session
{
return;
}
// 序列化消息到流中
var memoryStream = MemoryStreamHelper.GetRecyclableMemoryStream();
InnerPacketParser.Serialize(message, memoryStream);
memoryStream.Seek(0, SeekOrigin.Begin);
// 分发消息
var packInfo = InnerPackInfo.Create(rpcId, routeId, ((IMessage)message).OpCode(), 0, memoryStream);
NetworkMessageScheduler.Scheduler(this, packInfo).Coroutine();
NetworkMessageScheduler.InnerScheduler(this, rpcId, routeId, ((IMessage)message).OpCode(), 0, message).Coroutine();
}
public override void Send(IRouteMessage routeMessage, uint rpcId = 0, long routeId = 0)
@@ -27,13 +19,8 @@ public sealed class ServerInnerSession : Session
{
return;
}
// 序列化消息到流中
var memoryStream = MemoryStreamHelper.GetRecyclableMemoryStream();
InnerPacketParser.Serialize(routeMessage, memoryStream);
memoryStream.Seek(0, SeekOrigin.Begin);
// 分发消息
var packInfo = InnerPackInfo.Create(rpcId, routeId, routeMessage.OpCode(), routeMessage.RouteTypeOpCode(), memoryStream);
NetworkMessageScheduler.Scheduler(this, packInfo).Coroutine();
NetworkMessageScheduler.InnerScheduler(this, rpcId, routeId, routeMessage.OpCode(), routeMessage.RouteTypeOpCode(), routeMessage).Coroutine();
}
public override void Send(MemoryStream memoryStream, uint rpcId = 0, long routeTypeOpCode = 0, long routeId = 0)

View File

@@ -1,18 +1,17 @@
using System;
using System.IO;
using TEngine.Core;
#pragma warning disable CS8600
namespace TEngine.Core.Network
{
public abstract class ANetworkMessageScheduler
{
protected bool DisposePackInfo;
private readonly PingResponse _pingResponse = new PingResponse();
public async FTask Scheduler(Session session, APackInfo packInfo)
{
Type messageType = null;
var packInfoMemoryStream = packInfo.MemoryStream;
DisposePackInfo = true;
try
{
@@ -56,17 +55,6 @@ namespace TEngine.Core.Network
// 服务器之间发送消息因为走的是MessageHelper、所以接收消息的回调也应该放到MessageHelper里处理
MessageHelper.ResponseHandler(packInfo.RpcId, aResponse);
#else
#if TENGINE_UNITY
if (MessageDispatcherSystem.Instance.MsgHandles.TryGetValue(packInfo.ProtocolCode,out var msgDelegates))
{
foreach (var msgDelegate in msgDelegates)
{
msgDelegate.Invoke(aResponse);
}
return;
}
#endif
// 这个一般是客户端Session.Call发送时使用的、目前这个逻辑只有Unity客户端时使用
if (!session.RequestCallback.TryGetValue(packInfo.RpcId, out var action))
@@ -84,20 +72,57 @@ namespace TEngine.Core.Network
}
catch (Exception e)
{
if (packInfoMemoryStream.CanRead)
{
// ReSharper disable once MethodHasAsyncOverload
packInfoMemoryStream.Dispose();
}
Log.Error($"NetworkMessageScheduler error messageProtocolCode:{packInfo.ProtocolCode} messageType:{messageType} SessionId {session.Id} IsDispose {session.IsDisposed} {e}");
}
finally
{
packInfo.Dispose();
if (DisposePackInfo)
{
packInfo.Dispose();
}
}
}
public async FTask InnerScheduler(Session session, uint rpcId, long routeId, uint protocolCode, long routeTypeCode, object message)
{
var messageType = message.GetType();
try
{
if (session.IsDisposed)
{
return;
}
switch (protocolCode)
{
case >= Opcode.OuterRouteMessage:
{
await InnerHandler(session, rpcId, routeId, protocolCode, routeTypeCode, messageType, message);
return;
}
case < Opcode.OuterResponse:
{
MessageDispatcherSystem.Instance.MessageHandler(session, messageType, message, rpcId, protocolCode);
return;
}
default:
{
#if TENGINE_NET
// 服务器之间发送消息因为走的是MessageHelper、所以接收消息的回调也应该放到MessageHelper里处理
MessageHelper.ResponseHandler(rpcId, (IResponse)message);
#endif
return;
}
}
}
catch (Exception e)
{
Log.Error($"NetworkMessageScheduler error messageProtocolCode:{protocolCode} messageType:{messageType} SessionId {session.Id} IsDispose {session.IsDisposed} {e}");
}
}
protected abstract FTask Handler(Session session, Type messageType, APackInfo packInfo);
protected abstract FTask InnerHandler(Session session, uint rpcId, long routeId, uint protocolCode, long routeTypeCode, Type messageType, object message);
}
}

View File

@@ -7,8 +7,6 @@ namespace TEngine.Core.Network
{
protected override async FTask Handler(Session session, Type messageType, APackInfo packInfo)
{
var packInfoMemoryStream = packInfo.MemoryStream;
try
{
switch (packInfo.ProtocolCode)
@@ -28,7 +26,7 @@ namespace TEngine.Core.Network
Log.Error($"not found rpc {packInfo.RpcId}, response message: {aResponse.GetType().Name}");
return;
}
session.RequestCallback.Remove(packInfo.RpcId);
action.SetResult(aResponse);
return;
@@ -43,19 +41,22 @@ namespace TEngine.Core.Network
}
catch (Exception e)
{
if (packInfoMemoryStream.CanRead)
{
// ReSharper disable once MethodHasAsyncOverload
packInfoMemoryStream.Dispose();
}
Log.Error(e);
return;
}
finally
{
packInfo.Dispose();
}
await FTask.CompletedTask;
throw new NotSupportedException($"Received unsupported message protocolCode:{packInfo.ProtocolCode} messageType:{messageType}");
}
protected override FTask InnerHandler(Session session, uint rpcId, long routeId, uint protocolCode, long routeTypeCode, Type messageType, object message)
{
throw new NotImplementedException();
}
}
#endif
#if TENGINE_NET
@@ -65,6 +66,11 @@ namespace TEngine.Core.Network
{
throw new NotSupportedException($"Received unsupported message protocolCode:{packInfo.ProtocolCode} messageType:{messageType}");
}
protected override FTask InnerHandler(Session session, uint rpcId, long routeId, uint protocolCode, long routeTypeCode, Type messageType, object message)
{
throw new NotSupportedException($"Received unsupported message protocolCode:{protocolCode} messageType:{messageType}");
}
}
#endif
}

View File

@@ -7,11 +7,10 @@ namespace TEngine.Core.Network
{
protected override async FTask Handler(Session session, Type messageType, APackInfo packInfo)
{
var disposeMemoryStream = true;
var packInfoMemoryStream = packInfo.MemoryStream;
try
{
DisposePackInfo = false;
switch (packInfo.ProtocolCode)
{
case >= Opcode.InnerBsonRouteResponse:
@@ -80,8 +79,7 @@ namespace TEngine.Core.Network
case Session gateSession:
{
// 这里如果是Session只可能是Gate的Session、如果是的话、肯定是转发Address消息
disposeMemoryStream = false;
gateSession.Send(packInfoMemoryStream, packInfo.RpcId);
gateSession.Send(packInfo.CreateMemoryStream(), packInfo.RpcId);
return;
}
default:
@@ -94,20 +92,108 @@ namespace TEngine.Core.Network
}
default:
{
throw new NotSupportedException($"Received unsupported message protocolCode:{packInfo.ProtocolCode} messageType:{messageType}");
throw new NotSupportedException(
$"Received unsupported message protocolCode:{packInfo.ProtocolCode} messageType:{messageType}");
}
}
}
catch (Exception e)
{
if (disposeMemoryStream && packInfoMemoryStream.CanRead)
{
// ReSharper disable once MethodHasAsyncOverload
packInfoMemoryStream.Dispose();
}
Log.Error($"InnerMessageSchedulerHandler error messageProtocolCode:{packInfo.ProtocolCode} messageType:{messageType} {e}");
}
finally
{
packInfo.Dispose();
}
}
protected override async FTask InnerHandler(Session session, uint rpcId, long routeId, uint protocolCode, long routeTypeCode, Type messageType, object message)
{
try
{
switch (protocolCode)
{
case >= Opcode.InnerBsonRouteResponse:
case >= Opcode.InnerRouteResponse:
{
MessageHelper.ResponseHandler(rpcId, (IRouteResponse)message);
return;
}
case >= Opcode.OuterRouteResponse:
{
// 如果Gate服务器、需要转发Addressable协议、所以这里有可能会接收到该类型协议
MessageHelper.ResponseHandler(rpcId, (IResponse)message);
return;
}
case > Opcode.InnerBsonRouteMessage:
{
var entity = Entity.GetEntity(routeId);
if (entity == null)
{
if (protocolCode > Opcode.InnerBsonRouteRequest)
{
MessageDispatcherSystem.Instance.FailResponse(session, (IRouteRequest)message, CoreErrorCode.ErrNotFoundRoute, rpcId);
}
return;
}
await MessageDispatcherSystem.Instance.RouteMessageHandler(session, messageType, entity, message, rpcId);
return;
}
case > Opcode.InnerRouteMessage:
{
var entity = Entity.GetEntity(routeId);
if (entity == null)
{
if (protocolCode > Opcode.InnerRouteRequest)
{
MessageDispatcherSystem.Instance.FailResponse(session, (IRouteRequest)message, CoreErrorCode.ErrNotFoundRoute, rpcId);
}
return;
}
await MessageDispatcherSystem.Instance.RouteMessageHandler(session, messageType, entity, message, rpcId);
return;
}
case > Opcode.OuterRouteMessage:
{
var entity = Entity.GetEntity(routeId);
switch (entity)
{
case null:
{
var response = MessageDispatcherSystem.Instance.CreateResponse((IRouteMessage)message, CoreErrorCode.ErrNotFoundRoute);
session.Send(response, rpcId, routeId);
return;
}
case Session gateSession:
{
// 这里如果是Session只可能是Gate的Session、如果是的话、肯定是转发Address消息
gateSession.Send(message, rpcId);
return;
}
default:
{
await MessageDispatcherSystem.Instance.RouteMessageHandler(session, messageType, entity, message, rpcId);
return;
}
}
}
default:
{
throw new NotSupportedException($"Received unsupported message protocolCode:{protocolCode} messageType:{messageType}");
}
}
}
catch (Exception e)
{
Log.Error($"InnerMessageSchedulerHandler error messageProtocolCode:{protocolCode} messageType:{messageType} {e}");
}
}
}
}

View File

@@ -11,6 +11,11 @@ namespace TEngine.Core.Network
{
throw new NotSupportedException($"Received unsupported message protocolCode:{packInfo.ProtocolCode} messageType:{messageType}");
}
protected override FTask InnerHandler(Session session, uint rpcId, long routeId, uint protocolCode, long routeTypeCode, Type messageType, object message)
{
throw new NotImplementedException();
}
}
#endif
#if TENGINE_NET
@@ -23,10 +28,10 @@ namespace TEngine.Core.Network
throw new NotSupportedException($"Received unsupported message protocolCode:{packInfo.ProtocolCode} messageType:{messageType}");
}
var packInfoMemoryStream = packInfo.MemoryStream;
try
{
DisposePackInfo = false;
switch (packInfo.RouteTypeCode)
{
case CoreRouteType.Route:
@@ -49,10 +54,8 @@ namespace TEngine.Core.Network
case > Opcode.OuterRouteRequest:
{
var runtimeId = session.RuntimeId;
var response = await addressableRouteComponent.Call(packInfo.RouteTypeCode, messageType, packInfoMemoryStream);
var response = await addressableRouteComponent.Call(packInfo.RouteTypeCode, messageType, packInfo.CreateMemoryStream());
// session可能已经断开了所以这里需要判断
if (session.RuntimeId == runtimeId)
{
session.Send(response, packInfo.RpcId);
@@ -62,7 +65,7 @@ namespace TEngine.Core.Network
}
case > Opcode.OuterRouteMessage:
{
addressableRouteComponent.Send(packInfo.RouteTypeCode, messageType, packInfoMemoryStream);
addressableRouteComponent.Send(packInfo.RouteTypeCode, messageType, packInfo.CreateMemoryStream());
return;
}
}
@@ -90,7 +93,7 @@ namespace TEngine.Core.Network
case > Opcode.OuterRouteRequest:
{
var runtimeId = session.RuntimeId;
var response = await MessageHelper.CallInnerRoute(session.Scene, routeId, packInfo.RouteTypeCode, messageType, packInfoMemoryStream);
var response = await MessageHelper.CallInnerRoute(session.Scene, routeId, packInfo.RouteTypeCode, messageType, packInfo.CreateMemoryStream());
// session可能已经断开了所以这里需要判断
if (session.RuntimeId == runtimeId)
{
@@ -101,7 +104,7 @@ namespace TEngine.Core.Network
}
case > Opcode.OuterRouteMessage:
{
MessageHelper.SendInnerRoute(session.Scene, routeId, packInfo.RouteTypeCode, packInfoMemoryStream);
MessageHelper.SendInnerRoute(session.Scene, routeId, packInfo.RouteTypeCode, packInfo.CreateMemoryStream());
return;
}
}
@@ -112,18 +115,21 @@ namespace TEngine.Core.Network
}
catch (Exception e)
{
if (packInfoMemoryStream.CanRead)
{
// ReSharper disable once MethodHasAsyncOverload
packInfoMemoryStream.Dispose();
}
Log.Error(e);
return;
}
finally
{
packInfo.Dispose();
}
throw new NotSupportedException($"Received unsupported message protocolCode:{packInfo.ProtocolCode} messageType:{messageType}");
}
protected override FTask InnerHandler(Session session, uint rpcId, long routeId, uint protocolCode, long routeTypeCode, Type messageType, object message)
{
throw new NotSupportedException($"OuterMessageScheduler NotSupported InnerHandler");
}
}
#endif
}

View File

@@ -1,218 +0,0 @@
using System;
using System.Runtime.InteropServices;
namespace TEngine.Core.Network
{
[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
public delegate int KcpOutput(IntPtr buf, int len, IntPtr kcp, IntPtr user);
public static class KCP
{
#if UNITY_IPHONE && !UNITY_EDITOR
const string KcpDll = "__Internal";
#else
const string KcpDll = "kcp";
#endif
[DllImport(KcpDll, CallingConvention = CallingConvention.Cdecl)]
private static extern uint ikcp_check(IntPtr kcp, uint current);
[DllImport(KcpDll, CallingConvention = CallingConvention.Cdecl)]
private static extern IntPtr ikcp_create(uint conv, IntPtr user);
[DllImport(KcpDll, CallingConvention = CallingConvention.Cdecl)]
private static extern void ikcp_flush(IntPtr kcp);
[DllImport(KcpDll, CallingConvention = CallingConvention.Cdecl)]
private static extern uint ikcp_getconv(IntPtr ptr);
[DllImport(KcpDll, CallingConvention = CallingConvention.Cdecl)]
private static extern int ikcp_input(IntPtr kcp, byte[] data, int offset, int size);
[DllImport(KcpDll, CallingConvention = CallingConvention.Cdecl)]
private static extern int ikcp_nodelay(IntPtr kcp, int nodelay, int interval, int resend, int nc);
[DllImport(KcpDll, CallingConvention = CallingConvention.Cdecl)]
private static extern int ikcp_peeksize(IntPtr kcp);
[DllImport(KcpDll, CallingConvention = CallingConvention.Cdecl)]
private static extern int ikcp_recv(IntPtr kcp, byte[] buffer, int len);
[DllImport(KcpDll, CallingConvention = CallingConvention.Cdecl)]
private static extern void ikcp_release(IntPtr kcp);
[DllImport(KcpDll, CallingConvention = CallingConvention.Cdecl)]
private static extern int ikcp_send(IntPtr kcp, byte[] buffer, int len);
[DllImport(KcpDll, CallingConvention=CallingConvention.Cdecl)]
private static extern void ikcp_setminrto(IntPtr ptr, int minrto);
[DllImport(KcpDll, CallingConvention = CallingConvention.Cdecl)]
private static extern int ikcp_setmtu(IntPtr kcp, int mtu);
[DllImport(KcpDll, CallingConvention = CallingConvention.Cdecl)]
private static extern void ikcp_setoutput(IntPtr kcp, KcpOutput output);
[DllImport(KcpDll, CallingConvention = CallingConvention.Cdecl)]
private static extern void ikcp_update(IntPtr kcp, uint current);
[DllImport(KcpDll, CallingConvention = CallingConvention.Cdecl)]
private static extern int ikcp_waitsnd(IntPtr kcp);
[DllImport(KcpDll, CallingConvention = CallingConvention.Cdecl)]
private static extern int ikcp_wndsize(IntPtr kcp, int sndwnd, int rcvwnd);
public static uint KcpCheck(IntPtr kcp, uint current)
{
if (kcp == IntPtr.Zero)
{
throw new Exception($"kcp error, kcp point is zero");
}
return ikcp_check(kcp, current);
}
public static IntPtr KcpCreate(uint conv, IntPtr user)
{
return ikcp_create(conv, user);
}
public static void KcpFlush(IntPtr kcp)
{
if (kcp == IntPtr.Zero)
{
throw new Exception($"kcp error, kcp point is zero");
}
ikcp_flush(kcp);
}
public static uint KcpGetconv(IntPtr ptr)
{
if (ptr == IntPtr.Zero)
{
throw new Exception($"kcp error, kcp point is zero");
}
return ikcp_getconv(ptr);
}
public static int KcpInput(IntPtr kcp, byte[] data, int offset, int size)
{
if (kcp == IntPtr.Zero)
{
throw new Exception($"kcp error, kcp point is zero");
}
return ikcp_input(kcp, data, offset, size);
}
public static int KcpNodelay(IntPtr kcp, int nodelay, int interval, int resend, int nc)
{
if (kcp == IntPtr.Zero)
{
throw new Exception($"kcp error, kcp point is zero");
}
return ikcp_nodelay(kcp, nodelay, interval, resend, nc);
}
public static int KcpPeeksize(IntPtr kcp)
{
if (kcp == IntPtr.Zero)
{
throw new Exception($"kcp error, kcp point is zero");
}
return ikcp_peeksize(kcp);
}
public static int KcpRecv(IntPtr kcp, byte[] buffer, int len)
{
if (kcp == IntPtr.Zero)
{
throw new Exception($"kcp error, kcp point is zero");
}
return ikcp_recv(kcp, buffer, len);
}
public static void KcpRelease(IntPtr kcp)
{
if (kcp == IntPtr.Zero)
{
throw new Exception($"kcp error, kcp point is zero");
}
ikcp_release(kcp);
}
public static int KcpSend(IntPtr kcp, byte[] buffer, int len)
{
if (kcp == IntPtr.Zero)
{
throw new Exception($"kcp error, kcp point is zero");
}
return ikcp_send(kcp, buffer, len);
}
public static void KcpSetminrto(IntPtr kcp, int minrto)
{
if (kcp == IntPtr.Zero)
{
throw new Exception($"kcp error, kcp point is zero");
}
ikcp_setminrto(kcp, minrto);
}
public static int KcpSetmtu(IntPtr kcp, int mtu)
{
if (kcp == IntPtr.Zero)
{
throw new Exception($"kcp error, kcp point is zero");
}
return ikcp_setmtu(kcp, mtu);
}
public static void KcpSetoutput(IntPtr kcp, KcpOutput output)
{
if (kcp == IntPtr.Zero)
{
throw new Exception($"kcp error, kcp point is zero");
}
ikcp_setoutput(kcp, output);
}
public static void KcpUpdate(IntPtr kcp, uint current)
{
if (kcp == IntPtr.Zero)
{
throw new Exception($"kcp error, kcp point is zero");
}
ikcp_update(kcp, current);
}
public static int KcpWaitsnd(IntPtr kcp)
{
if (kcp == IntPtr.Zero)
{
throw new Exception($"kcp error, kcp point is zero");
}
return ikcp_waitsnd(kcp);
}
public static int KcpWndsize(IntPtr kcp, int sndwnd, int rcvwnd)
{
if (kcp == IntPtr.Zero)
{
throw new Exception($"kcp error, kcp point is zero");
}
return ikcp_wndsize(kcp, sndwnd, rcvwnd);
}
}
}

View File

@@ -4,9 +4,9 @@ namespace TEngine.Core.Network
{
public class KCPSettings
{
public int Mtu { get; private set; }
public int SendWindowSize { get; private set; }
public int ReceiveWindowSize { get; private set; }
public uint Mtu { get; private set; }
public uint SendWindowSize { get; private set; }
public uint ReceiveWindowSize { get; private set; }
public int MaxSendWindowSize { get; private set; }
public static KCPSettings Create(NetworkTarget networkTarget)

View File

@@ -1,14 +0,0 @@
namespace TEngine
{
public static class KcpProtocalType
{
public const byte SYN = 1;
public const byte ACK = 2;
public const byte FIN = 3;
public const byte MSG = 4;
public const byte RouterReconnectSYN = 5;
public const byte RouterReconnectACK = 6;
public const byte RouterSYN = 7;
public const byte RouterACK = 8;
}
}

View File

@@ -1,3 +0,0 @@
fileFormatVersion: 2
guid: 9bd29f721d3e458d88d43160fd7cca55
timeCreated: 1689230822

View File

@@ -0,0 +1,8 @@
fileFormatVersion: 2
guid: b6517b73296f4564a8411993d86fcafb
folderAsset: yes
DefaultImporter:
externalObjects: {}
userData:
assetBundleName:
assetBundleVariant:

View File

@@ -0,0 +1,24 @@
MIT License
Copyright (c) 2016 limpo1989
Copyright (c) 2020 Paul Pacheco
Copyright (c) 2020 Lymdun
Copyright (c) 2020 vis2k
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@@ -0,0 +1,7 @@
fileFormatVersion: 2
guid: 60e959aa2b0fc774fb97f3a52197e41b
DefaultImporter:
externalObjects: {}
userData:
assetBundleName:
assetBundleVariant:

View File

@@ -0,0 +1,24 @@
MIT License
Copyright (c) 2016 limpo1989
Copyright (c) 2020 Paul Pacheco
Copyright (c) 2020 Lymdun
Copyright (c) 2020 vis2k
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@@ -0,0 +1,7 @@
fileFormatVersion: 2
guid: e29e0db9c79ad6348aa2997ac1326446
TextScriptImporter:
externalObjects: {}
userData:
assetBundleName:
assetBundleVariant:

View File

@@ -0,0 +1,32 @@
# kcp2k
C# KCP based on the original C [kcp](https://github.com/skywind3000/kcp).
Works with **netcore** and **Unity**.
Developed for [Mirror Networking](https://github.com/MirrorNetworking/Mirror).
# Features
* Kcp.cs based on kcp.c v1.7, line-by-line translation to C#
* Heavy test coverage
* Fixed [WND_RCV bug](https://github.com/skywind3000/kcp/pull/291) from original kcp
* Optional high level C# code for client/server connection handling
* Optional high level Unreliable channel added
Pull requests for bug fixes & tests welcome.
# Unity
kcp2k works perfectly with Unity, see the Mirror repository's KcpTransport.
# Allocations
The client is allocation free.
The server's SendTo/ReceiveFrom still allocate.
Previously, [where-allocation](https://github.com/vis2k/where-allocation) for a 25x reduction in server allocations. However:
- It only worked with Unity's old Mono version.
- It didn't work in Unity's IL2CPP builds, which are still faster than Mono + NonAlloc
- It didn't work in regular C# projects.
- Overall, the extra complexity is not worth it. Use IL2CPP instead.
- Microsoft is considering to [remove the allocation](https://github.com/dotnet/runtime/issues/30797#issuecomment-1308599410).
# Remarks
- **Congestion Control** should be left disabled. It seems to be broken in KCP.

View File

@@ -0,0 +1,7 @@
fileFormatVersion: 2
guid: 2e2e6b3195633174f85b149e87c9312b
TextScriptImporter:
externalObjects: {}
userData:
assetBundleName:
assetBundleVariant:

View File

@@ -0,0 +1,234 @@
V1.36 [2023-06-08]
- fix: #49 KcpPeer.RawInput message size check now considers cookie as well
- kcp.cs cleanups
V1.35 [2023-04-05]
- fix: KcpClients now need to validate with a secure cookie in order to protect against
UDP spoofing. fixes:
https://github.com/MirrorNetworking/Mirror/issues/3286
[disclosed by IncludeSec]
- KcpClient/Server: change callbacks to protected so inheriting classes can use them too
- KcpClient/Server: change config visibility to protected
V1.34 [2023-03-15]
- Send/SendTo/Receive/ReceiveFrom NonBlocking extensions.
to encapsulate WouldBlock allocations, exceptions, etc.
allows for reuse when overwriting KcpServer/Client (i.e. for relays).
V1.33 [2023-03-14]
- perf: KcpServer/Client RawReceive now call socket.Poll to avoid non-blocking
socket's allocating a new SocketException in case they WouldBlock.
fixes https://github.com/MirrorNetworking/Mirror/issues/3413
- perf: KcpServer/Client RawSend now call socket.Poll to avoid non-blocking
socket's allocating a new SocketException in case they WouldBlock.
fixes https://github.com/MirrorNetworking/Mirror/issues/3413
V1.32 [2023-03-12]
- fix: KcpPeer RawInput now doesn't disconnect in case of random internet noise
V1.31 [2023-03-05]
- KcpClient: Tick/Incoming/Outgoing can now be overwritten (virtual)
- breaking: KcpClient now takes KcpConfig in constructor instead of in Connect.
cleaner, and prepares for KcpConfig.MTU setting.
- KcpConfig now includes MTU; KcpPeer now works with KcpConfig's MTU, KcpServer/Client
buffers are now created with config's MTU.
V1.30 [2023-02-20]
- fix: set send/recv buffer sizes directly instead of iterating to find the limit.
fixes: https://github.com/MirrorNetworking/Mirror/issues/3390
- fix: server & client sockets are now always non-blocking to ensure main thread never
blocks on socket.recv/send. Send() now also handles WouldBlock.
- fix: socket.Receive/From directly with non-blocking sockets and handle WouldBlock,
instead of socket.Poll. faster, more obvious, and fixes Poll() looping forever while
socket is in error state. fixes: https://github.com/MirrorNetworking/Mirror/issues/2733
V1.29 [2023-01-28]
- fix: KcpServer.CreateServerSocket now handles NotSupportedException when setting DualMode
https://github.com/MirrorNetworking/Mirror/issues/3358
V1.28 [2023-01-28]
- fix: KcpClient.Connect now resolves hostname before creating peer
https://github.com/MirrorNetworking/Mirror/issues/3361
V1.27 [2023-01-08]
- KcpClient.Connect: invoke own events directly instead of going through peer,
which calls our own events anyway
- fix: KcpPeer/Client/Server callbacks are readonly and assigned in constructor
to ensure they are safe to use at all times.
fixes https://github.com/MirrorNetworking/Mirror/issues/3337
V1.26 [2022-12-22]
- KcpPeer.RawInput: fix compile error in old Unity Mono versions
- fix: KcpServer sets up a new connection's OnError immediately.
fixes KcpPeer throwing NullReferenceException when attempting to call OnError
after authentication errors.
- improved log messages
V1.25 [2022-12-14]
- breaking: removed where-allocation. use IL2CPP on servers instead.
- breaking: KcpConfig to simplify configuration
- high level cleanups
V1.24 [2022-12-14]
- KcpClient: fixed NullReferenceException when connection without a server.
added test coverage to ensure this never happens again.
V1.23 [2022-12-07]
- KcpClient: rawReceiveBuffer exposed
- fix: KcpServer RawSend uses connection.remoteEndPoint instead of the helper
'newClientEP'. fixes clients receiving the wrong messages meant for others.
https://github.com/MirrorNetworking/Mirror/issues/3296
V1.22 [2022-11-30]
- high level refactor, part two.
V1.21 [2022-11-24]
- high level refactor, part one.
- KcpPeer instead of KcpConnection, KcpClientConnection, KcpServerConnection
- RawSend/Receive can now easily be overwritten in KcpClient/Server.
for non-alloc, relays, etc.
V1.20 [2022-11-22]
- perf: KcpClient receive allocation was removed entirely.
reduces Mirror benchmark client sided allocations from 4.9 KB / 1.7 KB (non-alloc) to 0B.
- fix: KcpConnection.Disconnect does not check socket.Connected anymore.
UDP sockets don't have a connection.
fixes Disconnects not being sent to clients in netcore.
- KcpConnection.SendReliable: added OnError instead of logs
V1.19 [2022-05-12]
- feature: OnError ErrorCodes
V1.18 [2022-05-08]
- feature: OnError to allow higher level to show popups etc.
- feature: KcpServer.GetClientAddress is now GetClientEndPoint in order to
expose more details
- ResolveHostname: include exception in log for easier debugging
- fix: KcpClientConnection.RawReceive now logs the SocketException even if
it was expected. makes debugging easier.
- fix: KcpServer.TickIncoming now logs the SocketException even if it was
expected. makes debugging easier.
- fix: KcpClientConnection.RawReceive now calls Disconnect() if the other end
has closed the connection. better than just remaining in a state with unusable
sockets.
V1.17 [2022-01-09]
- perf: server/client MaximizeSendReceiveBuffersToOSLimit option to set send/recv
buffer sizes to OS limit. avoids drops due to small buffers under heavy load.
V1.16 [2022-01-06]
- fix: SendUnreliable respects ArraySegment.Offset
- fix: potential bug with negative length (see PR #2)
- breaking: removed pause handling because it's not necessary for Mirror anymore
V1.15 [2021-12-11]
- feature: feature: MaxRetransmits aka dead_link now configurable
- dead_link disconnect message improved to show exact retransmit count
V1.14 [2021-11-30]
- fix: Send() now throws an exception for messages which require > 255 fragments
- fix: ReliableMaxMessageSize is now limited to messages which require <= 255 fragments
V1.13 [2021-11-28]
- fix: perf: uncork max message size from 144 KB to as much as we want based on
receive window size.
fixes https://github.com/vis2k/kcp2k/issues/22
fixes https://github.com/skywind3000/kcp/pull/291
- feature: OnData now includes channel it was received on
V1.12 [2021-07-16]
- Tests: don't depend on Unity anymore
- fix: #26 - Kcp now catches exception if host couldn't be resolved, and calls
OnDisconnected to let the user now.
- fix: KcpServer.DualMode is now configurable in the constructor instead of
using #if UNITY_SWITCH. makes it run on all other non dual mode platforms too.
- fix: where-allocation made optional via virtuals and inheriting
KcpServer/Client/Connection NonAlloc classes. fixes a bug where some platforms
might not support where-allocation.
V1.11 rollback [2021-06-01]
- perf: Segment MemoryStream initial capacity set to MTU to avoid early runtime
resizing/allocations
V1.10 [2021-05-28]
- feature: configurable Timeout
- allocations explained with comments (C# ReceiveFrom / IPEndPoint.GetHashCode)
- fix: #17 KcpConnection.ReceiveNextReliable now assigns message default so it
works in .net too
- fix: Segment pool is not static anymore. Each kcp instance now has it's own
Pool<Segment>. fixes #18 concurrency issues
V1.9 [2021-03-02]
- Tick() split into TickIncoming()/TickOutgoing() to use in Mirror's new update
functions. allows to minimize latency.
=> original Tick() is still supported for convenience. simply processes both!
V1.8 [2021-02-14]
- fix: Unity IPv6 errors on Nintendo Switch
- fix: KcpConnection now disconnects if data message was received without content.
previously it would call OnData with an empty ArraySegment, causing all kinds of
weird behaviour in Mirror/DOTSNET. Added tests too.
- fix: KcpConnection.SendData: don't allow sending empty messages anymore. disconnect
and log a warning to make it completely obvious.
V1.7 [2021-01-13]
- fix: unreliable messages reset timeout now too
- perf: KcpConnection OnCheckEnabled callback changed to a simple 'paused' boolean.
This is faster than invoking a Func<bool> every time and allows us to fix #8 more
easily later by calling .Pause/.Unpause from OnEnable/OnDisable in MirrorTransport.
- fix #8: Unpause now resets timeout to fix a bug where Mirror would pause kcp,
change the scene which took >10s, then unpause and kcp would detect the lack of
any messages for >10s as timeout. Added test to make sure it never happens again.
- MirrorTransport: statistics logging for headless servers
- Mirror Transport: Send/Receive window size increased once more from 2048 to 4096.
V1.6 [2021-01-10]
- Unreliable channel added!
- perf: KcpHeader byte added to every kcp message to indicate
Handshake/Data/Ping/Disconnect instead of scanning each message for Hello/Byte/Ping
content via SegmentEquals. It's a lot cleaner, should be faster and should avoid
edge cases where a message content would equal Hello/Ping/Bye sequence accidentally.
- Kcp.Input: offset moved to parameters for cases where it's needed
- Kcp.SetMtu from original Kcp.c
V1.5 [2021-01-07]
- KcpConnection.MaxSend/ReceiveRate calculation based on the article
- MirrorTransport: large send/recv window size defaults to avoid high latencies caused
by packets not being processed fast enough
- MirrorTransport: show MaxSend/ReceiveRate in debug gui
- MirrorTransport: don't Log.Info to console in headless mode if debug log is disabled
V1.4 [2020-11-27]
- fix: OnCheckEnabled added. KcpConnection message processing while loop can now
be interrupted immediately. fixes Mirror Transport scene changes which need to stop
processing any messages immediately after a scene message)
- perf: Mirror KcpTransport: FastResend enabled by default. turbo mode according to:
https://github.com/skywind3000/kcp/blob/master/README.en.md#protocol-configuration
- perf: Mirror KcpTransport: CongestionControl disabled by default (turbo mode)
V1.3 [2020-11-17]
- Log.Info/Warning/Error so logging doesn't depend on UnityEngine anymore
- fix: Server.Tick catches SocketException which happens if Android client is killed
- MirrorTransport: debugLog option added that can be checked in Unity Inspector
- Utils.Clamp so Kcp.cs doesn't depend on UnityEngine
- Utils.SegmentsEqual: use Linq SequenceEqual so it doesn't depend on UnityEngine
=> kcp2k can now be used in any C# project even without Unity
V1.2 [2020-11-10]
- more tests added
- fix: raw receive buffers are now all of MTU size
- fix: raw receive detects error where buffer was too small for msgLength and
result in excess data being dropped silently
- KcpConnection.MaxMessageSize added for use in high level
- KcpConnection.MaxMessageSize increased from 1200 bytes to to maximum allowed
message size of 145KB for kcp (based on mtu, overhead, wnd_rcv)
V1.1 [2020-10-30]
- high level cleanup, fixes, improvements
V1.0 [2020-10-22]
- Kcp.cs now mirrors original Kcp.c behaviour
(this fixes dozens of bugs)
V0.1
- initial kcp-csharp based version

View File

@@ -0,0 +1,7 @@
fileFormatVersion: 2
guid: 0d1fd9853ebb3d047acd92fae8441350
TextScriptImporter:
externalObjects: {}
userData:
assetBundleName:
assetBundleVariant:

View File

@@ -0,0 +1,8 @@
fileFormatVersion: 2
guid: d84ce75a45b625749bb2ee11dadf14e9
folderAsset: yes
DefaultImporter:
externalObjects: {}
userData:
assetBundleName:
assetBundleVariant:

View File

@@ -0,0 +1,8 @@
namespace kcp2k
{
internal struct AckItem
{
internal uint serialNumber;
internal uint timestamp;
}
}

View File

@@ -1,5 +1,5 @@
fileFormatVersion: 2
guid: eb202d4e9e36ea846861fa8378dd1c23
guid: cbc45820dcb825444a1e4b3a1bcba94a
MonoImporter:
externalObjects: {}
serializedVersion: 2

View File

@@ -0,0 +1,3 @@
using System.Runtime.CompilerServices;
[assembly: InternalsVisibleTo("kcp2k.Tests")]

View File

@@ -0,0 +1,11 @@
fileFormatVersion: 2
guid: 1731c38e547afe045bd600f9082a0f33
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,11 @@
fileFormatVersion: 2
guid: c43eaf92abd8f764bb6f94c9b5a701c4
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

View File

@@ -0,0 +1,46 @@
// Pool to avoid allocations (from libuv2k & Mirror)
using System;
using System.Collections.Generic;
namespace kcp2k
{
public class Pool<T>
{
// Mirror is single threaded, no need for concurrent collections
readonly Stack<T> objects = new Stack<T>();
// some types might need additional parameters in their constructor, so
// we use a Func<T> generator
readonly Func<T> objectGenerator;
// some types might need additional cleanup for returned objects
readonly Action<T> objectResetter;
public Pool(Func<T> objectGenerator, Action<T> objectResetter, int initialCapacity)
{
this.objectGenerator = objectGenerator;
this.objectResetter = objectResetter;
// allocate an initial pool so we have fewer (if any)
// allocations in the first few frames (or seconds).
for (int i = 0; i < initialCapacity; ++i)
objects.Push(objectGenerator());
}
// take an element from the pool, or create a new one if empty
public T Take() => objects.Count > 0 ? objects.Pop() : objectGenerator();
// return an element to the pool
public void Return(T item)
{
objectResetter(item);
objects.Push(item);
}
// clear the pool
public void Clear() => objects.Clear();
// count to see how many objects are in the pool. useful for tests.
public int Count => objects.Count;
}
}

View File

@@ -0,0 +1,11 @@
fileFormatVersion: 2
guid: d78a539c8e8e73a46bd6ffd83d5e9d8c
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

View File

@@ -0,0 +1,78 @@
using System.IO;
namespace kcp2k
{
// KCP Segment Definition
internal class Segment
{
internal uint conv; // conversation
internal uint cmd; // command, e.g. Kcp.CMD_ACK etc.
// fragment (sent as 1 byte).
// 0 if unfragmented, otherwise fragment numbers in reverse: N,..,32,1,0
// this way the first received segment tells us how many fragments there are.
internal uint frg;
internal uint wnd; // window size that the receive can currently receive
internal uint ts; // timestamp
internal uint sn; // sequence number
internal uint una;
internal uint resendts; // resend timestamp
internal int rto;
internal uint fastack;
internal uint xmit; // retransmit count
// we need an auto scaling byte[] with a WriteBytes function.
// MemoryStream does that perfectly, no need to reinvent the wheel.
// note: no need to pool it, because Segment is already pooled.
// -> default MTU as initial capacity to avoid most runtime resizing/allocations
//
// .data is only used for Encode(), which always fits it into a buffer.
// the buffer is always Kcp.buffer. Kcp ctor creates the buffer of size:
// (mtu + OVERHEAD) * 3 bytes.
// in other words, Encode only ever writes up to the above amount of bytes.
internal MemoryStream data = new MemoryStream(Kcp.MTU_DEF);
// ikcp_encode_seg
// encode a segment into buffer.
// buffer is always Kcp.buffer. Kcp ctor creates the buffer of size:
// (mtu + OVERHEAD) * 3 bytes.
// in other words, Encode only ever writes up to the above amount of bytes.
internal int Encode(byte[] ptr, int offset)
{
int previousPosition = offset;
offset += Utils.Encode32U(ptr, offset, conv);
offset += Utils.Encode8u(ptr, offset, (byte)cmd);
// IMPORTANT kcp encodes 'frg' as 1 byte.
// so we can only support up to 255 fragments.
// (which limits max message size to around 288 KB)
offset += Utils.Encode8u(ptr, offset, (byte)frg);
offset += Utils.Encode16U(ptr, offset, (ushort)wnd);
offset += Utils.Encode32U(ptr, offset, ts);
offset += Utils.Encode32U(ptr, offset, sn);
offset += Utils.Encode32U(ptr, offset, una);
offset += Utils.Encode32U(ptr, offset, (uint)data.Position);
int written = offset - previousPosition;
return written;
}
// reset to return a fresh segment to the pool
internal void Reset()
{
conv = 0;
cmd = 0;
frg = 0;
wnd = 0;
ts = 0;
sn = 0;
una = 0;
rto = 0;
xmit = 0;
resendts = 0;
fastack = 0;
// keep buffer for next pool usage, but reset length (= bytes written)
data.SetLength(0);
}
}
}

View File

@@ -0,0 +1,11 @@
fileFormatVersion: 2
guid: 8749c094620ccde478c15165010ffbf1
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

View File

@@ -0,0 +1,76 @@
using System.Runtime.CompilerServices;
namespace kcp2k
{
public static partial class Utils
{
// Clamp so we don't have to depend on UnityEngine
public static int Clamp(int value, int min, int max)
{
if (value < min) return min;
if (value > max) return max;
return value;
}
// encode 8 bits unsigned int
public static int Encode8u(byte[] p, int offset, byte value)
{
p[0 + offset] = value;
return 1;
}
// decode 8 bits unsigned int
public static int Decode8u(byte[] p, int offset, out byte value)
{
value = p[0 + offset];
return 1;
}
// encode 16 bits unsigned int (lsb)
public static int Encode16U(byte[] p, int offset, ushort value)
{
p[0 + offset] = (byte)(value >> 0);
p[1 + offset] = (byte)(value >> 8);
return 2;
}
// decode 16 bits unsigned int (lsb)
public static int Decode16U(byte[] p, int offset, out ushort value)
{
ushort result = 0;
result |= p[0 + offset];
result |= (ushort)(p[1 + offset] << 8);
value = result;
return 2;
}
// encode 32 bits unsigned int (lsb)
public static int Encode32U(byte[] p, int offset, uint value)
{
p[0 + offset] = (byte)(value >> 0);
p[1 + offset] = (byte)(value >> 8);
p[2 + offset] = (byte)(value >> 16);
p[3 + offset] = (byte)(value >> 24);
return 4;
}
// decode 32 bits unsigned int (lsb)
public static int Decode32U(byte[] p, int offset, out uint value)
{
uint result = 0;
result |= p[0 + offset];
result |= (uint)(p[1 + offset] << 8);
result |= (uint)(p[2 + offset] << 16);
result |= (uint)(p[3 + offset] << 24);
value = result;
return 4;
}
// timediff was a macro in original Kcp. let's inline it if possible.
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static int TimeDiff(uint later, uint earlier)
{
return (int)(later - earlier);
}
}
}

View File

@@ -0,0 +1,11 @@
fileFormatVersion: 2
guid: 927985771e12e4c45b199b20c8477369
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

View File

@@ -1,16 +1,15 @@
using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Runtime.InteropServices;
using System.Threading;
using TEngine.Core;
using kcp2k;
// ReSharper disable ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract
// ReSharper disable PossibleNullReferenceException
// ReSharper disable InconsistentNaming
#pragma warning disable CS8602
#pragma warning disable CS8625
#pragma warning disable CS8618
namespace TEngine.Core.Network
{
@@ -55,21 +54,22 @@ namespace TEngine.Core.Network
{
ThreadSynchronizationContext.Main.Post(OnConnectDisconnect);
}
_socket.Disconnect(false);
_socket.Close();
}
_kcp = null;
_maxSndWnd = 0;
_updateMinTime = 0;
_memoryPool.Dispose();
_memoryPool = null;
_sendAction = null;
_rawSendBuffer = null;
_rawReceiveBuffer = null;
_packetParser?.Dispose();
_receiveMemoryStream?.Dispose();
ClearConnectTimeout(ref _connectTimeoutId);
if (_messageCache != null)
@@ -77,13 +77,6 @@ namespace TEngine.Core.Network
_messageCache.Clear();
_messageCache = null;
}
if (_kcpIntPtr != IntPtr.Zero)
{
KCP.KcpRelease(_kcpIntPtr);
ConnectionPtrChannel.Remove(_kcpIntPtr);
_kcpIntPtr = IntPtr.Zero;
}
#if NETDEBUG
Log.Debug($"KCPClientNetwork ConnectionPtrChannel:{ConnectionPtrChannel.Count}");
#endif
@@ -110,6 +103,7 @@ namespace TEngine.Core.Network
_maxSndWnd = _kcpSettings.MaxSendWindowSize;
_messageCache = new Queue<MessageCacheInfo>();
_rawReceiveBuffer = new byte[_kcpSettings.Mtu + 5];
_memoryPool = MemoryPool<byte>.Shared;
_sendAction = (rpcId, routeTypeOpCode, routeId, memoryStream, message) =>
{
@@ -151,7 +145,7 @@ namespace TEngine.Core.Network
private Socket _socket;
private int _maxSndWnd;
private IntPtr _kcpIntPtr;
private Kcp _kcp;
private bool _isDisconnect;
private long _updateMinTime;
private byte[] _rawSendBuffer;
@@ -159,15 +153,12 @@ namespace TEngine.Core.Network
private byte[] _rawReceiveBuffer;
private KCPSettings _kcpSettings;
private APacketParser _packetParser;
private MemoryStream _receiveMemoryStream;
private MemoryPool<byte> _memoryPool;
private Queue<MessageCacheInfo> _messageCache;
private Action<uint, long, long, MemoryStream, object> _sendAction;
private readonly Queue<uint> _updateTimeOutTime = new Queue<uint>();
private EndPoint _clientEndPoint = new IPEndPoint(IPAddress.Any, 0);
private readonly SortedDictionary<uint, Action> _updateTimer = new SortedDictionary<uint, Action>();
private static readonly Dictionary<IntPtr, KCPClientNetwork> ConnectionPtrChannel = new Dictionary<IntPtr, KCPClientNetwork>();
private readonly SortedSet<uint> _updateTimer = new SortedSet<uint>();
private uint TimeNow => (uint) (TimeHelper.Now - _startTime);
private void Receive()
@@ -213,14 +204,11 @@ namespace TEngine.Core.Network
SendHeader(KcpHeader.ConfirmConnection);
ClearConnectTimeout(ref _connectTimeoutId);
// 创建KCP和相关的初始化
_kcpIntPtr = KCP.KcpCreate(channelId, new IntPtr(channelId));
KCP.KcpNodelay(_kcpIntPtr, 1, 5, 2, 1);
KCP.KcpWndsize(_kcpIntPtr, _kcpSettings.SendWindowSize, _kcpSettings.ReceiveWindowSize);
KCP.KcpSetmtu(_kcpIntPtr, _kcpSettings.Mtu);
KCP.KcpSetminrto(_kcpIntPtr, 30);
KCP.KcpSetoutput(_kcpIntPtr, KcpOutput);
_kcp = new Kcp(channelId, Output);
_kcp.SetNoDelay(1, 5, 2, true);
_kcp.SetWindowSize(_kcpSettings.SendWindowSize, _kcpSettings.ReceiveWindowSize);
_kcp.SetMtu(_kcpSettings.Mtu);
_rawSendBuffer = new byte[ushort.MaxValue];
_receiveMemoryStream = MemoryStreamHelper.GetRecyclableMemoryStream();
_packetParser = APacketParser.CreatePacketParser(NetworkTarget);
// 把缓存的消息全部发送给服务器
@@ -248,7 +236,6 @@ namespace TEngine.Core.Network
_messageCache.Clear();
_messageCache = null;
ConnectionPtrChannel.Add(_kcpIntPtr, this);
// 调用ChannelId改变事件、就算没有改变也要发下、接收事件的地方会判定下
ThreadSynchronizationContext.Main.Post(() =>
{
@@ -272,8 +259,8 @@ namespace TEngine.Core.Network
{
break;
}
KCP.KcpInput(_kcpIntPtr, _rawReceiveBuffer, 5, messageLength);
_kcp.Input(_rawReceiveBuffer, 5, messageLength);
AddToUpdate(0);
KcpReceive();
break;
@@ -284,7 +271,7 @@ namespace TEngine.Core.Network
{
break;
}
_isDisconnect = true;
Dispose();
break;
@@ -310,8 +297,8 @@ namespace TEngine.Core.Network
}
#endif
// 检查等待发送的消息如果超出两倍窗口大小KCP作者给的建议是要断开连接
var waitSendSize = KCP.KcpWaitsnd(_kcpIntPtr);
var waitSendSize = _kcp.WaitSnd;
if (waitSendSize > _maxSndWnd)
{
@@ -319,15 +306,9 @@ namespace TEngine.Core.Network
Dispose();
return;
}
// 发送消息
KCP.KcpSend(_kcpIntPtr, memoryStream.GetBuffer(), (int) memoryStream.Length);
// 因为memoryStream对象池出来的、所以需要手动回收下
_kcp.Send(memoryStream.GetBuffer(), 0, (int)memoryStream.Length);
memoryStream.Dispose();
AddToUpdate(0);
}
@@ -365,7 +346,7 @@ namespace TEngine.Core.Network
_sendAction(rpcId, routeTypeOpCode, entityId, null, message);
}
private void Output(IntPtr bytes, int count)
private void Output(byte[] bytes, int count)
{
#if TENGINE_DEVELOP
if (NetworkThread.Instance.ManagedThreadId != Thread.CurrentThread.ManagedThreadId)
@@ -374,7 +355,7 @@ namespace TEngine.Core.Network
return;
}
#endif
if (IsDisposed || _kcpIntPtr == IntPtr.Zero)
if (IsDisposed)
{
return;
}
@@ -388,7 +369,7 @@ namespace TEngine.Core.Network
_rawSendBuffer.WriteTo(0, (byte) KcpHeader.ReceiveData);
_rawSendBuffer.WriteTo(1, ChannelId);
Marshal.Copy(bytes, _rawSendBuffer, 5, count);
Buffer.BlockCopy(bytes, 0, _rawSendBuffer, 5, count);
_socket.Send(_rawSendBuffer, 0, count + 5, SocketFlags.None);
}
catch (Exception e)
@@ -406,7 +387,7 @@ namespace TEngine.Core.Network
return;
}
#endif
if (IsDisposed || _kcpIntPtr == IntPtr.Zero)
if (IsDisposed)
{
return;
}
@@ -416,8 +397,8 @@ namespace TEngine.Core.Network
try
{
// 获得一个完整消息的长度
var peekSize = KCP.KcpPeeksize(_kcpIntPtr);
var peekSize = _kcp.PeekSize();
// 如果没有接收的消息那就跳出当前循环。
@@ -433,9 +414,8 @@ namespace TEngine.Core.Network
throw new Exception("SocketError.NetworkReset");
}
_receiveMemoryStream.SetLength(peekSize);
_receiveMemoryStream.Seek(0, SeekOrigin.Begin);
var receiveCount = KCP.KcpRecv(_kcpIntPtr, _receiveMemoryStream.GetBuffer(), peekSize);
var receiveMemoryOwner = _memoryPool.Rent(Packet.OuterPacketMaxLength);
var receiveCount = _kcp.Receive(receiveMemoryOwner.Memory, peekSize);
// 如果接收的长度跟peekSize不一样不需要处理因为消息肯定有问题的(虽然不可能出现)。
@@ -445,13 +425,10 @@ namespace TEngine.Core.Network
break;
}
var packInfo = _packetParser.UnPack(_receiveMemoryStream);
if (packInfo == null)
if (!_packetParser.UnPack(receiveMemoryOwner, out var packInfo))
{
break;
}
ThreadSynchronizationContext.Main.Post(() =>
{
@@ -483,15 +460,13 @@ namespace TEngine.Core.Network
foreach (var timeId in _updateTimer)
{
var key = timeId.Key;
if (key > nowTime)
if (timeId > nowTime)
{
_updateMinTime = key;
_updateMinTime = timeId;
break;
}
_updateTimeOutTime.Enqueue(key);
_updateTimeOutTime.Enqueue(timeId);
}
while (_updateTimeOutTime.TryDequeue(out var time))
@@ -521,7 +496,7 @@ namespace TEngine.Core.Network
_updateMinTime = tillTime;
}
_updateTimer[tillTime] = KcpUpdate;
_updateTimer.Add(tillTime);
}
private void KcpUpdate()
@@ -537,17 +512,14 @@ namespace TEngine.Core.Network
try
{
KCP.KcpUpdate(_kcpIntPtr, nowTime);
_kcp.Update(nowTime);
}
catch (Exception e)
{
Log.Error(e);
}
if (_kcpIntPtr != IntPtr.Zero)
{
AddToUpdate(KCP.KcpCheck(_kcpIntPtr, nowTime));
}
AddToUpdate(_kcp.Check(nowTime));
}
public override void RemoveChannel(uint channelId)
@@ -590,38 +562,6 @@ namespace TEngine.Core.Network
TimerScheduler.Instance.Core.RemoveByRef(ref connectTimeoutId);
}
#if ENABLE_IL2CPP
[AOT.MonoPInvokeCallback(typeof(KcpOutput))]
#endif
private static int KcpOutput(IntPtr bytes, int count, IntPtr kcp, IntPtr user)
{
#if TENGINE_DEVELOP
if (NetworkThread.Instance.ManagedThreadId != Thread.CurrentThread.ManagedThreadId)
{
Log.Error("not in NetworkThread!");
return 0;
}
#endif
try
{
if (kcp == IntPtr.Zero || !ConnectionPtrChannel.TryGetValue(kcp, out var channel))
{
return 0;
}
if (!channel.IsDisposed)
{
channel.Output(bytes, count);
}
}
catch (Exception e)
{
Log.Error(e);
}
return count;
}
#endregion
}

View File

@@ -5,11 +5,10 @@ using System.Linq;
using System.Net;
using System.Net.Sockets;
using TEngine.DataStructure;
using TEngine.Core;
using kcp2k;
// ReSharper disable InconsistentNaming
#pragma warning disable CS8601
#pragma warning disable CS8625
#pragma warning disable CS8618
// ReSharper disable ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract
namespace TEngine.Core.Network
{
@@ -102,7 +101,6 @@ namespace TEngine.Core.Network
private readonly SortedOneToManyList<uint, uint> _pendingConnectionTimer = new SortedOneToManyList<uint, uint>();
private readonly Dictionary<uint, KCPServerNetworkChannel> _pendingConnection = new Dictionary<uint, KCPServerNetworkChannel>();
private readonly Dictionary<uint, KCPServerNetworkChannel> _connectionChannel = new Dictionary<uint, KCPServerNetworkChannel>();
public static readonly Dictionary<IntPtr, KCPServerNetworkChannel> ConnectionPtrChannel = new Dictionary<IntPtr, KCPServerNetworkChannel>();
private KCPSettings KcpSettings { get; set; }
private uint TimeNow => (uint) (TimeHelper.Now - _startTime);
@@ -170,6 +168,7 @@ namespace TEngine.Core.Network
try
{
var receiveLength = _socket.ReceiveFrom(_rawReceiveBuffer, ref _clientEndPoint);
if (receiveLength < 1)
{
continue;
@@ -182,7 +181,6 @@ namespace TEngine.Core.Network
{
case KcpHeader.RequestConnection:
{
// Log.Debug("KcpHeader.RequestConnection");
if (receiveLength != 5)
{
break;
@@ -232,17 +230,12 @@ namespace TEngine.Core.Network
break;
}
var kcpIntPtr = KCP.KcpCreate(channelId, new IntPtr(channelId));
KCP.KcpNodelay(kcpIntPtr, 1, 5, 2, 1);
KCP.KcpWndsize(kcpIntPtr, KcpSettings.SendWindowSize, KcpSettings.ReceiveWindowSize);
KCP.KcpSetmtu(kcpIntPtr, KcpSettings.Mtu);
KCP.KcpSetminrto(kcpIntPtr, 30);
KCP.KcpSetoutput(kcpIntPtr, KcpOutput);
var kcp = new Kcp(channelId, channel.Output);
kcp.SetNoDelay(1, 5, 2, true);
kcp.SetWindowSize(KcpSettings.SendWindowSize, KcpSettings.ReceiveWindowSize);
kcp.SetMtu(KcpSettings.Mtu);
_connectionChannel.Add(channel.Id, channel);
ConnectionPtrChannel.Add(kcpIntPtr, channel);
channel.Connect(kcpIntPtr, AddToUpdate, KcpSettings.MaxSendWindowSize, NetworkTarget, NetworkMessageScheduler);
channel.Connect(kcp, AddToUpdate, KcpSettings.MaxSendWindowSize, NetworkTarget, NetworkMessageScheduler);
break;
}
case KcpHeader.ReceiveData:
@@ -260,7 +253,7 @@ namespace TEngine.Core.Network
break;
}
KCP.KcpInput(channel.KcpIntPtr, _rawReceiveBuffer, 5, messageLength);
channel.Kcp.Input(_rawReceiveBuffer, 5, messageLength);
AddToUpdate(0, channel.Id);
channel.Receive();
break;
@@ -279,6 +272,7 @@ namespace TEngine.Core.Network
}
}
}
private bool RemovePendingConnection(uint channelId, EndPoint remoteEndPoint, out KCPServerNetworkChannel channel)
{
@@ -391,18 +385,19 @@ namespace TEngine.Core.Network
continue;
}
var channelKcp = channel.Kcp;
try
{
KCP.KcpUpdate(channel.KcpIntPtr, nowTime);
channelKcp.Update(nowTime);
}
catch (Exception e)
{
Log.Error(e);
}
if (channel.KcpIntPtr != IntPtr.Zero)
if (channelKcp != null)
{
AddToUpdate(KCP.KcpCheck(channel.KcpIntPtr, nowTime), channelId);
AddToUpdate(channelKcp.Check(nowTime), channelId);
}
}
@@ -461,39 +456,6 @@ namespace TEngine.Core.Network
_updateTimer.Add(tillTime, channelId);
}
#if ENABLE_IL2CPP
[AOT.MonoPInvokeCallback(typeof(KcpOutput))]
#endif
private static int KcpOutput(IntPtr bytes, int count, IntPtr kcp, IntPtr user)
{
#if TENGINE_DEVELOP
if (NetworkThread.Instance.ManagedThreadId != Thread.CurrentThread.ManagedThreadId)
{
Log.Error("not in NetworkThread!");
return 0;
}
#endif
try
{
if (kcp == IntPtr.Zero || !ConnectionPtrChannel.TryGetValue(kcp, out var channel))
{
return 0;
}
if (!channel.IsDisposed)
{
channel.Output(bytes, count);
}
}
catch (Exception e)
{
Log.Error(e);
}
return count;
}
#endregion
}

View File

@@ -1,12 +1,14 @@
using System;
using System.Buffers;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Runtime.InteropServices;
using TEngine.Core;
using TEngine.DataStructure;
using kcp2k;
// ReSharper disable ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract
// ReSharper disable InconsistentNaming
#pragma warning disable CS8625
#pragma warning disable CS8618
namespace TEngine.Core.Network
{
@@ -19,8 +21,8 @@ namespace TEngine.Core.Network
public readonly uint CreateTime;
private readonly Socket _socket;
private Action<uint, uint> _addToUpdate;
private MemoryStream _receiveMemoryStream;
public IntPtr KcpIntPtr { get; private set; }
private MemoryPool<byte> _memoryPool;
public Kcp Kcp { get; private set; }
public override event Action OnDispose;
public override event Action<APackInfo> OnReceiveMemoryStream;
@@ -37,6 +39,7 @@ namespace TEngine.Core.Network
_socket = socket;
CreateTime = createTime;
RemoteEndPoint = remoteEndPoint;
_memoryPool = MemoryPool<byte>.Shared;
}
public override void Dispose()
@@ -53,28 +56,23 @@ namespace TEngine.Core.Network
return;
}
Kcp = null;
var buff = new byte[5];
buff.WriteTo(0, (byte) KcpHeader.Disconnect);
buff.WriteTo(1, Id);
_socket.SendTo(buff, 5, SocketFlags.None, RemoteEndPoint);
if (KcpIntPtr != IntPtr.Zero)
{
KCPServerNetwork.ConnectionPtrChannel.Remove(KcpIntPtr);
KCP.KcpRelease(KcpIntPtr);
KcpIntPtr = IntPtr.Zero;
}
#if NETDEBUG
Log.Debug($"KCPServerNetworkChannel ConnectionPtrChannel:{KCPServerNetwork.ConnectionPtrChannel.Count}");
#endif
_maxSndWnd = 0;
_addToUpdate = null;
_receiveMemoryStream?.Dispose();
_memoryPool.Dispose();
_memoryPool = null;
ThreadSynchronizationContext.Main.Post(OnDispose);
base.Dispose();
}
public void Connect(IntPtr kcpIntPtr, Action<uint, uint> addToUpdate, int maxSndWnd, NetworkTarget networkTarget, ANetworkMessageScheduler networkMessageScheduler)
public void Connect(Kcp kcp, Action<uint, uint> addToUpdate, int maxSndWnd, NetworkTarget networkTarget, ANetworkMessageScheduler networkMessageScheduler)
{
#if TENGINE_DEVELOP
if (NetworkThread.Instance.ManagedThreadId != Thread.CurrentThread.ManagedThreadId)
@@ -83,11 +81,10 @@ namespace TEngine.Core.Network
return;
}
#endif
KcpIntPtr = kcpIntPtr;
Kcp = kcp;
_maxSndWnd = maxSndWnd;
_addToUpdate = addToUpdate;
_rawSendBuffer = new byte[ushort.MaxValue];
_receiveMemoryStream = MemoryStreamHelper.GetRecyclableMemoryStream();
PacketParser = APacketParser.CreatePacketParser(networkTarget);
ThreadSynchronizationContext.Main.Post(() =>
@@ -110,14 +107,14 @@ namespace TEngine.Core.Network
return;
}
#endif
if (IsDisposed || KcpIntPtr == IntPtr.Zero)
if (IsDisposed)
{
return;
}
// 检查等待发送的消息如果超出两倍窗口大小KCP作者给的建议是要断开连接
var waitSendSize = KCP.KcpWaitsnd(KcpIntPtr);
var waitSendSize = Kcp.WaitSnd;
if (waitSendSize > _maxSndWnd)
{
@@ -125,15 +122,10 @@ namespace TEngine.Core.Network
Dispose();
return;
}
// 发送消息
KCP.KcpSend(KcpIntPtr, memoryStream.GetBuffer(), (int) memoryStream.Length);
Kcp.Send(memoryStream.GetBuffer(), 0, (int)memoryStream.Length);
// 因为memoryStream对象池出来的、所以需要手动回收下
memoryStream.Dispose();
_addToUpdate(0, Id);
}
@@ -146,7 +138,7 @@ namespace TEngine.Core.Network
return;
}
#endif
if (IsDisposed || KcpIntPtr == IntPtr.Zero)
if (IsDisposed)
{
return;
}
@@ -155,32 +147,25 @@ namespace TEngine.Core.Network
{
try
{
if (KcpIntPtr == IntPtr.Zero)
{
return;
}
// 获得一个完整消息的长度
var peekSize = KCP.KcpPeeksize(KcpIntPtr);
var peekSize = Kcp.PeekSize();
// 如果没有接收的消息那就跳出当前循环。
if (peekSize < 0)
{
return;
}
// 如果为0表示当前消息发生错误。提示下、一般情况不会发生的
if (peekSize == 0)
{
throw new Exception("SocketError.NetworkReset");
}
_receiveMemoryStream.SetLength(peekSize);
_receiveMemoryStream.Seek(0, SeekOrigin.Begin);
var receiveCount = KCP.KcpRecv(KcpIntPtr, _receiveMemoryStream.GetBuffer(), peekSize);
var receiveMemoryOwner = _memoryPool.Rent(Packet.OuterPacketMaxLength);
var receiveCount = Kcp.Receive(receiveMemoryOwner.Memory, peekSize);
// 如果接收的长度跟peekSize不一样不需要处理因为消息肯定有问题的(虽然不可能出现)。
@@ -190,9 +175,7 @@ namespace TEngine.Core.Network
break;
}
var packInfo = PacketParser.UnPack(_receiveMemoryStream);
if (packInfo == null)
if (!PacketParser.UnPack(receiveMemoryOwner,out var packInfo))
{
break;
}
@@ -215,7 +198,7 @@ namespace TEngine.Core.Network
}
}
public void Output(IntPtr bytes, int count)
public void Output(byte[] bytes, int count)
{
#if TENGINE_DEVELOP
if (NetworkThread.Instance.ManagedThreadId != Thread.CurrentThread.ManagedThreadId)
@@ -224,7 +207,7 @@ namespace TEngine.Core.Network
return;
}
#endif
if (IsDisposed || KcpIntPtr == IntPtr.Zero)
if (IsDisposed)
{
return;
}
@@ -238,7 +221,7 @@ namespace TEngine.Core.Network
_rawSendBuffer.WriteTo(0, (byte) KcpHeader.ReceiveData);
_rawSendBuffer.WriteTo(1, Id);
Marshal.Copy(bytes, _rawSendBuffer, 5, count);
Buffer.BlockCopy(bytes, 0, _rawSendBuffer, 5, count);
_socket.SendTo(_rawSendBuffer, 0, count + 5, SocketFlags.None, RemoteEndPoint);
}
catch (Exception e)

View File

@@ -1,37 +1,25 @@
#if TENGINE_NET
using System.Buffers;
using TEngine.DataStructure;
using TEngine.Core;
#pragma warning disable CS8600
#pragma warning disable CS8625
#pragma warning disable CS8603
// ReSharper disable ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract
namespace TEngine.Core.Network;
public sealed class InnerPackInfo : APackInfo
{
public static InnerPackInfo Create()
public static InnerPackInfo Create(IMemoryOwner<byte> memoryOwner)
{
return Pool<InnerPackInfo>.Rent();
}
public static InnerPackInfo Create(uint rpcId, long routeId, uint protocolCode)
{
var innerPackInfo = Pool<InnerPackInfo>.Rent();
innerPackInfo.RpcId = rpcId;
innerPackInfo.RouteId = routeId;
innerPackInfo.ProtocolCode = protocolCode;
var innerPackInfo = Rent<InnerPackInfo>();
innerPackInfo.MemoryOwner = memoryOwner;
return innerPackInfo;
}
public static InnerPackInfo Create(uint rpcId, long routeId, uint protocolCode, long routeTypeCode, MemoryStream memoryStream)
public override MemoryStream CreateMemoryStream()
{
var innerPackInfo = Pool<InnerPackInfo>.Rent();
innerPackInfo.RpcId = rpcId;
innerPackInfo.RouteId = routeId;
innerPackInfo.ProtocolCode = protocolCode;
innerPackInfo.RouteTypeCode = routeTypeCode;
innerPackInfo.MemoryStream = memoryStream;
return innerPackInfo;
var recyclableMemoryStream = MemoryStreamHelper.GetRecyclableMemoryStream();
recyclableMemoryStream.Write(MemoryOwner.Memory.Span.Slice(0, Packet.InnerPacketHeadLength + MessagePacketLength));
recyclableMemoryStream.Seek(0, SeekOrigin.Begin);
return recyclableMemoryStream;
}
public override void Dispose()
@@ -42,38 +30,36 @@ public sealed class InnerPackInfo : APackInfo
public override object Deserialize(Type messageType)
{
using (MemoryStream)
var memoryOwnerMemory = MemoryOwner.Memory;
memoryOwnerMemory = memoryOwnerMemory.Slice(Packet.InnerPacketHeadLength, MessagePacketLength);
switch (ProtocolCode)
{
MemoryStream.Seek(Packet.InnerPacketHeadLength, SeekOrigin.Begin);
switch (ProtocolCode)
case >= Opcode.InnerBsonRouteResponse:
{
case >= Opcode.InnerBsonRouteResponse:
{
return MongoHelper.Instance.DeserializeFrom(messageType, MemoryStream);
}
case >= Opcode.InnerRouteResponse:
{
return ProtoBufHelper.FromStream(messageType, MemoryStream);
}
case >= Opcode.OuterRouteResponse:
{
return ProtoBufHelper.FromStream(messageType, MemoryStream);
}
case >= Opcode.InnerBsonRouteMessage:
{
return MongoHelper.Instance.DeserializeFrom(messageType, MemoryStream);
}
case >= Opcode.InnerRouteMessage:
case >= Opcode.OuterRouteMessage:
{
return ProtoBufHelper.FromStream(messageType, MemoryStream);
}
default:
{
Log.Error($"protocolCode:{ProtocolCode} Does not support processing protocol");
return null;
}
return MongoHelper.Instance.Deserialize(memoryOwnerMemory, messageType);
}
case >= Opcode.InnerRouteResponse:
{
return ProtoBufHelper.FromMemory(messageType, memoryOwnerMemory);
}
case >= Opcode.OuterRouteResponse:
{
return ProtoBufHelper.FromMemory(messageType, memoryOwnerMemory);
}
case >= Opcode.InnerBsonRouteMessage:
{
return MongoHelper.Instance.Deserialize(memoryOwnerMemory, messageType);
}
case >= Opcode.InnerRouteMessage:
case >= Opcode.OuterRouteMessage:
{
return ProtoBufHelper.FromMemory(messageType, memoryOwnerMemory);
}
default:
{
Log.Error($"protocolCode:{ProtocolCode} Does not support processing protocol");
return null;
}
}
}
@@ -88,10 +74,15 @@ public sealed class InnerPacketParser : APacketParser
private bool _isUnPackHead = true;
private readonly byte[] _messageHead = new byte[Packet.InnerPacketHeadLength];
public InnerPacketParser()
{
MemoryPool = MemoryPool<byte>.Shared;
}
public override bool UnPack(CircularBuffer buffer, out APackInfo packInfo)
{
packInfo = null;
while (!IsDisposed)
{
if (_isUnPackHead)
@@ -100,48 +91,40 @@ public sealed class InnerPacketParser : APacketParser
{
return false;
}
_ = buffer.Read(_messageHead, 0, Packet.InnerPacketHeadLength);
_messagePacketLength = BitConverter.ToInt32(_messageHead, 0);
if (_messagePacketLength > Packet.PacketBodyMaxLength)
{
throw new ScanException($"The received information exceeds the maximum limit = {_messagePacketLength}");
}
_protocolCode = BitConverter.ToUInt32(_messageHead, Packet.PacketLength);
_rpcId = BitConverter.ToUInt32(_messageHead, Packet.OuterPacketRpcIdLocation);
_rpcId = BitConverter.ToUInt32(_messageHead, Packet.InnerPacketRpcIdLocation);
_routeId = BitConverter.ToInt64(_messageHead, Packet.InnerPacketRouteRouteIdLocation);
_isUnPackHead = false;
}
try
{
if (buffer.Length < _messagePacketLength)
{
return false;
}
_isUnPackHead = true;
packInfo = InnerPackInfo.Create(_rpcId, _routeId, _protocolCode);
if (_messagePacketLength > 0)
{
return true;
}
var memoryStream = MemoryStreamHelper.GetRecyclableMemoryStream();
// 创建消息包
var memoryOwner = MemoryPool.Rent(Packet.InnerPacketMaxLength);
packInfo = InnerPackInfo.Create(memoryOwner);
packInfo.RpcId = _rpcId;
packInfo.RouteId = _routeId;
packInfo.ProtocolCode = _protocolCode;
packInfo.MessagePacketLength = _messagePacketLength;
// 写入消息体的信息到内存中
memoryStream.Seek(Packet.InnerPacketHeadLength, SeekOrigin.Begin);
buffer.Read(memoryStream, _messagePacketLength);
buffer.Read(memoryOwner.Memory.Slice(Packet.InnerPacketHeadLength), _messagePacketLength);
// 写入消息头的信息到内存中
memoryStream.Seek(0, SeekOrigin.Begin);
memoryStream.Write(BitConverter.GetBytes(_messagePacketLength));
memoryStream.Write(BitConverter.GetBytes(packInfo.ProtocolCode));
memoryStream.Write(BitConverter.GetBytes(packInfo.RpcId));
memoryStream.Write(BitConverter.GetBytes(packInfo.RouteId));
memoryStream.Seek(0, SeekOrigin.Begin);
packInfo.MemoryStream = memoryStream;
_messageHead.AsMemory().CopyTo( memoryOwner.Memory.Slice(0, Packet.InnerPacketHeadLength));
return true;
}
catch (Exception e)
@@ -151,69 +134,50 @@ public sealed class InnerPacketParser : APacketParser
return false;
}
}
return false;
}
public override APackInfo UnPack(MemoryStream memoryStream)
public override bool UnPack(IMemoryOwner<byte> memoryOwner, out APackInfo packInfo)
{
InnerPackInfo packInfo = null;
packInfo = null;
try
{
if (memoryStream == null || memoryStream.Length < Packet.InnerPacketHeadLength)
{
return null;
}
_ = memoryStream.Read(_messageHead, 0, Packet.InnerPacketHeadLength);
_messagePacketLength = BitConverter.ToInt32(_messageHead, 0);
var memorySpan = memoryOwner.Memory.Span;
if (memorySpan.Length < Packet.InnerPacketHeadLength)
{
return false;
}
_messagePacketLength = BitConverter.ToInt32(memorySpan);
if (_messagePacketLength > Packet.PacketBodyMaxLength)
{
throw new ScanException($"The received information exceeds the maximum limit = {_messagePacketLength}");
}
packInfo = InnerPackInfo.Create();
packInfo.ProtocolCode = BitConverter.ToUInt32(_messageHead, Packet.PacketLength);
packInfo.RpcId = BitConverter.ToUInt32(_messageHead, Packet.OuterPacketRpcIdLocation);
packInfo.RouteId = BitConverter.ToInt64(_messageHead, Packet.InnerPacketRouteRouteIdLocation);
if (memoryStream.Length < _messagePacketLength)
{
return null;
}
if (_messagePacketLength <= 0)
{
return packInfo;
}
var outMemoryStream = MemoryStreamHelper.GetRecyclableMemoryStream();
memoryStream.WriteTo(outMemoryStream);
outMemoryStream.Seek(0, SeekOrigin.Begin);
packInfo.MemoryStream = outMemoryStream;
return packInfo;
packInfo = InnerPackInfo.Create(memoryOwner);
packInfo.MessagePacketLength = _messagePacketLength;
packInfo.ProtocolCode = BitConverter.ToUInt32(memorySpan[Packet.PacketLength..]);
packInfo.RpcId = BitConverter.ToUInt32(memorySpan[Packet.OuterPacketRpcIdLocation..]);
packInfo.RouteId = BitConverter.ToInt64(memorySpan[Packet.InnerPacketRouteRouteIdLocation..]);
if (memorySpan.Length < _messagePacketLength)
{
return false;
}
return _messagePacketLength >= 0;
}
catch (Exception e)
{
packInfo?.Dispose();
Log.Error(e);
return null;
Console.WriteLine(e);
throw;
}
}
public static void Serialize(object message, MemoryStream stream)
{
if (message is IBsonMessage)
{
MongoHelper.Instance.SerializeTo(message, stream);
return;
}
ProtoBufHelper.ToStream(message, stream);
}
public static MemoryStream Pack(uint rpcId, long routeId, MemoryStream memoryStream)
{
memoryStream.Seek(Packet.InnerPacketRpcIdLocation, SeekOrigin.Begin);
@@ -233,7 +197,15 @@ public sealed class InnerPacketParser : APacketParser
if (message != null)
{
Serialize(message, memoryStream);
if (message is IBsonMessage)
{
MongoHelper.Instance.SerializeTo(message, memoryStream);
}
else
{
ProtoBufHelper.ToStream(message, memoryStream);
}
opCode = MessageDispatcherSystem.Instance.GetOpCode(message.GetType());
packetBodyCount = (int)(memoryStream.Position - Packet.InnerPacketHeadLength);
}
@@ -255,7 +227,6 @@ public sealed class InnerPacketParser : APacketParser
public override void Dispose()
{
_messagePacketLength = 0;
Array.Clear(_messageHead, 0, _messageHead.Length);
base.Dispose();
}
}

View File

@@ -1,7 +1,6 @@
using System;
using System.Buffers;
using System.IO;
#pragma warning disable CS8625
#pragma warning disable CS8618
namespace TEngine.Core.Network
{
@@ -11,17 +10,34 @@ namespace TEngine.Core.Network
public long RouteId;
public uint ProtocolCode;
public long RouteTypeCode;
public MemoryStream MemoryStream;
public int MessagePacketLength;
public IMemoryOwner<byte> MemoryOwner;
public bool IsDisposed;
public static T Rent<T>() where T : APackInfo
{
var aPackInfo = Pool<T>.Rent();
aPackInfo.IsDisposed = false;
return aPackInfo;
}
public abstract object Deserialize(Type messageType);
public abstract MemoryStream CreateMemoryStream();
public virtual void Dispose()
{
if (IsDisposed)
{
return;
}
RpcId = 0;
RouteId = 0;
ProtocolCode = 0;
RouteTypeCode = 0;
MemoryStream = null;
MessagePacketLength = 0;
MemoryOwner.Dispose();
MemoryOwner = null;
IsDisposed = true;
}
}
}

View File

@@ -1,12 +1,13 @@
using System;
using System.Buffers;
using System.IO;
using TEngine.DataStructure;
using TEngine.Core;
namespace TEngine.Core.Network
{
public abstract class APacketParser : IDisposable
{
protected MemoryPool<byte> MemoryPool;
protected bool IsDisposed { get; private set; }
public static APacketParser CreatePacketParser(NetworkTarget networkTarget)
@@ -17,8 +18,8 @@ namespace TEngine.Core.Network
{
#if TENGINE_NET
return new InnerPacketParser();
#else
return null;
#else
throw new NotSupportedException($"PacketParserHelper Create NotSupport {networkTarget}");
#endif
}
case NetworkTarget.Outer:
@@ -33,11 +34,11 @@ namespace TEngine.Core.Network
}
public abstract bool UnPack(CircularBuffer buffer, out APackInfo packInfo);
public abstract APackInfo UnPack(MemoryStream memoryStream);
public abstract bool UnPack(IMemoryOwner<byte> memoryOwner, out APackInfo packInfo);
public virtual void Dispose()
{
IsDisposed = true;
MemoryPool.Dispose();
}
}
}

View File

@@ -1,42 +1,39 @@
using System;
using System.Buffers;
using System.IO;
using TEngine.DataStructure;
using TEngine.Core;
#pragma warning disable CS8603
#pragma warning disable CS8600
#pragma warning disable CS8625
// ReSharper disable ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract
namespace TEngine.Core.Network
{
public sealed class OuterPackInfo : APackInfo
{
public static OuterPackInfo Create()
public static OuterPackInfo Create(IMemoryOwner<byte> memoryOwner)
{
return Pool<OuterPackInfo>.Rent();
}
public static OuterPackInfo Create(uint rpcId, uint protocolCode, long routeTypeCode)
{
var outerPackInfo = Pool<OuterPackInfo>.Rent();
outerPackInfo.RpcId = rpcId;
outerPackInfo.ProtocolCode = protocolCode;
outerPackInfo.RouteTypeCode = routeTypeCode;
var outerPackInfo = Rent<OuterPackInfo>();;
outerPackInfo.MemoryOwner = memoryOwner;
return outerPackInfo;
}
public override MemoryStream CreateMemoryStream()
{
var recyclableMemoryStream = MemoryStreamHelper.GetRecyclableMemoryStream();
recyclableMemoryStream.Write(MemoryOwner.Memory.Span.Slice(0, Packet.InnerPacketHeadLength + MessagePacketLength));
recyclableMemoryStream.Seek(0, SeekOrigin.Begin);
return recyclableMemoryStream;
}
public override void Dispose()
{
base.Dispose();
Pool<OuterPackInfo>.Return(this);
}
public override object Deserialize(Type messageType)
{
using (MemoryStream)
{
MemoryStream.Seek(Packet.OuterPacketHeadLength, SeekOrigin.Begin);
return ProtoBufHelper.FromStream(messageType, MemoryStream);
}
var memoryOwnerMemory = MemoryOwner.Memory;
var memory = memoryOwnerMemory.Slice(Packet.OuterPacketHeadLength, MessagePacketLength);
return ProtoBufHelper.FromMemory(messageType, memory);
}
}
@@ -49,6 +46,11 @@ namespace TEngine.Core.Network
private bool _isUnPackHead = true;
private readonly byte[] _messageHead = new byte[Packet.OuterPacketHeadLength];
public OuterPacketParser()
{
MemoryPool = MemoryPool<byte>.Shared;
}
public override bool UnPack(CircularBuffer buffer, out APackInfo packInfo)
{
packInfo = null;
@@ -84,26 +86,18 @@ namespace TEngine.Core.Network
}
_isUnPackHead = true;
packInfo = OuterPackInfo.Create(_rpcId, _protocolCode, _routeTypeCode);
if (_messagePacketLength <= 0)
{
return true;
}
var memoryStream = MemoryStreamHelper.GetRecyclableMemoryStream();
// 创建消息包
var memoryOwner = MemoryPool.Rent(Packet.OuterPacketMaxLength);
packInfo = OuterPackInfo.Create(memoryOwner);
packInfo.RpcId = _rpcId;
packInfo.ProtocolCode = _protocolCode;
packInfo.RouteTypeCode = _routeTypeCode;
packInfo.MessagePacketLength = _messagePacketLength;
// 写入消息体的信息到内存中
memoryStream.Seek(Packet.OuterPacketHeadLength, SeekOrigin.Begin);
buffer.Read(memoryStream, _messagePacketLength);
buffer.Read(memoryOwner.Memory.Slice(Packet.OuterPacketHeadLength), _messagePacketLength);
// 写入消息头的信息到内存中
memoryStream.Seek(0, SeekOrigin.Begin);
memoryStream.Write(BitConverter.GetBytes(_messagePacketLength));
memoryStream.Write(BitConverter.GetBytes(_protocolCode));
memoryStream.Write(BitConverter.GetBytes(_rpcId));
memoryStream.Write(BitConverter.GetBytes(_routeTypeCode));
memoryStream.Seek(0, SeekOrigin.Begin);
packInfo.MemoryStream = memoryStream;
return true;
_messageHead.AsMemory().CopyTo(memoryOwner.Memory.Slice(0, Packet.OuterPacketHeadLength));
return _messagePacketLength > 0;
}
catch (Exception e)
{
@@ -116,60 +110,44 @@ namespace TEngine.Core.Network
return false;
}
public override APackInfo UnPack(MemoryStream memoryStream)
public override bool UnPack(IMemoryOwner<byte> memoryOwner, out APackInfo packInfo)
{
OuterPackInfo packInfo = null;
packInfo = null;
var memory = memoryOwner.Memory;
try
{
if (memoryStream == null)
if (memory.Length < Packet.OuterPacketHeadLength)
{
return null;
return false;
}
if (memoryStream.Length < Packet.OuterPacketHeadLength)
{
return null;
}
_ = memoryStream.Read(_messageHead, 0, Packet.OuterPacketHeadLength);
_messagePacketLength = BitConverter.ToInt32(_messageHead, 0);
var memorySpan = memory.Span;
_messagePacketLength = BitConverter.ToInt32(memorySpan);
#if TENGINE_NET
if (_messagePacketLength > Packet.PacketBodyMaxLength)
{
throw new ScanException($"The received information exceeds the maximum limit = {_messagePacketLength}");
}
#endif
packInfo = OuterPackInfo.Create();
if (packInfo == null)
{
return null;
}
packInfo.ProtocolCode = BitConverter.ToUInt32(_messageHead, Packet.PacketLength);
packInfo.RpcId = BitConverter.ToUInt32(_messageHead, Packet.OuterPacketRpcIdLocation);
packInfo.RouteTypeCode = BitConverter.ToUInt16(_messageHead, Packet.OuterPacketRouteTypeOpCodeLocation);
packInfo = OuterPackInfo.Create(memoryOwner);
packInfo.MessagePacketLength = _messagePacketLength;
packInfo.ProtocolCode = BitConverter.ToUInt32(memorySpan.Slice(Packet.PacketLength));
packInfo.RpcId = BitConverter.ToUInt32(memorySpan.Slice(Packet.OuterPacketRpcIdLocation));
packInfo.RouteTypeCode = BitConverter.ToUInt16(memorySpan.Slice(Packet.OuterPacketRouteTypeOpCodeLocation));
if (memoryStream.Length < _messagePacketLength)
if (memory.Length < _messagePacketLength)
{
return null;
return false;
}
if (_messagePacketLength <= 0)
{
return packInfo;
}
var outMemoryStream = MemoryStreamHelper.GetRecyclableMemoryStream();
memoryStream.WriteTo(outMemoryStream);
outMemoryStream.Seek(0, SeekOrigin.Begin);
packInfo.MemoryStream = outMemoryStream;
return packInfo;
return _messagePacketLength >= 0;
}
catch (Exception e)
{
packInfo?.Dispose();
Log.Error(e);
return null;
return false;
}
}

View File

@@ -45,7 +45,11 @@ namespace TEngine.Core.Network
/// <summary>
/// 外网消息总长度(消息体最大长度 + 外网消息头长度)
/// </summary>
public const int PacketMaxLength = OuterPacketHeadLength + PacketBodyMaxLength;
public const int OuterPacketMaxLength = OuterPacketHeadLength + PacketBodyMaxLength;
/// <summary>
/// 内网消息总长度(消息体最大长度 + 外网消息头长度)
/// </summary>
public const int InnerPacketMaxLength = InnerPacketHeadLength + PacketBodyMaxLength;
/// <summary>
/// 外网消息头长度(消息体长度在消息头占用的长度 + 协议编号在消息头占用的长度 + RPCId长度 + RouteTypeOpCode长度
/// </summary>