extract BackgroundJobRunner class

This commit is contained in:
in0finite 2022-02-15 22:26:00 +01:00
parent e66ad5065a
commit d8e6191c45
7 changed files with 288 additions and 256 deletions

View file

@ -182,7 +182,7 @@ namespace SanAndreasUnity.Editor
yield break;
}
LoadingThread.Singleton.EnsureBackgroundThreadStarted();
LoadingThread.Singleton.BackgroundJobRunner.EnsureBackgroundThreadStarted();
if (!Loader.HasLoaded)
{
@ -401,7 +401,7 @@ namespace SanAndreasUnity.Editor
for (int i = 0; i < numIterations; i++)
{
long initialNumPendingJobs = LoadingThread.Singleton.GetNumPendingJobs();
long initialNumPendingJobs = LoadingThread.Singleton.BackgroundJobRunner.GetNumPendingJobs();
long numPendingJobs = initialNumPendingJobs;
do
@ -419,7 +419,7 @@ namespace SanAndreasUnity.Editor
System.Threading.Thread.Sleep(5); // don't interact with background thread too often, and also reduce CPU usage
numPendingJobs = LoadingThread.Singleton.GetNumPendingJobs();
numPendingJobs = LoadingThread.Singleton.BackgroundJobRunner.GetNumPendingJobs();
initialNumPendingJobs = Math.Max(initialNumPendingJobs, numPendingJobs);
} while (numPendingJobs > 0);

View file

@ -153,7 +153,7 @@ namespace SanAndreasUnity.Importing.Archive
// [MethodImpl(MethodImplOptions.Synchronized)]
public static void ReadFileAsync(string name, float loadPriority, System.Action<Stream> onFinish)
{
Behaviours.LoadingThread.RegisterJob (new Behaviours.LoadingThread.Job<Stream> () {
Behaviours.LoadingThread.RegisterJob (new BackgroundJobRunner.Job<Stream> () {
priority = loadPriority,
action = () => ReadFile( name ),
callbackFinish = (stream) => { onFinish(stream); },

View file

@ -599,7 +599,7 @@ namespace SanAndreasUnity.Importing.Conversion
GeometryParts loadedGeoms = null;
LoadingThread.RegisterJob (new LoadingThread.Job<Clump> () {
LoadingThread.RegisterJob (new BackgroundJobRunner.Job<Clump> () {
priority = loadPriority,
action = () => {
// read archive file in background thread

View file

@ -282,7 +282,7 @@ namespace SanAndreasUnity.Importing.Conversion
TextureDictionary loadedTxd = null;
bool bDontLoad = DontLoadTextures;
Behaviours.LoadingThread.RegisterJob (new Behaviours.LoadingThread.Job<RenderWareStream.TextureDictionary> () {
Behaviours.LoadingThread.RegisterJob (new Utilities.BackgroundJobRunner.Job<RenderWareStream.TextureDictionary> () {
priority = loadPriority,
action = () => {
return bDontLoad ? null : ArchiveManager.ReadFile<RenderWareStream.TextureDictionary>(name + ".txd");

View file

@ -0,0 +1,258 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using UnityEngine;
using Debug = UnityEngine.Debug;
namespace SanAndreasUnity.Utilities
{
public class BackgroundJobRunner
{
// TODO: maybe convert to class, because it takes 36 bytes - it's too much for red-black tree operations
public struct Job<T>
{
public System.Func<T> action;
public System.Action<T> callbackSuccess;
public System.Action<System.Exception> callbackError;
public System.Action<T> callbackFinish;
public float priority;
internal object result;
internal System.Exception exception;
internal long id;
}
public class JobComparer : IComparer<Job<object>>
{
public int Compare(Job<object> a, Job<object> b)
{
if (a.id == b.id)
return 0;
if (a.priority != b.priority)
return a.priority <= b.priority ? -1 : 1;
// priorities are the same
// the advantage has the job which was created earlier
return a.id <= b.id ? -1 : 1;
}
}
private class ThreadParameters
{
public readonly BlockingCollection<Job<object>> jobs =
new BlockingCollection<Job<object>>(new System.Collections.Concurrent.ConcurrentQueue<Job<object>>());
public readonly Utilities.ConcurrentQueue<Job<object>> processedJobs = new Utilities.ConcurrentQueue<Job<object>>();
private bool _shouldThreadExit = false;
private readonly object _shouldThreadExitLockObject = new object();
public bool ShouldThreadExit()
{
lock (_shouldThreadExitLockObject)
return _shouldThreadExit;
}
public void TellThreadToExit()
{
lock (_shouldThreadExitLockObject)
_shouldThreadExit = true;
}
}
private Thread _thread;
private readonly ThreadParameters _threadParameters = new ThreadParameters();
private readonly Queue<Job<object>> _processedJobsBuffer = new Queue<Job<object>>(256);
private long _lastJobId = 0;
private readonly object _lastJobIdLockObject = new object();
private long _lastProcessedJobId = 0;
private readonly Stopwatch _stopwatch = new Stopwatch();
void StartThread()
{
if (_thread != null)
return;
_thread = new Thread(ThreadFunction);
_thread.Start(_threadParameters);
}
public void ShutDown()
{
if (_thread != null)
{
var sw = System.Diagnostics.Stopwatch.StartNew();
// _thread.Interrupt ();
_threadParameters.TellThreadToExit();
if (_thread.Join(7000))
Debug.LogFormat("Stopped loading thread in {0} ms", sw.Elapsed.TotalMilliseconds);
else
Debug.LogError("Failed to stop loading thread");
}
}
public void RegisterJob<T>(Job<T> job)
{
// note: this function can be called from any thread
if (null == job.action)
throw new ArgumentException("Job must have an action");
if (0f == job.priority)
throw new ArgumentException("You forgot to assign job priority");
job.exception = null;
var j = new Job<object>()
{
priority = job.priority,
action = () => job.action(),
callbackError = job.callbackError,
};
if (job.callbackSuccess != null)
j.callbackSuccess = (arg) => job.callbackSuccess((T)arg);
if (job.callbackFinish != null)
j.callbackFinish = (arg) => job.callbackFinish((T)arg);
lock (_lastJobIdLockObject)
// make sure that changing id and adding new job is atomic operation, otherwise
// multiple threads accessing this part of code can cause the jobs to be inserted out of order
{
j.id = ++_lastJobId;
_threadParameters.jobs.Add(j);
}
}
public void UpdateJobs(ushort maxTimeToUpdateMs)
{
ThreadHelper.ThrowIfNotOnMainThread();
this.UpdateJobsInternal(maxTimeToUpdateMs);
}
void UpdateJobsInternal(ushort maxTimeToUpdateMs)
{
// get all processed jobs
_stopwatch.Restart();
Job<object> job;
while (true)
{
if (maxTimeToUpdateMs != 0 && _stopwatch.ElapsedMilliseconds >= maxTimeToUpdateMs)
break;
if (_processedJobsBuffer.Count > 0)
job = _processedJobsBuffer.Dequeue();
else
{
int numCopied = _threadParameters.processedJobs.DequeueToQueue(_processedJobsBuffer, 256);
if (numCopied == 0)
break;
job = _processedJobsBuffer.Dequeue();
}
if (job.exception != null)
{
// error happened
if (job.callbackError != null)
Utilities.F.RunExceptionSafe(() => job.callbackError(job.exception));
Debug.LogException(job.exception);
}
else
{
// success
if (job.callbackSuccess != null)
Utilities.F.RunExceptionSafe(() => job.callbackSuccess(job.result));
}
// invoke finish callback
if (job.callbackFinish != null)
F.RunExceptionSafe(() => job.callbackFinish(job.result));
_lastProcessedJobId = job.id;
}
}
public long GetNumJobsPendingApproximately()
{
ThreadHelper.ThrowIfNotOnMainThread();
// this is not done in a critical section: calling Count on 2 multithreaded collections
return (long)_threadParameters.jobs.Count + (long)_threadParameters.processedJobs.Count + (long)_processedJobsBuffer.Count;
}
public long GetNumPendingJobs()
{
// this will not work if collections used are not FIFO collections (eg. other than queues)
// - this will be the case if job priority is used
ThreadHelper.ThrowIfNotOnMainThread();
lock (_lastJobIdLockObject)
{
if (_lastProcessedJobId > _lastJobId)
throw new Exception($"Last processed job id ({_lastProcessedJobId}) is higher than last registered job id ({_lastJobId}). This should not happen.");
return _lastJobId - _lastProcessedJobId;
}
}
public bool IsBackgroundThreadRunning()
{
ThreadHelper.ThrowIfNotOnMainThread();
if (null == _thread)
return false;
if (_thread.ThreadState != System.Threading.ThreadState.Running)
return false;
return true;
}
public void EnsureBackgroundThreadStarted()
{
ThreadHelper.ThrowIfNotOnMainThread();
this.StartThread();
}
static void ThreadFunction(object objectParameter)
{
ThreadParameters threadParameters = (ThreadParameters)objectParameter;
while (!threadParameters.ShouldThreadExit())
{
Job<object> job;
if (!threadParameters.jobs.TryTake(out job, 200))
continue;
try
{
job.result = job.action();
}
catch (System.Exception ex)
{
job.exception = ex;
}
threadParameters.processedJobs.Enqueue(job);
}
}
}
}

View file

@ -0,0 +1,11 @@
fileFormatVersion: 2
guid: fdd88c441d4ecba469ce3b23cab23fa3
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

View file

@ -1,79 +1,11 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using UnityEngine;
using SanAndreasUnity.Utilities;
using System.Threading;
using System.Collections.Concurrent;
using Debug = UnityEngine.Debug;
using SanAndreasUnity.Utilities;
namespace SanAndreasUnity.Behaviours
{
public class LoadingThread : StartupSingleton<LoadingThread> {
// TODO: maybe convert to class, because it takes 36 bytes - it's too much for red-black tree operations
public struct Job<T>
{
public System.Func<T> action ;
public System.Action<T> callbackSuccess ;
public System.Action<System.Exception> callbackError ;
public System.Action<T> callbackFinish;
public float priority;
internal object result ;
internal System.Exception exception ;
internal long id;
}
public class JobComparer : IComparer<Job<object>>
{
public int Compare(Job<object> a, Job<object> b)
{
if (a.id == b.id)
return 0;
if (a.priority != b.priority)
return a.priority <= b.priority ? -1 : 1;
// priorities are the same
// the advantage has the job which was created earlier
return a.id <= b.id ? -1 : 1;
}
}
private class ThreadParameters
{
public readonly BlockingCollection<Job<object>> jobs =
new BlockingCollection<Job<object>> (new System.Collections.Concurrent.ConcurrentQueue<Job<object>>());
public readonly Utilities.ConcurrentQueue<Job<object>> processedJobs = new Utilities.ConcurrentQueue<Job<object>>();
private bool _shouldThreadExit = false;
private readonly object _shouldThreadExitLockObject = new object();
public bool ShouldThreadExit()
{
lock (_shouldThreadExitLockObject)
return _shouldThreadExit;
}
public void TellThreadToExit()
{
lock (_shouldThreadExitLockObject)
_shouldThreadExit = true;
}
}
private Thread _thread;
private readonly ThreadParameters _threadParameters = new ThreadParameters();
private readonly Queue<Job<object>> _processedJobsBuffer = new Queue<Job<object>>(256);
private long _lastJobId = 0;
private readonly object _lastJobIdLockObject = new object();
private long _lastProcessedJobId = 0;
private readonly Stopwatch _stopwatch = new Stopwatch();
public class LoadingThread : StartupSingleton<LoadingThread>
{
public BackgroundJobRunner BackgroundJobRunner { get; } = new BackgroundJobRunner();
public ushort maxTimePerFrameMs = 0;
@ -85,204 +17,35 @@ namespace SanAndreasUnity.Behaviours
if (null == Singleton)
return;
Singleton.StartThread();
Singleton.BackgroundJobRunner.EnsureBackgroundThreadStarted();
}
#endif
protected override void OnSingletonStart()
{
this.StartThread();
this.BackgroundJobRunner.EnsureBackgroundThreadStarted();
}
protected override void OnSingletonDisable()
{
if (_thread != null)
{
var sw = System.Diagnostics.Stopwatch.StartNew ();
// _thread.Interrupt ();
_threadParameters.TellThreadToExit ();
if (_thread.Join (7000))
Debug.LogFormat ("Stopped loading thread in {0} ms", sw.Elapsed.TotalMilliseconds);
else
Debug.LogError ("Failed to stop loading thread");
}
}
void StartThread()
{
if (_thread != null)
return;
_thread = new Thread(ThreadFunction);
_thread.Start(_threadParameters);
this.BackgroundJobRunner.ShutDown();
}
void Update()
{
this.UpdateJobsInternal();
this.UpdateJobs();
}
public void UpdateJobs()
{
ThreadHelper.ThrowIfNotOnMainThread();
this.UpdateJobsInternal();
}
void UpdateJobsInternal () {
// get all processed jobs
_stopwatch.Restart();
Job<object> job;
while (true)
{
if (this.maxTimePerFrameMs != 0 && _stopwatch.ElapsedMilliseconds >= this.maxTimePerFrameMs)
break;
if (_processedJobsBuffer.Count > 0)
job = _processedJobsBuffer.Dequeue();
else
{
int numCopied = _threadParameters.processedJobs.DequeueToQueue(_processedJobsBuffer, 256);
if (numCopied == 0)
break;
job = _processedJobsBuffer.Dequeue();
}
if (job.exception != null)
{
// error happened
if (job.callbackError != null)
Utilities.F.RunExceptionSafe( () => job.callbackError (job.exception) );
Debug.LogException (job.exception);
}
else
{
// success
if (job.callbackSuccess != null)
Utilities.F.RunExceptionSafe( () => job.callbackSuccess (job.result) );
}
// invoke finish callback
if (job.callbackFinish != null)
F.RunExceptionSafe (() => job.callbackFinish (job.result));
_lastProcessedJobId = job.id;
}
{
this.BackgroundJobRunner.UpdateJobs(this.maxTimePerFrameMs);
}
public static void RegisterJob<T>(Job<T> job)
public static void RegisterJob<T>(BackgroundJobRunner.Job<T> job)
{
ThreadHelper.ThrowIfNotOnMainThread(); // obtaining Singleton should only happen on main thread
Singleton.RegisterJobOnInstance(job);
}
public void RegisterJobOnInstance<T> (Job<T> job)
{
// note: this function can be called from any thread
if (null == job.action)
throw new ArgumentException("Job must have an action");
if (0f == job.priority)
throw new ArgumentException("You forgot to assign job priority");
job.exception = null;
var j = new Job<object> () {
priority = job.priority,
action = () => job.action(),
callbackError = job.callbackError,
};
if(job.callbackSuccess != null)
j.callbackSuccess = (arg) => job.callbackSuccess( (T) arg );
if(job.callbackFinish != null)
j.callbackFinish = (arg) => job.callbackFinish( (T) arg );
lock (_lastJobIdLockObject)
// make sure that changing id and adding new job is atomic operation, otherwise
// multiple threads accessing this part of code can cause the jobs to be inserted out of order
{
j.id = ++_lastJobId;
_threadParameters.jobs.Add(j);
}
}
public long GetNumJobsPendingApproximately()
{
ThreadHelper.ThrowIfNotOnMainThread();
// this is not done in a critical section: calling Count on 2 multithreaded collections
return (long)_threadParameters.jobs.Count + (long)_threadParameters.processedJobs.Count + (long)_processedJobsBuffer.Count;
}
public long GetNumPendingJobs()
{
// this will not work if collections used are not FIFO collections (eg. other than queues)
// - this will be the case if job priority is used
ThreadHelper.ThrowIfNotOnMainThread();
lock (_lastJobIdLockObject)
{
if (_lastProcessedJobId > _lastJobId)
throw new Exception($"Last processed job id ({_lastProcessedJobId}) is higher than last registered job id ({_lastJobId}). This should not happen.");
return _lastJobId - _lastProcessedJobId;
}
}
public bool IsBackgroundThreadRunning()
{
ThreadHelper.ThrowIfNotOnMainThread();
if (null == _thread)
return false;
if (_thread.ThreadState != System.Threading.ThreadState.Running)
return false;
return true;
}
public void EnsureBackgroundThreadStarted()
{
ThreadHelper.ThrowIfNotOnMainThread();
this.StartThread();
}
static void ThreadFunction (object objectParameter)
{
ThreadParameters threadParameters = (ThreadParameters) objectParameter;
while (!threadParameters.ShouldThreadExit())
{
Job<object> job;
if (!threadParameters.jobs.TryTake (out job, 200))
continue;
try
{
job.result = job.action();
}
catch(System.Exception ex)
{
job.exception = ex;
}
threadParameters.processedJobs.Enqueue(job);
}
Singleton.BackgroundJobRunner.RegisterJob(job);
}
}