using System; using System.Collections.Generic; using System.Linq; using System.Net; namespace ET { public readonly struct RpcInfo { public readonly IRequest Request; public readonly ETTask Tcs; public RpcInfo(IRequest request) { this.Request = request; this.Tcs = ETTask.Create(true); } } [EntitySystemOf(typeof(Session))] [FriendOf(typeof(Session))] public static partial class SessionSystem { [EntitySystem] private static void Awake(this Session self, AService aService) { self.AService = aService; long timeNow = self.Fiber().TimeInfo.ClientNow(); self.LastRecvTime = timeNow; self.LastSendTime = timeNow; self.requestCallbacks.Clear(); Log.Info($"session create: zone: {self.Zone()} id: {self.Id} {timeNow} "); } [EntitySystem] private static void Destroy(this Session self) { self.AService.Remove(self.Id, self.Error); foreach (RpcInfo responseCallback in self.requestCallbacks.Values.ToArray()) { responseCallback.Tcs.SetException(new RpcException(self.Error, $"session dispose: {self.Id} {self.RemoteAddress}")); } Log.Info($"session dispose: {self.RemoteAddress} id: {self.Id} ErrorCode: {self.Error}, please see ErrorCode.cs! {self.Fiber().TimeInfo.ClientNow()}"); self.requestCallbacks.Clear(); } public static void OnResponse(this Session self, IResponse response) { if (!self.requestCallbacks.Remove(response.RpcId, out var action)) { return; } if (ErrorCore.IsRpcNeedThrowException(response.Error)) { action.Tcs.SetException(new Exception($"Rpc error, request: {action.Request} response: {response}")); return; } action.Tcs.SetResult(response); } public static async ETTask Call(this Session self, IRequest request, ETCancellationToken cancellationToken) { int rpcId = ++Session.RpcId; RpcInfo rpcInfo = new RpcInfo(request); self.requestCallbacks[rpcId] = rpcInfo; request.RpcId = rpcId; self.Send(request); void CancelAction() { if (!self.requestCallbacks.TryGetValue(rpcId, out RpcInfo action)) { return; } self.requestCallbacks.Remove(rpcId); Type responseType = OpcodeType.Instance.GetResponseType(action.Request.GetType()); IResponse response = (IResponse) Activator.CreateInstance(responseType); response.Error = ErrorCore.ERR_Cancel; action.Tcs.SetResult(response); } IResponse ret; try { cancellationToken?.Add(CancelAction); ret = await rpcInfo.Tcs; } finally { cancellationToken?.Remove(CancelAction); } return ret; } public static async ETTask Call(this Session self, IRequest request, int time = 0) { int rpcId = ++Session.RpcId; RpcInfo rpcInfo = new(request); self.requestCallbacks[rpcId] = rpcInfo; request.RpcId = rpcId; self.Send(request); if (time > 0) { async ETTask Timeout() { await self.Fiber().TimerComponent.WaitAsync(time); if (!self.requestCallbacks.TryGetValue(rpcId, out RpcInfo action)) { return; } if (!self.requestCallbacks.Remove(rpcId)) { return; } action.Tcs.SetException(new Exception($"session call timeout: {request} {time}")); } Timeout().Coroutine(); } return await rpcInfo.Tcs; } public static void Send(this Session self, IMessage message) { self.Send(default, message); } public static void Send(this Session self, ActorId actorId, IMessage message) { self.LastSendTime = self.Fiber().TimeInfo.ClientNow(); Log.Debug(message.ToString()); self.AService.Send(self.Id, actorId, message as MessageObject); } } [ChildOf] public sealed class Session: Entity, IAwake, IDestroy { public AService AService { get; set; } public static int RpcId { get; set; } public readonly Dictionary requestCallbacks = new(); public long LastRecvTime { get; set; } public long LastSendTime { get; set; } public int Error { get; set; } public IPEndPoint RemoteAddress { get; set; } } }