Update content to new ParallelManager (#21813)

This commit is contained in:
metalgearsloth
2023-11-29 11:00:12 +11:00
committed by GitHub
parent 03ed3ff37c
commit 7ada1c6674
9 changed files with 221 additions and 90 deletions

View File

@@ -124,7 +124,6 @@ public sealed class InstrumentSystem : SharedInstrumentSystem
instrument.SequenceDelay = 0;
instrument.SequenceStartTick = 0;
_midiManager.OcclusionCollisionMask = (int) CollisionGroup.Impassable;
instrument.Renderer = _midiManager.GetNewRenderer();
if (instrument.Renderer != null)

View File

@@ -36,6 +36,12 @@ namespace Content.Server.Atmos.EntitySystems
[Robust.Shared.IoC.Dependency] private readonly AtmosphereSystem _atmosphereSystem = default!;
[Robust.Shared.IoC.Dependency] private readonly ChunkingSystem _chunkingSys = default!;
/// <summary>
/// Per-tick cache of sessions.
/// </summary>
private readonly List<ICommonSession> _sessions = new();
private UpdatePlayerJob _updateJob;
private readonly Dictionary<ICommonSession, Dictionary<NetEntity, HashSet<Vector2i>>> _lastSentChunks = new();
// Oh look its more duplicated decal system code!
@@ -56,6 +62,19 @@ namespace Content.Server.Atmos.EntitySystems
public override void Initialize()
{
base.Initialize();
_updateJob = new UpdatePlayerJob()
{
EntManager = EntityManager,
System = this,
ChunkIndexPool = _chunkIndexPool,
Sessions = _sessions,
ChunkingSys = _chunkingSys,
MapManager = _mapManager,
ChunkViewerPool = _chunkViewerPool,
LastSentChunks = _lastSentChunks,
};
_playerManager.PlayerStatusChanged += OnPlayerStatusChanged;
_confMan.OnValueChanged(CCVars.NetGasOverlayTickRate, UpdateTickRate, true);
_confMan.OnValueChanged(CCVars.GasOverlayThresholds, UpdateThresholds, true);
@@ -69,7 +88,7 @@ namespace Content.Server.Atmos.EntitySystems
{
// This **shouldn't** be required, but just in case we ever get entity prototypes that have gas overlays, we
// need to ensure that we send an initial full state to players.
Dirty(component);
Dirty(uid, component);
}
public override void Shutdown()
@@ -287,87 +306,21 @@ namespace Content.Server.Atmos.EntitySystems
// Now we'll go through each player, then through each chunk in range of that player checking if the player is still in range
// If they are, check if they need the new data to send (i.e. if there's an overlay for the gas).
// Afterwards we reset all the chunk data for the next time we tick.
var players = _playerManager.Sessions.Where(x => x.Status == SessionStatus.InGame).ToArray();
var opts = new ParallelOptions { MaxDegreeOfParallelism = _parMan.ParallelProcessCount };
Parallel.ForEach(players, opts, p => UpdatePlayer(p, curTick));
}
_sessions.Clear();
private void UpdatePlayer(ICommonSession playerSession, GameTick curTick)
{
var chunksInRange = _chunkingSys.GetChunksForSession(playerSession, ChunkSize, _chunkIndexPool, _chunkViewerPool);
var previouslySent = _lastSentChunks[playerSession];
var ev = new GasOverlayUpdateEvent();
foreach (var (netGrid, oldIndices) in previouslySent)
foreach (var player in _playerManager.Sessions)
{
// Mark the whole grid as stale and flag for removal.
if (!chunksInRange.TryGetValue(netGrid, out var chunks))
{
previouslySent.Remove(netGrid);
// If grid was deleted then don't worry about sending it to the client.
if (!TryGetEntity(netGrid, out var gridId) || !_mapManager.IsGrid(gridId.Value))
ev.RemovedChunks[netGrid] = oldIndices;
else
{
oldIndices.Clear();
_chunkIndexPool.Return(oldIndices);
}
continue;
}
var old = _chunkIndexPool.Get();
DebugTools.Assert(old.Count == 0);
foreach (var chunk in oldIndices)
{
if (!chunks.Contains(chunk))
old.Add(chunk);
}
if (old.Count == 0)
_chunkIndexPool.Return(old);
else
ev.RemovedChunks.Add(netGrid, old);
}
foreach (var (netGrid, gridChunks) in chunksInRange)
{
// Not all grids have atmospheres.
if (!TryGetEntity(netGrid, out var grid) || !TryComp(grid, out GasTileOverlayComponent? overlay))
if (player.Status != SessionStatus.InGame)
continue;
List<GasOverlayChunk> dataToSend = new();
ev.UpdatedChunks[netGrid] = dataToSend;
previouslySent.TryGetValue(netGrid, out var previousChunks);
foreach (var index in gridChunks)
{
if (!overlay.Chunks.TryGetValue(index, out var value))
continue;
if (previousChunks != null &&
previousChunks.Contains(index) &&
value.LastUpdate != curTick)
{
continue;
}
dataToSend.Add(value);
}
previouslySent[netGrid] = gridChunks;
if (previousChunks != null)
{
previousChunks.Clear();
_chunkIndexPool.Return(previousChunks);
}
_sessions.Add(player);
}
if (ev.UpdatedChunks.Count != 0 || ev.RemovedChunks.Count != 0)
RaiseNetworkEvent(ev, playerSession.ConnectedClient);
if (_sessions.Count > 0)
{
_updateJob.CurrentTick = curTick;
_parMan.ProcessNow(_updateJob, _sessions.Count);
}
}
public void Reset(RoundRestartCleanupEvent ev)
@@ -383,5 +336,107 @@ namespace Content.Server.Atmos.EntitySystems
data.Clear();
}
}
#region Jobs
/// <summary>
/// Updates per player gas overlay data.
/// </summary>
private record struct UpdatePlayerJob : IParallelRobustJob
{
public int BatchSize => 2;
public IEntityManager EntManager;
public IMapManager MapManager;
public ChunkingSystem ChunkingSys;
public GasTileOverlaySystem System;
public ObjectPool<HashSet<Vector2i>> ChunkIndexPool;
public ObjectPool<Dictionary<NetEntity, HashSet<Vector2i>>> ChunkViewerPool;
public GameTick CurrentTick;
public Dictionary<ICommonSession, Dictionary<NetEntity, HashSet<Vector2i>>> LastSentChunks;
public List<ICommonSession> Sessions;
public void Execute(int index)
{
var playerSession = Sessions[index];
var chunksInRange = ChunkingSys.GetChunksForSession(playerSession, ChunkSize, ChunkIndexPool, ChunkViewerPool);
var previouslySent = LastSentChunks[playerSession];
var ev = new GasOverlayUpdateEvent();
foreach (var (netGrid, oldIndices) in previouslySent)
{
// Mark the whole grid as stale and flag for removal.
if (!chunksInRange.TryGetValue(netGrid, out var chunks))
{
previouslySent.Remove(netGrid);
// If grid was deleted then don't worry about sending it to the client.
if (!EntManager.TryGetEntity(netGrid, out var gridId) || !MapManager.IsGrid(gridId.Value))
ev.RemovedChunks[netGrid] = oldIndices;
else
{
oldIndices.Clear();
ChunkIndexPool.Return(oldIndices);
}
continue;
}
var old = ChunkIndexPool.Get();
DebugTools.Assert(old.Count == 0);
foreach (var chunk in oldIndices)
{
if (!chunks.Contains(chunk))
old.Add(chunk);
}
if (old.Count == 0)
ChunkIndexPool.Return(old);
else
ev.RemovedChunks.Add(netGrid, old);
}
foreach (var (netGrid, gridChunks) in chunksInRange)
{
// Not all grids have atmospheres.
if (!EntManager.TryGetEntity(netGrid, out var grid) || !EntManager.TryGetComponent(grid, out GasTileOverlayComponent? overlay))
continue;
List<GasOverlayChunk> dataToSend = new();
ev.UpdatedChunks[netGrid] = dataToSend;
previouslySent.TryGetValue(netGrid, out var previousChunks);
foreach (var gIndex in gridChunks)
{
if (!overlay.Chunks.TryGetValue(gIndex, out var value))
continue;
if (previousChunks != null &&
previousChunks.Contains(gIndex) &&
value.LastUpdate != CurrentTick)
{
continue;
}
dataToSend.Add(value);
}
previouslySent[netGrid] = gridChunks;
if (previousChunks != null)
{
previousChunks.Clear();
ChunkIndexPool.Return(previousChunks);
}
}
if (ev.UpdatedChunks.Count != 0 || ev.RemovedChunks.Count != 0)
System.RaiseNetworkEvent(ev, playerSession.Channel);
}
}
#endregion
}
}

View File

@@ -41,6 +41,9 @@ namespace Content.Server.Decals
private static readonly Vector2 _boundsMinExpansion = new(0.01f, 0.01f);
private static readonly Vector2 _boundsMaxExpansion = new(1.01f, 1.01f);
private UpdatePlayerJob _updateJob;
private List<ICommonSession> _sessions = new();
// If this ever gets parallelised then you'll want to increase the pooled count.
private ObjectPool<HashSet<Vector2i>> _chunkIndexPool =
new DefaultObjectPool<HashSet<Vector2i>>(
@@ -54,6 +57,12 @@ namespace Content.Server.Decals
{
base.Initialize();
_updateJob = new UpdatePlayerJob()
{
System = this,
Sessions = _sessions,
};
_playerManager.PlayerStatusChanged += OnPlayerStatusChanged;
SubscribeLocalEvent<TileChangedEvent>(OnTileChanged);
@@ -428,9 +437,18 @@ namespace Content.Server.Decals
if (PvsEnabled)
{
var players = _playerManager.Sessions.Where(x => x.Status == SessionStatus.InGame).ToArray();
var opts = new ParallelOptions { MaxDegreeOfParallelism = _parMan.ParallelProcessCount };
Parallel.ForEach(players, opts, UpdatePlayer);
_sessions.Clear();
foreach (var session in _playerManager.Sessions)
{
if (session.Status != SessionStatus.InGame)
continue;
_sessions.Add(session);
}
if (_sessions.Count > 0)
_parMan.ProcessNow(_updateJob, _sessions.Count);
}
_dirtyChunks.Clear();
@@ -564,5 +582,26 @@ namespace Content.Server.Decals
ReturnToPool(updatedChunks);
ReturnToPool(staleChunks);
}
#region Jobs
/// <summary>
/// Updates per-player data for decals.
/// </summary>
private record struct UpdatePlayerJob : IParallelRobustJob
{
public int BatchSize => 2;
public DecalSystem System;
public List<ICommonSession> Sessions;
public void Execute(int index)
{
System.UpdatePlayer(Sessions[index]);
}
}
#endregion
}
}

View File

@@ -259,7 +259,7 @@ namespace Content.Server.Power.EntitySystems
RaiseLocalEvent(new NetworkBatteryPreSync());
// Run power solver.
_solver.Tick(frameTime, _powerState, _parMan.ParallelProcessCount);
_solver.Tick(frameTime, _powerState, _parMan);
// Synchronize batteries, the other way around.
RaiseLocalEvent(new NetworkBatteryPostSync());

View File

@@ -1,14 +1,22 @@
using Pidgin;
using Robust.Shared.Utility;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;
using Robust.Shared.Threading;
using static Content.Server.Power.Pow3r.PowerState;
namespace Content.Server.Power.Pow3r
{
public sealed class BatteryRampPegSolver : IPowerSolver
{
private UpdateNetworkJob _networkJob;
public BatteryRampPegSolver()
{
_networkJob = new()
{
Solver = this,
};
}
private sealed class HeightComparer : Comparer<Network>
{
public static HeightComparer Instance { get; } = new();
@@ -21,15 +29,16 @@ namespace Content.Server.Power.Pow3r
}
}
public void Tick(float frameTime, PowerState state, int parallel)
public void Tick(float frameTime, PowerState state, IParallelManager parallel)
{
ClearLoadsAndSupplies(state);
state.GroupedNets ??= GroupByNetworkDepth(state);
DebugTools.Assert(state.GroupedNets.Select(x => x.Count).Sum() == state.Networks.Count);
_networkJob.State = state;
_networkJob.FrameTime = frameTime;
// Each network height layer can be run in parallel without issues.
var opts = new ParallelOptions { MaxDegreeOfParallelism = parallel };
foreach (var group in state.GroupedNets)
{
// Note that many net-layers only have a handful of networks.
@@ -44,7 +53,8 @@ namespace Content.Server.Power.Pow3r
// TODO make GroupByNetworkDepth evaluate the TOTAL size of each layer (i.e. loads + chargers +
// suppliers + discharger) Then decide based on total layer size whether its worth parallelizing that
// layer?
Parallel.ForEach(group, opts, net => UpdateNetwork(net, state, frameTime));
_networkJob.Networks = group;
parallel.ProcessNow(_networkJob, group.Count);
}
ClearBatteries(state);
@@ -344,5 +354,24 @@ namespace Content.Server.Power.Pow3r
else
groupedNetworks[network.Height].Add(network);
}
#region Jobs
private record struct UpdateNetworkJob : IParallelRobustJob
{
public int BatchSize => 4;
public BatteryRampPegSolver Solver;
public PowerState State;
public float FrameTime;
public List<Network> Networks;
public void Execute(int index)
{
Solver.UpdateNetwork(Networks[index], State, FrameTime);
}
}
#endregion
}
}

View File

@@ -1,7 +1,9 @@
using Robust.Shared.Threading;
namespace Content.Server.Power.Pow3r
{
public interface IPowerSolver
{
void Tick(float frameTime, PowerState state, int parallel);
void Tick(float frameTime, PowerState state, IParallelManager parallel);
}
}

View File

@@ -1,8 +1,10 @@
using Robust.Shared.Threading;
namespace Content.Server.Power.Pow3r
{
public sealed class NoOpSolver : IPowerSolver
{
public void Tick(float frameTime, PowerState state, int parallel)
public void Tick(float frameTime, PowerState state, IParallelManager parallel)
{
// Literally nothing.
}

View File

@@ -15,6 +15,7 @@
<ItemGroup>
<ProjectReference Include="..\Content.Server\Content.Server.csproj" />
<ProjectReference Include="..\RobustToolbox\Robust.Shared.Maths\Robust.Shared.Maths.csproj" />
<ProjectReference Include="..\RobustToolbox\Robust.UnitTesting\Robust.UnitTesting.csproj" />
</ItemGroup>
<Import Project="..\RobustToolbox\MSBuild\Robust.Properties.targets" />

View File

@@ -1,6 +1,8 @@
using System.Collections.Generic;
using System.Diagnostics;
using Content.Server.Power.Pow3r;
using Robust.Shared.Threading;
using Robust.UnitTesting;
using static Content.Server.Power.Pow3r.PowerState;
@@ -32,6 +34,8 @@ namespace Pow3r
private readonly Queue<object> _remQueue = new();
private readonly Stopwatch _simStopwatch = new Stopwatch();
private IParallelManager _parallel = new TestingParallelManager();
private void Tick(float frameTime)
{
if (_paused)
@@ -45,7 +49,7 @@ namespace Pow3r
_simStopwatch.Restart();
_tickDataIdx = (_tickDataIdx + 1) % MaxTickData;
_solvers[_currentSolver].Tick(frameTime, _state, 1);
_solvers[_currentSolver].Tick(frameTime, _state, _parallel);
// Update tick history.
foreach (var load in _state.Loads.Values)