#if TENGINE_NET using System.Linq.Expressions; using TEngine.Core; using MongoDB.Bson; using MongoDB.Driver; namespace TEngine.Core.DataBase; public sealed class MongoDataBase : IDateBase { private string _dbName; private string _connectionString; private MongoClient _mongoClient; private IMongoDatabase _mongoDatabase; private readonly HashSet _collections = new HashSet(); private readonly CoroutineLockQueueType _mongoDataBaseLock = new CoroutineLockQueueType("MongoDataBaseLock"); public IDateBase Initialize(string connectionString, string dbName) { _dbName = dbName; _connectionString = connectionString; _mongoClient = new MongoClient(connectionString); _mongoDatabase = _mongoClient.GetDatabase(dbName); // 记录所有集合名 _collections.UnionWith(_mongoDatabase.ListCollectionNames().ToList()); return this; } #region Other public async FTask Sum(Expression> filter, Expression> sumExpression, string collection = null) where T : Entity { var member = (MemberExpression) ((UnaryExpression) sumExpression.Body).Operand; var projection = new BsonDocument("_id", "null").Add("Result", new BsonDocument("$sum", $"${member.Member.Name}")); var data = await GetCollection(collection).Aggregate().Match(filter).Group(projection) .FirstOrDefaultAsync(); return data == null ? 0 : Convert.ToInt64(data["Result"]); } #endregion #region GetCollection private IMongoCollection GetCollection(string collection = null) { return _mongoDatabase.GetCollection(collection ?? typeof(T).Name); } private IMongoCollection GetCollection(string name) { return _mongoDatabase.GetCollection(name); } #endregion #region Count public async FTask Count(string collection = null) where T : Entity { return await GetCollection(collection).CountDocumentsAsync(d => true); } public async FTask Count(Expression> filter, string collection = null) where T : Entity { return await GetCollection(collection).CountDocumentsAsync(filter); } #endregion #region Exist public async FTask Exist(string collection = null) where T : Entity { return await Count(collection) > 0; } public async FTask Exist(Expression> filter, string collection = null) where T : Entity { return await Count(filter, collection) > 0; } #endregion #region Query public async FTask Query(long id, string collection = null) where T : Entity { using (await _mongoDataBaseLock.Lock(id)) { var cursor = await GetCollection(collection).FindAsync(d => d.Id == id); var v = await cursor.FirstOrDefaultAsync(); return v; } } public async FTask<(int count, List dates)> QueryCountAndDatesByPage(Expression> filter, int pageIndex, int pageSize, string collection = null) where T : Entity { using (await _mongoDataBaseLock.Lock(RandomHelper.RandInt64())) { var count = await Count(filter); var dates = await QueryByPage(filter, pageIndex, pageSize, collection); return ((int)count, dates); } } public async FTask<(int count, List dates)> QueryCountAndDatesByPage(Expression> filter, int pageIndex, int pageSize, string[] cols, string collection = null) where T : Entity { using (await _mongoDataBaseLock.Lock(RandomHelper.RandInt64())) { var count = await Count(filter); var dates = await QueryByPage(filter, pageIndex, pageSize, cols, collection); return ((int) count, dates); } } public async FTask> QueryByPage(Expression> filter, int pageIndex, int pageSize, string collection = null) where T : Entity { using (await _mongoDataBaseLock.Lock(RandomHelper.RandInt64())) { return await GetCollection(collection).Find(filter).Skip((pageIndex - 1) * pageSize) .Limit(pageSize) .ToListAsync(); } } public async FTask> QueryByPage(Expression> filter, int pageIndex, int pageSize, string[] cols, string collection = null) where T : Entity { using (await _mongoDataBaseLock.Lock(RandomHelper.RandInt64())) { var projection = Builders.Projection.Include(""); foreach (var col in cols) { projection = projection.Include(col); } return await GetCollection(collection).Find(filter).Project(projection) .Skip((pageIndex - 1) * pageSize).Limit(pageSize).ToListAsync(); } } public async FTask> QueryByPageOrderBy(Expression> filter, int pageIndex, int pageSize, Expression> orderByExpression, bool isAsc = true, string collection = null) where T : Entity { using (await _mongoDataBaseLock.Lock(RandomHelper.RandInt64())) { if (isAsc) { return await GetCollection(collection).Find(filter).SortBy(orderByExpression) .Skip((pageIndex - 1) * pageSize).Limit(pageSize).ToListAsync(); } return await GetCollection(collection).Find(filter).SortByDescending(orderByExpression) .Skip((pageIndex - 1) * pageSize).Limit(pageSize).ToListAsync(); } } public async FTask First(Expression> filter, string collection = null) where T : Entity { using (await _mongoDataBaseLock.Lock(RandomHelper.RandInt64())) { var cursor = await GetCollection(collection).FindAsync(filter); return await cursor.FirstOrDefaultAsync(); } } public async FTask First(string json, string[] cols, string collection = null) where T : Entity { using (await _mongoDataBaseLock.Lock(RandomHelper.RandInt64())) { var projection = Builders.Projection.Include(""); foreach (var col in cols) { projection = projection.Include(col); } var options = new FindOptions {Projection = projection}; FilterDefinition filterDefinition = new JsonFilterDefinition(json); var cursor = await GetCollection(collection).FindAsync(filterDefinition, options); return await cursor.FirstOrDefaultAsync(); } } public async FTask> QueryOrderBy(Expression> filter, Expression> orderByExpression, bool isAsc = true, string collection = null) where T : Entity { using (await _mongoDataBaseLock.Lock(RandomHelper.RandInt64())) { if (isAsc) { return await GetCollection(collection).Find(filter).SortBy(orderByExpression).ToListAsync(); } return await GetCollection(collection).Find(filter).SortByDescending(orderByExpression) .ToListAsync(); } } public async FTask> Query(Expression> filter, string collection = null) where T : Entity { using (await _mongoDataBaseLock.Lock(RandomHelper.RandInt64())) { var cursor = await GetCollection(collection).FindAsync(filter); var v = await cursor.ToListAsync(); return v; } } public async FTask Query(long id, List collectionNames, List result) { using (await _mongoDataBaseLock.Lock(id)) { if (collectionNames == null || collectionNames.Count == 0) { return; } foreach (var collectionName in collectionNames) { var cursor = await GetCollection(collectionName).FindAsync(d => d.Id == id); var e = await cursor.FirstOrDefaultAsync(); if (e == null) { continue; } result.Add(e); } } } public async FTask> QueryJson(string json, string collection = null) where T : Entity { using (await _mongoDataBaseLock.Lock(RandomHelper.RandInt64())) { FilterDefinition filterDefinition = new JsonFilterDefinition(json); var cursor = await GetCollection(collection).FindAsync(filterDefinition); var v = await cursor.ToListAsync(); return v; } } public async FTask> QueryJson(string json, string[] cols, string collection = null) where T : Entity { using (await _mongoDataBaseLock.Lock(RandomHelper.RandInt64())) { var projection = Builders.Projection.Include(""); foreach (var col in cols) { projection = projection.Include(col); } var options = new FindOptions {Projection = projection}; FilterDefinition filterDefinition = new JsonFilterDefinition(json); var cursor = await GetCollection(collection).FindAsync(filterDefinition, options); var v = await cursor.ToListAsync(); return v; } } public async FTask> QueryJson(long taskId, string json, string collection = null) where T : Entity { using (await _mongoDataBaseLock.Lock(taskId)) { FilterDefinition filterDefinition = new JsonFilterDefinition(json); var cursor = await GetCollection(collection).FindAsync(filterDefinition); var v = await cursor.ToListAsync(); return v; } } public async FTask> Query(Expression> filter, string[] cols, string collection = null) where T : class { using (await _mongoDataBaseLock.Lock(RandomHelper.RandInt64())) { var projection = Builders.Projection.Include(cols[0]); for (var i = 1; i < cols.Length; i++) { projection = projection.Include(cols[i]); } return await GetCollection(collection).Find(filter).Project(projection).ToListAsync(); } } #endregion #region Save public async FTask Save(object transactionSession, T entity, string collection = null) where T : Entity { if (entity == null) { Log.Error($"save entity is null: {typeof(T).Name}"); return; } var clone = MongoHelper.Instance.Clone(entity); using (await _mongoDataBaseLock.Lock(clone.Id)) { await GetCollection(collection ?? clone.GetType().Name).ReplaceOneAsync( (IClientSessionHandle) transactionSession, d => d.Id == clone.Id, clone, new ReplaceOptions {IsUpsert = true}); } } public async FTask Save(T entity, string collection = null) where T : Entity, new() { if (entity == null) { Log.Error($"save entity is null: {typeof(T).Name}"); return; } var clone = MongoHelper.Instance.Clone(entity); using (await _mongoDataBaseLock.Lock(clone.Id)) { await GetCollection(collection ?? clone.GetType().Name).ReplaceOneAsync(d => d.Id == clone.Id, clone, new ReplaceOptions {IsUpsert = true}); } } private async FTask SaveBase(T entity, string collection = null) where T : Entity { if (entity == null) { Log.Error($"save entity is null: {typeof(T).Name}"); return; } var clone = MongoHelper.Instance.Clone(entity); using (await _mongoDataBaseLock.Lock(clone.Id)) { await GetCollection(collection ?? clone.GetType().Name).ReplaceOneAsync(d => d.Id == clone.Id, clone, new ReplaceOptions {IsUpsert = true}); } } public async FTask Save(long id, List entities) { if (entities == null) { Log.Error("save entity is null"); return; } var clones = MongoHelper.Instance.Clone(entities); using (await _mongoDataBaseLock.Lock(id)) { foreach (Entity clone in clones) { try { await GetCollection(clone.GetType().Name).ReplaceOneAsync(d => d.Id == clone.Id, clone, new ReplaceOptions {IsUpsert = true}); } catch (Exception e) { Log.Error($"Save List Entity Error: {clone.GetType().Name} {clone}\n{e}"); } } } } #endregion #region Insert public FTask Insert(T entity, string collection = null) where T : Entity, new() { return Save(entity); } public async FTask InsertBatch(IEnumerable list, string collection = null) where T : Entity, new() { using (await _mongoDataBaseLock.Lock(RandomHelper.RandInt64())) { await GetCollection(collection ?? typeof(T).Name).InsertManyAsync(list); } } public async FTask InsertBatch(object transactionSession, IEnumerable list, string collection = null) where T : Entity, new() { using (await _mongoDataBaseLock.Lock(RandomHelper.RandInt64())) { await GetCollection(collection ?? typeof(T).Name).InsertManyAsync((IClientSessionHandle) transactionSession, list); } } #endregion #region Remove public async FTask Remove(object transactionSession, long id, string collection = null) where T : Entity, new() { using (await _mongoDataBaseLock.Lock(id)) { var result = await GetCollection(collection).DeleteOneAsync((IClientSessionHandle) transactionSession, d => d.Id == id); return result.DeletedCount; } } public async FTask Remove(long id, string collection = null) where T : Entity, new() { using (await _mongoDataBaseLock.Lock(id)) { var result = await GetCollection(collection).DeleteOneAsync(d => d.Id == id); return result.DeletedCount; } } public async FTask Remove(long id, object transactionSession, Expression> filter, string collection = null) where T : Entity, new() { using (await _mongoDataBaseLock.Lock(id)) { var result = await GetCollection(collection).DeleteManyAsync((IClientSessionHandle) transactionSession, filter); return result.DeletedCount; } } public async FTask Remove(long id, Expression> filter, string collection = null) where T : Entity, new() { using (await _mongoDataBaseLock.Lock(id)) { var result = await GetCollection(collection).DeleteManyAsync(filter); return result.DeletedCount; } } #endregion #region Index /// /// 创建数据库索引 /// /// /// /// /// /// 使用例子(可多个): /// 1 : Builders.IndexKeys.Ascending(d=>d.Id) /// 2 : Builders.IndexKeys.Descending(d=>d.Id).Ascending(d=>d.Name) /// 3 : Builders.IndexKeys.Descending(d=>d.Id),Builders.IndexKeys.Descending(d=>d.Name) /// public async FTask CreateIndex(string collection, params object[] keys) where T : Entity { if (keys == null || keys.Length <= 0) { return; } var indexModels = new List>(); foreach (object key in keys) { IndexKeysDefinition indexKeysDefinition = (IndexKeysDefinition) key; indexModels.Add(new CreateIndexModel(indexKeysDefinition)); } await GetCollection(collection).Indexes.CreateManyAsync(indexModels); } public async FTask CreateIndex(params object[] keys) where T : Entity { if (keys == null) { return; } List> indexModels = new List>(); foreach (object key in keys) { IndexKeysDefinition indexKeysDefinition = (IndexKeysDefinition) key; indexModels.Add(new CreateIndexModel(indexKeysDefinition)); } await GetCollection().Indexes.CreateManyAsync(indexModels); } #endregion #region CreateDB public async FTask CreateDB() where T : Entity { // 已经存在数据库表 string name = typeof(T).Name; if (_collections.Contains(name)) { return; } await _mongoDatabase.CreateCollectionAsync(name); _collections.Add(name); } public async FTask CreateDB(Type type) { string name = type.Name; if (_collections.Contains(name)) { return; } await _mongoDatabase.CreateCollectionAsync(name); _collections.Add(name); } #endregion } #endif