完善网络框架,增加服务器断开连接回调

This commit is contained in:
ALEXTANG
2023-07-18 17:36:48 +08:00
parent a473971cfb
commit 526baf45fa
39 changed files with 559 additions and 308 deletions

View File

@@ -4,17 +4,24 @@ using System.Net;
namespace TEngine.Core.Network
{
public abstract class AClientNetwork : ANetwork
public abstract class AClientNetwork: ANetwork
{
public uint ChannelId { get; protected set; }
public abstract event Action OnDispose;
public abstract event Action OnConnectFail;
public abstract event Action OnConnectComplete;
public abstract event Action OnConnectDisconnect;
public abstract event Action<uint> OnChangeChannelId;
public abstract event Action<APackInfo> OnReceiveMemoryStream;
protected AClientNetwork(Scene scene, NetworkType networkType, NetworkProtocolType networkProtocolType, NetworkTarget networkTarget) : base(scene, networkType, networkProtocolType, networkTarget) { }
protected AClientNetwork(Scene scene, NetworkType networkType, NetworkProtocolType networkProtocolType, NetworkTarget networkTarget): base(
scene, networkType, networkProtocolType, networkTarget)
{
}
public abstract uint Connect(IPEndPoint remoteEndPoint, Action onConnectComplete, Action onConnectFail, Action onConnectDisconnect,
int connectTimeout = 5000);
public abstract uint Connect(IPEndPoint remoteEndPoint, Action onConnectComplete, Action onConnectFail, int connectTimeout = 5000);
public override void Dispose()
{
ChannelId = 0;

View File

@@ -1,6 +1,7 @@
using System;
using System.IO;
using TEngine.Core;
// ReSharper disable ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract
#pragma warning disable CS8625
#pragma warning disable CS8618
@@ -26,7 +27,6 @@ namespace TEngine.Core.Network
public ANetworkMessageScheduler NetworkMessageScheduler { get; protected set; }
protected readonly Func<uint, long, long, MemoryStream, object, MemoryStream> Pack;
private readonly LastMessageInfo _lastMessageInfo = new LastMessageInfo();
protected ANetwork(Scene scene, NetworkType networkType, NetworkProtocolType networkProtocolType, NetworkTarget networkTarget)
{
@@ -69,22 +69,9 @@ namespace TEngine.Core.Network
return null;
}
#endif
if (memoryStream != null)
{
return InnerPacketParser.Pack(rpcId, routeId, memoryStream);
}
// 只针对服务器做缓存消息优化(例如群发消息等)、避免多次序列化
if (ReferenceEquals(_lastMessageInfo.Message, message))
{
_lastMessageInfo.MemoryStream.Seek(0, SeekOrigin.Begin);
return _lastMessageInfo.MemoryStream;
}
memoryStream = InnerPacketParser.Pack(rpcId, routeId, message);
_lastMessageInfo.MemoryStream = memoryStream;
_lastMessageInfo.Message = message;
return memoryStream;
return memoryStream == null
? InnerPacketParser.Pack(rpcId, routeId, message)
: InnerPacketParser.Pack(rpcId, routeId, memoryStream);
}
#endif
private MemoryStream OuterPack(uint rpcId, long routeTypeOpCode, long routeId, MemoryStream memoryStream, object message)
@@ -96,26 +83,9 @@ namespace TEngine.Core.Network
return null;
}
#endif
if (memoryStream != null)
{
return OuterPacketParser.Pack(rpcId, routeTypeOpCode, memoryStream);
}
// 只针对服务器做缓存消息优化(例如群发消息等)、避免多次序列化
// 客户端没有群发消息的功能、一般客户端都是自己缓存消息、如果这里做了缓存反而不好了
#if TENGINE_NET
if (ReferenceEquals(_lastMessageInfo.Message, message))
{
_lastMessageInfo.MemoryStream.Seek(0, SeekOrigin.Begin);
return _lastMessageInfo.MemoryStream;
}
#endif
memoryStream = OuterPacketParser.Pack(rpcId, routeTypeOpCode, message);
#if TENGINE_NET
_lastMessageInfo.MemoryStream = memoryStream;
_lastMessageInfo.Message = message;
#endif
return memoryStream;
return memoryStream == null
? OuterPacketParser.Pack(rpcId, routeTypeOpCode, message)
: OuterPacketParser.Pack(rpcId, routeTypeOpCode, memoryStream);
}
public abstract void Send(uint channelId, uint rpcId, long routeTypeOpCode, long routeId, object message);
@@ -132,8 +102,6 @@ namespace TEngine.Core.Network
NetworkType = NetworkType.None;
NetworkTarget = NetworkTarget.None;
NetworkProtocolType = NetworkProtocolType.None;
_lastMessageInfo.Dispose();
}
}
}

View File

@@ -1,19 +0,0 @@
using System;
using System.IO;
#pragma warning disable CS8625
#pragma warning disable CS8618
namespace TEngine.Core.Network
{
public class LastMessageInfo : IDisposable
{
public object Message;
public MemoryStream MemoryStream;
public void Dispose()
{
Message = null;
MemoryStream = null;
}
}
}

View File

@@ -1,11 +0,0 @@
fileFormatVersion: 2
guid: d9135b10424d287419b2b9b46caa803d
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

View File

@@ -84,7 +84,7 @@ namespace TEngine
clientNetworkComponent.Connect(ipEndPoint,null, () =>
{
Log.Error($"Unable to connect to the target server sourceServerId:{Id} targetServerId:{targetServerId}");
});
},null);
_sessions.Add(targetServerId, new ConnectInfo(clientNetworkComponent.Session, clientNetworkComponent));
return clientNetworkComponent.Session;
}

View File

@@ -9,6 +9,7 @@ namespace TEngine.Core.Network
{
private AClientNetwork Network { get; set; }
public Session Session { get; private set; }
private Action _onConnectDisconnect;
public void Initialize(NetworkProtocolType networkProtocolType, NetworkTarget networkTarget)
{
@@ -31,14 +32,15 @@ namespace TEngine.Core.Network
}
}
public void Connect(IPEndPoint remoteEndPoint, Action onConnectComplete, Action onConnectFail, int connectTimeout = 5000)
public void Connect(IPEndPoint remoteEndPoint, Action onConnectComplete, Action onConnectFail,Action onConnectDisconnect, int connectTimeout = 5000)
{
if (Network == null || Network.IsDisposed)
{
throw new NotSupportedException("Network is null or isDisposed");
}
Network.Connect(remoteEndPoint, onConnectComplete, onConnectFail, connectTimeout);
_onConnectDisconnect = onConnectDisconnect;
Network.Connect(remoteEndPoint, onConnectComplete, onConnectFail, onConnectDisconnect, connectTimeout);
Session = Session.Create(Network);
}
@@ -51,6 +53,7 @@ namespace TEngine.Core.Network
}
Session = null;
_onConnectDisconnect?.Invoke();
base.Dispose();
}
}

View File

@@ -19,7 +19,7 @@ namespace TEngine.Core.Network
private static readonly Dictionary<long, Session> Sessions = new ();
public readonly Dictionary<long, FTask<IResponse>> RequestCallback = new();
public static void Create(ANetworkMessageScheduler networkMessageScheduler, ANetworkChannel channel)
public static void Create(ANetworkMessageScheduler networkMessageScheduler, ANetworkChannel channel, NetworkTarget networkTarget)
{
#if TENGINE_DEVELOP
if (ThreadSynchronizationContext.Main.ThreadId != Thread.CurrentThread.ManagedThreadId)
@@ -33,6 +33,14 @@ namespace TEngine.Core.Network
session.NetworkMessageScheduler = networkMessageScheduler;
channel.OnDispose += session.Dispose;
channel.OnReceiveMemoryStream += session.OnReceive;
#if TENGINE_NET
if (networkTarget == NetworkTarget.Outer)
{
var interval = Define.SessionIdleCheckerInterval;
var timeOut = Define.SessionIdleCheckerTimeout;
session.AddComponent<SessionIdleCheckerComponent>().Start(interval, timeOut);
}
#endif
Sessions.Add(session.Id, session);
}

View File

@@ -37,6 +37,7 @@ namespace TEngine.Core.Network
switch (packInfo.ProtocolCode)
{
case Opcode.PingResponse:
case >= Opcode.OuterRouteMessage:
{
await Handler(session, messageType, packInfo);

View File

@@ -37,7 +37,7 @@ namespace TEngine
[ProtoIgnore] public PingResponse ResponseType { get; set; }
[ProtoMember(90)] public long RpcId { get; set; }
}
[ProtoContract]
public class PingResponse : AProto, IResponse
{
public uint OpCode()

View File

@@ -17,6 +17,7 @@ namespace TEngine.Core.Network
{
throw new NotSupportedException($"Received unsupported message protocolCode:{packInfo.ProtocolCode} messageType:{messageType}");
}
case Opcode.PingResponse:
case > Opcode.OuterRouteResponse:
{
// 这个一般是客户端Session.Call发送时使用的、目前这个逻辑只有Unity客户端时使用

View File

@@ -19,10 +19,11 @@ namespace TEngine.Core.Network
#region 线
private bool _isInit;
private Action _onConnectFail;
private Action _onConnectComplete;
private long _connectTimeoutId;
public override event Action OnDispose;
public override event Action OnConnectFail;
public override event Action OnConnectComplete;
public override event Action OnConnectDisconnect;
public override event Action<uint> OnChangeChannelId;
public override event Action<APackInfo> OnReceiveMemoryStream;
@@ -50,6 +51,11 @@ namespace TEngine.Core.Network
if (_socket.Connected)
{
if (OnConnectDisconnect != null)
{
ThreadSynchronizationContext.Main.Post(OnConnectDisconnect);
}
_socket.Disconnect(false);
_socket.Close();
}
@@ -58,7 +64,6 @@ namespace TEngine.Core.Network
_updateMinTime = 0;
_sendAction = null;
_onConnectFail = null;
_rawSendBuffer = null;
_rawReceiveBuffer = null;
@@ -89,7 +94,7 @@ namespace TEngine.Core.Network
});
}
public override uint Connect(IPEndPoint remoteEndPoint, Action onConnectComplete, Action onConnectFail, int connectTimeout = 5000)
public override uint Connect(IPEndPoint remoteEndPoint, Action onConnectComplete, Action onConnectFail, Action onConnectDisconnect, int connectTimeout = 5000)
{
if (_isInit)
{
@@ -97,8 +102,9 @@ namespace TEngine.Core.Network
}
_isInit = true;
_onConnectFail = onConnectFail;
_onConnectComplete = onConnectComplete;
OnConnectFail = onConnectFail;
OnConnectComplete = onConnectComplete;
OnConnectDisconnect = onConnectDisconnect;
ChannelId = CreateChannelId();
_kcpSettings = KCPSettings.Create(NetworkTarget);
_maxSndWnd = _kcpSettings.MaxSendWindowSize;
@@ -119,11 +125,11 @@ namespace TEngine.Core.Network
_connectTimeoutId = TimerScheduler.Instance.Core.OnceTimer(connectTimeout, () =>
{
if (_onConnectFail == null)
if (OnConnectFail == null)
{
return;
}
_onConnectFail();
OnConnectFail();
Dispose();
});
@@ -247,7 +253,7 @@ namespace TEngine.Core.Network
ThreadSynchronizationContext.Main.Post(() =>
{
OnChangeChannelId(ChannelId);
_onConnectComplete?.Invoke();
OnConnectComplete?.Invoke();
});
// 到这里正确创建上连接了、可以正常发送消息了
break;

View File

@@ -170,7 +170,6 @@ namespace TEngine.Core.Network
try
{
var receiveLength = _socket.ReceiveFrom(_rawReceiveBuffer, ref _clientEndPoint);
// Log.Debug($"_socket.ReceiveFrom receiveLength:{receiveLength}");
if (receiveLength < 1)
{
continue;
@@ -223,7 +222,6 @@ namespace TEngine.Core.Network
}
case KcpHeader.ConfirmConnection:
{
// Log.Debug("KcpHeader.ConfirmConnection");
if (receiveLength != 5)
{
break;

View File

@@ -97,7 +97,7 @@ namespace TEngine.Core.Network
return;
}
Session.Create(networkMessageScheduler, this);
Session.Create(networkMessageScheduler, this, networkTarget);
});
}

View File

@@ -18,10 +18,11 @@ namespace TEngine.Core.Network
#region 线
private bool _isInit;
private Action _onConnectFail;
private Action _onConnectComplete;
private long _connectTimeoutId;
public override event Action OnDispose;
public override event Action OnConnectFail;
public override event Action OnConnectComplete;
public override event Action OnConnectDisconnect;
public override event Action<uint> OnChangeChannelId = channelId => { };
public override event Action<APackInfo> OnReceiveMemoryStream;
@@ -30,7 +31,7 @@ namespace TEngine.Core.Network
NetworkThread.Instance.AddNetwork(this);
}
public override uint Connect(IPEndPoint remoteEndPoint, Action onConnectComplete, Action onConnectFail, int connectTimeout = 5000)
public override uint Connect(IPEndPoint remoteEndPoint, Action onConnectComplete, Action onConnectFail,Action onConnectDisconnect,int connectTimeout = 5000)
{
if (_isInit)
{
@@ -38,8 +39,8 @@ namespace TEngine.Core.Network
}
_isInit = true;
_onConnectFail = onConnectFail;
_onConnectComplete = onConnectComplete;
OnConnectFail = onConnectFail;
OnConnectComplete = onConnectComplete;
ChannelId = 0xC0000000 | (uint) new Random().Next();
_sendAction = (rpcId, routeTypeOpCode, routeId, memoryStream, message) =>
@@ -66,7 +67,7 @@ namespace TEngine.Core.Network
_connectTimeoutId = TimerScheduler.Instance.Core.OnceTimer(connectTimeout, () =>
{
_onConnectFail?.Invoke();
OnConnectFail?.Invoke();
Dispose();
});
@@ -87,7 +88,7 @@ namespace TEngine.Core.Network
return;
}
OnConnectComplete(outArgs);
OnNetworkConnectComplete(outArgs);
});
return ChannelId;
@@ -106,6 +107,10 @@ namespace TEngine.Core.Network
{
if (_socket.Connected)
{
if (OnConnectDisconnect != null)
{
ThreadSynchronizationContext.Main.Post(OnConnectDisconnect);
}
_socket.Disconnect(true);
_socket.Close();
}
@@ -145,7 +150,7 @@ namespace TEngine.Core.Network
private readonly SocketAsyncEventArgs _innArgs = new SocketAsyncEventArgs();
private Queue<MessageCacheInfo> _messageCache = new Queue<MessageCacheInfo>();
private void OnConnectComplete(SocketAsyncEventArgs asyncEventArgs)
private void OnNetworkConnectComplete(SocketAsyncEventArgs asyncEventArgs)
{
#if TENGINE_DEVELOP
if (NetworkThread.Instance.ManagedThreadId != Thread.CurrentThread.ManagedThreadId)
@@ -163,9 +168,9 @@ namespace TEngine.Core.Network
{
Log.Error($"Unable to connect to the target server asyncEventArgs:{asyncEventArgs.SocketError}");
if (_onConnectFail != null)
if (OnConnectFail != null)
{
ThreadSynchronizationContext.Main.Post(_onConnectFail);
ThreadSynchronizationContext.Main.Post(OnConnectFail);
}
Dispose();
@@ -199,9 +204,9 @@ namespace TEngine.Core.Network
_messageCache.Clear();
_messageCache = null;
if (_onConnectComplete != null)
if (OnConnectComplete != null)
{
ThreadSynchronizationContext.Main.Post(_onConnectComplete);
ThreadSynchronizationContext.Main.Post(OnConnectComplete);
}
}
@@ -509,7 +514,7 @@ namespace TEngine.Core.Network
{
case SocketAsyncOperation.Connect:
{
NetworkThread.Instance.SynchronizationContext.Post(() => OnConnectComplete(asyncEventArgs));
NetworkThread.Instance.SynchronizationContext.Post(() => OnNetworkConnectComplete(asyncEventArgs));
break;
}
case SocketAsyncOperation.Receive:

View File

@@ -173,7 +173,7 @@ namespace TEngine.Core.Network
return;
}
Session.Create(NetworkMessageScheduler, channel);
Session.Create(NetworkMessageScheduler, channel, this.NetworkTarget);
});
_connectionChannel.Add(channelId, channel);

View File

@@ -141,6 +141,10 @@ namespace TEngine.Core.Network
}
#endif
packInfo = OuterPackInfo.Create();
if (packInfo == null)
{
return null;
}
packInfo.ProtocolCode = BitConverter.ToUInt32(_messageHead, Packet.PacketLength);
packInfo.RpcId = BitConverter.ToUInt32(_messageHead, Packet.OuterPacketRpcIdLocation);
packInfo.RouteTypeCode = BitConverter.ToUInt16(_messageHead, Packet.OuterPacketRouteTypeOpCodeLocation);