Move jobQueue to engine (#16443)
This commit is contained in:
@@ -1,8 +0,0 @@
|
|||||||
namespace Content.Server.CPUJob.JobQueues
|
|
||||||
{
|
|
||||||
public interface IJob
|
|
||||||
{
|
|
||||||
JobStatus Status { get; }
|
|
||||||
void Run();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,230 +0,0 @@
|
|||||||
using System.Threading;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
using Robust.Shared.Timing;
|
|
||||||
using Robust.Shared.Utility;
|
|
||||||
|
|
||||||
namespace Content.Server.CPUJob.JobQueues
|
|
||||||
{
|
|
||||||
/// <summary>
|
|
||||||
/// CPU-intensive job that can be suspended and resumed on the main thread
|
|
||||||
/// </summary>
|
|
||||||
/// <remarks>
|
|
||||||
/// Implementations should overload <see cref="Process"/>.
|
|
||||||
/// Inside <see cref="Process"/>, implementations should only await on <see cref="SuspendNow"/>,
|
|
||||||
/// <see cref="SuspendIfOutOfTime"/>, or <see cref="WaitAsyncTask"/>.
|
|
||||||
/// </remarks>
|
|
||||||
/// <typeparam name="T">The type of result this job generates</typeparam>
|
|
||||||
public abstract class Job<T> : IJob
|
|
||||||
{
|
|
||||||
public JobStatus Status { get; private set; } = JobStatus.Pending;
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// Represents the status of this job as a regular task.
|
|
||||||
/// </summary>
|
|
||||||
public Task<T?> AsTask { get; }
|
|
||||||
|
|
||||||
public T? Result { get; private set; }
|
|
||||||
public Exception? Exception { get; private set; }
|
|
||||||
protected CancellationToken Cancellation { get; }
|
|
||||||
|
|
||||||
public double DebugTime { get; private set; }
|
|
||||||
private readonly double _maxTime;
|
|
||||||
protected readonly IStopwatch StopWatch;
|
|
||||||
|
|
||||||
// TCS for the Task property.
|
|
||||||
private readonly TaskCompletionSource<T?> _taskTcs;
|
|
||||||
|
|
||||||
// TCS to call to resume the suspended job.
|
|
||||||
private TaskCompletionSource<object?>? _resume;
|
|
||||||
private Task? _workInProgress;
|
|
||||||
|
|
||||||
protected Job(double maxTime, CancellationToken cancellation = default)
|
|
||||||
: this(maxTime, new Stopwatch(), cancellation)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
protected Job(double maxTime, IStopwatch stopwatch, CancellationToken cancellation = default)
|
|
||||||
{
|
|
||||||
_maxTime = maxTime;
|
|
||||||
StopWatch = stopwatch;
|
|
||||||
Cancellation = cancellation;
|
|
||||||
|
|
||||||
_taskTcs = new TaskCompletionSource<T?>();
|
|
||||||
AsTask = _taskTcs.Task;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// Suspends the current task immediately, yielding to other running jobs.
|
|
||||||
/// </summary>
|
|
||||||
/// <remarks>
|
|
||||||
/// This does not stop the job queue from un-suspending the current task immediately again,
|
|
||||||
/// if there is still time left over.
|
|
||||||
/// </remarks>
|
|
||||||
protected Task SuspendNow()
|
|
||||||
{
|
|
||||||
DebugTools.AssertNull(_resume);
|
|
||||||
|
|
||||||
_resume = new TaskCompletionSource<object?>();
|
|
||||||
Status = JobStatus.Paused;
|
|
||||||
DebugTime += StopWatch.Elapsed.TotalSeconds;
|
|
||||||
return _resume.Task;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected ValueTask SuspendIfOutOfTime()
|
|
||||||
{
|
|
||||||
DebugTools.AssertNull(_resume);
|
|
||||||
|
|
||||||
// ReSharper disable once CompareOfFloatsByEqualityOperator
|
|
||||||
if (StopWatch.Elapsed.TotalSeconds <= _maxTime || _maxTime == 0.0)
|
|
||||||
{
|
|
||||||
return new ValueTask();
|
|
||||||
}
|
|
||||||
|
|
||||||
return new ValueTask(SuspendNow());
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// Wrapper to await on an external task.
|
|
||||||
/// </summary>
|
|
||||||
protected async Task<TTask> WaitAsyncTask<TTask>(Task<TTask> task)
|
|
||||||
{
|
|
||||||
DebugTools.AssertNull(_resume);
|
|
||||||
|
|
||||||
Status = JobStatus.Waiting;
|
|
||||||
DebugTime += StopWatch.Elapsed.TotalSeconds;
|
|
||||||
|
|
||||||
var result = await task;
|
|
||||||
|
|
||||||
// Immediately block on resume so that everything stays correct.
|
|
||||||
Status = JobStatus.Paused;
|
|
||||||
_resume = new TaskCompletionSource<object?>();
|
|
||||||
|
|
||||||
await _resume.Task;
|
|
||||||
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// Wrapper to safely await on an external task.
|
|
||||||
/// </summary>
|
|
||||||
protected async Task WaitAsyncTask(Task task)
|
|
||||||
{
|
|
||||||
DebugTools.AssertNull(_resume);
|
|
||||||
|
|
||||||
Status = JobStatus.Waiting;
|
|
||||||
DebugTime += StopWatch.Elapsed.TotalSeconds;
|
|
||||||
|
|
||||||
await task;
|
|
||||||
|
|
||||||
// Immediately block on resume so that everything stays correct.
|
|
||||||
_resume = new TaskCompletionSource<object?>();
|
|
||||||
Status = JobStatus.Paused;
|
|
||||||
|
|
||||||
await _resume.Task;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void Run()
|
|
||||||
{
|
|
||||||
StopWatch.Restart();
|
|
||||||
_workInProgress ??= ProcessWrap();
|
|
||||||
|
|
||||||
if (Status == JobStatus.Finished)
|
|
||||||
{
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
DebugTools.Assert(_resume != null,
|
|
||||||
"Run() called without resume. Was this called while the job is in Waiting state?");
|
|
||||||
var resume = _resume;
|
|
||||||
_resume = null;
|
|
||||||
|
|
||||||
Status = JobStatus.Running;
|
|
||||||
|
|
||||||
if (Cancellation.IsCancellationRequested)
|
|
||||||
{
|
|
||||||
resume?.TrySetCanceled();
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
resume?.SetResult(null);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (Status != JobStatus.Finished && Status != JobStatus.Waiting)
|
|
||||||
{
|
|
||||||
DebugTools.Assert(_resume != null,
|
|
||||||
"Job suspended without _resume set. Did you await on an external task without using WaitAsyncTask?");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
protected abstract Task<T?> Process();
|
|
||||||
|
|
||||||
private async Task ProcessWrap()
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
Cancellation.ThrowIfCancellationRequested();
|
|
||||||
|
|
||||||
// Making sure that the task starts inside the Running block,
|
|
||||||
// where the stopwatch is correctly set and such.
|
|
||||||
await SuspendNow();
|
|
||||||
Result = await Process();
|
|
||||||
|
|
||||||
// TODO: not sure if it makes sense to connect Task directly up
|
|
||||||
// to the return value of this method/Process.
|
|
||||||
// Maybe?
|
|
||||||
_taskTcs.TrySetResult(Result);
|
|
||||||
}
|
|
||||||
catch (OperationCanceledException)
|
|
||||||
{
|
|
||||||
_taskTcs.TrySetCanceled();
|
|
||||||
}
|
|
||||||
catch (Exception e)
|
|
||||||
{
|
|
||||||
// TODO: Should this be exposed differently?
|
|
||||||
// I feel that people might forget to check whether the job failed.
|
|
||||||
Logger.ErrorS("job", "Job failed on exception:\n{0}", e);
|
|
||||||
Exception = e;
|
|
||||||
_taskTcs.TrySetException(e);
|
|
||||||
}
|
|
||||||
finally
|
|
||||||
{
|
|
||||||
if (Status != JobStatus.Waiting)
|
|
||||||
{
|
|
||||||
// If we're blocked on waiting and the waiting task goes cancel/exception,
|
|
||||||
// this timing info would not be correct.
|
|
||||||
DebugTime += StopWatch.Elapsed.TotalSeconds;
|
|
||||||
}
|
|
||||||
Status = JobStatus.Finished;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public enum JobStatus
|
|
||||||
{
|
|
||||||
/// <summary>
|
|
||||||
/// Job has been created and has not been ran yet.
|
|
||||||
/// </summary>
|
|
||||||
Pending,
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// Job is currently (yes, right now!) executing.
|
|
||||||
/// </summary>
|
|
||||||
Running,
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// Job is paused due to CPU limits.
|
|
||||||
/// </summary>
|
|
||||||
Paused,
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// Job is paused because of waiting on external task.
|
|
||||||
/// </summary>
|
|
||||||
Waiting,
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// Job is done.
|
|
||||||
/// </summary>
|
|
||||||
// TODO: Maybe have a different status code for cancelled/failed on exception?
|
|
||||||
Finished,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,76 +0,0 @@
|
|||||||
using Robust.Shared.Timing;
|
|
||||||
|
|
||||||
namespace Content.Server.CPUJob.JobQueues.Queues
|
|
||||||
{
|
|
||||||
[Virtual]
|
|
||||||
public class JobQueue
|
|
||||||
{
|
|
||||||
private readonly IStopwatch _stopwatch;
|
|
||||||
|
|
||||||
public JobQueue(double maxTime) : this(new Stopwatch())
|
|
||||||
{
|
|
||||||
MaxTime = maxTime;
|
|
||||||
}
|
|
||||||
|
|
||||||
public JobQueue() : this(new Stopwatch()) {}
|
|
||||||
|
|
||||||
public JobQueue(IStopwatch stopwatch)
|
|
||||||
{
|
|
||||||
_stopwatch = stopwatch;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// How long the job's allowed to run for before suspending
|
|
||||||
/// </summary>
|
|
||||||
public virtual double MaxTime { get; } = 0.002;
|
|
||||||
|
|
||||||
private readonly Queue<IJob> _pendingQueue = new();
|
|
||||||
private readonly List<IJob> _waitingJobs = new();
|
|
||||||
|
|
||||||
public void EnqueueJob(IJob job)
|
|
||||||
{
|
|
||||||
_pendingQueue.Enqueue(job);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void Process()
|
|
||||||
{
|
|
||||||
// Move all finished waiting jobs back into the regular queue.
|
|
||||||
foreach (var waitingJob in _waitingJobs)
|
|
||||||
{
|
|
||||||
if (waitingJob.Status != JobStatus.Waiting)
|
|
||||||
{
|
|
||||||
_pendingQueue.Enqueue(waitingJob);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
_waitingJobs.RemoveAll(p => p.Status != JobStatus.Waiting);
|
|
||||||
|
|
||||||
// At one point I tried making the pathfinding queue multi-threaded but ehhh didn't go great
|
|
||||||
// Could probably try it again at some point
|
|
||||||
// it just seemed slow af but I was probably doing something dumb with semaphores
|
|
||||||
_stopwatch.Restart();
|
|
||||||
|
|
||||||
// Although the jobs can stop themselves we might be able to squeeze more of them in the allotted time
|
|
||||||
while (_stopwatch.Elapsed.TotalSeconds < MaxTime && _pendingQueue.TryDequeue(out var job))
|
|
||||||
{
|
|
||||||
// Deque and re-enqueue these to cycle them through to avoid starvation if we've got a lot of jobs.
|
|
||||||
|
|
||||||
job.Run();
|
|
||||||
|
|
||||||
switch (job.Status)
|
|
||||||
{
|
|
||||||
case JobStatus.Finished:
|
|
||||||
continue;
|
|
||||||
case JobStatus.Waiting:
|
|
||||||
// If this job goes into waiting we have to move it into a separate list.
|
|
||||||
// Otherwise we'd just be spinning like mad here for external IO or such.
|
|
||||||
_waitingJobs.Add(job);
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
_pendingQueue.Enqueue(job);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,4 +1,6 @@
|
|||||||
namespace Content.Server.CPUJob.JobQueues.Queues
|
using Robust.Shared.CPUJob.JobQueues.Queues;
|
||||||
|
|
||||||
|
namespace Content.Server.CPUJob.JobQueues.Queues
|
||||||
{
|
{
|
||||||
public sealed class PathfindingJobQueue : JobQueue
|
public sealed class PathfindingJobQueue : JobQueue
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
using System.Linq;
|
using System.Linq;
|
||||||
using System.Threading;
|
using System.Threading;
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
using Content.Server.CPUJob.JobQueues;
|
using Robust.Shared.CPUJob.JobQueues;
|
||||||
using Content.Server.NPC.HTN.PrimitiveTasks;
|
using Content.Server.NPC.HTN.PrimitiveTasks;
|
||||||
|
|
||||||
namespace Content.Server.NPC.HTN;
|
namespace Content.Server.NPC.HTN;
|
||||||
|
|||||||
@@ -2,8 +2,8 @@ using System.Linq;
|
|||||||
using System.Text;
|
using System.Text;
|
||||||
using System.Threading;
|
using System.Threading;
|
||||||
using Content.Server.Administration.Managers;
|
using Content.Server.Administration.Managers;
|
||||||
using Content.Server.CPUJob.JobQueues;
|
using Robust.Shared.CPUJob.JobQueues;
|
||||||
using Content.Server.CPUJob.JobQueues.Queues;
|
using Robust.Shared.CPUJob.JobQueues.Queues;
|
||||||
using Content.Server.NPC.Components;
|
using Content.Server.NPC.Components;
|
||||||
using Content.Server.NPC.HTN.PrimitiveTasks;
|
using Content.Server.NPC.HTN.PrimitiveTasks;
|
||||||
using Content.Server.NPC.Systems;
|
using Content.Server.NPC.Systems;
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
using System.Threading;
|
using System.Threading;
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
using Content.Server.Construction;
|
using Content.Server.Construction;
|
||||||
using Content.Server.CPUJob.JobQueues;
|
using Robust.Shared.CPUJob.JobQueues;
|
||||||
using Content.Server.Decals;
|
using Content.Server.Decals;
|
||||||
using Content.Shared.Procedural;
|
using Content.Shared.Procedural;
|
||||||
using Content.Shared.Procedural.DungeonGenerators;
|
using Content.Shared.Procedural.DungeonGenerators;
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
using System.Threading;
|
using System.Threading;
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
using Content.Server.Construction;
|
using Content.Server.Construction;
|
||||||
using Content.Server.CPUJob.JobQueues.Queues;
|
using Robust.Shared.CPUJob.JobQueues.Queues;
|
||||||
using Content.Server.Decals;
|
using Content.Server.Decals;
|
||||||
using Content.Server.GameTicking.Events;
|
using Content.Server.GameTicking.Events;
|
||||||
using Content.Shared.CCVar;
|
using Content.Shared.CCVar;
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
using System.Linq;
|
using System.Linq;
|
||||||
using System.Threading;
|
using System.Threading;
|
||||||
using Content.Server.CPUJob.JobQueues;
|
using Robust.Shared.CPUJob.JobQueues;
|
||||||
using Content.Server.CPUJob.JobQueues.Queues;
|
using Robust.Shared.CPUJob.JobQueues.Queues;
|
||||||
using Content.Server.Salvage.Expeditions;
|
using Content.Server.Salvage.Expeditions;
|
||||||
using Content.Server.Salvage.Expeditions.Structure;
|
using Content.Server.Salvage.Expeditions.Structure;
|
||||||
using Content.Server.Station.Systems;
|
using Content.Server.Station.Systems;
|
||||||
|
|||||||
@@ -3,7 +3,7 @@ using System.Threading;
|
|||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
using Content.Server.Atmos;
|
using Content.Server.Atmos;
|
||||||
using Content.Server.Atmos.Components;
|
using Content.Server.Atmos.Components;
|
||||||
using Content.Server.CPUJob.JobQueues;
|
using Robust.Shared.CPUJob.JobQueues;
|
||||||
using Content.Server.Ghost.Roles.Components;
|
using Content.Server.Ghost.Roles.Components;
|
||||||
using Content.Server.Parallax;
|
using Content.Server.Parallax;
|
||||||
using Content.Server.Procedural;
|
using Content.Server.Procedural;
|
||||||
|
|||||||
@@ -1,8 +1,8 @@
|
|||||||
using System;
|
using System;
|
||||||
using System.Threading;
|
using System.Threading;
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
using Content.Server.CPUJob.JobQueues;
|
using Robust.Shared.CPUJob.JobQueues;
|
||||||
using Content.Server.CPUJob.JobQueues.Queues;
|
using Robust.Shared.CPUJob.JobQueues.Queues;
|
||||||
using NUnit.Framework;
|
using NUnit.Framework;
|
||||||
using Robust.Shared.Timing;
|
using Robust.Shared.Timing;
|
||||||
using Robust.UnitTesting;
|
using Robust.UnitTesting;
|
||||||
|
|||||||
Reference in New Issue
Block a user