はじめに#
Webアプリケーションやバックエンドサービスを開発する際、データベースへの接続は避けて通れない基本的な作業です。しかし、単に接続できるだけでは不十分です。本番環境で安定稼働させるためには、コネクションプーリングによる効率的な接続管理、SQLインジェクション対策によるセキュリティ確保、プリペアドステートメントによるパフォーマンス最適化が必要になります。
本記事では、Node.js、Python、Goの3つの主要言語でPostgreSQLに接続する方法を実践的に解説します。各言語のデファクトスタンダードとなっているドライバーを使用し、実際のプロジェクトで使えるコード例を提供します。
この記事を読むことで、以下のことができるようになります。
- 各言語(Node.js/Python/Go)でPostgreSQLに安全に接続できる
- コネクションプーリングを実装してリソースを効率的に管理できる
- プリペアドステートメントでSQLインジェクションを防止できる
- PgBouncerを導入してアプリケーションをスケールできる
- ORMと低レベルドライバーを適切に使い分けられる
前提条件#
- PostgreSQL 14以降がインストールされていること
- 各言語の開発環境がセットアップ済みであること
- Node.js 18以降
- Python 3.10以降
- Go 1.21以降
- 基本的なSQL文(SELECT/INSERT/UPDATE/DELETE)を理解していること
PostgreSQL接続の基本概念#
接続文字列(Connection String)#
PostgreSQLへの接続には、接続文字列(DSN: Data Source Name)を使用します。これはデータベースの場所と認証情報を含む文字列です。
1
|
postgresql://username:password@hostname:port/database?parameter=value
|
| 要素 |
説明 |
例 |
| username |
データベースユーザー名 |
postgres |
| password |
パスワード |
mysecretpassword |
| hostname |
サーバーのホスト名/IP |
localhost |
| port |
ポート番号 |
5432(デフォルト) |
| database |
データベース名 |
myapp_db |
| parameter |
追加パラメータ |
sslmode=require |
接続の流れ#
アプリケーションからPostgreSQLに接続する際の基本的な流れを理解しておきましょう。
sequenceDiagram
participant App as アプリケーション
participant Driver as DBドライバー
participant PG as PostgreSQL
App->>Driver: 接続リクエスト
Driver->>PG: TCP接続確立
PG-->>Driver: 認証チャレンジ
Driver->>PG: 認証情報送信
PG-->>Driver: 認証成功
Driver-->>App: コネクションオブジェクト返却
App->>Driver: クエリ実行
Driver->>PG: SQLクエリ送信
PG-->>Driver: 結果セット返却
Driver-->>App: 結果オブジェクト返却
App->>Driver: 接続クローズ
Driver->>PG: 切断Node.jsでPostgreSQLに接続する#
node-postgres(pg)の概要#
Node.jsでPostgreSQLに接続する際の標準的なライブラリがnode-postgres(npm package名: pg)です。Pure JavaScriptで実装されており、Promise/async-awaitをサポートしています。
主な特徴は以下の通りです。
- コールバック、Promise、async/awaitすべてをサポート
- コネクションプーリング機能を内蔵
- プリペアドステートメント対応
- カーソルによるストリーミング結果取得
- SSL/TLS接続サポート
インストールと基本接続#
まずパッケージをインストールします。
TypeScriptを使用する場合は型定義もインストールします。
1
|
npm install pg @types/pg
|
基本的な接続コードは以下の通りです。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
import { Client } from 'pg';
async function main() {
const client = new Client({
host: 'localhost',
port: 5432,
database: 'myapp_db',
user: 'postgres',
password: 'mysecretpassword',
});
try {
await client.connect();
const result = await client.query('SELECT NOW() as current_time');
console.log('現在時刻:', result.rows[0].current_time);
} catch (error) {
console.error('データベースエラー:', error);
} finally {
await client.end();
}
}
main();
|
環境変数を使用した接続も可能です。pgライブラリは以下の環境変数を自動的に読み取ります。
| 環境変数 |
説明 |
| PGHOST |
ホスト名 |
| PGPORT |
ポート番号 |
| PGDATABASE |
データベース名 |
| PGUSER |
ユーザー名 |
| PGPASSWORD |
パスワード |
1
2
3
|
// 環境変数が設定されていれば引数なしで接続可能
const client = new Client();
await client.connect();
|
コネクションプーリングの実装#
本番環境では、リクエストごとに新しい接続を作成するのは非効率です。Poolクラスを使用してコネクションプーリングを実装します。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
|
import { Pool, PoolClient } from 'pg';
// プールの作成(アプリケーション起動時に一度だけ)
const pool = new Pool({
host: process.env.PGHOST || 'localhost',
port: parseInt(process.env.PGPORT || '5432'),
database: process.env.PGDATABASE || 'myapp_db',
user: process.env.PGUSER || 'postgres',
password: process.env.PGPASSWORD,
// プール設定
max: 20, // 最大接続数
min: 5, // 最小接続数
idleTimeoutMillis: 30000, // アイドル接続のタイムアウト
connectionTimeoutMillis: 5000, // 接続タイムアウト
});
// エラーハンドリング
pool.on('error', (err: Error) => {
console.error('予期しないプールエラー:', err);
});
// クエリ実行(シンプルな方法)
async function getUserById(userId: number) {
const result = await pool.query(
'SELECT * FROM users WHERE id = $1',
[userId]
);
return result.rows[0];
}
// トランザクションを使用する場合
async function transferMoney(fromId: number, toId: number, amount: number) {
const client: PoolClient = await pool.connect();
try {
await client.query('BEGIN');
await client.query(
'UPDATE accounts SET balance = balance - $1 WHERE user_id = $2',
[amount, fromId]
);
await client.query(
'UPDATE accounts SET balance = balance + $1 WHERE user_id = $2',
[amount, toId]
);
await client.query('COMMIT');
console.log('送金完了');
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release(); // 重要: 必ずプールに返却
}
}
// アプリケーション終了時
async function shutdown() {
await pool.end();
}
|
プリペアドステートメントとパラメータ化クエリ#
SQLインジェクションを防ぐため、必ずパラメータ化クエリを使用します。
1
2
3
4
5
6
7
8
|
// 危険な例(絶対に使用しないこと)
const unsafeQuery = `SELECT * FROM users WHERE name = '${userInput}'`;
// 安全な例(パラメータ化クエリ)
const result = await pool.query(
'SELECT * FROM users WHERE name = $1 AND status = $2',
[userName, 'active']
);
|
名前付きプリペアドステートメントを使用すると、同じクエリを繰り返し実行する際のパフォーマンスが向上します。
1
2
3
4
5
6
7
8
|
// 名前付きプリペアドステートメント
const queryConfig = {
name: 'get-user-by-email',
text: 'SELECT * FROM users WHERE email = $1',
values: ['user@example.com'],
};
const result = await pool.query(queryConfig);
|
PythonでPostgreSQLに接続する#
psycopg3の概要#
PythonでPostgreSQLに接続する際のデファクトスタンダードがpsycopg(サイコピージー)です。2021年にリリースされたpsycopg3(パッケージ名: psycopg)は、以下の特徴を持つ最新バージョンです。
- 完全な非同期(async/await)サポート
- 型ヒント(typing)対応
- サーバーサイドパラメータバインディング
- コネクションプール機能(
psycopg_pool)
- バイナリプロトコル対応による高速化
インストールと基本接続#
psycopg3をインストールします。
1
|
pip install psycopg[binary]
|
コネクションプールを使用する場合は追加でインストールします。
1
|
pip install psycopg_pool
|
基本的な接続コードは以下の通りです。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
import psycopg
def main():
# 接続文字列を使用
conn_string = "postgresql://postgres:mysecretpassword@localhost:5432/myapp_db"
# コンテキストマネージャーで自動クローズ
with psycopg.connect(conn_string) as conn:
with conn.cursor() as cur:
cur.execute("SELECT NOW() as current_time")
row = cur.fetchone()
print(f"現在時刻: {row[0]}")
if __name__ == "__main__":
main()
|
キーワード引数を使用した接続も可能です。
1
2
3
4
5
6
7
8
9
10
11
|
import psycopg
with psycopg.connect(
host="localhost",
port=5432,
dbname="myapp_db",
user="postgres",
password="mysecretpassword",
) as conn:
# データベース操作
pass
|
コネクションプーリングの実装#
本番環境ではpsycopg_poolを使用してコネクションプーリングを実装します。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
from psycopg_pool import ConnectionPool
import psycopg
from contextlib import contextmanager
from typing import Generator
# プールの作成(アプリケーション起動時)
pool = ConnectionPool(
conninfo="postgresql://postgres:mysecretpassword@localhost:5432/myapp_db",
min_size=5, # 最小接続数
max_size=20, # 最大接続数
max_idle=300, # アイドル接続の最大保持時間(秒)
max_lifetime=3600, # 接続の最大生存時間(秒)
)
def get_user_by_id(user_id: int) -> dict | None:
"""ユーザーをIDで取得"""
with pool.connection() as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT id, name, email FROM users WHERE id = %s",
(user_id,)
)
row = cur.fetchone()
if row:
return {"id": row[0], "name": row[1], "email": row[2]}
return None
def transfer_money(from_id: int, to_id: int, amount: float) -> None:
"""送金処理(トランザクション使用)"""
with pool.connection() as conn:
with conn.transaction():
with conn.cursor() as cur:
cur.execute(
"UPDATE accounts SET balance = balance - %s WHERE user_id = %s",
(amount, from_id)
)
cur.execute(
"UPDATE accounts SET balance = balance + %s WHERE user_id = %s",
(amount, to_id)
)
# トランザクションブロックを抜けると自動コミット
# 例外発生時は自動ロールバック
# アプリケーション終了時
def shutdown():
pool.close()
|
非同期接続(asyncio対応)#
FastAPIなどの非同期フレームワークと組み合わせる場合は、非同期版を使用します。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
import asyncio
from psycopg_pool import AsyncConnectionPool
import psycopg
async def main():
async with AsyncConnectionPool(
conninfo="postgresql://postgres:mysecretpassword@localhost:5432/myapp_db",
min_size=5,
max_size=20,
) as pool:
async with pool.connection() as conn:
async with conn.cursor() as cur:
await cur.execute("SELECT * FROM users WHERE status = %s", ("active",))
rows = await cur.fetchall()
for row in rows:
print(row)
if __name__ == "__main__":
asyncio.run(main())
|
パラメータ化クエリとSQLインジェクション対策#
psycopg3は%s形式のプレースホルダーを使用します。
1
2
3
4
5
6
7
8
9
|
# 危険な例(絶対に使用しないこと)
unsafe_query = f"SELECT * FROM users WHERE name = '{user_input}'"
# 安全な例(パラメータ化クエリ)
with conn.cursor() as cur:
cur.execute(
"SELECT * FROM users WHERE name = %s AND status = %s",
(user_name, "active")
)
|
動的にテーブル名やカラム名を指定する場合は、sqlモジュールを使用します。
1
2
3
4
5
6
7
8
9
10
11
12
|
from psycopg import sql
table_name = "users"
column_name = "email"
query = sql.SQL("SELECT {column} FROM {table} WHERE id = %s").format(
column=sql.Identifier(column_name),
table=sql.Identifier(table_name),
)
with conn.cursor() as cur:
cur.execute(query, (user_id,))
|
GoでPostgreSQLに接続する#
pgxの概要#
GoでPostgreSQLに接続する際の最も人気のあるライブラリがpgxです。標準のdatabase/sqlインターフェースにも対応していますが、pgx独自のインターフェースを使用することでPostgreSQLの機能を最大限に活用できます。
主な特徴は以下の通りです。
- 約70種類のPostgreSQLデータ型をサポート
- 自動プリペアドステートメントキャッシング
- バッチクエリ対応
- COPY プロトコルサポート
- コネクションプール(pgxpool)
- LISTEN/NOTIFYサポート
インストールと基本接続#
pgxをインストールします。
1
|
go get github.com/jackc/pgx/v5
|
基本的な接続コードは以下の通りです。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
package main
import (
"context"
"fmt"
"os"
"time"
"github.com/jackc/pgx/v5"
)
func main() {
ctx := context.Background()
// 接続文字列
connString := "postgresql://postgres:mysecretpassword@localhost:5432/myapp_db"
conn, err := pgx.Connect(ctx, connString)
if err != nil {
fmt.Fprintf(os.Stderr, "接続エラー: %v\n", err)
os.Exit(1)
}
defer conn.Close(ctx)
// クエリ実行
var currentTime time.Time
err = conn.QueryRow(ctx, "SELECT NOW()").Scan(¤tTime)
if err != nil {
fmt.Fprintf(os.Stderr, "クエリエラー: %v\n", err)
os.Exit(1)
}
fmt.Println("現在時刻:", currentTime)
}
|
コネクションプーリングの実装#
本番環境ではpgxpoolパッケージを使用してコネクションプーリングを実装します。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
|
package main
import (
"context"
"fmt"
"os"
"time"
"github.com/jackc/pgx/v5/pgxpool"
)
var pool *pgxpool.Pool
func initDB() error {
ctx := context.Background()
config, err := pgxpool.ParseConfig(os.Getenv("DATABASE_URL"))
if err != nil {
return fmt.Errorf("設定パースエラー: %w", err)
}
// プール設定
config.MaxConns = 20 // 最大接続数
config.MinConns = 5 // 最小接続数
config.MaxConnLifetime = time.Hour // 接続の最大生存時間
config.MaxConnIdleTime = 30 * time.Minute // アイドル接続のタイムアウト
config.HealthCheckPeriod = time.Minute // ヘルスチェック間隔
pool, err = pgxpool.NewWithConfig(ctx, config)
if err != nil {
return fmt.Errorf("プール作成エラー: %w", err)
}
return nil
}
type User struct {
ID int
Name string
Email string
}
func getUserByID(ctx context.Context, userID int) (*User, error) {
var user User
err := pool.QueryRow(ctx,
"SELECT id, name, email FROM users WHERE id = $1",
userID,
).Scan(&user.ID, &user.Name, &user.Email)
if err != nil {
return nil, err
}
return &user, nil
}
func transferMoney(ctx context.Context, fromID, toID int, amount float64) error {
tx, err := pool.Begin(ctx)
if err != nil {
return err
}
// トランザクション終了時に自動ロールバック(コミットされていない場合)
defer tx.Rollback(ctx)
_, err = tx.Exec(ctx,
"UPDATE accounts SET balance = balance - $1 WHERE user_id = $2",
amount, fromID,
)
if err != nil {
return err
}
_, err = tx.Exec(ctx,
"UPDATE accounts SET balance = balance + $1 WHERE user_id = $2",
amount, toID,
)
if err != nil {
return err
}
return tx.Commit(ctx)
}
func main() {
if err := initDB(); err != nil {
fmt.Fprintf(os.Stderr, "DB初期化エラー: %v\n", err)
os.Exit(1)
}
defer pool.Close()
ctx := context.Background()
user, err := getUserByID(ctx, 1)
if err != nil {
fmt.Fprintf(os.Stderr, "ユーザー取得エラー: %v\n", err)
return
}
fmt.Printf("ユーザー: %+v\n", user)
}
|
複数行の取得とスキャン#
pgxではRowsを使用して複数行を処理します。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
func getActiveUsers(ctx context.Context) ([]User, error) {
rows, err := pool.Query(ctx,
"SELECT id, name, email FROM users WHERE status = $1",
"active",
)
if err != nil {
return nil, err
}
defer rows.Close()
var users []User
for rows.Next() {
var user User
if err := rows.Scan(&user.ID, &user.Name, &user.Email); err != nil {
return nil, err
}
users = append(users, user)
}
if err := rows.Err(); err != nil {
return nil, err
}
return users, nil
}
|
pgx.CollectRowsを使用するとより簡潔に記述できます。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
import "github.com/jackc/pgx/v5"
func getActiveUsers(ctx context.Context) ([]User, error) {
rows, err := pool.Query(ctx,
"SELECT id, name, email FROM users WHERE status = $1",
"active",
)
if err != nil {
return nil, err
}
return pgx.CollectRows(rows, pgx.RowToStructByName[User])
}
|
PgBouncerによるコネクションプーリング#
PgBouncerとは#
PgBouncerは、PostgreSQL向けの軽量なコネクションプーラーです。アプリケーションとPostgreSQLの間に配置することで、データベース接続を効率的に管理します。
flowchart LR
subgraph Applications
A1[App Instance 1]
A2[App Instance 2]
A3[App Instance 3]
end
subgraph PgBouncer
PB[PgBouncer<br/>Connection Pool]
end
subgraph PostgreSQL
PG[(PostgreSQL<br/>Database)]
end
A1 --> PB
A2 --> PB
A3 --> PB
PB --> PGなぜPgBouncerが必要なのか#
アプリケーション内蔵のコネクションプールでは不十分な場合があります。
| 課題 |
説明 |
| マイクロサービス環境 |
複数のサービスがそれぞれプールを持つと接続数が爆発的に増加 |
| サーバーレス環境 |
Lambda等では各インスタンスがプールを維持できない |
| 接続コスト |
PostgreSQLの接続はメモリを大量に消費(約10MB/接続) |
| スケーリング |
アプリインスタンスを増やすとDB接続数も比例して増加 |
プールモードの選択#
PgBouncerには3つのプールモードがあります。用途に応じて適切なモードを選択します。
| モード |
説明 |
用途 |
| session |
クライアント切断まで同一接続を維持 |
セッション変数使用時 |
| transaction |
トランザクション終了で接続を返却 |
一般的なWebアプリ(推奨) |
| statement |
各ステートメント終了で接続を返却 |
シンプルなクエリのみ |
基本設定#
PgBouncerの設定ファイル(pgbouncer.ini)の基本例です。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
[databases]
myapp_db = host=localhost port=5432 dbname=myapp_db
[pgbouncer]
listen_addr = 0.0.0.0
listen_port = 6432
auth_type = scram-sha-256
auth_file = /etc/pgbouncer/userlist.txt
; プールモード(transaction推奨)
pool_mode = transaction
; 接続設定
max_client_conn = 1000
default_pool_size = 20
min_pool_size = 5
reserve_pool_size = 5
; タイムアウト設定
server_idle_timeout = 600
server_lifetime = 3600
|
認証ファイル(userlist.txt)の形式は以下の通りです。
1
2
|
"postgres" "SCRAM-SHA-256$4096:salt$storedkey:serverkey"
"myapp_user" "SCRAM-SHA-256$4096:salt$storedkey:serverkey"
|
アプリケーションからの接続#
PgBouncerを使用する場合、アプリケーションからはPgBouncerのポート(6432)に接続します。
1
2
3
4
5
6
7
8
|
// Node.js
const pool = new Pool({
host: 'localhost',
port: 6432, // PgBouncerのポート
database: 'myapp_db',
user: 'myapp_user',
password: 'password',
});
|
1
2
3
4
|
# Python
pool = ConnectionPool(
conninfo="postgresql://myapp_user:password@localhost:6432/myapp_db"
)
|
1
2
3
|
// Go
connString := "postgresql://myapp_user:password@localhost:6432/myapp_db"
pool, err := pgxpool.New(ctx, connString)
|
トランザクションモード使用時の注意点#
トランザクションモードでは、以下の機能が制限されます。
| 機能 |
状態 |
対策 |
| PREPARE/DEALLOCATE |
制限あり |
PgBouncer 1.21+でmax_prepared_statements設定で対応 |
| SET文 |
トランザクション内のみ有効 |
SET LOCALを使用 |
| LISTEN/NOTIFY |
使用不可 |
sessionモードを使用 |
| セッション変数 |
保持されない |
トランザクション内で完結させる |
SQLインジェクション対策#
SQLインジェクションとは#
SQLインジェクションは、ユーザー入力を適切にエスケープせずにSQL文に組み込むことで発生する脆弱性です。攻撃者は悪意のあるSQL文を注入し、データベースを不正に操作できます。
1
2
3
4
5
6
|
-- 脆弱なクエリ(ユーザー入力をそのまま埋め込み)
SELECT * FROM users WHERE name = 'ユーザー入力'
-- 攻撃例: ユーザー入力が「' OR '1'='1」の場合
SELECT * FROM users WHERE name = '' OR '1'='1'
-- 結果: 全ユーザーのデータが漏洩
|
防止策のベストプラクティス#
以下の対策を必ず実施してください。
| 対策 |
重要度 |
説明 |
| パラメータ化クエリ |
必須 |
プレースホルダーを使用してパラメータをバインド |
| 入力バリデーション |
必須 |
期待される形式・長さ・文字種を検証 |
| 最小権限の原則 |
推奨 |
アプリ用DBユーザーの権限を最小限に |
| ORMの使用 |
推奨 |
ORMが自動的にエスケープ処理を実施 |
各言語での安全なクエリ実装#
全言語で共通して、ユーザー入力を直接SQL文に埋め込むことは禁止です。
1
2
3
4
5
|
// Node.js - 安全な実装
const result = await pool.query(
'SELECT * FROM products WHERE category = $1 AND price < $2',
[category, maxPrice]
);
|
1
2
3
4
5
|
# Python - 安全な実装
cur.execute(
"SELECT * FROM products WHERE category = %s AND price < %s",
(category, max_price)
)
|
1
2
3
4
5
|
// Go - 安全な実装
rows, err := pool.Query(ctx,
"SELECT * FROM products WHERE category = $1 AND price < $2",
category, maxPrice,
)
|
動的クエリの安全な構築#
WHERE句の条件を動的に組み立てる必要がある場合は、各言語のクエリビルダー機能を使用します。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
// Node.js - 条件付きクエリの構築
function buildSearchQuery(filters: { category?: string; minPrice?: number; maxPrice?: number }) {
const conditions: string[] = [];
const values: (string | number)[] = [];
let paramIndex = 1;
if (filters.category) {
conditions.push(`category = $${paramIndex++}`);
values.push(filters.category);
}
if (filters.minPrice !== undefined) {
conditions.push(`price >= $${paramIndex++}`);
values.push(filters.minPrice);
}
if (filters.maxPrice !== undefined) {
conditions.push(`price <= $${paramIndex++}`);
values.push(filters.maxPrice);
}
const whereClause = conditions.length > 0 ? `WHERE ${conditions.join(' AND ')}` : '';
return {
text: `SELECT * FROM products ${whereClause}`,
values,
};
}
|
プリペアドステートメントの活用#
プリペアドステートメントとは#
プリペアドステートメントは、SQL文を事前にパース・コンパイルしておき、実行時にはパラメータだけを渡す仕組みです。
flowchart LR
subgraph 通常のクエリ
Q1[SQL文送信] --> P1[パース] --> PL1[プラン作成] --> E1[実行]
Q2[SQL文送信] --> P2[パース] --> PL2[プラン作成] --> E2[実行]
end
subgraph プリペアドステートメント
PS[PREPARE] --> PP[パース] --> PPL[プラン作成]
EX1[EXECUTE + パラメータ] --> EE1[実行]
EX2[EXECUTE + パラメータ] --> EE2[実行]
endメリット#
| メリット |
説明 |
| パフォーマンス向上 |
パース・プラン作成を1回のみ実行 |
| セキュリティ |
パラメータは値として扱われるためSQLインジェクション防止 |
| ネットワーク効率 |
2回目以降はパラメータのみ送信 |
各言語での実装#
各ドライバーはプリペアドステートメントを自動的に管理するため、特別な実装は不要です。
1
2
3
4
5
6
7
8
9
|
// Node.js - 名前付きプリペアドステートメント
// 同じnameを持つクエリは自動的にキャッシュされる
async function getUsersByStatus(status: string) {
return pool.query({
name: 'get-users-by-status',
text: 'SELECT * FROM users WHERE status = $1',
values: [status],
});
}
|
1
2
3
4
5
|
// Go (pgx) - 自動プリペアドステートメント
// pgxはデフォルトでプリペアドステートメントを自動キャッシュ
// config.DefaultQueryExecMode で制御可能
config, _ := pgxpool.ParseConfig(connString)
config.ConnConfig.DefaultQueryExecMode = pgx.QueryExecModeCacheStatement
|
ORMとの連携#
ORMを使用するメリットとデメリット#
ORMは生産性を向上させますが、トレードオフも存在します。
| 観点 |
メリット |
デメリット |
| 開発効率 |
SQLを書かずにデータ操作可能 |
学習コストがかかる |
| 保守性 |
型安全、リファクタリングしやすい |
生成SQLの把握が難しい |
| 性能 |
基本的なCRUDは十分 |
複雑なクエリでは非効率になることも |
| 移植性 |
DB間の移植が容易 |
DB固有機能が使いにくい |
各言語の主要ORM#
| 言語 |
ORM |
特徴 |
| Node.js |
Prisma |
型安全、マイグレーション機能、直感的なAPI |
| Node.js |
TypeORM |
デコレータベース、Active Record/Data Mapper両対応 |
| Python |
SQLAlchemy |
業界標準、柔軟性が高い |
| Python |
Django ORM |
Django内蔵、管理画面連携 |
| Go |
GORM |
最も人気、機能豊富 |
| Go |
sqlc |
SQLからGoコード生成、型安全 |
ORMと低レベルドライバーの使い分け#
適材適所で使い分けることが重要です。
flowchart TD
Start[クエリの種類] --> Simple{シンプルなCRUD?}
Simple -->|Yes| ORM[ORMを使用]
Simple -->|No| Complex{複雑な集計・分析?}
Complex -->|Yes| Raw[生SQLを使用]
Complex -->|No| Performance{高いパフォーマンス要求?}
Performance -->|Yes| Raw
Performance -->|No| ORM実践的なプロジェクト構成例#
Node.js(Express + pg)#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
// src/db/pool.ts
import { Pool } from 'pg';
export const pool = new Pool({
connectionString: process.env.DATABASE_URL,
max: 20,
});
// src/repositories/userRepository.ts
import { pool } from '../db/pool';
export interface User {
id: number;
name: string;
email: string;
}
export async function findUserById(id: number): Promise<User | null> {
const result = await pool.query<User>(
'SELECT id, name, email FROM users WHERE id = $1',
[id]
);
return result.rows[0] || null;
}
export async function createUser(name: string, email: string): Promise<User> {
const result = await pool.query<User>(
'INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id, name, email',
[name, email]
);
return result.rows[0];
}
// src/app.ts
import express from 'express';
import { findUserById, createUser } from './repositories/userRepository';
const app = express();
app.use(express.json());
app.get('/users/:id', async (req, res) => {
try {
const user = await findUserById(parseInt(req.params.id));
if (!user) {
return res.status(404).json({ error: 'User not found' });
}
res.json(user);
} catch (error) {
res.status(500).json({ error: 'Internal server error' });
}
});
app.listen(3000);
|
Python(FastAPI + psycopg3)#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
|
# src/db/pool.py
from psycopg_pool import AsyncConnectionPool
import os
pool: AsyncConnectionPool | None = None
async def init_pool():
global pool
pool = AsyncConnectionPool(
conninfo=os.environ["DATABASE_URL"],
min_size=5,
max_size=20,
)
async def close_pool():
if pool:
await pool.close()
# src/repositories/user_repository.py
from dataclasses import dataclass
from db.pool import pool
@dataclass
class User:
id: int
name: str
email: str
async def find_user_by_id(user_id: int) -> User | None:
async with pool.connection() as conn:
async with conn.cursor() as cur:
await cur.execute(
"SELECT id, name, email FROM users WHERE id = %s",
(user_id,)
)
row = await cur.fetchone()
if row:
return User(id=row[0], name=row[1], email=row[2])
return None
async def create_user(name: str, email: str) -> User:
async with pool.connection() as conn:
async with conn.cursor() as cur:
await cur.execute(
"INSERT INTO users (name, email) VALUES (%s, %s) RETURNING id, name, email",
(name, email)
)
row = await cur.fetchone()
await conn.commit()
return User(id=row[0], name=row[1], email=row[2])
# src/main.py
from fastapi import FastAPI, HTTPException
from contextlib import asynccontextmanager
from db.pool import init_pool, close_pool
from repositories.user_repository import find_user_by_id
@asynccontextmanager
async def lifespan(app: FastAPI):
await init_pool()
yield
await close_pool()
app = FastAPI(lifespan=lifespan)
@app.get("/users/{user_id}")
async def get_user(user_id: int):
user = await find_user_by_id(user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
|
まとめ#
本記事では、Node.js、Python、GoからPostgreSQLに接続する方法を解説しました。各言語のデファクトスタンダードなドライバーを使用することで、安全かつ効率的なデータベース接続が実現できます。
重要なポイントを振り返ります。
| 項目 |
ポイント |
| ドライバー選択 |
Node.js: pg、Python: psycopg3、Go: pgx |
| コネクションプーリング |
各ドライバーの内蔵プール機能を活用 |
| PgBouncer |
マイクロサービス・サーバーレス環境では必須 |
| SQLインジェクション対策 |
パラメータ化クエリを必ず使用 |
| プリペアドステートメント |
ドライバーが自動管理するため意識不要 |
| ORMとの使い分け |
CRUD操作はORM、複雑なクエリは生SQL |
これらの知識を活用することで、本番環境で安定稼働するアプリケーションを構築できます。次のステップとして、実際にサンプルコードを動かしながら、各ドライバーの使い方に慣れていくことをお勧めします。
参考リンク#