はじめに

Webアプリケーションやバックエンドサービスを開発する際、データベースへの接続は避けて通れない基本的な作業です。しかし、単に接続できるだけでは不十分です。本番環境で安定稼働させるためには、コネクションプーリングによる効率的な接続管理、SQLインジェクション対策によるセキュリティ確保、プリペアドステートメントによるパフォーマンス最適化が必要になります。

本記事では、Node.js、Python、Goの3つの主要言語でPostgreSQLに接続する方法を実践的に解説します。各言語のデファクトスタンダードとなっているドライバーを使用し、実際のプロジェクトで使えるコード例を提供します。

この記事を読むことで、以下のことができるようになります。

  • 各言語(Node.js/Python/Go)でPostgreSQLに安全に接続できる
  • コネクションプーリングを実装してリソースを効率的に管理できる
  • プリペアドステートメントでSQLインジェクションを防止できる
  • PgBouncerを導入してアプリケーションをスケールできる
  • ORMと低レベルドライバーを適切に使い分けられる

前提条件

  • PostgreSQL 14以降がインストールされていること
  • 各言語の開発環境がセットアップ済みであること
    • Node.js 18以降
    • Python 3.10以降
    • Go 1.21以降
  • 基本的なSQL文(SELECT/INSERT/UPDATE/DELETE)を理解していること

PostgreSQL接続の基本概念

接続文字列(Connection String)

PostgreSQLへの接続には、接続文字列(DSN: Data Source Name)を使用します。これはデータベースの場所と認証情報を含む文字列です。

1
postgresql://username:password@hostname:port/database?parameter=value
要素 説明
username データベースユーザー名 postgres
password パスワード mysecretpassword
hostname サーバーのホスト名/IP localhost
port ポート番号 5432(デフォルト)
database データベース名 myapp_db
parameter 追加パラメータ sslmode=require

接続の流れ

アプリケーションからPostgreSQLに接続する際の基本的な流れを理解しておきましょう。

sequenceDiagram
    participant App as アプリケーション
    participant Driver as DBドライバー
    participant PG as PostgreSQL
    
    App->>Driver: 接続リクエスト
    Driver->>PG: TCP接続確立
    PG-->>Driver: 認証チャレンジ
    Driver->>PG: 認証情報送信
    PG-->>Driver: 認証成功
    Driver-->>App: コネクションオブジェクト返却
    
    App->>Driver: クエリ実行
    Driver->>PG: SQLクエリ送信
    PG-->>Driver: 結果セット返却
    Driver-->>App: 結果オブジェクト返却
    
    App->>Driver: 接続クローズ
    Driver->>PG: 切断

Node.jsでPostgreSQLに接続する

node-postgres(pg)の概要

Node.jsでPostgreSQLに接続する際の標準的なライブラリがnode-postgres(npm package名: pg)です。Pure JavaScriptで実装されており、Promise/async-awaitをサポートしています。

主な特徴は以下の通りです。

  • コールバック、Promise、async/awaitすべてをサポート
  • コネクションプーリング機能を内蔵
  • プリペアドステートメント対応
  • カーソルによるストリーミング結果取得
  • SSL/TLS接続サポート

インストールと基本接続

まずパッケージをインストールします。

1
npm install pg

TypeScriptを使用する場合は型定義もインストールします。

1
npm install pg @types/pg

基本的な接続コードは以下の通りです。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import { Client } from 'pg';

async function main() {
  const client = new Client({
    host: 'localhost',
    port: 5432,
    database: 'myapp_db',
    user: 'postgres',
    password: 'mysecretpassword',
  });

  try {
    await client.connect();
    
    const result = await client.query('SELECT NOW() as current_time');
    console.log('現在時刻:', result.rows[0].current_time);
    
  } catch (error) {
    console.error('データベースエラー:', error);
  } finally {
    await client.end();
  }
}

main();

環境変数を使用した接続も可能です。pgライブラリは以下の環境変数を自動的に読み取ります。

環境変数 説明
PGHOST ホスト名
PGPORT ポート番号
PGDATABASE データベース名
PGUSER ユーザー名
PGPASSWORD パスワード
1
2
3
// 環境変数が設定されていれば引数なしで接続可能
const client = new Client();
await client.connect();

コネクションプーリングの実装

本番環境では、リクエストごとに新しい接続を作成するのは非効率です。Poolクラスを使用してコネクションプーリングを実装します。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import { Pool, PoolClient } from 'pg';

// プールの作成(アプリケーション起動時に一度だけ)
const pool = new Pool({
  host: process.env.PGHOST || 'localhost',
  port: parseInt(process.env.PGPORT || '5432'),
  database: process.env.PGDATABASE || 'myapp_db',
  user: process.env.PGUSER || 'postgres',
  password: process.env.PGPASSWORD,
  // プール設定
  max: 20,                // 最大接続数
  min: 5,                 // 最小接続数
  idleTimeoutMillis: 30000,    // アイドル接続のタイムアウト
  connectionTimeoutMillis: 5000, // 接続タイムアウト
});

// エラーハンドリング
pool.on('error', (err: Error) => {
  console.error('予期しないプールエラー:', err);
});

// クエリ実行(シンプルな方法)
async function getUserById(userId: number) {
  const result = await pool.query(
    'SELECT * FROM users WHERE id = $1',
    [userId]
  );
  return result.rows[0];
}

// トランザクションを使用する場合
async function transferMoney(fromId: number, toId: number, amount: number) {
  const client: PoolClient = await pool.connect();
  
  try {
    await client.query('BEGIN');
    
    await client.query(
      'UPDATE accounts SET balance = balance - $1 WHERE user_id = $2',
      [amount, fromId]
    );
    
    await client.query(
      'UPDATE accounts SET balance = balance + $1 WHERE user_id = $2',
      [amount, toId]
    );
    
    await client.query('COMMIT');
    console.log('送金完了');
    
  } catch (error) {
    await client.query('ROLLBACK');
    throw error;
  } finally {
    client.release(); // 重要: 必ずプールに返却
  }
}

// アプリケーション終了時
async function shutdown() {
  await pool.end();
}

プリペアドステートメントとパラメータ化クエリ

SQLインジェクションを防ぐため、必ずパラメータ化クエリを使用します。

1
2
3
4
5
6
7
8
// 危険な例(絶対に使用しないこと)
const unsafeQuery = `SELECT * FROM users WHERE name = '${userInput}'`;

// 安全な例(パラメータ化クエリ)
const result = await pool.query(
  'SELECT * FROM users WHERE name = $1 AND status = $2',
  [userName, 'active']
);

名前付きプリペアドステートメントを使用すると、同じクエリを繰り返し実行する際のパフォーマンスが向上します。

1
2
3
4
5
6
7
8
// 名前付きプリペアドステートメント
const queryConfig = {
  name: 'get-user-by-email',
  text: 'SELECT * FROM users WHERE email = $1',
  values: ['user@example.com'],
};

const result = await pool.query(queryConfig);

PythonでPostgreSQLに接続する

psycopg3の概要

PythonでPostgreSQLに接続する際のデファクトスタンダードがpsycopg(サイコピージー)です。2021年にリリースされたpsycopg3(パッケージ名: psycopg)は、以下の特徴を持つ最新バージョンです。

  • 完全な非同期(async/await)サポート
  • 型ヒント(typing)対応
  • サーバーサイドパラメータバインディング
  • コネクションプール機能(psycopg_pool
  • バイナリプロトコル対応による高速化

インストールと基本接続

psycopg3をインストールします。

1
pip install psycopg[binary]

コネクションプールを使用する場合は追加でインストールします。

1
pip install psycopg_pool

基本的な接続コードは以下の通りです。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
import psycopg

def main():
    # 接続文字列を使用
    conn_string = "postgresql://postgres:mysecretpassword@localhost:5432/myapp_db"
    
    # コンテキストマネージャーで自動クローズ
    with psycopg.connect(conn_string) as conn:
        with conn.cursor() as cur:
            cur.execute("SELECT NOW() as current_time")
            row = cur.fetchone()
            print(f"現在時刻: {row[0]}")

if __name__ == "__main__":
    main()

キーワード引数を使用した接続も可能です。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
import psycopg

with psycopg.connect(
    host="localhost",
    port=5432,
    dbname="myapp_db",
    user="postgres",
    password="mysecretpassword",
) as conn:
    # データベース操作
    pass

コネクションプーリングの実装

本番環境ではpsycopg_poolを使用してコネクションプーリングを実装します。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
from psycopg_pool import ConnectionPool
import psycopg
from contextlib import contextmanager
from typing import Generator

# プールの作成(アプリケーション起動時)
pool = ConnectionPool(
    conninfo="postgresql://postgres:mysecretpassword@localhost:5432/myapp_db",
    min_size=5,          # 最小接続数
    max_size=20,         # 最大接続数
    max_idle=300,        # アイドル接続の最大保持時間(秒)
    max_lifetime=3600,   # 接続の最大生存時間(秒)
)

def get_user_by_id(user_id: int) -> dict | None:
    """ユーザーをIDで取得"""
    with pool.connection() as conn:
        with conn.cursor() as cur:
            cur.execute(
                "SELECT id, name, email FROM users WHERE id = %s",
                (user_id,)
            )
            row = cur.fetchone()
            if row:
                return {"id": row[0], "name": row[1], "email": row[2]}
            return None

def transfer_money(from_id: int, to_id: int, amount: float) -> None:
    """送金処理(トランザクション使用)"""
    with pool.connection() as conn:
        with conn.transaction():
            with conn.cursor() as cur:
                cur.execute(
                    "UPDATE accounts SET balance = balance - %s WHERE user_id = %s",
                    (amount, from_id)
                )
                cur.execute(
                    "UPDATE accounts SET balance = balance + %s WHERE user_id = %s",
                    (amount, to_id)
                )
        # トランザクションブロックを抜けると自動コミット
        # 例外発生時は自動ロールバック

# アプリケーション終了時
def shutdown():
    pool.close()

非同期接続(asyncio対応)

FastAPIなどの非同期フレームワークと組み合わせる場合は、非同期版を使用します。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
import asyncio
from psycopg_pool import AsyncConnectionPool
import psycopg

async def main():
    async with AsyncConnectionPool(
        conninfo="postgresql://postgres:mysecretpassword@localhost:5432/myapp_db",
        min_size=5,
        max_size=20,
    ) as pool:
        async with pool.connection() as conn:
            async with conn.cursor() as cur:
                await cur.execute("SELECT * FROM users WHERE status = %s", ("active",))
                rows = await cur.fetchall()
                for row in rows:
                    print(row)

if __name__ == "__main__":
    asyncio.run(main())

パラメータ化クエリとSQLインジェクション対策

psycopg3は%s形式のプレースホルダーを使用します。

1
2
3
4
5
6
7
8
9
# 危険な例(絶対に使用しないこと)
unsafe_query = f"SELECT * FROM users WHERE name = '{user_input}'"

# 安全な例(パラメータ化クエリ)
with conn.cursor() as cur:
    cur.execute(
        "SELECT * FROM users WHERE name = %s AND status = %s",
        (user_name, "active")
    )

動的にテーブル名やカラム名を指定する場合は、sqlモジュールを使用します。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
from psycopg import sql

table_name = "users"
column_name = "email"

query = sql.SQL("SELECT {column} FROM {table} WHERE id = %s").format(
    column=sql.Identifier(column_name),
    table=sql.Identifier(table_name),
)

with conn.cursor() as cur:
    cur.execute(query, (user_id,))

GoでPostgreSQLに接続する

pgxの概要

GoでPostgreSQLに接続する際の最も人気のあるライブラリがpgxです。標準のdatabase/sqlインターフェースにも対応していますが、pgx独自のインターフェースを使用することでPostgreSQLの機能を最大限に活用できます。

主な特徴は以下の通りです。

  • 約70種類のPostgreSQLデータ型をサポート
  • 自動プリペアドステートメントキャッシング
  • バッチクエリ対応
  • COPY プロトコルサポート
  • コネクションプール(pgxpool)
  • LISTEN/NOTIFYサポート

インストールと基本接続

pgxをインストールします。

1
go get github.com/jackc/pgx/v5

基本的な接続コードは以下の通りです。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package main

import (
	"context"
	"fmt"
	"os"
	"time"

	"github.com/jackc/pgx/v5"
)

func main() {
	ctx := context.Background()
	
	// 接続文字列
	connString := "postgresql://postgres:mysecretpassword@localhost:5432/myapp_db"
	
	conn, err := pgx.Connect(ctx, connString)
	if err != nil {
		fmt.Fprintf(os.Stderr, "接続エラー: %v\n", err)
		os.Exit(1)
	}
	defer conn.Close(ctx)
	
	// クエリ実行
	var currentTime time.Time
	err = conn.QueryRow(ctx, "SELECT NOW()").Scan(&currentTime)
	if err != nil {
		fmt.Fprintf(os.Stderr, "クエリエラー: %v\n", err)
		os.Exit(1)
	}
	
	fmt.Println("現在時刻:", currentTime)
}

コネクションプーリングの実装

本番環境ではpgxpoolパッケージを使用してコネクションプーリングを実装します。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
package main

import (
	"context"
	"fmt"
	"os"
	"time"

	"github.com/jackc/pgx/v5/pgxpool"
)

var pool *pgxpool.Pool

func initDB() error {
	ctx := context.Background()
	
	config, err := pgxpool.ParseConfig(os.Getenv("DATABASE_URL"))
	if err != nil {
		return fmt.Errorf("設定パースエラー: %w", err)
	}
	
	// プール設定
	config.MaxConns = 20                          // 最大接続数
	config.MinConns = 5                           // 最小接続数
	config.MaxConnLifetime = time.Hour            // 接続の最大生存時間
	config.MaxConnIdleTime = 30 * time.Minute     // アイドル接続のタイムアウト
	config.HealthCheckPeriod = time.Minute        // ヘルスチェック間隔
	
	pool, err = pgxpool.NewWithConfig(ctx, config)
	if err != nil {
		return fmt.Errorf("プール作成エラー: %w", err)
	}
	
	return nil
}

type User struct {
	ID    int
	Name  string
	Email string
}

func getUserByID(ctx context.Context, userID int) (*User, error) {
	var user User
	err := pool.QueryRow(ctx,
		"SELECT id, name, email FROM users WHERE id = $1",
		userID,
	).Scan(&user.ID, &user.Name, &user.Email)
	
	if err != nil {
		return nil, err
	}
	return &user, nil
}

func transferMoney(ctx context.Context, fromID, toID int, amount float64) error {
	tx, err := pool.Begin(ctx)
	if err != nil {
		return err
	}
	// トランザクション終了時に自動ロールバック(コミットされていない場合)
	defer tx.Rollback(ctx)
	
	_, err = tx.Exec(ctx,
		"UPDATE accounts SET balance = balance - $1 WHERE user_id = $2",
		amount, fromID,
	)
	if err != nil {
		return err
	}
	
	_, err = tx.Exec(ctx,
		"UPDATE accounts SET balance = balance + $1 WHERE user_id = $2",
		amount, toID,
	)
	if err != nil {
		return err
	}
	
	return tx.Commit(ctx)
}

func main() {
	if err := initDB(); err != nil {
		fmt.Fprintf(os.Stderr, "DB初期化エラー: %v\n", err)
		os.Exit(1)
	}
	defer pool.Close()
	
	ctx := context.Background()
	user, err := getUserByID(ctx, 1)
	if err != nil {
		fmt.Fprintf(os.Stderr, "ユーザー取得エラー: %v\n", err)
		return
	}
	fmt.Printf("ユーザー: %+v\n", user)
}

複数行の取得とスキャン

pgxではRowsを使用して複数行を処理します。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func getActiveUsers(ctx context.Context) ([]User, error) {
	rows, err := pool.Query(ctx,
		"SELECT id, name, email FROM users WHERE status = $1",
		"active",
	)
	if err != nil {
		return nil, err
	}
	defer rows.Close()
	
	var users []User
	for rows.Next() {
		var user User
		if err := rows.Scan(&user.ID, &user.Name, &user.Email); err != nil {
			return nil, err
		}
		users = append(users, user)
	}
	
	if err := rows.Err(); err != nil {
		return nil, err
	}
	
	return users, nil
}

pgx.CollectRowsを使用するとより簡潔に記述できます。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
import "github.com/jackc/pgx/v5"

func getActiveUsers(ctx context.Context) ([]User, error) {
	rows, err := pool.Query(ctx,
		"SELECT id, name, email FROM users WHERE status = $1",
		"active",
	)
	if err != nil {
		return nil, err
	}
	
	return pgx.CollectRows(rows, pgx.RowToStructByName[User])
}

PgBouncerによるコネクションプーリング

PgBouncerとは

PgBouncerは、PostgreSQL向けの軽量なコネクションプーラーです。アプリケーションとPostgreSQLの間に配置することで、データベース接続を効率的に管理します。

flowchart LR
    subgraph Applications
        A1[App Instance 1]
        A2[App Instance 2]
        A3[App Instance 3]
    end
    
    subgraph PgBouncer
        PB[PgBouncer<br/>Connection Pool]
    end
    
    subgraph PostgreSQL
        PG[(PostgreSQL<br/>Database)]
    end
    
    A1 --> PB
    A2 --> PB
    A3 --> PB
    PB --> PG

なぜPgBouncerが必要なのか

アプリケーション内蔵のコネクションプールでは不十分な場合があります。

課題 説明
マイクロサービス環境 複数のサービスがそれぞれプールを持つと接続数が爆発的に増加
サーバーレス環境 Lambda等では各インスタンスがプールを維持できない
接続コスト PostgreSQLの接続はメモリを大量に消費(約10MB/接続)
スケーリング アプリインスタンスを増やすとDB接続数も比例して増加

プールモードの選択

PgBouncerには3つのプールモードがあります。用途に応じて適切なモードを選択します。

モード 説明 用途
session クライアント切断まで同一接続を維持 セッション変数使用時
transaction トランザクション終了で接続を返却 一般的なWebアプリ(推奨)
statement 各ステートメント終了で接続を返却 シンプルなクエリのみ

基本設定

PgBouncerの設定ファイル(pgbouncer.ini)の基本例です。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
[databases]
myapp_db = host=localhost port=5432 dbname=myapp_db

[pgbouncer]
listen_addr = 0.0.0.0
listen_port = 6432
auth_type = scram-sha-256
auth_file = /etc/pgbouncer/userlist.txt

; プールモード(transaction推奨)
pool_mode = transaction

; 接続設定
max_client_conn = 1000
default_pool_size = 20
min_pool_size = 5
reserve_pool_size = 5

; タイムアウト設定
server_idle_timeout = 600
server_lifetime = 3600

認証ファイル(userlist.txt)の形式は以下の通りです。

1
2
"postgres" "SCRAM-SHA-256$4096:salt$storedkey:serverkey"
"myapp_user" "SCRAM-SHA-256$4096:salt$storedkey:serverkey"

アプリケーションからの接続

PgBouncerを使用する場合、アプリケーションからはPgBouncerのポート(6432)に接続します。

1
2
3
4
5
6
7
8
// Node.js
const pool = new Pool({
  host: 'localhost',
  port: 6432,  // PgBouncerのポート
  database: 'myapp_db',
  user: 'myapp_user',
  password: 'password',
});
1
2
3
4
# Python
pool = ConnectionPool(
    conninfo="postgresql://myapp_user:password@localhost:6432/myapp_db"
)
1
2
3
// Go
connString := "postgresql://myapp_user:password@localhost:6432/myapp_db"
pool, err := pgxpool.New(ctx, connString)

トランザクションモード使用時の注意点

トランザクションモードでは、以下の機能が制限されます。

機能 状態 対策
PREPARE/DEALLOCATE 制限あり PgBouncer 1.21+でmax_prepared_statements設定で対応
SET文 トランザクション内のみ有効 SET LOCALを使用
LISTEN/NOTIFY 使用不可 sessionモードを使用
セッション変数 保持されない トランザクション内で完結させる

SQLインジェクション対策

SQLインジェクションとは

SQLインジェクションは、ユーザー入力を適切にエスケープせずにSQL文に組み込むことで発生する脆弱性です。攻撃者は悪意のあるSQL文を注入し、データベースを不正に操作できます。

1
2
3
4
5
6
-- 脆弱なクエリ(ユーザー入力をそのまま埋め込み)
SELECT * FROM users WHERE name = 'ユーザー入力'

-- 攻撃例: ユーザー入力が「' OR '1'='1」の場合
SELECT * FROM users WHERE name = '' OR '1'='1'
-- 結果: 全ユーザーのデータが漏洩

防止策のベストプラクティス

以下の対策を必ず実施してください。

対策 重要度 説明
パラメータ化クエリ 必須 プレースホルダーを使用してパラメータをバインド
入力バリデーション 必須 期待される形式・長さ・文字種を検証
最小権限の原則 推奨 アプリ用DBユーザーの権限を最小限に
ORMの使用 推奨 ORMが自動的にエスケープ処理を実施

各言語での安全なクエリ実装

全言語で共通して、ユーザー入力を直接SQL文に埋め込むことは禁止です。

1
2
3
4
5
// Node.js - 安全な実装
const result = await pool.query(
  'SELECT * FROM products WHERE category = $1 AND price < $2',
  [category, maxPrice]
);
1
2
3
4
5
# Python - 安全な実装
cur.execute(
    "SELECT * FROM products WHERE category = %s AND price < %s",
    (category, max_price)
)
1
2
3
4
5
// Go - 安全な実装
rows, err := pool.Query(ctx,
    "SELECT * FROM products WHERE category = $1 AND price < $2",
    category, maxPrice,
)

動的クエリの安全な構築

WHERE句の条件を動的に組み立てる必要がある場合は、各言語のクエリビルダー機能を使用します。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// Node.js - 条件付きクエリの構築
function buildSearchQuery(filters: { category?: string; minPrice?: number; maxPrice?: number }) {
  const conditions: string[] = [];
  const values: (string | number)[] = [];
  let paramIndex = 1;

  if (filters.category) {
    conditions.push(`category = $${paramIndex++}`);
    values.push(filters.category);
  }
  if (filters.minPrice !== undefined) {
    conditions.push(`price >= $${paramIndex++}`);
    values.push(filters.minPrice);
  }
  if (filters.maxPrice !== undefined) {
    conditions.push(`price <= $${paramIndex++}`);
    values.push(filters.maxPrice);
  }

  const whereClause = conditions.length > 0 ? `WHERE ${conditions.join(' AND ')}` : '';
  return {
    text: `SELECT * FROM products ${whereClause}`,
    values,
  };
}

プリペアドステートメントの活用

プリペアドステートメントとは

プリペアドステートメントは、SQL文を事前にパース・コンパイルしておき、実行時にはパラメータだけを渡す仕組みです。

flowchart LR
    subgraph 通常のクエリ
        Q1[SQL文送信] --> P1[パース] --> PL1[プラン作成] --> E1[実行]
        Q2[SQL文送信] --> P2[パース] --> PL2[プラン作成] --> E2[実行]
    end
    
    subgraph プリペアドステートメント
        PS[PREPARE] --> PP[パース] --> PPL[プラン作成]
        EX1[EXECUTE + パラメータ] --> EE1[実行]
        EX2[EXECUTE + パラメータ] --> EE2[実行]
    end

メリット

メリット 説明
パフォーマンス向上 パース・プラン作成を1回のみ実行
セキュリティ パラメータは値として扱われるためSQLインジェクション防止
ネットワーク効率 2回目以降はパラメータのみ送信

各言語での実装

各ドライバーはプリペアドステートメントを自動的に管理するため、特別な実装は不要です。

1
2
3
4
5
6
7
8
9
// Node.js - 名前付きプリペアドステートメント
// 同じnameを持つクエリは自動的にキャッシュされる
async function getUsersByStatus(status: string) {
  return pool.query({
    name: 'get-users-by-status',
    text: 'SELECT * FROM users WHERE status = $1',
    values: [status],
  });
}
1
2
3
4
5
// Go (pgx) - 自動プリペアドステートメント
// pgxはデフォルトでプリペアドステートメントを自動キャッシュ
// config.DefaultQueryExecMode で制御可能
config, _ := pgxpool.ParseConfig(connString)
config.ConnConfig.DefaultQueryExecMode = pgx.QueryExecModeCacheStatement

ORMとの連携

ORMを使用するメリットとデメリット

ORMは生産性を向上させますが、トレードオフも存在します。

観点 メリット デメリット
開発効率 SQLを書かずにデータ操作可能 学習コストがかかる
保守性 型安全、リファクタリングしやすい 生成SQLの把握が難しい
性能 基本的なCRUDは十分 複雑なクエリでは非効率になることも
移植性 DB間の移植が容易 DB固有機能が使いにくい

各言語の主要ORM

言語 ORM 特徴
Node.js Prisma 型安全、マイグレーション機能、直感的なAPI
Node.js TypeORM デコレータベース、Active Record/Data Mapper両対応
Python SQLAlchemy 業界標準、柔軟性が高い
Python Django ORM Django内蔵、管理画面連携
Go GORM 最も人気、機能豊富
Go sqlc SQLからGoコード生成、型安全

ORMと低レベルドライバーの使い分け

適材適所で使い分けることが重要です。

flowchart TD
    Start[クエリの種類] --> Simple{シンプルなCRUD?}
    Simple -->|Yes| ORM[ORMを使用]
    Simple -->|No| Complex{複雑な集計・分析?}
    Complex -->|Yes| Raw[生SQLを使用]
    Complex -->|No| Performance{高いパフォーマンス要求?}
    Performance -->|Yes| Raw
    Performance -->|No| ORM

実践的なプロジェクト構成例

Node.js(Express + pg)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// src/db/pool.ts
import { Pool } from 'pg';

export const pool = new Pool({
  connectionString: process.env.DATABASE_URL,
  max: 20,
});

// src/repositories/userRepository.ts
import { pool } from '../db/pool';

export interface User {
  id: number;
  name: string;
  email: string;
}

export async function findUserById(id: number): Promise<User | null> {
  const result = await pool.query<User>(
    'SELECT id, name, email FROM users WHERE id = $1',
    [id]
  );
  return result.rows[0] || null;
}

export async function createUser(name: string, email: string): Promise<User> {
  const result = await pool.query<User>(
    'INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id, name, email',
    [name, email]
  );
  return result.rows[0];
}

// src/app.ts
import express from 'express';
import { findUserById, createUser } from './repositories/userRepository';

const app = express();
app.use(express.json());

app.get('/users/:id', async (req, res) => {
  try {
    const user = await findUserById(parseInt(req.params.id));
    if (!user) {
      return res.status(404).json({ error: 'User not found' });
    }
    res.json(user);
  } catch (error) {
    res.status(500).json({ error: 'Internal server error' });
  }
});

app.listen(3000);

Python(FastAPI + psycopg3)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# src/db/pool.py
from psycopg_pool import AsyncConnectionPool
import os

pool: AsyncConnectionPool | None = None

async def init_pool():
    global pool
    pool = AsyncConnectionPool(
        conninfo=os.environ["DATABASE_URL"],
        min_size=5,
        max_size=20,
    )

async def close_pool():
    if pool:
        await pool.close()

# src/repositories/user_repository.py
from dataclasses import dataclass
from db.pool import pool

@dataclass
class User:
    id: int
    name: str
    email: str

async def find_user_by_id(user_id: int) -> User | None:
    async with pool.connection() as conn:
        async with conn.cursor() as cur:
            await cur.execute(
                "SELECT id, name, email FROM users WHERE id = %s",
                (user_id,)
            )
            row = await cur.fetchone()
            if row:
                return User(id=row[0], name=row[1], email=row[2])
            return None

async def create_user(name: str, email: str) -> User:
    async with pool.connection() as conn:
        async with conn.cursor() as cur:
            await cur.execute(
                "INSERT INTO users (name, email) VALUES (%s, %s) RETURNING id, name, email",
                (name, email)
            )
            row = await cur.fetchone()
            await conn.commit()
            return User(id=row[0], name=row[1], email=row[2])

# src/main.py
from fastapi import FastAPI, HTTPException
from contextlib import asynccontextmanager
from db.pool import init_pool, close_pool
from repositories.user_repository import find_user_by_id

@asynccontextmanager
async def lifespan(app: FastAPI):
    await init_pool()
    yield
    await close_pool()

app = FastAPI(lifespan=lifespan)

@app.get("/users/{user_id}")
async def get_user(user_id: int):
    user = await find_user_by_id(user_id)
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    return user

まとめ

本記事では、Node.js、Python、GoからPostgreSQLに接続する方法を解説しました。各言語のデファクトスタンダードなドライバーを使用することで、安全かつ効率的なデータベース接続が実現できます。

重要なポイントを振り返ります。

項目 ポイント
ドライバー選択 Node.js: pg、Python: psycopg3、Go: pgx
コネクションプーリング 各ドライバーの内蔵プール機能を活用
PgBouncer マイクロサービス・サーバーレス環境では必須
SQLインジェクション対策 パラメータ化クエリを必ず使用
プリペアドステートメント ドライバーが自動管理するため意識不要
ORMとの使い分け CRUD操作はORM、複雑なクエリは生SQL

これらの知識を活用することで、本番環境で安定稼働するアプリケーションを構築できます。次のステップとして、実際にサンプルコードを動かしながら、各ドライバーの使い方に慣れていくことをお勧めします。

参考リンク