Node.jsはシングルスレッドで動作するため、CPUを大量に消費する処理がイベントループをブロックし、アプリケーション全体のレスポンスが低下することがあります。Worker Threadsを使用することで、重い計算処理を別スレッドにオフロードし、メインスレッドの応答性を維持できます。
本記事では、node:worker_threadsモジュールの基本から、スレッド間通信、共有メモリを使った高速なデータ交換まで、実践的なコード例を通じて解説します。
実行環境#
| 項目 |
バージョン |
| Node.js |
20.x LTS以上 |
| npm |
10.x以上 |
| OS |
Windows/macOS/Linux |
前提条件#
- JavaScriptの基礎知識(関数、オブジェクト、Promise/async-await)
- Node.jsの基本API理解
- イベントループの基本的な仕組みの理解
Worker Threadsとは#
Worker Threadsは、Node.js v10.5.0で実験的に導入され、v12以降で安定版となった機能です。JavaScriptコードを独立したスレッドで並列実行できます。
child_process/clusterとの違い#
| 機能 |
Worker Threads |
child_process |
cluster |
| 実行単位 |
スレッド |
プロセス |
プロセス |
| メモリ共有 |
可能(SharedArrayBuffer) |
不可 |
不可 |
| 起動コスト |
低 |
高 |
高 |
| 通信方式 |
postMessage + 共有メモリ |
IPC(シリアライズ) |
IPC(シリアライズ) |
| 主な用途 |
CPUバウンドな処理 |
外部コマンド実行 |
HTTPサーバーのスケール |
使用すべきケース#
flowchart TD
A[処理の種類を判断] --> B{I/O処理?}
B -->|Yes| C[非同期APIを使用]
B -->|No| D{CPU集約的?}
D -->|Yes| E{Node.jsコード?}
D -->|No| C
E -->|Yes| F[Worker Threads]
E -->|No| G[child_process]Worker Threadsが適しているケース:
- 画像・動画のエンコード/デコード
- 暗号化・ハッシュ計算
- データの圧縮・解凍
- 大規模なJSONパース
- 数値計算・シミュレーション
I/O処理(ファイル読み書き、ネットワーク通信、データベースアクセス)には、Node.js組み込みの非同期APIを使用する方が効率的です。
worker_threadsモジュールの基本#
主要なエクスポート#
1
2
3
4
5
6
7
8
9
|
import {
Worker, // Workerスレッドを生成するクラス
isMainThread, // メインスレッドかどうかの判定
parentPort, // 親スレッドとの通信ポート
workerData, // 親から渡された初期データ
threadId, // スレッドの一意なID
MessageChannel, // カスタム通信チャネル
MessagePort, // 通信ポート
} from 'node:worker_threads';
|
最小構成のWorkerスレッド#
メインスレッドとWorkerスレッドを同一ファイルに記述する基本パターンです。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
import { Worker, isMainThread, parentPort, workerData } from 'node:worker_threads';
import { fileURLToPath } from 'node:url';
if (isMainThread) {
// メインスレッドの処理
const worker = new Worker(fileURLToPath(import.meta.url), {
workerData: { message: 'Hello from main thread' }
});
worker.on('message', (result) => {
console.log('Workerからの応答:', result);
});
worker.on('error', (error) => {
console.error('Workerエラー:', error);
});
worker.on('exit', (code) => {
console.log(`Worker終了 (コード: ${code})`);
});
} else {
// Workerスレッドの処理
console.log('受信データ:', workerData.message);
parentPort.postMessage('Hello from worker thread');
}
|
別ファイルでWorkerを定義する#
実際のプロジェクトでは、Workerスレッドのコードを別ファイルに分離するのが一般的です。
main.js:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
import { Worker } from 'node:worker_threads';
import { join, dirname } from 'node:path';
import { fileURLToPath } from 'node:url';
const __dirname = dirname(fileURLToPath(import.meta.url));
function runWorker(data) {
return new Promise((resolve, reject) => {
const worker = new Worker(join(__dirname, 'worker.js'), {
workerData: data
});
worker.on('message', resolve);
worker.on('error', reject);
worker.on('exit', (code) => {
if (code !== 0) {
reject(new Error(`Worker stopped with exit code ${code}`));
}
});
});
}
// 使用例
const result = await runWorker({ numbers: [1, 2, 3, 4, 5] });
console.log('計算結果:', result);
|
worker.js:
1
2
3
4
5
6
7
8
|
import { parentPort, workerData } from 'node:worker_threads';
// 受信したデータを処理
const { numbers } = workerData;
const sum = numbers.reduce((acc, num) => acc + num, 0);
// 結果を親スレッドに送信
parentPort.postMessage({ sum });
|
メッセージ通信の実装#
postMessageとon(‘message’)の基本#
スレッド間通信はpostMessage()でメッセージを送信し、on('message')で受信します。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
import { Worker, isMainThread, parentPort } from 'node:worker_threads';
import { fileURLToPath } from 'node:url';
if (isMainThread) {
const worker = new Worker(fileURLToPath(import.meta.url));
// Workerにメッセージを送信
worker.postMessage({ type: 'calculate', value: 42 });
// Workerからのメッセージを受信
worker.on('message', (msg) => {
console.log('結果:', msg);
worker.terminate();
});
} else {
// 親スレッドからのメッセージを受信
parentPort.on('message', (msg) => {
if (msg.type === 'calculate') {
const result = msg.value * 2;
// 結果を親スレッドに送信
parentPort.postMessage({ type: 'result', value: result });
}
});
}
|
構造化クローンアルゴリズム#
postMessage()で送信されるデータは「構造化クローンアルゴリズム」によってコピーされます。以下のデータ型がサポートされています。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
// サポートされるデータ型
const supportedTypes = {
primitives: [42, 'string', true, null, undefined],
objects: { nested: { deeply: true } },
arrays: [1, 2, 3],
dates: new Date(),
regexps: /pattern/gi,
maps: new Map([['key', 'value']]),
sets: new Set([1, 2, 3]),
arrayBuffers: new ArrayBuffer(8),
typedArrays: new Uint8Array([1, 2, 3]),
// 循環参照も可能
};
// サポートされないデータ型
const unsupportedTypes = {
functions: () => {}, // 関数は送信不可
symbols: Symbol('test'), // Symbolは送信不可
domNodes: null, // DOMノードは送信不可
weakMaps: new WeakMap(), // WeakMap/WeakSetは送信不可
};
|
Transferable Objects(転送可能オブジェクト)#
大きなArrayBufferをコピーせずに所有権を移転することで、パフォーマンスを向上できます。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
import { Worker, isMainThread, parentPort } from 'node:worker_threads';
import { fileURLToPath } from 'node:url';
if (isMainThread) {
const worker = new Worker(fileURLToPath(import.meta.url));
// 1MBのArrayBufferを作成
const buffer = new ArrayBuffer(1024 * 1024);
const view = new Uint8Array(buffer);
view[0] = 42;
console.log('送信前のバッファサイズ:', buffer.byteLength); // 1048576
// transferListを使用して所有権を移転
worker.postMessage({ buffer }, [buffer]);
console.log('送信後のバッファサイズ:', buffer.byteLength); // 0(切り離された)
worker.on('message', (msg) => {
console.log('処理完了:', msg);
worker.terminate();
});
} else {
parentPort.on('message', (msg) => {
const view = new Uint8Array(msg.buffer);
console.log('受信データ:', view[0]); // 42
parentPort.postMessage('done');
});
}
|
MessageChannelによるカスタム通信#
複雑な通信パターンが必要な場合、MessageChannelを使用して専用の通信チャネルを作成できます。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
import { Worker, isMainThread, parentPort, MessageChannel } from 'node:worker_threads';
import { fileURLToPath } from 'node:url';
if (isMainThread) {
const worker = new Worker(fileURLToPath(import.meta.url));
// カスタムチャネルを作成
const channel = new MessageChannel();
// port1はメインスレッドで使用
channel.port1.on('message', (msg) => {
console.log('カスタムチャネル経由:', msg);
});
// port2をWorkerに送信
worker.postMessage({ port: channel.port2 }, [channel.port2]);
// 通常のメッセージ
worker.on('message', (msg) => {
console.log('通常チャネル:', msg);
});
setTimeout(() => {
channel.port1.close();
worker.terminate();
}, 1000);
} else {
parentPort.on('message', (msg) => {
if (msg.port) {
// カスタムチャネルを通じて通信
msg.port.postMessage('Message via custom channel');
}
});
// 通常チャネルでも通信
parentPort.postMessage('Message via default channel');
}
|
SharedArrayBufferによる共有メモリ#
共有メモリの基本#
SharedArrayBufferを使用すると、スレッド間でメモリを共有できます。データのコピーが不要なため、大量のデータを扱う場合に高速です。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
import { Worker, isMainThread, parentPort, workerData } from 'node:worker_threads';
import { fileURLToPath } from 'node:url';
if (isMainThread) {
// 共有メモリを作成
const sharedBuffer = new SharedArrayBuffer(4); // 4バイト
const sharedArray = new Int32Array(sharedBuffer);
sharedArray[0] = 0;
const worker = new Worker(fileURLToPath(import.meta.url), {
workerData: { sharedBuffer }
});
// 定期的に共有メモリの値を確認
const interval = setInterval(() => {
console.log('メインスレッドから見た値:', sharedArray[0]);
if (sharedArray[0] >= 5) {
clearInterval(interval);
worker.terminate();
}
}, 100);
} else {
const { sharedBuffer } = workerData;
const sharedArray = new Int32Array(sharedBuffer);
// 共有メモリの値を更新
for (let i = 0; i < 5; i++) {
sharedArray[0]++;
console.log('Workerで更新:', sharedArray[0]);
// 処理の間隔を空ける
const start = Date.now();
while (Date.now() - start < 50) {}
}
}
|
Atomicsによる同期処理#
複数スレッドが同時に共有メモリにアクセスすると、データ競合が発生する可能性があります。Atomicsオブジェクトを使用して安全に操作できます。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
import { Worker, isMainThread, parentPort, workerData } from 'node:worker_threads';
import { fileURLToPath } from 'node:url';
if (isMainThread) {
const sharedBuffer = new SharedArrayBuffer(4);
const sharedArray = new Int32Array(sharedBuffer);
sharedArray[0] = 0;
// 複数のWorkerを起動
const workers = [];
const numWorkers = 4;
for (let i = 0; i < numWorkers; i++) {
workers.push(new Worker(fileURLToPath(import.meta.url), {
workerData: { sharedBuffer, workerId: i }
}));
}
// 全Workerの終了を待機
let completed = 0;
workers.forEach(worker => {
worker.on('exit', () => {
completed++;
if (completed === numWorkers) {
console.log('最終結果:', sharedArray[0]);
// 期待値: 4 * 1000 = 4000
}
});
});
} else {
const { sharedBuffer, workerId } = workerData;
const sharedArray = new Int32Array(sharedBuffer);
// Atomics.addでアトミックに加算
for (let i = 0; i < 1000; i++) {
Atomics.add(sharedArray, 0, 1);
}
console.log(`Worker ${workerId} 完了`);
}
|
Atomicsの主要なメソッド#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
import { SharedArrayBuffer } from 'node:worker_threads';
const sharedBuffer = new SharedArrayBuffer(16);
const int32 = new Int32Array(sharedBuffer);
// アトミックな読み書き
Atomics.store(int32, 0, 100); // 値を設定
const value = Atomics.load(int32, 0); // 値を取得
// アトミックな算術演算
Atomics.add(int32, 0, 10); // 加算して旧値を返す
Atomics.sub(int32, 0, 5); // 減算して旧値を返す
// アトミックなビット演算
Atomics.and(int32, 0, 0xFF); // AND演算
Atomics.or(int32, 0, 0x100); // OR演算
Atomics.xor(int32, 0, 0xFF); // XOR演算
// 比較と交換
Atomics.compareExchange(int32, 0, 100, 200); // 100なら200に置換
// 交換
Atomics.exchange(int32, 0, 300); // 値を設定し旧値を返す
// 待機と通知(スレッド同期)
// Atomics.wait(int32, 0, expectedValue, timeout);
// Atomics.notify(int32, 0, count);
|
wait/notifyによるスレッド同期#
Atomics.wait()とAtomics.notify()を使用して、スレッド間で同期を取ることができます。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
import { Worker, isMainThread, parentPort, workerData } from 'node:worker_threads';
import { fileURLToPath } from 'node:url';
if (isMainThread) {
const sharedBuffer = new SharedArrayBuffer(4);
const sharedArray = new Int32Array(sharedBuffer);
sharedArray[0] = 0; // 0: 待機中, 1: 開始
const worker = new Worker(fileURLToPath(import.meta.url), {
workerData: { sharedBuffer }
});
// 2秒後にWorkerに開始を通知
setTimeout(() => {
console.log('メイン: Workerに開始を通知');
Atomics.store(sharedArray, 0, 1);
Atomics.notify(sharedArray, 0, 1);
}, 2000);
worker.on('message', (msg) => {
console.log('Worker完了:', msg);
worker.terminate();
});
} else {
const { sharedBuffer } = workerData;
const sharedArray = new Int32Array(sharedBuffer);
console.log('Worker: 開始を待機中...');
// 値が0の間待機(最大10秒)
const result = Atomics.wait(sharedArray, 0, 0, 10000);
console.log('Worker: 待機終了、結果:', result);
if (result === 'ok') {
parentPort.postMessage('処理完了');
}
}
|
CPUバウンドな処理のオフロード#
素数計算の例#
CPUを大量に消費する計算をWorkerにオフロードする実践的な例です。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
|
import { Worker, isMainThread, parentPort, workerData } from 'node:worker_threads';
import { fileURLToPath } from 'node:url';
// 素数判定関数
function isPrime(n) {
if (n < 2) return false;
if (n === 2) return true;
if (n % 2 === 0) return false;
for (let i = 3; i <= Math.sqrt(n); i += 2) {
if (n % i === 0) return false;
}
return true;
}
// 指定範囲の素数を数える
function countPrimesInRange(start, end) {
let count = 0;
for (let i = start; i <= end; i++) {
if (isPrime(i)) count++;
}
return count;
}
if (isMainThread) {
const max = 10_000_000;
const numWorkers = 4;
const rangeSize = Math.ceil(max / numWorkers);
console.log(`${max}までの素数を${numWorkers}スレッドで計算`);
console.time('並列処理');
const workers = [];
let totalCount = 0;
let completed = 0;
for (let i = 0; i < numWorkers; i++) {
const start = i * rangeSize + 1;
const end = Math.min((i + 1) * rangeSize, max);
const worker = new Worker(fileURLToPath(import.meta.url), {
workerData: { start, end }
});
worker.on('message', (count) => {
totalCount += count;
completed++;
if (completed === numWorkers) {
console.timeEnd('並列処理');
console.log(`素数の個数: ${totalCount}`);
workers.forEach(w => w.terminate());
}
});
workers.push(worker);
}
// 比較用: シングルスレッド
console.time('シングルスレッド');
const singleResult = countPrimesInRange(1, max);
console.timeEnd('シングルスレッド');
console.log(`素数の個数(シングル): ${singleResult}`);
} else {
const { start, end } = workerData;
const count = countPrimesInRange(start, end);
parentPort.postMessage(count);
}
|
Workerプールの実装#
Workerの作成にはオーバーヘッドがあるため、繰り返し使用する場合はプールを実装します。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
|
import { Worker } from 'node:worker_threads';
import { join, dirname } from 'node:path';
import { fileURLToPath } from 'node:url';
import { EventEmitter } from 'node:events';
const __dirname = dirname(fileURLToPath(import.meta.url));
class WorkerPool extends EventEmitter {
constructor(workerPath, numWorkers) {
super();
this.workerPath = workerPath;
this.numWorkers = numWorkers;
this.workers = [];
this.freeWorkers = [];
this.taskQueue = [];
for (let i = 0; i < numWorkers; i++) {
this.addNewWorker();
}
}
addNewWorker() {
const worker = new Worker(this.workerPath);
worker.on('message', (result) => {
// タスク完了時のコールバックを実行
worker.currentResolve(result);
worker.currentResolve = null;
worker.currentReject = null;
// キューに待機中のタスクがあれば実行
if (this.taskQueue.length > 0) {
const { task, resolve, reject } = this.taskQueue.shift();
this.runTask(worker, task, resolve, reject);
} else {
this.freeWorkers.push(worker);
}
});
worker.on('error', (error) => {
if (worker.currentReject) {
worker.currentReject(error);
}
// エラーが発生したWorkerを置き換え
this.workers = this.workers.filter(w => w !== worker);
this.addNewWorker();
});
this.workers.push(worker);
this.freeWorkers.push(worker);
}
runTask(worker, task, resolve, reject) {
worker.currentResolve = resolve;
worker.currentReject = reject;
worker.postMessage(task);
}
exec(task) {
return new Promise((resolve, reject) => {
if (this.freeWorkers.length > 0) {
const worker = this.freeWorkers.pop();
this.runTask(worker, task, resolve, reject);
} else {
// 空きWorkerがなければキューに追加
this.taskQueue.push({ task, resolve, reject });
}
});
}
async close() {
for (const worker of this.workers) {
await worker.terminate();
}
}
}
// 使用例
const pool = new WorkerPool(join(__dirname, 'pool-worker.js'), 4);
async function main() {
const tasks = Array.from({ length: 20 }, (_, i) => ({ value: i }));
const results = await Promise.all(
tasks.map(task => pool.exec(task))
);
console.log('結果:', results);
await pool.close();
}
main();
|
pool-worker.js:
1
2
3
4
5
6
7
8
9
10
11
12
|
import { parentPort } from 'node:worker_threads';
parentPort.on('message', (task) => {
// 重い計算をシミュレート
const result = task.value * 2;
// 処理時間をシミュレート
const start = Date.now();
while (Date.now() - start < 100) {}
parentPort.postMessage(result);
});
|
実践的なユースケース#
画像処理の並列化#
複数の画像を同時に処理する例です。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
|
import { Worker, isMainThread, parentPort, workerData } from 'node:worker_threads';
import { fileURLToPath } from 'node:url';
import { cpus } from 'node:os';
// 画像処理をシミュレート(実際にはsharp等のライブラリを使用)
function processImage(imageData) {
// グレースケール変換をシミュレート
const result = new Uint8Array(imageData.length);
for (let i = 0; i < imageData.length; i += 4) {
const gray = Math.round(
imageData[i] * 0.299 +
imageData[i + 1] * 0.587 +
imageData[i + 2] * 0.114
);
result[i] = gray;
result[i + 1] = gray;
result[i + 2] = gray;
result[i + 3] = imageData[i + 3];
}
return result;
}
if (isMainThread) {
// サンプル画像データを生成(実際にはファイルから読み込み)
const images = Array.from({ length: 8 }, (_, i) => {
const data = new Uint8Array(1024 * 1024 * 4); // 1MP画像を想定
data.fill(i * 30);
return { id: i, data };
});
const numWorkers = cpus().length;
console.log(`${numWorkers}スレッドで${images.length}枚の画像を処理`);
console.time('画像処理');
let completed = 0;
const results = [];
// 画像をWorkerに分配
for (let i = 0; i < Math.min(numWorkers, images.length); i++) {
const image = images.shift();
if (image) {
const worker = new Worker(fileURLToPath(import.meta.url), {
workerData: { id: image.id, data: image.data }
});
worker.on('message', (result) => {
results.push(result);
completed++;
// 次の画像があれば処理
const nextImage = images.shift();
if (nextImage) {
worker.postMessage({ id: nextImage.id, data: nextImage.data });
} else {
worker.terminate();
}
if (completed === 8) {
console.timeEnd('画像処理');
console.log('処理完了:', results.length, '枚');
}
});
}
}
} else {
// 初期データを処理
const result = processImage(workerData.data);
parentPort.postMessage({ id: workerData.id, size: result.length });
// 追加のタスクを受け付け
parentPort.on('message', (msg) => {
const result = processImage(msg.data);
parentPort.postMessage({ id: msg.id, size: result.length });
});
}
|
JSON解析の高速化#
大きなJSONファイルの解析を並列化する例です。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
import { Worker, isMainThread, parentPort, workerData } from 'node:worker_threads';
import { fileURLToPath } from 'node:url';
if (isMainThread) {
// 大きなJSONデータをシミュレート
const largeData = Array.from({ length: 100000 }, (_, i) => ({
id: i,
name: `Item ${i}`,
value: Math.random() * 1000,
tags: ['tag1', 'tag2', 'tag3']
}));
const jsonString = JSON.stringify(largeData);
console.log(`JSONサイズ: ${(jsonString.length / 1024 / 1024).toFixed(2)} MB`);
// シングルスレッドでの解析
console.time('シングルスレッド解析');
const singleResult = JSON.parse(jsonString);
console.timeEnd('シングルスレッド解析');
// Workerでの解析
console.time('Worker解析');
const worker = new Worker(fileURLToPath(import.meta.url), {
workerData: jsonString
});
worker.on('message', (result) => {
console.timeEnd('Worker解析');
console.log('解析結果:', result.length, '件');
worker.terminate();
});
} else {
// Workerスレッドで解析
const parsed = JSON.parse(workerData);
// 追加の処理(フィルタリングなど)
const filtered = parsed.filter(item => item.value > 500);
parentPort.postMessage(filtered);
}
|
エラーハンドリングとデバッグ#
エラーイベントの処理#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
import { Worker, isMainThread, parentPort } from 'node:worker_threads';
import { fileURLToPath } from 'node:url';
if (isMainThread) {
const worker = new Worker(fileURLToPath(import.meta.url));
worker.on('error', (error) => {
console.error('Workerでエラーが発生:', error.message);
console.error('スタックトレース:', error.stack);
});
worker.on('exit', (code) => {
if (code !== 0) {
console.error(`Workerが異常終了 (コード: ${code})`);
}
});
worker.on('messageerror', (error) => {
console.error('メッセージのデシリアライズに失敗:', error);
});
} else {
// 意図的にエラーを発生させる
setTimeout(() => {
throw new Error('Worker内でエラーが発生しました');
}, 100);
}
|
resourceLimitsによるリソース制限#
Workerが使用するリソースを制限できます。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
import { Worker } from 'node:worker_threads';
const worker = new Worker('./heavy-task.js', {
resourceLimits: {
maxOldGenerationSizeMb: 128, // ヒープの最大サイズ
maxYoungGenerationSizeMb: 32, // 新世代ヒープの最大サイズ
codeRangeSizeMb: 16, // コード領域の最大サイズ
stackSizeMb: 4 // スタックサイズ
}
});
worker.on('error', (error) => {
if (error.message.includes('out of memory')) {
console.error('Workerがメモリ制限を超過しました');
}
});
|
デバッグのヒント#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
import { Worker, isMainThread, parentPort, threadId } from 'node:worker_threads';
import { fileURLToPath } from 'node:url';
if (isMainThread) {
const worker = new Worker(fileURLToPath(import.meta.url), {
name: 'CalculationWorker' // デバッグ用の名前
});
console.log(`メインスレッド ID: ${threadId}`);
worker.on('online', () => {
console.log('Workerがオンラインになりました');
});
worker.on('message', console.log);
// 5秒後に強制終了
setTimeout(async () => {
const exitCode = await worker.terminate();
console.log(`Worker終了: ${exitCode}`);
}, 5000);
} else {
console.log(`Workerスレッド ID: ${threadId}`);
// 定期的に状態を報告
setInterval(() => {
const memoryUsage = process.memoryUsage();
parentPort.postMessage({
threadId,
heapUsed: `${(memoryUsage.heapUsed / 1024 / 1024).toFixed(2)} MB`
});
}, 1000);
}
|
ベストプラクティス#
Workerの適切な使用判断#
flowchart TD
A[処理の実行] --> B{処理時間 > 10ms?}
B -->|No| C[メインスレッドで実行]
B -->|Yes| D{I/Oバウンド?}
D -->|Yes| E[非同期APIを使用]
D -->|No| F{繰り返し実行?}
F -->|Yes| G[Workerプールを使用]
F -->|No| H[単発Workerを使用]パフォーマンス最適化のポイント#
- Workerの再利用: 起動コストを避けるためプールを使用
- 転送可能オブジェクト: 大きなArrayBufferは転送リストを使用
- 共有メモリ: 頻繁にデータを交換する場合はSharedArrayBufferを使用
- 適切なWorker数: CPUコア数を目安に調整
1
2
3
4
5
|
import { cpus } from 'node:os';
// CPUコア数に基づいてWorker数を決定
const numCPUs = cpus().length;
const numWorkers = Math.max(1, numCPUs - 1); // 1コアはメインスレッド用に残す
|
メモリリークの防止#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
import { Worker } from 'node:worker_threads';
class ManagedWorker {
constructor(workerPath) {
this.worker = new Worker(workerPath);
this.isTerminated = false;
}
async execute(data) {
if (this.isTerminated) {
throw new Error('Worker is already terminated');
}
return new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
reject(new Error('Worker timeout'));
this.terminate();
}, 30000);
this.worker.once('message', (result) => {
clearTimeout(timeout);
resolve(result);
});
this.worker.once('error', (error) => {
clearTimeout(timeout);
reject(error);
});
this.worker.postMessage(data);
});
}
async terminate() {
if (!this.isTerminated) {
this.isTerminated = true;
await this.worker.terminate();
}
}
}
|
まとめ#
Worker Threadsを使用することで、Node.jsアプリケーションでCPU集約的な処理を効率的に並列実行できます。
本記事で学んだ内容:
- worker_threadsモジュールの基本: Worker、isMainThread、parentPort、workerDataの使い方
- メッセージ通信: postMessage/onによるスレッド間通信と転送可能オブジェクト
- 共有メモリ: SharedArrayBufferとAtomicsによる高速なデータ共有
- 実践パターン: Workerプール、画像処理、JSON解析の並列化
- ベストプラクティス: エラーハンドリング、リソース制限、パフォーマンス最適化
Worker Threadsは強力な機能ですが、すべての処理に適しているわけではありません。I/Oバウンドな処理には従来の非同期APIを使用し、CPUバウンドな処理にのみWorker Threadsを適用することで、アプリケーション全体のパフォーマンスを最大化できます。
参考リンク#