diff --git a/Content.Server/CPUJob/JobQueues/IJob.cs b/Content.Server/CPUJob/JobQueues/IJob.cs deleted file mode 100644 index 283466f4ba..0000000000 --- a/Content.Server/CPUJob/JobQueues/IJob.cs +++ /dev/null @@ -1,8 +0,0 @@ -namespace Content.Server.CPUJob.JobQueues -{ - public interface IJob - { - JobStatus Status { get; } - void Run(); - } -} diff --git a/Content.Server/CPUJob/JobQueues/Job.cs b/Content.Server/CPUJob/JobQueues/Job.cs deleted file mode 100644 index a0f8082bec..0000000000 --- a/Content.Server/CPUJob/JobQueues/Job.cs +++ /dev/null @@ -1,230 +0,0 @@ -using System.Threading; -using System.Threading.Tasks; -using Robust.Shared.Timing; -using Robust.Shared.Utility; - -namespace Content.Server.CPUJob.JobQueues -{ - /// - /// CPU-intensive job that can be suspended and resumed on the main thread - /// - /// - /// Implementations should overload . - /// Inside , implementations should only await on , - /// , or . - /// - /// The type of result this job generates - public abstract class Job : IJob - { - public JobStatus Status { get; private set; } = JobStatus.Pending; - - /// - /// Represents the status of this job as a regular task. - /// - public Task AsTask { get; } - - public T? Result { get; private set; } - public Exception? Exception { get; private set; } - protected CancellationToken Cancellation { get; } - - public double DebugTime { get; private set; } - private readonly double _maxTime; - protected readonly IStopwatch StopWatch; - - // TCS for the Task property. - private readonly TaskCompletionSource _taskTcs; - - // TCS to call to resume the suspended job. - private TaskCompletionSource? _resume; - private Task? _workInProgress; - - protected Job(double maxTime, CancellationToken cancellation = default) - : this(maxTime, new Stopwatch(), cancellation) - { - } - - protected Job(double maxTime, IStopwatch stopwatch, CancellationToken cancellation = default) - { - _maxTime = maxTime; - StopWatch = stopwatch; - Cancellation = cancellation; - - _taskTcs = new TaskCompletionSource(); - AsTask = _taskTcs.Task; - } - - /// - /// Suspends the current task immediately, yielding to other running jobs. - /// - /// - /// This does not stop the job queue from un-suspending the current task immediately again, - /// if there is still time left over. - /// - protected Task SuspendNow() - { - DebugTools.AssertNull(_resume); - - _resume = new TaskCompletionSource(); - Status = JobStatus.Paused; - DebugTime += StopWatch.Elapsed.TotalSeconds; - return _resume.Task; - } - - protected ValueTask SuspendIfOutOfTime() - { - DebugTools.AssertNull(_resume); - - // ReSharper disable once CompareOfFloatsByEqualityOperator - if (StopWatch.Elapsed.TotalSeconds <= _maxTime || _maxTime == 0.0) - { - return new ValueTask(); - } - - return new ValueTask(SuspendNow()); - } - - /// - /// Wrapper to await on an external task. - /// - protected async Task WaitAsyncTask(Task task) - { - DebugTools.AssertNull(_resume); - - Status = JobStatus.Waiting; - DebugTime += StopWatch.Elapsed.TotalSeconds; - - var result = await task; - - // Immediately block on resume so that everything stays correct. - Status = JobStatus.Paused; - _resume = new TaskCompletionSource(); - - await _resume.Task; - - return result; - } - - /// - /// Wrapper to safely await on an external task. - /// - protected async Task WaitAsyncTask(Task task) - { - DebugTools.AssertNull(_resume); - - Status = JobStatus.Waiting; - DebugTime += StopWatch.Elapsed.TotalSeconds; - - await task; - - // Immediately block on resume so that everything stays correct. - _resume = new TaskCompletionSource(); - Status = JobStatus.Paused; - - await _resume.Task; - } - - public void Run() - { - StopWatch.Restart(); - _workInProgress ??= ProcessWrap(); - - if (Status == JobStatus.Finished) - { - return; - } - - DebugTools.Assert(_resume != null, - "Run() called without resume. Was this called while the job is in Waiting state?"); - var resume = _resume; - _resume = null; - - Status = JobStatus.Running; - - if (Cancellation.IsCancellationRequested) - { - resume?.TrySetCanceled(); - } - else - { - resume?.SetResult(null); - } - - if (Status != JobStatus.Finished && Status != JobStatus.Waiting) - { - DebugTools.Assert(_resume != null, - "Job suspended without _resume set. Did you await on an external task without using WaitAsyncTask?"); - } - } - - protected abstract Task Process(); - - private async Task ProcessWrap() - { - try - { - Cancellation.ThrowIfCancellationRequested(); - - // Making sure that the task starts inside the Running block, - // where the stopwatch is correctly set and such. - await SuspendNow(); - Result = await Process(); - - // TODO: not sure if it makes sense to connect Task directly up - // to the return value of this method/Process. - // Maybe? - _taskTcs.TrySetResult(Result); - } - catch (OperationCanceledException) - { - _taskTcs.TrySetCanceled(); - } - catch (Exception e) - { - // TODO: Should this be exposed differently? - // I feel that people might forget to check whether the job failed. - Logger.ErrorS("job", "Job failed on exception:\n{0}", e); - Exception = e; - _taskTcs.TrySetException(e); - } - finally - { - if (Status != JobStatus.Waiting) - { - // If we're blocked on waiting and the waiting task goes cancel/exception, - // this timing info would not be correct. - DebugTime += StopWatch.Elapsed.TotalSeconds; - } - Status = JobStatus.Finished; - } - } - } - - public enum JobStatus - { - /// - /// Job has been created and has not been ran yet. - /// - Pending, - - /// - /// Job is currently (yes, right now!) executing. - /// - Running, - - /// - /// Job is paused due to CPU limits. - /// - Paused, - - /// - /// Job is paused because of waiting on external task. - /// - Waiting, - - /// - /// Job is done. - /// - // TODO: Maybe have a different status code for cancelled/failed on exception? - Finished, - } -} diff --git a/Content.Server/CPUJob/JobQueues/Queues/JobQueue.cs b/Content.Server/CPUJob/JobQueues/Queues/JobQueue.cs deleted file mode 100644 index 31e22ec24c..0000000000 --- a/Content.Server/CPUJob/JobQueues/Queues/JobQueue.cs +++ /dev/null @@ -1,76 +0,0 @@ -using Robust.Shared.Timing; - -namespace Content.Server.CPUJob.JobQueues.Queues -{ - [Virtual] - public class JobQueue - { - private readonly IStopwatch _stopwatch; - - public JobQueue(double maxTime) : this(new Stopwatch()) - { - MaxTime = maxTime; - } - - public JobQueue() : this(new Stopwatch()) {} - - public JobQueue(IStopwatch stopwatch) - { - _stopwatch = stopwatch; - } - - /// - /// How long the job's allowed to run for before suspending - /// - public virtual double MaxTime { get; } = 0.002; - - private readonly Queue _pendingQueue = new(); - private readonly List _waitingJobs = new(); - - public void EnqueueJob(IJob job) - { - _pendingQueue.Enqueue(job); - } - - public void Process() - { - // Move all finished waiting jobs back into the regular queue. - foreach (var waitingJob in _waitingJobs) - { - if (waitingJob.Status != JobStatus.Waiting) - { - _pendingQueue.Enqueue(waitingJob); - } - } - - _waitingJobs.RemoveAll(p => p.Status != JobStatus.Waiting); - - // At one point I tried making the pathfinding queue multi-threaded but ehhh didn't go great - // Could probably try it again at some point - // it just seemed slow af but I was probably doing something dumb with semaphores - _stopwatch.Restart(); - - // Although the jobs can stop themselves we might be able to squeeze more of them in the allotted time - while (_stopwatch.Elapsed.TotalSeconds < MaxTime && _pendingQueue.TryDequeue(out var job)) - { - // Deque and re-enqueue these to cycle them through to avoid starvation if we've got a lot of jobs. - - job.Run(); - - switch (job.Status) - { - case JobStatus.Finished: - continue; - case JobStatus.Waiting: - // If this job goes into waiting we have to move it into a separate list. - // Otherwise we'd just be spinning like mad here for external IO or such. - _waitingJobs.Add(job); - break; - default: - _pendingQueue.Enqueue(job); - break; - } - } - } - } -} diff --git a/Content.Server/CPUJob/JobQueues/Queues/PathfindingJobQueue.cs b/Content.Server/CPUJob/JobQueues/Queues/PathfindingJobQueue.cs index 5dade1671c..a3c1084e40 100644 --- a/Content.Server/CPUJob/JobQueues/Queues/PathfindingJobQueue.cs +++ b/Content.Server/CPUJob/JobQueues/Queues/PathfindingJobQueue.cs @@ -1,4 +1,6 @@ -namespace Content.Server.CPUJob.JobQueues.Queues +using Robust.Shared.CPUJob.JobQueues.Queues; + +namespace Content.Server.CPUJob.JobQueues.Queues { public sealed class PathfindingJobQueue : JobQueue { diff --git a/Content.Server/NPC/HTN/HTNPlanJob.cs b/Content.Server/NPC/HTN/HTNPlanJob.cs index f5ba3e8fd9..b1a8064953 100644 --- a/Content.Server/NPC/HTN/HTNPlanJob.cs +++ b/Content.Server/NPC/HTN/HTNPlanJob.cs @@ -1,7 +1,7 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; -using Content.Server.CPUJob.JobQueues; +using Robust.Shared.CPUJob.JobQueues; using Content.Server.NPC.HTN.PrimitiveTasks; namespace Content.Server.NPC.HTN; diff --git a/Content.Server/NPC/HTN/HTNSystem.cs b/Content.Server/NPC/HTN/HTNSystem.cs index 9beabeef7a..3ef87a9e68 100644 --- a/Content.Server/NPC/HTN/HTNSystem.cs +++ b/Content.Server/NPC/HTN/HTNSystem.cs @@ -2,8 +2,8 @@ using System.Linq; using System.Text; using System.Threading; using Content.Server.Administration.Managers; -using Content.Server.CPUJob.JobQueues; -using Content.Server.CPUJob.JobQueues.Queues; +using Robust.Shared.CPUJob.JobQueues; +using Robust.Shared.CPUJob.JobQueues.Queues; using Content.Server.NPC.Components; using Content.Server.NPC.HTN.PrimitiveTasks; using Content.Server.NPC.Systems; diff --git a/Content.Server/Procedural/DungeonJob.cs b/Content.Server/Procedural/DungeonJob.cs index 29abd2dc12..7d61ad10d8 100644 --- a/Content.Server/Procedural/DungeonJob.cs +++ b/Content.Server/Procedural/DungeonJob.cs @@ -1,7 +1,7 @@ using System.Threading; using System.Threading.Tasks; using Content.Server.Construction; -using Content.Server.CPUJob.JobQueues; +using Robust.Shared.CPUJob.JobQueues; using Content.Server.Decals; using Content.Shared.Procedural; using Content.Shared.Procedural.DungeonGenerators; diff --git a/Content.Server/Procedural/DungeonSystem.cs b/Content.Server/Procedural/DungeonSystem.cs index 083c635b78..7dabc23943 100644 --- a/Content.Server/Procedural/DungeonSystem.cs +++ b/Content.Server/Procedural/DungeonSystem.cs @@ -1,7 +1,7 @@ using System.Threading; using System.Threading.Tasks; using Content.Server.Construction; -using Content.Server.CPUJob.JobQueues.Queues; +using Robust.Shared.CPUJob.JobQueues.Queues; using Content.Server.Decals; using Content.Server.GameTicking.Events; using Content.Shared.CCVar; diff --git a/Content.Server/Salvage/SalvageSystem.Expeditions.cs b/Content.Server/Salvage/SalvageSystem.Expeditions.cs index 046f099002..e5bbce38ea 100644 --- a/Content.Server/Salvage/SalvageSystem.Expeditions.cs +++ b/Content.Server/Salvage/SalvageSystem.Expeditions.cs @@ -1,7 +1,7 @@ using System.Linq; using System.Threading; -using Content.Server.CPUJob.JobQueues; -using Content.Server.CPUJob.JobQueues.Queues; +using Robust.Shared.CPUJob.JobQueues; +using Robust.Shared.CPUJob.JobQueues.Queues; using Content.Server.Salvage.Expeditions; using Content.Server.Salvage.Expeditions.Structure; using Content.Server.Station.Systems; diff --git a/Content.Server/Salvage/SpawnSalvageMissionJob.cs b/Content.Server/Salvage/SpawnSalvageMissionJob.cs index cbc4fb2b04..224d04a586 100644 --- a/Content.Server/Salvage/SpawnSalvageMissionJob.cs +++ b/Content.Server/Salvage/SpawnSalvageMissionJob.cs @@ -3,7 +3,7 @@ using System.Threading; using System.Threading.Tasks; using Content.Server.Atmos; using Content.Server.Atmos.Components; -using Content.Server.CPUJob.JobQueues; +using Robust.Shared.CPUJob.JobQueues; using Content.Server.Ghost.Roles.Components; using Content.Server.Parallax; using Content.Server.Procedural; diff --git a/Content.Tests/Server/Jobs/JobQueueTest.cs b/Content.Tests/Server/Jobs/JobQueueTest.cs index 5f6cb21d02..d81525767a 100644 --- a/Content.Tests/Server/Jobs/JobQueueTest.cs +++ b/Content.Tests/Server/Jobs/JobQueueTest.cs @@ -1,8 +1,8 @@ using System; using System.Threading; using System.Threading.Tasks; -using Content.Server.CPUJob.JobQueues; -using Content.Server.CPUJob.JobQueues.Queues; +using Robust.Shared.CPUJob.JobQueues; +using Robust.Shared.CPUJob.JobQueues.Queues; using NUnit.Framework; using Robust.Shared.Timing; using Robust.UnitTesting;