Node.jsでファイルを扱う際、fs.readFileやfs.writeFileは手軽ですが、大容量ファイルではメモリを圧迫します。1GBのファイルを読み込めば、1GB以上のメモリが必要になるためです。
この問題を解決するのがStreamです。Streamはデータを小さなチャンク(塊)に分割し、逐次処理することでメモリ使用量を一定に保ちます。本記事では、Node.jsのStream APIを使用した大容量ファイルの効率的な処理方法を、実践的なコード例とともに解説します。
実行環境#
| 項目 |
バージョン |
| Node.js |
20.x LTS以上 |
| npm |
10.x以上 |
| OS |
Windows/macOS/Linux |
前提条件#
- JavaScriptの基礎知識(関数、オブジェクト、async/await)
- Node.jsの基本操作(スクリプト実行、npm利用)経験
- fsモジュールの基本的な理解
Streamとは何か#
Streamは、データを連続的な流れとして扱うための抽象化です。ファイル全体を一度にメモリに読み込むのではなく、データを小さなチャンクに分割して順次処理します。
従来のファイル読み込みとStreamの比較#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
import { readFile } from 'node:fs/promises';
import { createReadStream } from 'node:fs';
// 従来の方法: ファイル全体をメモリに読み込む
// 1GBのファイル → 1GB以上のメモリを消費
async function readEntireFile(filePath) {
const content = await readFile(filePath);
console.log(`メモリ使用量: ${content.length} bytes`);
return content;
}
// Stream: チャンク単位で処理
// 1GBのファイル → 64KB程度のメモリで処理可能
function readWithStream(filePath) {
const stream = createReadStream(filePath);
stream.on('data', (chunk) => {
// デフォルトで64KBずつ処理
console.log(`チャンク受信: ${chunk.length} bytes`);
});
stream.on('end', () => {
console.log('ファイル読み込み完了');
});
}
|
Streamを使うべきケース#
| ケース |
推奨手法 |
理由 |
| 設定ファイル(数KB) |
readFile |
オーバーヘッドが少ない |
| ログファイル(数百MB〜数GB) |
Stream |
メモリ効率が良い |
| リアルタイムデータ処理 |
Stream |
逐次処理が必要 |
| ファイル変換(圧縮、暗号化) |
Stream |
中間データを保持しない |
| HTTP レスポンスのストリーミング |
Stream |
応答時間の短縮 |
4つのStreamタイプを理解する#
Node.jsには4つの基本的なStreamタイプがあります。それぞれの特徴と用途を理解することが、効率的なストリーム処理の第一歩です。
Streamタイプの比較表#
| タイプ |
役割 |
代表例 |
| Readable |
データの読み取り元 |
fs.createReadStream、http.IncomingMessage |
| Writable |
データの書き込み先 |
fs.createWriteStream、http.ServerResponse |
| Duplex |
読み書き両方(独立) |
net.Socket、TCP接続 |
| Transform |
データ変換(入力→加工→出力) |
zlib.createGzip、暗号化ストリーム |
Streamタイプの関係図#
flowchart LR
subgraph Readable
R1[fs.createReadStream]
R2[http.IncomingMessage]
R3[process.stdin]
end
subgraph Writable
W1[fs.createWriteStream]
W2[http.ServerResponse]
W3[process.stdout]
end
subgraph Transform
T1[zlib.createGzip]
T2[crypto.createCipher]
end
subgraph Duplex
D1[net.Socket]
D2[TCP Connection]
end
R1 --> |pipe| T1
T1 --> |pipe| W1
R2 --> |pipe| T2
T2 --> |pipe| W2
D1 <--> |双方向| D2Readable Stream#
データを読み取るためのストリームです。ファイル、HTTPリクエスト、標準入力などがこれに該当します。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
import { createReadStream } from 'node:fs';
const readable = createReadStream('./large-file.txt', {
encoding: 'utf8',
highWaterMark: 64 * 1024 // 64KB(デフォルト)
});
// dataイベント: チャンクを受け取るたびに発火
readable.on('data', (chunk) => {
console.log(`受信: ${chunk.length} 文字`);
});
// endイベント: すべてのデータを読み終えたら発火
readable.on('end', () => {
console.log('読み込み完了');
});
// errorイベント: エラー発生時
readable.on('error', (err) => {
console.error('エラー:', err.message);
});
|
Writable Stream#
データを書き込むためのストリームです。ファイル、HTTPレスポンス、標準出力などがこれに該当します。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
import { createWriteStream } from 'node:fs';
const writable = createWriteStream('./output.txt', {
encoding: 'utf8',
highWaterMark: 16 * 1024 // 16KB(デフォルト)
});
// データを書き込む
writable.write('Hello, ');
writable.write('Stream!\n');
// ストリームを終了
writable.end('これが最後のデータです。');
// finishイベント: すべての書き込みが完了したら発火
writable.on('finish', () => {
console.log('書き込み完了');
});
// errorイベント: エラー発生時
writable.on('error', (err) => {
console.error('書き込みエラー:', err.message);
});
|
データを変換しながら通過させるストリームです。入力を受け取り、加工して出力します。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
import { Transform } from 'node:stream';
// 大文字に変換するTransformストリーム
const upperCaseTransform = new Transform({
transform(chunk, encoding, callback) {
// chunkを大文字に変換してpush
const upperChunk = chunk.toString().toUpperCase();
callback(null, upperChunk);
}
});
// 使用例
process.stdin
.pipe(upperCaseTransform)
.pipe(process.stdout);
|
Duplex Stream#
読み取りと書き込みの両方が可能なストリームです。ただし、Transformとは異なり、読み取りと書き込みは独立しています。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
import { Duplex } from 'node:stream';
const duplexStream = new Duplex({
read(size) {
// 読み取りロジック
this.push('データを生成');
this.push(null); // 読み取り終了
},
write(chunk, encoding, callback) {
// 書き込みロジック
console.log('受信:', chunk.toString());
callback();
}
});
|
createReadStream/createWriteStreamによるファイル処理#
大容量ファイルを効率的に処理するための主要なAPIを詳しく見ていきましょう。
createReadStreamの基本オプション#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
import { createReadStream } from 'node:fs';
const readStream = createReadStream('./large-file.txt', {
// 文字エンコーディング(省略時はBuffer)
encoding: 'utf8',
// 一度に読み込むバイト数(デフォルト: 64KB)
highWaterMark: 64 * 1024,
// 読み込み開始位置(バイト)
start: 0,
// 読み込み終了位置(バイト)
end: 1024,
// ファイルを自動的に閉じるか
autoClose: true,
// closeイベントを発火するか
emitClose: true
});
|
createWriteStreamの基本オプション#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
import { createWriteStream } from 'node:fs';
const writeStream = createWriteStream('./output.txt', {
// 文字エンコーディング
encoding: 'utf8',
// バッファサイズ(デフォルト: 16KB)
highWaterMark: 16 * 1024,
// 書き込みモード
// 'w': 新規作成/上書き(デフォルト)
// 'a': 追記
// 'wx': 新規作成のみ(ファイル存在時はエラー)
flags: 'w',
// ファイルパーミッション(デフォルト: 0o666)
mode: 0o666,
// ファイルを自動的に閉じるか
autoClose: true
});
|
実践例: 大容量ログファイルの行数カウント#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
import { createReadStream } from 'node:fs';
import { createInterface } from 'node:readline';
async function countLines(filePath) {
const fileStream = createReadStream(filePath);
const rl = createInterface({
input: fileStream,
crlfDelay: Infinity // Windows/Unix両対応
});
let lineCount = 0;
for await (const line of rl) {
lineCount++;
// 進捗表示(100万行ごと)
if (lineCount % 1000000 === 0) {
console.log(`処理中: ${lineCount.toLocaleString()} 行`);
}
}
return lineCount;
}
// 使用例
const filePath = './access.log';
const count = await countLines(filePath);
console.log(`総行数: ${count.toLocaleString()} 行`);
|
実践例: ファイルのコピー#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
import { createReadStream, createWriteStream } from 'node:fs';
import { pipeline } from 'node:stream/promises';
async function copyFile(source, destination) {
const startTime = Date.now();
let bytesProcessed = 0;
const readStream = createReadStream(source);
const writeStream = createWriteStream(destination);
// 進捗監視
readStream.on('data', (chunk) => {
bytesProcessed += chunk.length;
});
try {
await pipeline(readStream, writeStream);
const duration = (Date.now() - startTime) / 1000;
const sizeMB = bytesProcessed / (1024 * 1024);
console.log(`コピー完了: ${sizeMB.toFixed(2)} MB in ${duration.toFixed(2)}s`);
console.log(`速度: ${(sizeMB / duration).toFixed(2)} MB/s`);
} catch (err) {
console.error('コピーエラー:', err.message);
throw err;
}
}
// 使用例
await copyFile('./large-file.zip', './backup/large-file.zip');
|
pipeによるストリーム連結#
pipeメソッドは、ReadableストリームをWritableストリームに接続し、データの流れを自動的に管理します。
基本的なpipeの使い方#
1
2
3
4
5
6
7
8
9
10
|
import { createReadStream, createWriteStream } from 'node:fs';
const readable = createReadStream('./input.txt');
const writable = createWriteStream('./output.txt');
// readableからwritableへデータを流す
readable.pipe(writable);
// pipeはwritableを返すため、チェーンが可能
// readable.pipe(transform).pipe(writable);
|
pipeチェーンによるファイル圧縮#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';
function compressFile(inputPath, outputPath) {
return new Promise((resolve, reject) => {
const readable = createReadStream(inputPath);
const gzip = createGzip();
const writable = createWriteStream(outputPath);
readable
.pipe(gzip)
.pipe(writable)
.on('finish', () => {
console.log('圧縮完了:', outputPath);
resolve();
})
.on('error', reject);
// エラーハンドリング(各ストリームで必要)
readable.on('error', reject);
gzip.on('error', reject);
});
}
// 使用例
await compressFile('./large-file.txt', './large-file.txt.gz');
|
pipeの問題点とpipelineの推奨#
pipeには重要な問題があります。エラーが発生した場合、ストリームが自動的に閉じられず、メモリリークの原因になります。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
import { createReadStream, createWriteStream } from 'node:fs';
import { pipeline } from 'node:stream/promises';
import { createGzip } from 'node:zlib';
// 推奨: pipelineを使用
async function compressFileWithPipeline(inputPath, outputPath) {
try {
await pipeline(
createReadStream(inputPath),
createGzip(),
createWriteStream(outputPath)
);
console.log('圧縮完了');
} catch (err) {
// pipelineはエラー時に自動的にすべてのストリームを破棄
console.error('圧縮エラー:', err.message);
throw err;
}
}
|
pipeとpipelineの比較#
| 特徴 |
pipe |
pipeline |
| エラー時のストリーム破棄 |
手動で必要 |
自動 |
| Promise対応 |
なし |
stream/promisesで対応 |
| エラー伝播 |
手動でリスナー設定 |
自動 |
| メモリリーク対策 |
手動で必要 |
自動 |
backpressureの概念と対処法#
backpressure(背圧)は、ストリーム処理における重要な概念です。データの生産速度が消費速度を上回ると、メモリが圧迫されます。
backpressureが発生する状況#
flowchart LR
subgraph 問題
R[Readable<br/>高速読み取り] -->|データ洪水| B[Buffer<br/>膨張]
B -->|遅い書き込み| W[Writable<br/>低速]
end
subgraph 解決策
R2[Readable] -->|pause| B2[Buffer<br/>制御]
B2 -->|drain| W2[Writable]
W2 -->|resume| R2
endwrite()の戻り値を確認する#
write()メソッドは、内部バッファがhighWaterMarkを超えるとfalseを返します。この場合、drainイベントを待つ必要があります。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
import { createWriteStream } from 'node:fs';
function writeWithBackpressure(filePath, data) {
return new Promise((resolve, reject) => {
const writable = createWriteStream(filePath);
let i = 0;
function write() {
let ok = true;
while (i < data.length && ok) {
// write()がfalseを返したらバッファがいっぱい
ok = writable.write(data[i]);
i++;
}
if (i < data.length) {
// バッファが空くまで待機
writable.once('drain', write);
} else {
// すべて書き込み完了
writable.end();
}
}
writable.on('finish', resolve);
writable.on('error', reject);
write();
});
}
// 使用例: 100万行のデータを書き込む
const lines = Array.from({ length: 1000000 }, (_, i) => `Line ${i + 1}\n`);
await writeWithBackpressure('./output.txt', lines);
|
Promiseベースのbackpressure対応#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
import { createWriteStream } from 'node:fs';
import { once } from 'node:events';
async function writeLines(filePath, lines) {
const writable = createWriteStream(filePath);
for (const line of lines) {
// write()がfalseを返したらdrainを待つ
if (!writable.write(line)) {
await once(writable, 'drain');
}
}
// 書き込み終了
writable.end();
await once(writable, 'finish');
}
// 使用例
const data = Array.from({ length: 1000000 }, (_, i) => `Data ${i}\n`);
await writeLines('./large-output.txt', data);
console.log('書き込み完了');
|
highWaterMarkの調整#
highWaterMarkはバッファサイズを制御します。適切な値はユースケースによって異なります。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
import { createReadStream, createWriteStream } from 'node:fs';
// 大容量ファイルの高速コピー: 大きなバッファ
const fastReadStream = createReadStream('./large-file.bin', {
highWaterMark: 1024 * 1024 // 1MB
});
// メモリ制約のある環境: 小さなバッファ
const memoryEfficientStream = createReadStream('./large-file.bin', {
highWaterMark: 16 * 1024 // 16KB
});
// リアルタイム処理: 小さなバッファで低遅延
const realTimeStream = createReadStream('./data-stream.txt', {
highWaterMark: 256 // 256 bytes
});
|
実践的なTransformストリームの実装例を見ていきましょう。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
|
import { Transform } from 'node:stream';
import { createReadStream, createWriteStream } from 'node:fs';
import { pipeline } from 'node:stream/promises';
class CsvToJsonTransform extends Transform {
constructor(options = {}) {
super({ ...options, objectMode: true });
this.headers = null;
this.buffer = '';
}
_transform(chunk, encoding, callback) {
this.buffer += chunk.toString();
const lines = this.buffer.split('\n');
// 最後の不完全な行はバッファに残す
this.buffer = lines.pop() || '';
for (const line of lines) {
if (!line.trim()) continue;
const values = this.parseCsvLine(line);
if (!this.headers) {
// 最初の行はヘッダー
this.headers = values;
} else {
// データ行をJSONオブジェクトに変換
const obj = {};
this.headers.forEach((header, i) => {
obj[header] = values[i];
});
this.push(JSON.stringify(obj) + '\n');
}
}
callback();
}
_flush(callback) {
// 残りのバッファを処理
if (this.buffer.trim() && this.headers) {
const values = this.parseCsvLine(this.buffer);
const obj = {};
this.headers.forEach((header, i) => {
obj[header] = values[i];
});
this.push(JSON.stringify(obj) + '\n');
}
callback();
}
parseCsvLine(line) {
// シンプルなCSVパース(カンマ区切り)
return line.split(',').map(v => v.trim());
}
}
// 使用例
async function convertCsvToJson(inputPath, outputPath) {
await pipeline(
createReadStream(inputPath),
new CsvToJsonTransform(),
createWriteStream(outputPath)
);
console.log('変換完了:', outputPath);
}
await convertCsvToJson('./data.csv', './data.jsonl');
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
|
import { Transform } from 'node:stream';
import { createReadStream, createWriteStream } from 'node:fs';
import { pipeline } from 'node:stream/promises';
class LineFilterTransform extends Transform {
constructor(filterFn, options = {}) {
super(options);
this.filterFn = filterFn;
this.buffer = '';
this.lineNumber = 0;
}
_transform(chunk, encoding, callback) {
this.buffer += chunk.toString();
const lines = this.buffer.split('\n');
// 最後の不完全な行はバッファに残す
this.buffer = lines.pop() || '';
for (const line of lines) {
this.lineNumber++;
if (this.filterFn(line, this.lineNumber)) {
this.push(line + '\n');
}
}
callback();
}
_flush(callback) {
if (this.buffer) {
this.lineNumber++;
if (this.filterFn(this.buffer, this.lineNumber)) {
this.push(this.buffer + '\n');
}
}
callback();
}
}
// 使用例: ERRORを含む行のみ抽出
async function extractErrorLines(logPath, outputPath) {
const errorFilter = (line) => line.includes('ERROR');
await pipeline(
createReadStream(logPath),
new LineFilterTransform(errorFilter),
createWriteStream(outputPath)
);
console.log('エラーログ抽出完了');
}
await extractErrorLines('./application.log', './errors.log');
|
async iteratorを使ったモダンなストリーム処理#
Node.jsのReadableストリームはfor await...ofで反復処理できます。
基本的な使い方#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
import { createReadStream } from 'node:fs';
async function processFileWithAsyncIterator(filePath) {
const stream = createReadStream(filePath, { encoding: 'utf8' });
let totalBytes = 0;
for await (const chunk of stream) {
totalBytes += chunk.length;
// チャンクごとの処理
console.log(`処理中: ${totalBytes} bytes`);
}
console.log(`完了: 合計 ${totalBytes} bytes`);
}
|
Readable.fromでイテラブルからストリームを作成#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
import { Readable } from 'node:stream';
import { createWriteStream } from 'node:fs';
import { pipeline } from 'node:stream/promises';
// ジェネレータからストリームを作成
async function* generateData() {
for (let i = 0; i < 1000000; i++) {
yield `Line ${i}: ${Date.now()}\n`;
// 1000行ごとに少し待機(模擬的なデータ生成)
if (i % 1000 === 0) {
await new Promise(resolve => setImmediate(resolve));
}
}
}
async function writeGeneratedData(outputPath) {
const readable = Readable.from(generateData());
const writable = createWriteStream(outputPath);
await pipeline(readable, writable);
console.log('データ生成・書き込み完了');
}
await writeGeneratedData('./generated-data.txt');
|
ストリームのメソッドチェーン#
Node.js 17以降では、Readableストリームにmap、filter、reduceなどのメソッドが追加されています。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
import { Readable } from 'node:stream';
const numbers = Readable.from([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
// 偶数のみをフィルタリングして2倍にする
const result = await numbers
.filter((n) => n % 2 === 0)
.map((n) => n * 2)
.toArray();
console.log(result); // [4, 8, 12, 16, 20]
// reduceで合計を計算
const sum = await Readable.from([1, 2, 3, 4, 5])
.reduce((acc, n) => acc + n, 0);
console.log(sum); // 15
|
実践例: 大容量CSVファイルの集計処理#
実際のユースケースとして、数GB規模のCSVファイルをストリーム処理で集計する例を示します。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
|
import { createReadStream } from 'node:fs';
import { createInterface } from 'node:readline';
async function aggregateSalesData(csvPath) {
const fileStream = createReadStream(csvPath);
const rl = createInterface({
input: fileStream,
crlfDelay: Infinity
});
const stats = {
totalSales: 0,
totalOrders: 0,
salesByCategory: {},
salesByMonth: {}
};
let isHeader = true;
let headers = [];
let processedLines = 0;
console.log('集計処理を開始...');
for await (const line of rl) {
if (isHeader) {
headers = line.split(',');
isHeader = false;
continue;
}
const values = line.split(',');
const row = {};
headers.forEach((header, i) => {
row[header.trim()] = values[i]?.trim();
});
// 集計処理
const amount = parseFloat(row.amount) || 0;
const category = row.category || 'Unknown';
const date = row.date || '';
const month = date.substring(0, 7); // YYYY-MM
stats.totalSales += amount;
stats.totalOrders++;
// カテゴリ別集計
stats.salesByCategory[category] =
(stats.salesByCategory[category] || 0) + amount;
// 月別集計
if (month) {
stats.salesByMonth[month] =
(stats.salesByMonth[month] || 0) + amount;
}
processedLines++;
// 進捗表示(10万行ごと)
if (processedLines % 100000 === 0) {
console.log(`処理中: ${processedLines.toLocaleString()} 行`);
}
}
console.log('集計完了');
console.log(`総注文数: ${stats.totalOrders.toLocaleString()}`);
console.log(`総売上: ¥${stats.totalSales.toLocaleString()}`);
return stats;
}
// 使用例
const result = await aggregateSalesData('./sales-data.csv');
console.log('カテゴリ別売上:', result.salesByCategory);
console.log('月別売上:', result.salesByMonth);
|
よくあるエラーと対処法#
EMFILE: too many open files#
同時に開くファイルが多すぎる場合に発生します。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
import { createReadStream, createWriteStream } from 'node:fs';
import { pipeline } from 'node:stream/promises';
// 問題: 同時に大量のファイルを開く
async function badExample(files) {
// すべてのファイルを同時に処理しようとする
await Promise.all(files.map(async (file) => {
await pipeline(
createReadStream(file.input),
createWriteStream(file.output)
);
}));
}
// 解決策: 並列数を制限する
async function processFilesWithLimit(files, concurrency = 10) {
const chunks = [];
for (let i = 0; i < files.length; i += concurrency) {
chunks.push(files.slice(i, i + concurrency));
}
for (const chunk of chunks) {
await Promise.all(chunk.map(async (file) => {
await pipeline(
createReadStream(file.input),
createWriteStream(file.output)
);
}));
}
}
|
ストリームが途中で止まる#
dataイベントを登録せずにストリームを作成すると、データが流れません。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
import { createReadStream } from 'node:fs';
// 問題: dataイベントがないと流れない
const stream = createReadStream('./file.txt');
// データが流れない...
// 解決策1: dataイベントを登録
stream.on('data', (chunk) => {
console.log(chunk);
});
// 解決策2: pipeを使用
stream.pipe(process.stdout);
// 解決策3: resume()を呼び出す
stream.resume();
stream.on('end', () => console.log('完了'));
|
メモリ使用量が増え続ける#
backpressureを無視してデータを書き込み続けると、メモリが膨張します。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
import { createWriteStream } from 'node:fs';
import { once } from 'node:events';
// 問題: backpressureを無視
function badWrite(filePath, data) {
const writable = createWriteStream(filePath);
for (const item of data) {
writable.write(item); // 戻り値を無視
}
writable.end();
}
// 解決策: drainを待つ
async function goodWrite(filePath, data) {
const writable = createWriteStream(filePath);
for (const item of data) {
const canContinue = writable.write(item);
if (!canContinue) {
await once(writable, 'drain');
}
}
writable.end();
await once(writable, 'finish');
}
|
まとめ#
本記事では、Node.jsのStreamを使用した大容量ファイル処理について解説しました。
学んだこと#
- Streamの基本概念: データを小さなチャンクに分割して逐次処理することで、メモリ効率を大幅に改善できる
- 4つのStreamタイプ: Readable、Writable、Transform、Duplexそれぞれの役割と用途
- createReadStream/createWriteStream: 大容量ファイルの効率的な読み書き方法
- pipeとpipeline: ストリームの連結とエラーハンドリング
- backpressure: データフローの制御とメモリ管理
ベストプラクティス#
- 大容量ファイル(数十MB以上)にはStreamを使用する
pipeよりもpipelineを使用してエラーハンドリングを簡潔にする
write()の戻り値を確認し、drainイベントを適切に待つ
highWaterMarkをユースケースに応じて調整する
for await...ofを使用してモダンな非同期処理を行う
Streamを使いこなすことで、メモリ効率の高い、スケーラブルなNode.jsアプリケーションを構築できます。
参考リンク#