マルチスレッド
はじめに
Quantumのタスクシステムは、データパラレリズム(同じタイプのデータの大きなバッチを処理する)とタスクパラレリズム(全く異なるデータで動作する2つの異なるロジック)の両方を実現します:
例:
- データパラレリズム: 一つのフィルタから複数のエンティティに対して同じロジックを並列に操作する1つのタスク
- タスクパラレリズム: それぞれ異なる全く別のエンティティ群に対してロジックを実行する2つのタスク
Quantum Taskシステムは、安全性よりもパフォーマンスを優先しています。その結果、デベロッパーは、並行で実行できるタスクはどれか、安全に触れることのできるデータのピースはどれか、追加のインターロック操作が必要なものはどれかということについて、意識することが求められています。 タスクシステムは、物理学やナビゲーションなどのコアシステムで使用されているほか、一部では自社のゲームシステムで使用しています。
スレッドの安全性
タスクシステムが保証するのは依存性グラフ実行におけるスレッドの安全性のみです。 直接的または間接的にタスクBがタスクAに依存する場合、Bが同時接続的にAへ実行を行うことはありません。タスク自身のロジックが競合状態にならないようにするため、タスクシステム自体から追加の方法がとられることもありません。 したがって、シミュレーションのスレッドセーフは、依存関係グラフの慎重な構築と、他のInterlocked/CAS操作(特にデータ並列化を行う場合)の使用によって確保する必要があります。
デフォルトでこれ以上、semaphoresなどのセキュリティメカニズムを採用していないことの理由は、パフォーマンスゲインを帳消しにしてしまうパフォーマンスペナルティです。
スレッドパフォーマンス
タスク間では、Quantumは「スピンロック」のカスタムフレーバーを使用して、非常に効率的に切り替えます。
ただし、Quantumでスレッドをスリープ状態にしたりウェイクアップさせるには、UnityのJobタスクサブシステムのすべて ENTRY ポイントに接続します。これはAndroidプラットフォームではUnityのJonについて未解決の問題があるため重要となっています。特に、Unityのジョブのウェイクアップまたはスリープの保留はUnity側自体で難しいものになることがあります。Androidでマルチスレッディングを回避すべき状態になるということです。
このような流動的な状況を考慮すると、ターゲットとなるデバイスやプラットフォームでパフォーマンス結果をテストすることを強くお勧めします。
決定性
異なるスレッドで同時にタスクを実行する場合、考慮しなければならないことが2つあります:
- タスクがスレッドセーフであること
- スレッドがロードを実行する順序が、タスクの実行の結果決定性に影響を与えないこと
スレッド1がエンティティAでロジックを実行し、スレッド2がエンティティBでロジックを実行している場合、これら2つのスレッドは、タスクを効果的に実行する順序にかかわらず本機とその他のすべてのクライアントのマシン機上のゲームステートに同様の決定静的変更を生み出すものでなければなりません。 実行の順序にシミュレーションの決定性的性質を残さす飛鳥がある場合、この順序はタスクの依存性グラフから強制されることになります。
タスク
タスクとは、あるデータに対して何らかのロジックを実行するコードの断片のことです。例えば、通常の SystemMainThread
システムは、メインスレッドによってのみ実行される Update()
メソッド(ロジックの一部)を持つ単一のタスクに変換されます。一方、SystemMainThreadFilter
は、ロジックが実行されるデータも定義します:フィルタに一致するコンポーネントを持つすべてのエンティティです。
タスクシステムを使用すると、以下の並列化を実現できます。
- データ: 巣複数のスレッドで実行される同一のタスク。各スレッドがさんざ真名データのサブセットでタスクを実行します。
- ロジック: 様々なスレッドで同時接続的に実行できる複数のタスク。
例えば、スレッドが3つある場合もあります。メインである0、コンポーネントバッファ内のエンティティの異なるサブセットでタスクAを実行する1と2、そして他のコンポーネントバッファでもう一つのスレッド3がタスクBを実行するケースです(ロジックの並列化)。
タイプ
タスクには4通りのタイプがあります。以下を参照してください。
- メインスレッドタスク:単一のスレッドによってのみ実行可能で、それはメインシミュレーションスレッドでなければならない(これは通常の
SystemMainThread
によっても使用されるタスクである) - シングルトンタスク:メインスレッドであるか否かを問わず、単一のスレッドによってのみ実行されることができる
- スレッドタスク:複数のスレッドで実行することができる
- 配列タスク: スケジュール時間内の既知のサイズのデータバッファで複数のスレッドが実行可能。配列はタスクシステムでスライスされチャンクはスレッドが消費。チャンク数を管理するための任意のパラメータがあり、デフォルトでは32に設定されている
Quantumマルチスレッディングのスケジュールはwork-stealingです。work-stealingは以下の2段階で発生します。
- On the Task-Graph レベル: スレッドがタスクを「盗もう」として、実行します。例えば、シングルトンタスクはリクエスト(盗むこと)ができる1つ目のスレッドが実行します(タスク依存性が満たされていることを想定)。
- On the Task-Chunk レベル: チャンクのバッファをスライスする配列タスク。各チャンクはスレッドによって盗まれる可能性があります。スレッドがチャンクで実行を完了すると、すべてのチャンクを実行するまで次に実行可能なチャンクを「盗み」ます。この場合、タスクシステムが2つのスレッドが同一のチャンクを獲得してしまうことが内容に取り持ちます。
システム
システムはタスクをスケジューリングすることができます。よりシンプルな SystemMainThread
では、このスケジューリングロジックは公開されず、 Update()
タスクが裏で自動的にスケジューリングされます。システムは SystemSetup
で定義されたのと同じ順序でタスクをスケジュールするよう呼び出され、スケジュールしたタスクは Task dependency graph と呼ぶ非周期グラフを形成する必要があります。
フレームシミュレーションは、メインスレッドといくつかのワーカースレッドが、このタスクグラフを見て、タスクが実行可能かどうかを確認し、条件が満たされれば、タスクを実行することで成り立っています。タスクは、その依存関係が満たされていて(依存するタスクがすべて完了している)、そのスレッドでの実行が許可されていれば実行できます(メインスレッドまたは単一スレッドでしか実行できないタスクもあります)。。
タイプ
SystemBase
システムに加えて、データ並列化を提供するいくつかの組み込みマルチスレッドシステムがある。これらは Quantum.Tasks
名前空間で見つけることができます。これらのシステムのいくつかは、メインスレッドの対応するものと類似しています。
SystemThreadedComponent<T>
(スレッドタスク): スレッドは、スケジュール時点では知られていないサイズのコンポーネントバッファを繰り返し処理することになります。各スレッドは、バッファの最後に到達するまで、与えられた(設定可能な)サイズのスライスを取得します。スライスを繰り返し処理する間、スレッドはFrameThreadSafe
と EntityRef とT*
を渡して、継承システムで実装可能なUpdate()
メソッドを呼び出します。SystemThreadedFilter<T>
(スレッド化されたタスク): (1)と似ていますが、1つのコンポーネントバッファでイテレートする代わりに、フィルター構造体のT
で定義されたフィルタリングされたコンポーネントセットでイテレートを行います。SystemMainThreadFilter<T>
と類似しています。SystemArrayComponent<T>
(配列タスク): (1)と似ていますが、それぞれ固定されたサイズのNスライスでコンポーネントバッファをスライスするのではなく、固定のスライス数でバッファさスライスされ、スライスのサイズはバッファのサイズに応じて異なり、スケジュール時には決定しています。SystemArrayFilter<T>
(配列タスク): (3)と似ていますが、スレッドはフィルター構造体T
で定義される、フィルタリングされたコンポーネントセットでイテレートを行います。
FrameThreadSafe
Tasks APIによって強制されるこれらのシステムはすべて、FrameThreadSafe
(FTS) を受け取ります。FTS は事実上 Frame
のラッパーで、スレッドセーフな Frame API のサブセットを公開し、タスクを実行しているスレッドに関する情報を提供します。
例えば、あるFTSはコンポーネントへのGetPointer<T>
の使用を許可しますが、Add<T>
を使用してエンティティへ新しいコンポーネントを追加することは許可しません。後者は複数のスレッドで同時接続的に安全に実行できる操作ではないからです。
しかし、FTSはそれ自体、複数のスレッドが同じコンポーネント上でGetPointer<T>
を呼び出し、同じフィールドを変更し、レースコンディションに陥ることを許可しています。依存関係グラフによってどのタスクの同時実行が許可され、実行中にどのデータから読み、どのデータに書き込むかを念頭に置き、他のスレッドが同時に変更できるデータには決して触れないようにするか、インターロック/CAS操作を導入してスレッドセーフを確保することが肝要です。
物理
物理エンジンは複数のスレッドから成されるクエリに対応していて、通常のFrameと同様にFrameThreadSafe
から行うことができます。
すべての物理入力が物理関連のコンポーネントへのポインタを有しています。
-Collider
コンポーネントまたはBody
コンポーネントへの修正で、物理ソルバーが行った仮定に影響がでる(最悪の場合は無効になる)可能性があります。
Transform
の位置データと回転データはパフォーマンス上の理由からキャッシュされ、位置のオフセットや複合シェイプに対応させるようにします。このデータは、キャッシュされた後には影響を受けません。ただし、その間に変更してしまうと、決定性に問題が生じ、競合状態となる可能性があります。
スニペット
以下のスニペットは、SystemBase
を継承したシステムにおいて、異なるタスクの種類とそのスケジュール方法を示しています。Schedule
では、前のシステムの最後のタスクの TaskHandle
にアクセスできるようになります。これにより、システムのタスクのサブグラフを構築し、その最後のタスクハンドルを返すことができるようになり、以下のシステムで使用されるようになります。
C#
using Quantum.Task;
namespace Quantum
{
public unsafe class TaskSystem : SystemBase
{
TaskDelegateHandle MainThreadHandle;
TaskDelegateHandle SingletonHandleA;
TaskDelegateHandle SingletonHandleB;
TaskDelegateHandle ArrayHandle;
TaskDelegateHandle ThreadedHandle;
public override void OnInit(Frame f)
{
// hooking method delegates to Quantum task handles (description will show up in the profilers)
f.Context.TaskContext.RegisterDelegate(DummySampleMethod, "Main Thread Task", ref MainThreadHandle);
f.Context.TaskContext.RegisterDelegate(DummySampleMethod, "Singleton Task A", ref SingletonHandleA);
f.Context.TaskContext.RegisterDelegate(DummySampleMethod, "Singleton Task B", ref SingletonHandleB);
f.Context.TaskContext.RegisterDelegate(DummySampleMethod, "Array Task", ref ArrayHandle);
f.Context.TaskContext.RegisterDelegate(DummySampleMethod, "Threaded Task", ref ThreadedHandle);
}
// Quantum's tasks are laid out in an Aciclic Graph, with dependencies set at will
// Systems inserted into the default SystemSetup will be laid out as a SEQUENCE (a system always depends fully on the previous one to be finished)
// this example schedules a graph of tasks for this system
// IMPORTANT: do NOT perform actual work in the schedule method... Any FRAME operations must happen in the actual task methods
protected override TaskHandle Schedule(Frame f, TaskHandle taskHandle)
{
// this registers a task to run on main thread mandatorily (single call per update).
var firstTask = f.Context.TaskContext.AddMainThreadTask(MainThreadHandle, null, taskHandle);
// this one will be registered to run AFTER the one above (but it's also single call to it). Notice it depends on FIRST.
var secondTaskA = f.Context.TaskContext.AddSingletonTask(SingletonHandleA, null, firstTask);
// this one is registered to run IN PARALLEL to the one above. Notice it only depends on FIRST
var secondTaskB = f.Context.TaskContext.AddSingletonTask(SingletonHandleB, null, firstTask);
// now this one is an array task, it considers you know IN ADVANCE the SIZE of the task data chunck.
// it splits into smaller chuncks to avoid overhead
// notice it depends on both second tasks (A and B), so it only starts after those two are done
var thirdTask = f.Context.TaskContext.AddArrayTask(ArrayHandle, null, 100);
thirdTask.AddDependency(secondTaskA);
thirdTask.AddDependency(secondTaskB);
// now a general threaded task (dynamic data size), for which you need to use Interlocked/CAS operations to "consume" data and update the counters safely
var fourthTask = f.Context.TaskContext.AddThreadedTask(ThreadedHandle, null, thirdTask);
// we return the LAST task here, so next SYSTEM scheduler DEPENDS on it...
return fourthTask;
}
public void DummySampleMethod(FrameThreadSafe frame, int start, int count, void* userData)
{
// start/count are only meaningful in an array task
// userData is optional (mostly for our internal use of tasks - as in systems you must as much as possible keep data in the frame)
}
}
}
ユーティリティ構造体
Quantum.Tasks
名前空間を通して、スレッドタスクの調整を助ける2種類のユーティリティ構造体を利用できます:
TaskSlice
AtomicInt
C#
// In a .qtn file
struct MyStructFoo {
FPVector3 Content;
}
singleton component MyStructFooData {
array<MyStructFoo>[128] MyStructFooArray;
Int32 UsedCount;
}
C#
// The System
using Quantum.Task;
namespace Quantum
{
public unsafe class MultithreadedSystemSample : SystemBase {
private TaskDelegateHandle _arrayTaskDelegateHandle;
private TaskDelegateHandle _threadedTaskDelegateHandle;
public override void OnInit(Frame f) {
f.Context.TaskContext.RegisterDelegate(ArrayTaskMethod, "Array Task", ref _arrayTaskDelegateHandle);
f.Context.TaskContext.RegisterDelegate(ThreadedTaskMethod, "Threaded Task", ref _threadedTaskDelegateHandle);
}
protected override TaskHandle Schedule(Frame f, TaskHandle taskHandle) {
// the size of an array task must be known in Schedule-time
var arrayCount = f.Unsafe.GetPointerSingleton<MyStructFooData>()->UsedCount;
var arrayTaskHandle = f.Context.TaskContext.AddArrayTask(_arrayTaskDelegateHandle, taskArg: null, taskSize: arrayCount, dependancy: taskHandle);
// get indexer to be used for slicing the data in the threaded task
// optional: instead of temp-allocating, the indexer could be persisted in a partial declaration of Frame (use AllocUser and FreeUser)
var threadedTaskIndexer = f.Context.TempAllocateAndClear(sizeof(AtomicInt));
var threadedTaskHandle = f.Context.TaskContext.AddThreadedTask(_threadedTaskDelegateHandle, taskArg: threadedTaskIndexer, dependancy: arrayTaskHandle);
// we return the LAST task here, so next SYSTEM scheduler DEPENDS on it...
return threadedTaskHandle;
}
public void ArrayTaskMethod(FrameThreadSafe frame, int start, int count, void* taskArg) {
var arrayOfFoo = frame.GetPointerSingleton<MyStructFooData>()->MyStructFooArray;
// start/count are only meaningful in an array task
for (int i = start; i < start + count; i++) {
var foo = arrayOfFoo.GetPointer(i);
// do work on foo ...
}
}
public void ThreadedTaskMethod(FrameThreadSafe frame, int start, int count, void* userData) {
// the task indexer was passed in as task argument
var taskIndexer = (AtomicInt*)userData;
// get array and its count
var myStructFooData = frame.GetPointerSingleton<MyStructFooData>();
var arrayOfFoo = myStructFooData->MyStructFooArray;
var arrayCount = myStructFooData->UsedCount;
// start/count are meaningless for a threaded task.
// as the size the data was NOT known in schedule-time, it must be sliced now
var slices = stackalloc TaskSlice[frame.Context.TaskContext.SlicePerThreadMaxCount];
var slicer = frame.Context.TaskContext.SlicePerThread(taskIndexer, arrayCount, slices);
while (slicer.Next(out var slice)) {
for (var i = slice.StartInclusive; i < slice.EndExclusive; ++i) {
var foo = arrayOfFoo.GetPointer(i);
// do work on foo ...
}
}
}
}
}
スケジューリングガイドライン
システムのスケジュールを設定するには、2つの重要なガイドラインに従わなければなりません:
- システムの最初のタスクは、前のタスクの最後のタスク(Scheduleコールで受け取った識別されたTask Handle)に依存する必要があります。
- システムの最初のタスクは、次のシステムがそれに依存できるように、その最後のタスクのタスクハンドルを返す必要があります。
*要約すると、タスクだけが互いに並行して実行でき、常に1つのシステムだけが実行されている必要があります。
スケジューリングの制約
マルチスレッドシステムの最後の数個のタスクは、すべて同じタスクハンドルに依存している場合があります。こうなると競合状態となり、まだ実行中のタスクがあるにもかかわらず、システムがすべての実行を完了したと勘違いしてしまうことになります。このような場合、次のシステムの実行が不自然に開始され、非決定的な操作や結果になる可能性があります。
このリスクを回避するために、「バリアタスク」を追加することが推奨されます。バリアタスクは、前述のレースコンディションに陥る可能性のあるすべてのタスクに依存するシングルトンタンクです。この依存関係によって、バリアタスクが前回のタスクがすべて完了された後の実行になることを確実にし、システムがバリアを最後のTask Handleとして返せるようにします。
C#
protected override TaskHandle Schedule(Frame f, TaskHandle taskHandle) {
var threadedTaskA = f.Context.TaskContext.AddThreadedTask(threadedTaskHandleA, null, taskHandle);
var threadedTaskB = f.Context.TaskContext.AddThreadedTask(threadedTaskHandleB, null, taskHandle);
// the barrier task does nothing but ensuring that all other tasks of the system are done
// before the next system's task(s) start executing
var barrier = f.Context.TaskContext.AddSingletonTask(barrierHandle, null);
barrier.AddDependency(threadedTaskA);
barrier.AddDependency(threadedTaskB);
return barrier;
}
プラットフォームごとのワーカースレッドの数を変更
異なるプラットフォームで、様々な数のワーカースレッドを使用することが可能です(または全く使用しないことも可能です)。
主な目的は、シミュレーションで使用するタスクランナーのIDeterministicPlatformTaskRunner.Scheduleメソッドに渡されるデリゲートの数を制限することです。
Unityでは、QuantumTaskRunnerJobs
によってデフォルトのIDeterministicPlatformTaskRunner
が実装されています。これはUnity Jobsを使用して、シミュレーションタスクグラフを実行するワーカースレッドを、work-stealingのかたちで常にグラフを実行するメインのシミュレーションスレッドと並行して起動させます。
Quantumシミュレーションでは常にIDeterministicPlatformTaskRunner.Schedule
を、SimulationConfig
で定義されているスレッドの数から、1引いた数で呼び出します。つまり、メインのシミュレーションスレッドに加えて、リクエストされた余分なスレッドの数です。QuantumTaskRunnerJobs
ランナーのカスタムバージョンを実装することで、この呼び出しが遮断されほかの数のワーカースレッドがスケジュールされるようになります。
以下のスニペットでは、本件を行うための方法を1つ表示しています。アプリケーションのメインのシーンにCustomQuantumTaskRunnerJobs
を追加することができ、これは自分自身をDontDestroyOnLoad
とマークします。QuantumTaskRunnerJobs.GetInstance
へのすべての呼び出しが、確実にこのバージョンを使用するようにします。
スニペット
備考: Schedule呼び出しが転送されなかったとしても、シミュレーションは、メインのスレッドを使用することでタスクグラフの実行を行うことができます。
C#
public class CustomQuantumTaskRunnerJobs : QuantumTaskRunnerJobs {
private void Awake() {
DontDestroyOnLoad(this);
}
private const uint PLATFORM_MAX_WORKER_THREAD_COUNT =
#if !UNITY_EDITOR && UNITY_IOS
// forces a single-threaded simulation on iOS
// (remember, the main thread will always execute the task graph, this is the number of EXTRA threads)
0
#else
3
#endif
;
private readonly Action[] _resizedDelegates = new Action[PLATFORM_MAX_WORKER_THREAD_COUNT];
public override void Schedule(Action[] delegates) {
Assert.Check(_resizedDelegates.Length == PLATFORM_MAX_WORKER_THREAD_COUNT, _resizedDelegates.Length, PLATFORM_MAX_WORKER_THREAD_COUNT);
if (PLATFORM_MAX_WORKER_THREAD_COUNT == 0) {
return;
}
if (delegates.Length > PLATFORM_MAX_WORKER_THREAD_COUNT) {
Array.Copy(delegates, 0, _resizedDelegates, 0, PLATFORM_MAX_WORKER_THREAD_COUNT);
base.Schedule(_resizedDelegates);
} else {
base.Schedule(delegates);
}
}
}
Back to top