Node.jsはシングルスレッドで動作するため、CPUを大量に消費する処理がイベントループをブロックし、アプリケーション全体のレスポンスが低下することがあります。Worker Threadsを使用することで、重い計算処理を別スレッドにオフロードし、メインスレッドの応答性を維持できます。

本記事では、node:worker_threadsモジュールの基本から、スレッド間通信、共有メモリを使った高速なデータ交換まで、実践的なコード例を通じて解説します。

実行環境

項目 バージョン
Node.js 20.x LTS以上
npm 10.x以上
OS Windows/macOS/Linux

前提条件

  • JavaScriptの基礎知識(関数、オブジェクト、Promise/async-await)
  • Node.jsの基本API理解
  • イベントループの基本的な仕組みの理解

Worker Threadsとは

Worker Threadsは、Node.js v10.5.0で実験的に導入され、v12以降で安定版となった機能です。JavaScriptコードを独立したスレッドで並列実行できます。

child_process/clusterとの違い

機能 Worker Threads child_process cluster
実行単位 スレッド プロセス プロセス
メモリ共有 可能(SharedArrayBuffer) 不可 不可
起動コスト
通信方式 postMessage + 共有メモリ IPC(シリアライズ) IPC(シリアライズ)
主な用途 CPUバウンドな処理 外部コマンド実行 HTTPサーバーのスケール

使用すべきケース

flowchart TD
    A[処理の種類を判断] --> B{I/O処理?}
    B -->|Yes| C[非同期APIを使用]
    B -->|No| D{CPU集約的?}
    D -->|Yes| E{Node.jsコード?}
    D -->|No| C
    E -->|Yes| F[Worker Threads]
    E -->|No| G[child_process]

Worker Threadsが適しているケース:

  • 画像・動画のエンコード/デコード
  • 暗号化・ハッシュ計算
  • データの圧縮・解凍
  • 大規模なJSONパース
  • 数値計算・シミュレーション

I/O処理(ファイル読み書き、ネットワーク通信、データベースアクセス)には、Node.js組み込みの非同期APIを使用する方が効率的です。

worker_threadsモジュールの基本

主要なエクスポート

1
2
3
4
5
6
7
8
9
import {
  Worker,           // Workerスレッドを生成するクラス
  isMainThread,     // メインスレッドかどうかの判定
  parentPort,       // 親スレッドとの通信ポート
  workerData,       // 親から渡された初期データ
  threadId,         // スレッドの一意なID
  MessageChannel,   // カスタム通信チャネル
  MessagePort,      // 通信ポート
} from 'node:worker_threads';

最小構成のWorkerスレッド

メインスレッドとWorkerスレッドを同一ファイルに記述する基本パターンです。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import { Worker, isMainThread, parentPort, workerData } from 'node:worker_threads';
import { fileURLToPath } from 'node:url';

if (isMainThread) {
  // メインスレッドの処理
  const worker = new Worker(fileURLToPath(import.meta.url), {
    workerData: { message: 'Hello from main thread' }
  });

  worker.on('message', (result) => {
    console.log('Workerからの応答:', result);
  });

  worker.on('error', (error) => {
    console.error('Workerエラー:', error);
  });

  worker.on('exit', (code) => {
    console.log(`Worker終了 (コード: ${code})`);
  });
} else {
  // Workerスレッドの処理
  console.log('受信データ:', workerData.message);
  parentPort.postMessage('Hello from worker thread');
}

別ファイルでWorkerを定義する

実際のプロジェクトでは、Workerスレッドのコードを別ファイルに分離するのが一般的です。

main.js:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import { Worker } from 'node:worker_threads';
import { join, dirname } from 'node:path';
import { fileURLToPath } from 'node:url';

const __dirname = dirname(fileURLToPath(import.meta.url));

function runWorker(data) {
  return new Promise((resolve, reject) => {
    const worker = new Worker(join(__dirname, 'worker.js'), {
      workerData: data
    });

    worker.on('message', resolve);
    worker.on('error', reject);
    worker.on('exit', (code) => {
      if (code !== 0) {
        reject(new Error(`Worker stopped with exit code ${code}`));
      }
    });
  });
}

// 使用例
const result = await runWorker({ numbers: [1, 2, 3, 4, 5] });
console.log('計算結果:', result);

worker.js:

1
2
3
4
5
6
7
8
import { parentPort, workerData } from 'node:worker_threads';

// 受信したデータを処理
const { numbers } = workerData;
const sum = numbers.reduce((acc, num) => acc + num, 0);

// 結果を親スレッドに送信
parentPort.postMessage({ sum });

メッセージ通信の実装

postMessageとon(‘message’)の基本

スレッド間通信はpostMessage()でメッセージを送信し、on('message')で受信します。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import { Worker, isMainThread, parentPort } from 'node:worker_threads';
import { fileURLToPath } from 'node:url';

if (isMainThread) {
  const worker = new Worker(fileURLToPath(import.meta.url));

  // Workerにメッセージを送信
  worker.postMessage({ type: 'calculate', value: 42 });

  // Workerからのメッセージを受信
  worker.on('message', (msg) => {
    console.log('結果:', msg);
    worker.terminate();
  });
} else {
  // 親スレッドからのメッセージを受信
  parentPort.on('message', (msg) => {
    if (msg.type === 'calculate') {
      const result = msg.value * 2;
      // 結果を親スレッドに送信
      parentPort.postMessage({ type: 'result', value: result });
    }
  });
}

構造化クローンアルゴリズム

postMessage()で送信されるデータは「構造化クローンアルゴリズム」によってコピーされます。以下のデータ型がサポートされています。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
// サポートされるデータ型
const supportedTypes = {
  primitives: [42, 'string', true, null, undefined],
  objects: { nested: { deeply: true } },
  arrays: [1, 2, 3],
  dates: new Date(),
  regexps: /pattern/gi,
  maps: new Map([['key', 'value']]),
  sets: new Set([1, 2, 3]),
  arrayBuffers: new ArrayBuffer(8),
  typedArrays: new Uint8Array([1, 2, 3]),
  // 循環参照も可能
};

// サポートされないデータ型
const unsupportedTypes = {
  functions: () => {},        // 関数は送信不可
  symbols: Symbol('test'),    // Symbolは送信不可
  domNodes: null,             // DOMノードは送信不可
  weakMaps: new WeakMap(),    // WeakMap/WeakSetは送信不可
};

Transferable Objects(転送可能オブジェクト)

大きなArrayBufferをコピーせずに所有権を移転することで、パフォーマンスを向上できます。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import { Worker, isMainThread, parentPort } from 'node:worker_threads';
import { fileURLToPath } from 'node:url';

if (isMainThread) {
  const worker = new Worker(fileURLToPath(import.meta.url));

  // 1MBのArrayBufferを作成
  const buffer = new ArrayBuffer(1024 * 1024);
  const view = new Uint8Array(buffer);
  view[0] = 42;

  console.log('送信前のバッファサイズ:', buffer.byteLength); // 1048576

  // transferListを使用して所有権を移転
  worker.postMessage({ buffer }, [buffer]);

  console.log('送信後のバッファサイズ:', buffer.byteLength); // 0(切り離された)

  worker.on('message', (msg) => {
    console.log('処理完了:', msg);
    worker.terminate();
  });
} else {
  parentPort.on('message', (msg) => {
    const view = new Uint8Array(msg.buffer);
    console.log('受信データ:', view[0]); // 42
    parentPort.postMessage('done');
  });
}

MessageChannelによるカスタム通信

複雑な通信パターンが必要な場合、MessageChannelを使用して専用の通信チャネルを作成できます。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import { Worker, isMainThread, parentPort, MessageChannel } from 'node:worker_threads';
import { fileURLToPath } from 'node:url';

if (isMainThread) {
  const worker = new Worker(fileURLToPath(import.meta.url));

  // カスタムチャネルを作成
  const channel = new MessageChannel();

  // port1はメインスレッドで使用
  channel.port1.on('message', (msg) => {
    console.log('カスタムチャネル経由:', msg);
  });

  // port2をWorkerに送信
  worker.postMessage({ port: channel.port2 }, [channel.port2]);

  // 通常のメッセージ
  worker.on('message', (msg) => {
    console.log('通常チャネル:', msg);
  });

  setTimeout(() => {
    channel.port1.close();
    worker.terminate();
  }, 1000);
} else {
  parentPort.on('message', (msg) => {
    if (msg.port) {
      // カスタムチャネルを通じて通信
      msg.port.postMessage('Message via custom channel');
    }
  });

  // 通常チャネルでも通信
  parentPort.postMessage('Message via default channel');
}

SharedArrayBufferによる共有メモリ

共有メモリの基本

SharedArrayBufferを使用すると、スレッド間でメモリを共有できます。データのコピーが不要なため、大量のデータを扱う場合に高速です。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import { Worker, isMainThread, parentPort, workerData } from 'node:worker_threads';
import { fileURLToPath } from 'node:url';

if (isMainThread) {
  // 共有メモリを作成
  const sharedBuffer = new SharedArrayBuffer(4); // 4バイト
  const sharedArray = new Int32Array(sharedBuffer);
  sharedArray[0] = 0;

  const worker = new Worker(fileURLToPath(import.meta.url), {
    workerData: { sharedBuffer }
  });

  // 定期的に共有メモリの値を確認
  const interval = setInterval(() => {
    console.log('メインスレッドから見た値:', sharedArray[0]);
    if (sharedArray[0] >= 5) {
      clearInterval(interval);
      worker.terminate();
    }
  }, 100);
} else {
  const { sharedBuffer } = workerData;
  const sharedArray = new Int32Array(sharedBuffer);

  // 共有メモリの値を更新
  for (let i = 0; i < 5; i++) {
    sharedArray[0]++;
    console.log('Workerで更新:', sharedArray[0]);
    // 処理の間隔を空ける
    const start = Date.now();
    while (Date.now() - start < 50) {}
  }
}

Atomicsによる同期処理

複数スレッドが同時に共有メモリにアクセスすると、データ競合が発生する可能性があります。Atomicsオブジェクトを使用して安全に操作できます。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import { Worker, isMainThread, parentPort, workerData } from 'node:worker_threads';
import { fileURLToPath } from 'node:url';

if (isMainThread) {
  const sharedBuffer = new SharedArrayBuffer(4);
  const sharedArray = new Int32Array(sharedBuffer);
  sharedArray[0] = 0;

  // 複数のWorkerを起動
  const workers = [];
  const numWorkers = 4;

  for (let i = 0; i < numWorkers; i++) {
    workers.push(new Worker(fileURLToPath(import.meta.url), {
      workerData: { sharedBuffer, workerId: i }
    }));
  }

  // 全Workerの終了を待機
  let completed = 0;
  workers.forEach(worker => {
    worker.on('exit', () => {
      completed++;
      if (completed === numWorkers) {
        console.log('最終結果:', sharedArray[0]);
        // 期待値: 4 * 1000 = 4000
      }
    });
  });
} else {
  const { sharedBuffer, workerId } = workerData;
  const sharedArray = new Int32Array(sharedBuffer);

  // Atomics.addでアトミックに加算
  for (let i = 0; i < 1000; i++) {
    Atomics.add(sharedArray, 0, 1);
  }

  console.log(`Worker ${workerId} 完了`);
}

Atomicsの主要なメソッド

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import { SharedArrayBuffer } from 'node:worker_threads';

const sharedBuffer = new SharedArrayBuffer(16);
const int32 = new Int32Array(sharedBuffer);

// アトミックな読み書き
Atomics.store(int32, 0, 100);      // 値を設定
const value = Atomics.load(int32, 0); // 値を取得

// アトミックな算術演算
Atomics.add(int32, 0, 10);         // 加算して旧値を返す
Atomics.sub(int32, 0, 5);          // 減算して旧値を返す

// アトミックなビット演算
Atomics.and(int32, 0, 0xFF);       // AND演算
Atomics.or(int32, 0, 0x100);       // OR演算
Atomics.xor(int32, 0, 0xFF);       // XOR演算

// 比較と交換
Atomics.compareExchange(int32, 0, 100, 200); // 100なら200に置換

// 交換
Atomics.exchange(int32, 0, 300);   // 値を設定し旧値を返す

// 待機と通知(スレッド同期)
// Atomics.wait(int32, 0, expectedValue, timeout);
// Atomics.notify(int32, 0, count);

wait/notifyによるスレッド同期

Atomics.wait()Atomics.notify()を使用して、スレッド間で同期を取ることができます。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import { Worker, isMainThread, parentPort, workerData } from 'node:worker_threads';
import { fileURLToPath } from 'node:url';

if (isMainThread) {
  const sharedBuffer = new SharedArrayBuffer(4);
  const sharedArray = new Int32Array(sharedBuffer);
  sharedArray[0] = 0; // 0: 待機中, 1: 開始

  const worker = new Worker(fileURLToPath(import.meta.url), {
    workerData: { sharedBuffer }
  });

  // 2秒後にWorkerに開始を通知
  setTimeout(() => {
    console.log('メイン: Workerに開始を通知');
    Atomics.store(sharedArray, 0, 1);
    Atomics.notify(sharedArray, 0, 1);
  }, 2000);

  worker.on('message', (msg) => {
    console.log('Worker完了:', msg);
    worker.terminate();
  });
} else {
  const { sharedBuffer } = workerData;
  const sharedArray = new Int32Array(sharedBuffer);

  console.log('Worker: 開始を待機中...');

  // 値が0の間待機(最大10秒)
  const result = Atomics.wait(sharedArray, 0, 0, 10000);
  console.log('Worker: 待機終了、結果:', result);

  if (result === 'ok') {
    parentPort.postMessage('処理完了');
  }
}

CPUバウンドな処理のオフロード

素数計算の例

CPUを大量に消費する計算をWorkerにオフロードする実践的な例です。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import { Worker, isMainThread, parentPort, workerData } from 'node:worker_threads';
import { fileURLToPath } from 'node:url';

// 素数判定関数
function isPrime(n) {
  if (n < 2) return false;
  if (n === 2) return true;
  if (n % 2 === 0) return false;
  for (let i = 3; i <= Math.sqrt(n); i += 2) {
    if (n % i === 0) return false;
  }
  return true;
}

// 指定範囲の素数を数える
function countPrimesInRange(start, end) {
  let count = 0;
  for (let i = start; i <= end; i++) {
    if (isPrime(i)) count++;
  }
  return count;
}

if (isMainThread) {
  const max = 10_000_000;
  const numWorkers = 4;
  const rangeSize = Math.ceil(max / numWorkers);

  console.log(`${max}までの素数を${numWorkers}スレッドで計算`);
  console.time('並列処理');

  const workers = [];
  let totalCount = 0;
  let completed = 0;

  for (let i = 0; i < numWorkers; i++) {
    const start = i * rangeSize + 1;
    const end = Math.min((i + 1) * rangeSize, max);

    const worker = new Worker(fileURLToPath(import.meta.url), {
      workerData: { start, end }
    });

    worker.on('message', (count) => {
      totalCount += count;
      completed++;

      if (completed === numWorkers) {
        console.timeEnd('並列処理');
        console.log(`素数の個数: ${totalCount}`);
        workers.forEach(w => w.terminate());
      }
    });

    workers.push(worker);
  }

  // 比較用: シングルスレッド
  console.time('シングルスレッド');
  const singleResult = countPrimesInRange(1, max);
  console.timeEnd('シングルスレッド');
  console.log(`素数の個数(シングル): ${singleResult}`);
} else {
  const { start, end } = workerData;
  const count = countPrimesInRange(start, end);
  parentPort.postMessage(count);
}

Workerプールの実装

Workerの作成にはオーバーヘッドがあるため、繰り返し使用する場合はプールを実装します。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import { Worker } from 'node:worker_threads';
import { join, dirname } from 'node:path';
import { fileURLToPath } from 'node:url';
import { EventEmitter } from 'node:events';

const __dirname = dirname(fileURLToPath(import.meta.url));

class WorkerPool extends EventEmitter {
  constructor(workerPath, numWorkers) {
    super();
    this.workerPath = workerPath;
    this.numWorkers = numWorkers;
    this.workers = [];
    this.freeWorkers = [];
    this.taskQueue = [];

    for (let i = 0; i < numWorkers; i++) {
      this.addNewWorker();
    }
  }

  addNewWorker() {
    const worker = new Worker(this.workerPath);

    worker.on('message', (result) => {
      // タスク完了時のコールバックを実行
      worker.currentResolve(result);
      worker.currentResolve = null;
      worker.currentReject = null;

      // キューに待機中のタスクがあれば実行
      if (this.taskQueue.length > 0) {
        const { task, resolve, reject } = this.taskQueue.shift();
        this.runTask(worker, task, resolve, reject);
      } else {
        this.freeWorkers.push(worker);
      }
    });

    worker.on('error', (error) => {
      if (worker.currentReject) {
        worker.currentReject(error);
      }
      // エラーが発生したWorkerを置き換え
      this.workers = this.workers.filter(w => w !== worker);
      this.addNewWorker();
    });

    this.workers.push(worker);
    this.freeWorkers.push(worker);
  }

  runTask(worker, task, resolve, reject) {
    worker.currentResolve = resolve;
    worker.currentReject = reject;
    worker.postMessage(task);
  }

  exec(task) {
    return new Promise((resolve, reject) => {
      if (this.freeWorkers.length > 0) {
        const worker = this.freeWorkers.pop();
        this.runTask(worker, task, resolve, reject);
      } else {
        // 空きWorkerがなければキューに追加
        this.taskQueue.push({ task, resolve, reject });
      }
    });
  }

  async close() {
    for (const worker of this.workers) {
      await worker.terminate();
    }
  }
}

// 使用例
const pool = new WorkerPool(join(__dirname, 'pool-worker.js'), 4);

async function main() {
  const tasks = Array.from({ length: 20 }, (_, i) => ({ value: i }));

  const results = await Promise.all(
    tasks.map(task => pool.exec(task))
  );

  console.log('結果:', results);
  await pool.close();
}

main();

pool-worker.js:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
import { parentPort } from 'node:worker_threads';

parentPort.on('message', (task) => {
  // 重い計算をシミュレート
  const result = task.value * 2;

  // 処理時間をシミュレート
  const start = Date.now();
  while (Date.now() - start < 100) {}

  parentPort.postMessage(result);
});

実践的なユースケース

画像処理の並列化

複数の画像を同時に処理する例です。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import { Worker, isMainThread, parentPort, workerData } from 'node:worker_threads';
import { fileURLToPath } from 'node:url';
import { cpus } from 'node:os';

// 画像処理をシミュレート(実際にはsharp等のライブラリを使用)
function processImage(imageData) {
  // グレースケール変換をシミュレート
  const result = new Uint8Array(imageData.length);
  for (let i = 0; i < imageData.length; i += 4) {
    const gray = Math.round(
      imageData[i] * 0.299 +
      imageData[i + 1] * 0.587 +
      imageData[i + 2] * 0.114
    );
    result[i] = gray;
    result[i + 1] = gray;
    result[i + 2] = gray;
    result[i + 3] = imageData[i + 3];
  }
  return result;
}

if (isMainThread) {
  // サンプル画像データを生成(実際にはファイルから読み込み)
  const images = Array.from({ length: 8 }, (_, i) => {
    const data = new Uint8Array(1024 * 1024 * 4); // 1MP画像を想定
    data.fill(i * 30);
    return { id: i, data };
  });

  const numWorkers = cpus().length;
  console.log(`${numWorkers}スレッドで${images.length}枚の画像を処理`);
  console.time('画像処理');

  let completed = 0;
  const results = [];

  // 画像をWorkerに分配
  for (let i = 0; i < Math.min(numWorkers, images.length); i++) {
    const image = images.shift();
    if (image) {
      const worker = new Worker(fileURLToPath(import.meta.url), {
        workerData: { id: image.id, data: image.data }
      });

      worker.on('message', (result) => {
        results.push(result);
        completed++;

        // 次の画像があれば処理
        const nextImage = images.shift();
        if (nextImage) {
          worker.postMessage({ id: nextImage.id, data: nextImage.data });
        } else {
          worker.terminate();
        }

        if (completed === 8) {
          console.timeEnd('画像処理');
          console.log('処理完了:', results.length, '枚');
        }
      });
    }
  }
} else {
  // 初期データを処理
  const result = processImage(workerData.data);
  parentPort.postMessage({ id: workerData.id, size: result.length });

  // 追加のタスクを受け付け
  parentPort.on('message', (msg) => {
    const result = processImage(msg.data);
    parentPort.postMessage({ id: msg.id, size: result.length });
  });
}

JSON解析の高速化

大きなJSONファイルの解析を並列化する例です。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import { Worker, isMainThread, parentPort, workerData } from 'node:worker_threads';
import { fileURLToPath } from 'node:url';

if (isMainThread) {
  // 大きなJSONデータをシミュレート
  const largeData = Array.from({ length: 100000 }, (_, i) => ({
    id: i,
    name: `Item ${i}`,
    value: Math.random() * 1000,
    tags: ['tag1', 'tag2', 'tag3']
  }));

  const jsonString = JSON.stringify(largeData);
  console.log(`JSONサイズ: ${(jsonString.length / 1024 / 1024).toFixed(2)} MB`);

  // シングルスレッドでの解析
  console.time('シングルスレッド解析');
  const singleResult = JSON.parse(jsonString);
  console.timeEnd('シングルスレッド解析');

  // Workerでの解析
  console.time('Worker解析');
  const worker = new Worker(fileURLToPath(import.meta.url), {
    workerData: jsonString
  });

  worker.on('message', (result) => {
    console.timeEnd('Worker解析');
    console.log('解析結果:', result.length, '件');
    worker.terminate();
  });
} else {
  // Workerスレッドで解析
  const parsed = JSON.parse(workerData);

  // 追加の処理(フィルタリングなど)
  const filtered = parsed.filter(item => item.value > 500);

  parentPort.postMessage(filtered);
}

エラーハンドリングとデバッグ

エラーイベントの処理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import { Worker, isMainThread, parentPort } from 'node:worker_threads';
import { fileURLToPath } from 'node:url';

if (isMainThread) {
  const worker = new Worker(fileURLToPath(import.meta.url));

  worker.on('error', (error) => {
    console.error('Workerでエラーが発生:', error.message);
    console.error('スタックトレース:', error.stack);
  });

  worker.on('exit', (code) => {
    if (code !== 0) {
      console.error(`Workerが異常終了 (コード: ${code})`);
    }
  });

  worker.on('messageerror', (error) => {
    console.error('メッセージのデシリアライズに失敗:', error);
  });
} else {
  // 意図的にエラーを発生させる
  setTimeout(() => {
    throw new Error('Worker内でエラーが発生しました');
  }, 100);
}

resourceLimitsによるリソース制限

Workerが使用するリソースを制限できます。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import { Worker } from 'node:worker_threads';

const worker = new Worker('./heavy-task.js', {
  resourceLimits: {
    maxOldGenerationSizeMb: 128,  // ヒープの最大サイズ
    maxYoungGenerationSizeMb: 32, // 新世代ヒープの最大サイズ
    codeRangeSizeMb: 16,          // コード領域の最大サイズ
    stackSizeMb: 4                // スタックサイズ
  }
});

worker.on('error', (error) => {
  if (error.message.includes('out of memory')) {
    console.error('Workerがメモリ制限を超過しました');
  }
});

デバッグのヒント

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import { Worker, isMainThread, parentPort, threadId } from 'node:worker_threads';
import { fileURLToPath } from 'node:url';

if (isMainThread) {
  const worker = new Worker(fileURLToPath(import.meta.url), {
    name: 'CalculationWorker'  // デバッグ用の名前
  });

  console.log(`メインスレッド ID: ${threadId}`);

  worker.on('online', () => {
    console.log('Workerがオンラインになりました');
  });

  worker.on('message', console.log);

  // 5秒後に強制終了
  setTimeout(async () => {
    const exitCode = await worker.terminate();
    console.log(`Worker終了: ${exitCode}`);
  }, 5000);
} else {
  console.log(`Workerスレッド ID: ${threadId}`);

  // 定期的に状態を報告
  setInterval(() => {
    const memoryUsage = process.memoryUsage();
    parentPort.postMessage({
      threadId,
      heapUsed: `${(memoryUsage.heapUsed / 1024 / 1024).toFixed(2)} MB`
    });
  }, 1000);
}

ベストプラクティス

Workerの適切な使用判断

flowchart TD
    A[処理の実行] --> B{処理時間 > 10ms?}
    B -->|No| C[メインスレッドで実行]
    B -->|Yes| D{I/Oバウンド?}
    D -->|Yes| E[非同期APIを使用]
    D -->|No| F{繰り返し実行?}
    F -->|Yes| G[Workerプールを使用]
    F -->|No| H[単発Workerを使用]

パフォーマンス最適化のポイント

  1. Workerの再利用: 起動コストを避けるためプールを使用
  2. 転送可能オブジェクト: 大きなArrayBufferは転送リストを使用
  3. 共有メモリ: 頻繁にデータを交換する場合はSharedArrayBufferを使用
  4. 適切なWorker数: CPUコア数を目安に調整
1
2
3
4
5
import { cpus } from 'node:os';

// CPUコア数に基づいてWorker数を決定
const numCPUs = cpus().length;
const numWorkers = Math.max(1, numCPUs - 1); // 1コアはメインスレッド用に残す

メモリリークの防止

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import { Worker } from 'node:worker_threads';

class ManagedWorker {
  constructor(workerPath) {
    this.worker = new Worker(workerPath);
    this.isTerminated = false;
  }

  async execute(data) {
    if (this.isTerminated) {
      throw new Error('Worker is already terminated');
    }

    return new Promise((resolve, reject) => {
      const timeout = setTimeout(() => {
        reject(new Error('Worker timeout'));
        this.terminate();
      }, 30000);

      this.worker.once('message', (result) => {
        clearTimeout(timeout);
        resolve(result);
      });

      this.worker.once('error', (error) => {
        clearTimeout(timeout);
        reject(error);
      });

      this.worker.postMessage(data);
    });
  }

  async terminate() {
    if (!this.isTerminated) {
      this.isTerminated = true;
      await this.worker.terminate();
    }
  }
}

まとめ

Worker Threadsを使用することで、Node.jsアプリケーションでCPU集約的な処理を効率的に並列実行できます。

本記事で学んだ内容:

  • worker_threadsモジュールの基本: Worker、isMainThread、parentPort、workerDataの使い方
  • メッセージ通信: postMessage/onによるスレッド間通信と転送可能オブジェクト
  • 共有メモリ: SharedArrayBufferとAtomicsによる高速なデータ共有
  • 実践パターン: Workerプール、画像処理、JSON解析の並列化
  • ベストプラクティス: エラーハンドリング、リソース制限、パフォーマンス最適化

Worker Threadsは強力な機能ですが、すべての処理に適しているわけではありません。I/Oバウンドな処理には従来の非同期APIを使用し、CPUバウンドな処理にのみWorker Threadsを適用することで、アプリケーション全体のパフォーマンスを最大化できます。

参考リンク