Database thread pool use (#14498)

Co-authored-by: metalgearsloth <comedian_vs_clown@hotmail.com>
This commit is contained in:
Pieter-Jan Briers
2023-05-02 02:36:39 +02:00
committed by GitHub
parent de9326da64
commit 1c4a2594ce
6 changed files with 161 additions and 87 deletions

View File

@@ -25,6 +25,8 @@ namespace Content.Server.Database
{
void Init();
void Shutdown();
#region Preferences
Task<PlayerPreferences> InitPrefsAsync(NetUserId userId, ICharacterProfile defaultProfile);
Task SaveSelectedCharacterIndexAsync(NetUserId userId, int index);
@@ -260,6 +262,10 @@ namespace Content.Server.Database
private LoggingProvider _msLogProvider = default!;
private ILoggerFactory _msLoggerFactory = default!;
private bool _synchronous;
// When running in integration tests, we'll use a single in-memory SQLite database connection.
// This is that connection, close it when we shut down.
private SqliteConnection? _sqliteInMemoryConnection;
public void Init()
{
@@ -269,12 +275,14 @@ namespace Content.Server.Database
builder.AddProvider(_msLogProvider);
});
_synchronous = _cfg.GetCVar(CCVars.DatabaseSynchronous);
var engine = _cfg.GetCVar(CCVars.DatabaseEngine).ToLower();
switch (engine)
{
case "sqlite":
var sqliteOptions = CreateSqliteOptions();
_db = new ServerDbSqlite(sqliteOptions);
SetupSqlite(out var contextFunc, out var inMemory);
_db = new ServerDbSqlite(contextFunc, inMemory);
break;
case "postgres":
var pgOptions = CreatePostgresOptions();
@@ -285,58 +293,63 @@ namespace Content.Server.Database
}
}
public void Shutdown()
{
_sqliteInMemoryConnection?.Dispose();
}
public Task<PlayerPreferences> InitPrefsAsync(NetUserId userId, ICharacterProfile defaultProfile)
{
DbWriteOpsMetric.Inc();
return _db.InitPrefsAsync(userId, defaultProfile);
return RunDbCommand(() => _db.InitPrefsAsync(userId, defaultProfile));
}
public Task SaveSelectedCharacterIndexAsync(NetUserId userId, int index)
{
DbWriteOpsMetric.Inc();
return _db.SaveSelectedCharacterIndexAsync(userId, index);
return RunDbCommand(() => _db.SaveSelectedCharacterIndexAsync(userId, index));
}
public Task SaveCharacterSlotAsync(NetUserId userId, ICharacterProfile? profile, int slot)
{
DbWriteOpsMetric.Inc();
return _db.SaveCharacterSlotAsync(userId, profile, slot);
return RunDbCommand(() => _db.SaveCharacterSlotAsync(userId, profile, slot));
}
public Task DeleteSlotAndSetSelectedIndex(NetUserId userId, int deleteSlot, int newSlot)
{
DbWriteOpsMetric.Inc();
return _db.DeleteSlotAndSetSelectedIndex(userId, deleteSlot, newSlot);
return RunDbCommand(() => _db.DeleteSlotAndSetSelectedIndex(userId, deleteSlot, newSlot));
}
public Task SaveAdminOOCColorAsync(NetUserId userId, Color color)
{
DbWriteOpsMetric.Inc();
return _db.SaveAdminOOCColorAsync(userId, color);
return RunDbCommand(() => _db.SaveAdminOOCColorAsync(userId, color));
}
public Task<PlayerPreferences?> GetPlayerPreferencesAsync(NetUserId userId)
{
DbReadOpsMetric.Inc();
return _db.GetPlayerPreferencesAsync(userId);
return RunDbCommand(() => _db.GetPlayerPreferencesAsync(userId));
}
public Task AssignUserIdAsync(string name, NetUserId userId)
{
DbWriteOpsMetric.Inc();
return _db.AssignUserIdAsync(name, userId);
return RunDbCommand(() => _db.AssignUserIdAsync(name, userId));
}
public Task<NetUserId?> GetAssignedUserIdAsync(string name)
{
DbReadOpsMetric.Inc();
return _db.GetAssignedUserIdAsync(name);
return RunDbCommand(() => _db.GetAssignedUserIdAsync(name));
}
public Task<ServerBanDef?> GetServerBanAsync(int id)
{
DbReadOpsMetric.Inc();
return _db.GetServerBanAsync(id);
return RunDbCommand(() => _db.GetServerBanAsync(id));
}
public Task<ServerBanDef?> GetServerBanAsync(
@@ -345,7 +358,7 @@ namespace Content.Server.Database
ImmutableArray<byte>? hwId)
{
DbReadOpsMetric.Inc();
return _db.GetServerBanAsync(address, userId, hwId);
return RunDbCommand(() => _db.GetServerBanAsync(address, userId, hwId));
}
public Task<List<ServerBanDef>> GetServerBansAsync(
@@ -355,19 +368,19 @@ namespace Content.Server.Database
bool includeUnbanned=true)
{
DbReadOpsMetric.Inc();
return _db.GetServerBansAsync(address, userId, hwId, includeUnbanned);
return RunDbCommand(() => _db.GetServerBansAsync(address, userId, hwId, includeUnbanned));
}
public Task AddServerBanAsync(ServerBanDef serverBan)
{
DbWriteOpsMetric.Inc();
return _db.AddServerBanAsync(serverBan);
return RunDbCommand(() => _db.AddServerBanAsync(serverBan));
}
public Task AddServerUnbanAsync(ServerUnbanDef serverUnban)
{
DbWriteOpsMetric.Inc();
return _db.AddServerUnbanAsync(serverUnban);
return RunDbCommand(() => _db.AddServerUnbanAsync(serverUnban));
}
public Task UpdateBanExemption(NetUserId userId, ServerBanExemptFlags flags)
@@ -386,7 +399,7 @@ namespace Content.Server.Database
public Task<ServerRoleBanDef?> GetServerRoleBanAsync(int id)
{
DbReadOpsMetric.Inc();
return _db.GetServerRoleBanAsync(id);
return RunDbCommand(() => _db.GetServerRoleBanAsync(id));
}
public Task<List<ServerRoleBanDef>> GetServerRoleBansAsync(
@@ -396,19 +409,19 @@ namespace Content.Server.Database
bool includeUnbanned = true)
{
DbReadOpsMetric.Inc();
return _db.GetServerRoleBansAsync(address, userId, hwId, includeUnbanned);
return RunDbCommand(() => _db.GetServerRoleBansAsync(address, userId, hwId, includeUnbanned));
}
public Task AddServerRoleBanAsync(ServerRoleBanDef serverRoleBan)
{
DbWriteOpsMetric.Inc();
return _db.AddServerRoleBanAsync(serverRoleBan);
return RunDbCommand(() => _db.AddServerRoleBanAsync(serverRoleBan));
}
public Task AddServerRoleUnbanAsync(ServerRoleUnbanDef serverRoleUnban)
{
DbWriteOpsMetric.Inc();
return _db.AddServerRoleUnbanAsync(serverRoleUnban);
return RunDbCommand(() => _db.AddServerRoleUnbanAsync(serverRoleUnban));
}
#endregion
@@ -417,13 +430,13 @@ namespace Content.Server.Database
public Task<List<PlayTime>> GetPlayTimes(Guid player)
{
DbReadOpsMetric.Inc();
return _db.GetPlayTimes(player);
return RunDbCommand(() => _db.GetPlayTimes(player));
}
public Task UpdatePlayTimes(IReadOnlyCollection<PlayTimeUpdate> updates)
{
DbWriteOpsMetric.Inc();
return _db.UpdatePlayTimes(updates);
return RunDbCommand(() => _db.UpdatePlayTimes(updates));
}
#endregion
@@ -435,19 +448,19 @@ namespace Content.Server.Database
ImmutableArray<byte> hwId)
{
DbWriteOpsMetric.Inc();
return _db.UpdatePlayerRecord(userId, userName, address, hwId);
return RunDbCommand(() => _db.UpdatePlayerRecord(userId, userName, address, hwId));
}
public Task<PlayerRecord?> GetPlayerRecordByUserName(string userName, CancellationToken cancel = default)
{
DbReadOpsMetric.Inc();
return _db.GetPlayerRecordByUserName(userName, cancel);
return RunDbCommand(() => _db.GetPlayerRecordByUserName(userName, cancel));
}
public Task<PlayerRecord?> GetPlayerRecordByUserId(NetUserId userId, CancellationToken cancel = default)
{
DbReadOpsMetric.Inc();
return _db.GetPlayerRecordByUserId(userId, cancel);
return RunDbCommand(() => _db.GetPlayerRecordByUserId(userId, cancel));
}
public Task<int> AddConnectionLogAsync(
@@ -458,91 +471,91 @@ namespace Content.Server.Database
ConnectionDenyReason? denied)
{
DbWriteOpsMetric.Inc();
return _db.AddConnectionLogAsync(userId, userName, address, hwId, denied);
return RunDbCommand(() => _db.AddConnectionLogAsync(userId, userName, address, hwId, denied));
}
public Task AddServerBanHitsAsync(int connection, IEnumerable<ServerBanDef> bans)
{
DbWriteOpsMetric.Inc();
return _db.AddServerBanHitsAsync(connection, bans);
return RunDbCommand(() => _db.AddServerBanHitsAsync(connection, bans));
}
public Task<Admin?> GetAdminDataForAsync(NetUserId userId, CancellationToken cancel = default)
{
DbReadOpsMetric.Inc();
return _db.GetAdminDataForAsync(userId, cancel);
return RunDbCommand(() => _db.GetAdminDataForAsync(userId, cancel));
}
public Task<AdminRank?> GetAdminRankAsync(int id, CancellationToken cancel = default)
{
DbReadOpsMetric.Inc();
return _db.GetAdminRankDataForAsync(id, cancel);
return RunDbCommand(() => _db.GetAdminRankDataForAsync(id, cancel));
}
public Task<((Admin, string? lastUserName)[] admins, AdminRank[])> GetAllAdminAndRanksAsync(
CancellationToken cancel = default)
{
DbReadOpsMetric.Inc();
return _db.GetAllAdminAndRanksAsync(cancel);
return RunDbCommand(() => _db.GetAllAdminAndRanksAsync(cancel));
}
public Task RemoveAdminAsync(NetUserId userId, CancellationToken cancel = default)
{
DbWriteOpsMetric.Inc();
return _db.RemoveAdminAsync(userId, cancel);
return RunDbCommand(() => _db.RemoveAdminAsync(userId, cancel));
}
public Task AddAdminAsync(Admin admin, CancellationToken cancel = default)
{
DbWriteOpsMetric.Inc();
return _db.AddAdminAsync(admin, cancel);
return RunDbCommand(() => _db.AddAdminAsync(admin, cancel));
}
public Task UpdateAdminAsync(Admin admin, CancellationToken cancel = default)
{
DbWriteOpsMetric.Inc();
return _db.UpdateAdminAsync(admin, cancel);
return RunDbCommand(() => _db.UpdateAdminAsync(admin, cancel));
}
public Task RemoveAdminRankAsync(int rankId, CancellationToken cancel = default)
{
DbWriteOpsMetric.Inc();
return _db.RemoveAdminRankAsync(rankId, cancel);
return RunDbCommand(() => _db.RemoveAdminRankAsync(rankId, cancel));
}
public Task AddAdminRankAsync(AdminRank rank, CancellationToken cancel = default)
{
DbWriteOpsMetric.Inc();
return _db.AddAdminRankAsync(rank, cancel);
return RunDbCommand(() => _db.AddAdminRankAsync(rank, cancel));
}
public Task<int> AddNewRound(Server server, params Guid[] playerIds)
{
DbWriteOpsMetric.Inc();
return _db.AddNewRound(server, playerIds);
return RunDbCommand(() => _db.AddNewRound(server, playerIds));
}
public Task<Round> GetRound(int id)
{
DbReadOpsMetric.Inc();
return _db.GetRound(id);
return RunDbCommand(() => _db.GetRound(id));
}
public Task AddRoundPlayers(int id, params Guid[] playerIds)
{
DbWriteOpsMetric.Inc();
return _db.AddRoundPlayers(id, playerIds);
return RunDbCommand(() => _db.AddRoundPlayers(id, playerIds));
}
public Task UpdateAdminRankAsync(AdminRank rank, CancellationToken cancel = default)
{
DbWriteOpsMetric.Inc();
return _db.UpdateAdminRankAsync(rank, cancel);
return RunDbCommand(() => _db.UpdateAdminRankAsync(rank, cancel));
}
public async Task<Server> AddOrGetServer(string serverName)
{
var (server, existed) = await _db.AddOrGetServer(serverName);
var (server, existed) = await RunDbCommand(() => _db.AddOrGetServer(serverName));
if (existed)
DbReadOpsMetric.Inc();
else
@@ -554,7 +567,7 @@ namespace Content.Server.Database
public Task AddAdminLogs(List<QueuedLog> logs)
{
DbWriteOpsMetric.Inc();
return _db.AddAdminLogs(logs);
return RunDbCommand(() => _db.AddAdminLogs(logs));
}
public IAsyncEnumerable<string> GetAdminLogMessages(LogFilter? filter = null)
@@ -578,43 +591,43 @@ namespace Content.Server.Database
public Task<bool> GetWhitelistStatusAsync(NetUserId player)
{
DbReadOpsMetric.Inc();
return _db.GetWhitelistStatusAsync(player);
return RunDbCommand(() => _db.GetWhitelistStatusAsync(player));
}
public Task AddToWhitelistAsync(NetUserId player)
{
DbWriteOpsMetric.Inc();
return _db.AddToWhitelistAsync(player);
return RunDbCommand(() => _db.AddToWhitelistAsync(player));
}
public Task RemoveFromWhitelistAsync(NetUserId player)
{
DbWriteOpsMetric.Inc();
return _db.RemoveFromWhitelistAsync(player);
return RunDbCommand(() => _db.RemoveFromWhitelistAsync(player));
}
public Task AddUploadedResourceLogAsync(NetUserId user, DateTime date, string path, byte[] data)
{
DbWriteOpsMetric.Inc();
return _db.AddUploadedResourceLogAsync(user, date, path, data);
return RunDbCommand(() => _db.AddUploadedResourceLogAsync(user, date, path, data));
}
public Task PurgeUploadedResourceLogAsync(int days)
{
DbWriteOpsMetric.Inc();
return _db.PurgeUploadedResourceLogAsync(days);
return RunDbCommand(() => _db.PurgeUploadedResourceLogAsync(days));
}
public Task<DateTime?> GetLastReadRules(NetUserId player)
{
DbReadOpsMetric.Inc();
return _db.GetLastReadRules(player);
return RunDbCommand(() => _db.GetLastReadRules(player));
}
public Task SetLastReadRules(NetUserId player, DateTime time)
{
DbWriteOpsMetric.Inc();
return _db.SetLastReadRules(player, time);
return RunDbCommand(() => _db.SetLastReadRules(player, time));
}
public Task<int> AddAdminNote(int? roundId, Guid player, string message, Guid createdBy, DateTime createdAt)
@@ -631,31 +644,55 @@ namespace Content.Server.Database
LastEditedAt = createdAt
};
return _db.AddAdminNote(note);
return RunDbCommand(() => _db.AddAdminNote(note));
}
public Task<AdminNote?> GetAdminNote(int id)
{
DbReadOpsMetric.Inc();
return _db.GetAdminNote(id);
return RunDbCommand(() => _db.GetAdminNote(id));
}
public Task<List<AdminNote>> GetAdminNotes(Guid player)
{
DbReadOpsMetric.Inc();
return _db.GetAdminNotes(player);
return RunDbCommand(() => _db.GetAdminNotes(player));
}
public Task DeleteAdminNote(int id, Guid deletedBy, DateTime deletedAt)
{
DbWriteOpsMetric.Inc();
return _db.DeleteAdminNote(id, deletedBy, deletedAt);
return RunDbCommand(() => _db.DeleteAdminNote(id, deletedBy, deletedAt));
}
public Task EditAdminNote(int id, string message, Guid editedBy, DateTime editedAt)
{
DbWriteOpsMetric.Inc();
return _db.EditAdminNote(id, message, editedBy, editedAt);
return RunDbCommand(() => _db.EditAdminNote(id, message, editedBy, editedAt));
}
// Wrapper functions to run DB commands from the thread pool.
// This will avoid SynchronizationContext capturing and avoid running CPU work on the main thread.
// For SQLite, this will also enable read parallelization (within limits).
//
// If we're configured to be synchronous (for integration tests) we shouldn't thread pool it,
// as that would make things very random and undeterministic.
// That only works on SQLite though, since SQLite is internally synchronous anyways.
private Task<T> RunDbCommand<T>(Func<Task<T>> command)
{
if (_synchronous)
return command();
return Task.Run(command);
}
private Task RunDbCommand(Func<Task> command)
{
if (_synchronous)
return command();
return Task.Run(command);
}
private DbContextOptions<PostgresServerDbContext> CreatePostgresOptions()
@@ -683,35 +720,42 @@ namespace Content.Server.Database
return builder.Options;
}
private DbContextOptions<SqliteServerDbContext> CreateSqliteOptions()
private void SetupSqlite(out Func<DbContextOptions<SqliteServerDbContext>> contextFunc, out bool inMemory)
{
var builder = new DbContextOptionsBuilder<SqliteServerDbContext>();
var configPreferencesDbPath = _cfg.GetCVar(CCVars.DatabaseSqliteDbPath);
var inMemory = _res.UserData.RootDir == null;
#if USE_SYSTEM_SQLITE
SQLitePCL.raw.SetProvider(new SQLitePCL.SQLite3Provider_sqlite3());
#endif
SqliteConnection connection;
// Can't re-use the SqliteConnection across multiple threads, so we have to make it every time.
Func<SqliteConnection> getConnection;
var configPreferencesDbPath = _cfg.GetCVar(CCVars.DatabaseSqliteDbPath);
inMemory = _res.UserData.RootDir == null;
if (!inMemory)
{
var finalPreferencesDbPath = Path.Combine(_res.UserData.RootDir!, configPreferencesDbPath);
Logger.DebugS("db.manager", $"Using SQLite DB \"{finalPreferencesDbPath}\"");
connection = new SqliteConnection($"Data Source={finalPreferencesDbPath}");
getConnection = () => new SqliteConnection($"Data Source={finalPreferencesDbPath}");
}
else
{
Logger.DebugS("db.manager", $"Using in-memory SQLite DB");
connection = new SqliteConnection("Data Source=:memory:");
Logger.DebugS("db.manager", "Using in-memory SQLite DB");
_sqliteInMemoryConnection = new SqliteConnection("Data Source=:memory:");
// When using an in-memory DB we have to open it manually
// so EFCore doesn't open, close and wipe it.
connection.Open();
// so EFCore doesn't open, close and wipe it every operation.
_sqliteInMemoryConnection.Open();
getConnection = () => _sqliteInMemoryConnection;
}
builder.UseSqlite(connection);
SetupLogging(builder);
return builder.Options;
contextFunc = () =>
{
var builder = new DbContextOptionsBuilder<SqliteServerDbContext>();
builder.UseSqlite(getConnection());
SetupLogging(builder);
return builder.Options;
};
}
private void SetupLogging(DbContextOptionsBuilder builder)