Node.jsでファイルを扱う際、fs.readFilefs.writeFileは手軽ですが、大容量ファイルではメモリを圧迫します。1GBのファイルを読み込めば、1GB以上のメモリが必要になるためです。

この問題を解決するのがStreamです。Streamはデータを小さなチャンク(塊)に分割し、逐次処理することでメモリ使用量を一定に保ちます。本記事では、Node.jsのStream APIを使用した大容量ファイルの効率的な処理方法を、実践的なコード例とともに解説します。

実行環境

項目 バージョン
Node.js 20.x LTS以上
npm 10.x以上
OS Windows/macOS/Linux

前提条件

  • JavaScriptの基礎知識(関数、オブジェクト、async/await)
  • Node.jsの基本操作(スクリプト実行、npm利用)経験
  • fsモジュールの基本的な理解

Streamとは何か

Streamは、データを連続的な流れとして扱うための抽象化です。ファイル全体を一度にメモリに読み込むのではなく、データを小さなチャンクに分割して順次処理します。

従来のファイル読み込みとStreamの比較

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import { readFile } from 'node:fs/promises';
import { createReadStream } from 'node:fs';

// 従来の方法: ファイル全体をメモリに読み込む
// 1GBのファイル → 1GB以上のメモリを消費
async function readEntireFile(filePath) {
  const content = await readFile(filePath);
  console.log(`メモリ使用量: ${content.length} bytes`);
  return content;
}

// Stream: チャンク単位で処理
// 1GBのファイル → 64KB程度のメモリで処理可能
function readWithStream(filePath) {
  const stream = createReadStream(filePath);
  
  stream.on('data', (chunk) => {
    // デフォルトで64KBずつ処理
    console.log(`チャンク受信: ${chunk.length} bytes`);
  });
  
  stream.on('end', () => {
    console.log('ファイル読み込み完了');
  });
}

Streamを使うべきケース

ケース 推奨手法 理由
設定ファイル(数KB) readFile オーバーヘッドが少ない
ログファイル(数百MB〜数GB) Stream メモリ効率が良い
リアルタイムデータ処理 Stream 逐次処理が必要
ファイル変換(圧縮、暗号化) Stream 中間データを保持しない
HTTP レスポンスのストリーミング Stream 応答時間の短縮

4つのStreamタイプを理解する

Node.jsには4つの基本的なStreamタイプがあります。それぞれの特徴と用途を理解することが、効率的なストリーム処理の第一歩です。

Streamタイプの比較表

タイプ 役割 代表例
Readable データの読み取り元 fs.createReadStreamhttp.IncomingMessage
Writable データの書き込み先 fs.createWriteStreamhttp.ServerResponse
Duplex 読み書き両方(独立) net.Socket、TCP接続
Transform データ変換(入力→加工→出力) zlib.createGzip、暗号化ストリーム

Streamタイプの関係図

flowchart LR
    subgraph Readable
        R1[fs.createReadStream]
        R2[http.IncomingMessage]
        R3[process.stdin]
    end
    
    subgraph Writable
        W1[fs.createWriteStream]
        W2[http.ServerResponse]
        W3[process.stdout]
    end
    
    subgraph Transform
        T1[zlib.createGzip]
        T2[crypto.createCipher]
    end
    
    subgraph Duplex
        D1[net.Socket]
        D2[TCP Connection]
    end
    
    R1 --> |pipe| T1
    T1 --> |pipe| W1
    R2 --> |pipe| T2
    T2 --> |pipe| W2
    D1 <--> |双方向| D2

Readable Stream

データを読み取るためのストリームです。ファイル、HTTPリクエスト、標準入力などがこれに該当します。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
import { createReadStream } from 'node:fs';

const readable = createReadStream('./large-file.txt', {
  encoding: 'utf8',
  highWaterMark: 64 * 1024 // 64KB(デフォルト)
});

// dataイベント: チャンクを受け取るたびに発火
readable.on('data', (chunk) => {
  console.log(`受信: ${chunk.length} 文字`);
});

// endイベント: すべてのデータを読み終えたら発火
readable.on('end', () => {
  console.log('読み込み完了');
});

// errorイベント: エラー発生時
readable.on('error', (err) => {
  console.error('エラー:', err.message);
});

Writable Stream

データを書き込むためのストリームです。ファイル、HTTPレスポンス、標準出力などがこれに該当します。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import { createWriteStream } from 'node:fs';

const writable = createWriteStream('./output.txt', {
  encoding: 'utf8',
  highWaterMark: 16 * 1024 // 16KB(デフォルト)
});

// データを書き込む
writable.write('Hello, ');
writable.write('Stream!\n');

// ストリームを終了
writable.end('これが最後のデータです。');

// finishイベント: すべての書き込みが完了したら発火
writable.on('finish', () => {
  console.log('書き込み完了');
});

// errorイベント: エラー発生時
writable.on('error', (err) => {
  console.error('書き込みエラー:', err.message);
});

Transform Stream

データを変換しながら通過させるストリームです。入力を受け取り、加工して出力します。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
import { Transform } from 'node:stream';

// 大文字に変換するTransformストリーム
const upperCaseTransform = new Transform({
  transform(chunk, encoding, callback) {
    // chunkを大文字に変換してpush
    const upperChunk = chunk.toString().toUpperCase();
    callback(null, upperChunk);
  }
});

// 使用例
process.stdin
  .pipe(upperCaseTransform)
  .pipe(process.stdout);

Duplex Stream

読み取りと書き込みの両方が可能なストリームです。ただし、Transformとは異なり、読み取りと書き込みは独立しています。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
import { Duplex } from 'node:stream';

const duplexStream = new Duplex({
  read(size) {
    // 読み取りロジック
    this.push('データを生成');
    this.push(null); // 読み取り終了
  },
  write(chunk, encoding, callback) {
    // 書き込みロジック
    console.log('受信:', chunk.toString());
    callback();
  }
});

createReadStream/createWriteStreamによるファイル処理

大容量ファイルを効率的に処理するための主要なAPIを詳しく見ていきましょう。

createReadStreamの基本オプション

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
import { createReadStream } from 'node:fs';

const readStream = createReadStream('./large-file.txt', {
  // 文字エンコーディング(省略時はBuffer)
  encoding: 'utf8',
  
  // 一度に読み込むバイト数(デフォルト: 64KB)
  highWaterMark: 64 * 1024,
  
  // 読み込み開始位置(バイト)
  start: 0,
  
  // 読み込み終了位置(バイト)
  end: 1024,
  
  // ファイルを自動的に閉じるか
  autoClose: true,
  
  // closeイベントを発火するか
  emitClose: true
});

createWriteStreamの基本オプション

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
import { createWriteStream } from 'node:fs';

const writeStream = createWriteStream('./output.txt', {
  // 文字エンコーディング
  encoding: 'utf8',
  
  // バッファサイズ(デフォルト: 16KB)
  highWaterMark: 16 * 1024,
  
  // 書き込みモード
  // 'w': 新規作成/上書き(デフォルト)
  // 'a': 追記
  // 'wx': 新規作成のみ(ファイル存在時はエラー)
  flags: 'w',
  
  // ファイルパーミッション(デフォルト: 0o666)
  mode: 0o666,
  
  // ファイルを自動的に閉じるか
  autoClose: true
});

実践例: 大容量ログファイルの行数カウント

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import { createReadStream } from 'node:fs';
import { createInterface } from 'node:readline';

async function countLines(filePath) {
  const fileStream = createReadStream(filePath);
  
  const rl = createInterface({
    input: fileStream,
    crlfDelay: Infinity // Windows/Unix両対応
  });
  
  let lineCount = 0;
  
  for await (const line of rl) {
    lineCount++;
    
    // 進捗表示(100万行ごと)
    if (lineCount % 1000000 === 0) {
      console.log(`処理中: ${lineCount.toLocaleString()} 行`);
    }
  }
  
  return lineCount;
}

// 使用例
const filePath = './access.log';
const count = await countLines(filePath);
console.log(`総行数: ${count.toLocaleString()} 行`);

実践例: ファイルのコピー

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import { createReadStream, createWriteStream } from 'node:fs';
import { pipeline } from 'node:stream/promises';

async function copyFile(source, destination) {
  const startTime = Date.now();
  let bytesProcessed = 0;
  
  const readStream = createReadStream(source);
  const writeStream = createWriteStream(destination);
  
  // 進捗監視
  readStream.on('data', (chunk) => {
    bytesProcessed += chunk.length;
  });
  
  try {
    await pipeline(readStream, writeStream);
    
    const duration = (Date.now() - startTime) / 1000;
    const sizeMB = bytesProcessed / (1024 * 1024);
    
    console.log(`コピー完了: ${sizeMB.toFixed(2)} MB in ${duration.toFixed(2)}s`);
    console.log(`速度: ${(sizeMB / duration).toFixed(2)} MB/s`);
  } catch (err) {
    console.error('コピーエラー:', err.message);
    throw err;
  }
}

// 使用例
await copyFile('./large-file.zip', './backup/large-file.zip');

pipeによるストリーム連結

pipeメソッドは、ReadableストリームをWritableストリームに接続し、データの流れを自動的に管理します。

基本的なpipeの使い方

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
import { createReadStream, createWriteStream } from 'node:fs';

const readable = createReadStream('./input.txt');
const writable = createWriteStream('./output.txt');

// readableからwritableへデータを流す
readable.pipe(writable);

// pipeはwritableを返すため、チェーンが可能
// readable.pipe(transform).pipe(writable);

pipeチェーンによるファイル圧縮

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';

function compressFile(inputPath, outputPath) {
  return new Promise((resolve, reject) => {
    const readable = createReadStream(inputPath);
    const gzip = createGzip();
    const writable = createWriteStream(outputPath);
    
    readable
      .pipe(gzip)
      .pipe(writable)
      .on('finish', () => {
        console.log('圧縮完了:', outputPath);
        resolve();
      })
      .on('error', reject);
    
    // エラーハンドリング(各ストリームで必要)
    readable.on('error', reject);
    gzip.on('error', reject);
  });
}

// 使用例
await compressFile('./large-file.txt', './large-file.txt.gz');

pipeの問題点とpipelineの推奨

pipeには重要な問題があります。エラーが発生した場合、ストリームが自動的に閉じられず、メモリリークの原因になります。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
import { createReadStream, createWriteStream } from 'node:fs';
import { pipeline } from 'node:stream/promises';
import { createGzip } from 'node:zlib';

// 推奨: pipelineを使用
async function compressFileWithPipeline(inputPath, outputPath) {
  try {
    await pipeline(
      createReadStream(inputPath),
      createGzip(),
      createWriteStream(outputPath)
    );
    console.log('圧縮完了');
  } catch (err) {
    // pipelineはエラー時に自動的にすべてのストリームを破棄
    console.error('圧縮エラー:', err.message);
    throw err;
  }
}

pipeとpipelineの比較

特徴 pipe pipeline
エラー時のストリーム破棄 手動で必要 自動
Promise対応 なし stream/promisesで対応
エラー伝播 手動でリスナー設定 自動
メモリリーク対策 手動で必要 自動

backpressureの概念と対処法

backpressure(背圧)は、ストリーム処理における重要な概念です。データの生産速度が消費速度を上回ると、メモリが圧迫されます。

backpressureが発生する状況

flowchart LR
    subgraph 問題
        R[Readable<br/>高速読み取り] -->|データ洪水| B[Buffer<br/>膨張]
        B -->|遅い書き込み| W[Writable<br/>低速]
    end
    
    subgraph 解決策
        R2[Readable] -->|pause| B2[Buffer<br/>制御]
        B2 -->|drain| W2[Writable]
        W2 -->|resume| R2
    end

write()の戻り値を確認する

write()メソッドは、内部バッファがhighWaterMarkを超えるとfalseを返します。この場合、drainイベントを待つ必要があります。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import { createWriteStream } from 'node:fs';

function writeWithBackpressure(filePath, data) {
  return new Promise((resolve, reject) => {
    const writable = createWriteStream(filePath);
    let i = 0;
    
    function write() {
      let ok = true;
      
      while (i < data.length && ok) {
        // write()がfalseを返したらバッファがいっぱい
        ok = writable.write(data[i]);
        i++;
      }
      
      if (i < data.length) {
        // バッファが空くまで待機
        writable.once('drain', write);
      } else {
        // すべて書き込み完了
        writable.end();
      }
    }
    
    writable.on('finish', resolve);
    writable.on('error', reject);
    
    write();
  });
}

// 使用例: 100万行のデータを書き込む
const lines = Array.from({ length: 1000000 }, (_, i) => `Line ${i + 1}\n`);
await writeWithBackpressure('./output.txt', lines);

Promiseベースのbackpressure対応

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
import { createWriteStream } from 'node:fs';
import { once } from 'node:events';

async function writeLines(filePath, lines) {
  const writable = createWriteStream(filePath);
  
  for (const line of lines) {
    // write()がfalseを返したらdrainを待つ
    if (!writable.write(line)) {
      await once(writable, 'drain');
    }
  }
  
  // 書き込み終了
  writable.end();
  await once(writable, 'finish');
}

// 使用例
const data = Array.from({ length: 1000000 }, (_, i) => `Data ${i}\n`);
await writeLines('./large-output.txt', data);
console.log('書き込み完了');

highWaterMarkの調整

highWaterMarkはバッファサイズを制御します。適切な値はユースケースによって異なります。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import { createReadStream, createWriteStream } from 'node:fs';

// 大容量ファイルの高速コピー: 大きなバッファ
const fastReadStream = createReadStream('./large-file.bin', {
  highWaterMark: 1024 * 1024 // 1MB
});

// メモリ制約のある環境: 小さなバッファ
const memoryEfficientStream = createReadStream('./large-file.bin', {
  highWaterMark: 16 * 1024 // 16KB
});

// リアルタイム処理: 小さなバッファで低遅延
const realTimeStream = createReadStream('./data-stream.txt', {
  highWaterMark: 256 // 256 bytes
});

カスタムTransformストリームの実装

実践的なTransformストリームの実装例を見ていきましょう。

CSVをJSONに変換するTransformストリーム

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import { Transform } from 'node:stream';
import { createReadStream, createWriteStream } from 'node:fs';
import { pipeline } from 'node:stream/promises';

class CsvToJsonTransform extends Transform {
  constructor(options = {}) {
    super({ ...options, objectMode: true });
    this.headers = null;
    this.buffer = '';
  }
  
  _transform(chunk, encoding, callback) {
    this.buffer += chunk.toString();
    const lines = this.buffer.split('\n');
    
    // 最後の不完全な行はバッファに残す
    this.buffer = lines.pop() || '';
    
    for (const line of lines) {
      if (!line.trim()) continue;
      
      const values = this.parseCsvLine(line);
      
      if (!this.headers) {
        // 最初の行はヘッダー
        this.headers = values;
      } else {
        // データ行をJSONオブジェクトに変換
        const obj = {};
        this.headers.forEach((header, i) => {
          obj[header] = values[i];
        });
        this.push(JSON.stringify(obj) + '\n');
      }
    }
    
    callback();
  }
  
  _flush(callback) {
    // 残りのバッファを処理
    if (this.buffer.trim() && this.headers) {
      const values = this.parseCsvLine(this.buffer);
      const obj = {};
      this.headers.forEach((header, i) => {
        obj[header] = values[i];
      });
      this.push(JSON.stringify(obj) + '\n');
    }
    callback();
  }
  
  parseCsvLine(line) {
    // シンプルなCSVパース(カンマ区切り)
    return line.split(',').map(v => v.trim());
  }
}

// 使用例
async function convertCsvToJson(inputPath, outputPath) {
  await pipeline(
    createReadStream(inputPath),
    new CsvToJsonTransform(),
    createWriteStream(outputPath)
  );
  console.log('変換完了:', outputPath);
}

await convertCsvToJson('./data.csv', './data.jsonl');

行フィルタリングTransformストリーム

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import { Transform } from 'node:stream';
import { createReadStream, createWriteStream } from 'node:fs';
import { pipeline } from 'node:stream/promises';

class LineFilterTransform extends Transform {
  constructor(filterFn, options = {}) {
    super(options);
    this.filterFn = filterFn;
    this.buffer = '';
    this.lineNumber = 0;
  }
  
  _transform(chunk, encoding, callback) {
    this.buffer += chunk.toString();
    const lines = this.buffer.split('\n');
    
    // 最後の不完全な行はバッファに残す
    this.buffer = lines.pop() || '';
    
    for (const line of lines) {
      this.lineNumber++;
      
      if (this.filterFn(line, this.lineNumber)) {
        this.push(line + '\n');
      }
    }
    
    callback();
  }
  
  _flush(callback) {
    if (this.buffer) {
      this.lineNumber++;
      if (this.filterFn(this.buffer, this.lineNumber)) {
        this.push(this.buffer + '\n');
      }
    }
    callback();
  }
}

// 使用例: ERRORを含む行のみ抽出
async function extractErrorLines(logPath, outputPath) {
  const errorFilter = (line) => line.includes('ERROR');
  
  await pipeline(
    createReadStream(logPath),
    new LineFilterTransform(errorFilter),
    createWriteStream(outputPath)
  );
  
  console.log('エラーログ抽出完了');
}

await extractErrorLines('./application.log', './errors.log');

async iteratorを使ったモダンなストリーム処理

Node.jsのReadableストリームはfor await...ofで反復処理できます。

基本的な使い方

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
import { createReadStream } from 'node:fs';

async function processFileWithAsyncIterator(filePath) {
  const stream = createReadStream(filePath, { encoding: 'utf8' });
  
  let totalBytes = 0;
  
  for await (const chunk of stream) {
    totalBytes += chunk.length;
    // チャンクごとの処理
    console.log(`処理中: ${totalBytes} bytes`);
  }
  
  console.log(`完了: 合計 ${totalBytes} bytes`);
}

Readable.fromでイテラブルからストリームを作成

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import { Readable } from 'node:stream';
import { createWriteStream } from 'node:fs';
import { pipeline } from 'node:stream/promises';

// ジェネレータからストリームを作成
async function* generateData() {
  for (let i = 0; i < 1000000; i++) {
    yield `Line ${i}: ${Date.now()}\n`;
    
    // 1000行ごとに少し待機(模擬的なデータ生成)
    if (i % 1000 === 0) {
      await new Promise(resolve => setImmediate(resolve));
    }
  }
}

async function writeGeneratedData(outputPath) {
  const readable = Readable.from(generateData());
  const writable = createWriteStream(outputPath);
  
  await pipeline(readable, writable);
  console.log('データ生成・書き込み完了');
}

await writeGeneratedData('./generated-data.txt');

ストリームのメソッドチェーン

Node.js 17以降では、Readableストリームにmapfilterreduceなどのメソッドが追加されています。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
import { Readable } from 'node:stream';

const numbers = Readable.from([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);

// 偶数のみをフィルタリングして2倍にする
const result = await numbers
  .filter((n) => n % 2 === 0)
  .map((n) => n * 2)
  .toArray();

console.log(result); // [4, 8, 12, 16, 20]

// reduceで合計を計算
const sum = await Readable.from([1, 2, 3, 4, 5])
  .reduce((acc, n) => acc + n, 0);

console.log(sum); // 15

実践例: 大容量CSVファイルの集計処理

実際のユースケースとして、数GB規模のCSVファイルをストリーム処理で集計する例を示します。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import { createReadStream } from 'node:fs';
import { createInterface } from 'node:readline';

async function aggregateSalesData(csvPath) {
  const fileStream = createReadStream(csvPath);
  const rl = createInterface({
    input: fileStream,
    crlfDelay: Infinity
  });
  
  const stats = {
    totalSales: 0,
    totalOrders: 0,
    salesByCategory: {},
    salesByMonth: {}
  };
  
  let isHeader = true;
  let headers = [];
  let processedLines = 0;
  
  console.log('集計処理を開始...');
  
  for await (const line of rl) {
    if (isHeader) {
      headers = line.split(',');
      isHeader = false;
      continue;
    }
    
    const values = line.split(',');
    const row = {};
    headers.forEach((header, i) => {
      row[header.trim()] = values[i]?.trim();
    });
    
    // 集計処理
    const amount = parseFloat(row.amount) || 0;
    const category = row.category || 'Unknown';
    const date = row.date || '';
    const month = date.substring(0, 7); // YYYY-MM
    
    stats.totalSales += amount;
    stats.totalOrders++;
    
    // カテゴリ別集計
    stats.salesByCategory[category] = 
      (stats.salesByCategory[category] || 0) + amount;
    
    // 月別集計
    if (month) {
      stats.salesByMonth[month] = 
        (stats.salesByMonth[month] || 0) + amount;
    }
    
    processedLines++;
    
    // 進捗表示(10万行ごと)
    if (processedLines % 100000 === 0) {
      console.log(`処理中: ${processedLines.toLocaleString()} 行`);
    }
  }
  
  console.log('集計完了');
  console.log(`総注文数: ${stats.totalOrders.toLocaleString()}`);
  console.log(`総売上: ¥${stats.totalSales.toLocaleString()}`);
  
  return stats;
}

// 使用例
const result = await aggregateSalesData('./sales-data.csv');
console.log('カテゴリ別売上:', result.salesByCategory);
console.log('月別売上:', result.salesByMonth);

よくあるエラーと対処法

EMFILE: too many open files

同時に開くファイルが多すぎる場合に発生します。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import { createReadStream, createWriteStream } from 'node:fs';
import { pipeline } from 'node:stream/promises';

// 問題: 同時に大量のファイルを開く
async function badExample(files) {
  // すべてのファイルを同時に処理しようとする
  await Promise.all(files.map(async (file) => {
    await pipeline(
      createReadStream(file.input),
      createWriteStream(file.output)
    );
  }));
}

// 解決策: 並列数を制限する
async function processFilesWithLimit(files, concurrency = 10) {
  const chunks = [];
  
  for (let i = 0; i < files.length; i += concurrency) {
    chunks.push(files.slice(i, i + concurrency));
  }
  
  for (const chunk of chunks) {
    await Promise.all(chunk.map(async (file) => {
      await pipeline(
        createReadStream(file.input),
        createWriteStream(file.output)
      );
    }));
  }
}

ストリームが途中で止まる

dataイベントを登録せずにストリームを作成すると、データが流れません。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
import { createReadStream } from 'node:fs';

// 問題: dataイベントがないと流れない
const stream = createReadStream('./file.txt');
// データが流れない...

// 解決策1: dataイベントを登録
stream.on('data', (chunk) => {
  console.log(chunk);
});

// 解決策2: pipeを使用
stream.pipe(process.stdout);

// 解決策3: resume()を呼び出す
stream.resume();
stream.on('end', () => console.log('完了'));

メモリ使用量が増え続ける

backpressureを無視してデータを書き込み続けると、メモリが膨張します。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import { createWriteStream } from 'node:fs';
import { once } from 'node:events';

// 問題: backpressureを無視
function badWrite(filePath, data) {
  const writable = createWriteStream(filePath);
  for (const item of data) {
    writable.write(item); // 戻り値を無視
  }
  writable.end();
}

// 解決策: drainを待つ
async function goodWrite(filePath, data) {
  const writable = createWriteStream(filePath);
  
  for (const item of data) {
    const canContinue = writable.write(item);
    if (!canContinue) {
      await once(writable, 'drain');
    }
  }
  
  writable.end();
  await once(writable, 'finish');
}

まとめ

本記事では、Node.jsのStreamを使用した大容量ファイル処理について解説しました。

学んだこと

  1. Streamの基本概念: データを小さなチャンクに分割して逐次処理することで、メモリ効率を大幅に改善できる
  2. 4つのStreamタイプ: Readable、Writable、Transform、Duplexそれぞれの役割と用途
  3. createReadStream/createWriteStream: 大容量ファイルの効率的な読み書き方法
  4. pipeとpipeline: ストリームの連結とエラーハンドリング
  5. backpressure: データフローの制御とメモリ管理

ベストプラクティス

  • 大容量ファイル(数十MB以上)にはStreamを使用する
  • pipeよりもpipelineを使用してエラーハンドリングを簡潔にする
  • write()の戻り値を確認し、drainイベントを適切に待つ
  • highWaterMarkをユースケースに応じて調整する
  • for await...ofを使用してモダンな非同期処理を行う

Streamを使いこなすことで、メモリ効率の高い、スケーラブルなNode.jsアプリケーションを構築できます。

参考リンク