SaaSマルチテナントDB設計の極意:パーティショニング戦略と実装
この記事の要点
- SaaSにおけるマルチテナントDB設計の主要な3つのアプローチ(共有DB、スキーマ分離、DB分離)の比較
- Discriminator Column(テナントIDカラム)方式の実装における注意点と自動化手法
- PostgreSQLのRow Level Security (RLS) を活用したセキュアなデータ分離の仕組み
- Python(SQLAlchemy)およびTypeScript(Prisma)による具体的なミドルウェア実装例
- 大規模データにおけるパフォーマンスとコストのバランスを取るためのハイブリッド戦略
はじめに
SaaSサービスを開発していると、必ずといっていいほど直面する壁が「マルチテナント性」の実装です。初期の数百ユーザーまでは単一のデータベースで全く問題ありませんが、契約企業が増え、データ量がテラバイト級に達し、各テナントごとのSLA(サービス品質保証)が厳しくなってくると、設計を見直す必要に迫られます。
テナントごとのデータをどう切り離し、どうパーティショニングするかは、単なるDBAの作業ではなく、製品の信頼性と拡張性を左右する重要なアーキテクチャ判断です。
本記事では、SaaSにおけるデータベースパーティショニング戦略のアーキテクチャ設計に加え、実プロジェクトですぐに組み込める具体的な実装例を合わせて解説します。
マルチテナントDBパーティショニングとは?
マルチテナントDBパーティショニングとは、単一のデータベースインフラ上で、複数のテナント(顧客企業)のデータを論理的あるいは物理的に分離するための設計手法です。
従来のシングルテナントシステムでは、1顧客につき1インスタンスを立てれば済みましたが、SaaSではコスト効率のために数千〜数万のテナントを共有リソースで扱います。ここで問題になるのが「データ分離(Isolation)」と「リソース競合(Noisy Neighbor問題)」です。パーティショニング戦略は、この二つのトレードオフをどうバランスさせるかを決定するものです。
主要なパーティショニング戦略の比較
設計を進める前に、まずは代表的な3つのアプローチを整理しておきましょう。それぞれに一長一短があり、採用するSaaSの特性(B2BかB2Cか、データの機密性レベル、テナントの規模感)によって最適解が異なります。
1. 共有データベース・共有スキーマ(Discriminator Column)
最もシンプルな手法です。全テーブルに tenant_id カラムを追加し、アプリケーション層でWHERE句を明示的に指定します。
- メリット: リソース効率が最高、DB運用が容易、新規テナントの追加コストがほぼゼロ。
- デメリット: インデックスサイズの肥大化、バックアップ/リストアがテナント単位で行えない、セキュリティリスク(WHERE句漏れ)。
2. 共有データベース・分離スキーマ(Schema Separation)
PostgreSQLなどが持つスキーマ機能を利用し、テナントごとに同名のテーブルを異なるスキーマ配下に配置します。
- メリット: テナントごとの論理的分離、インデックスの分離によるパフォーマンス向上、スキーマ単位のダンプが可能。
- デメリット: スキーマ切り替えの実装複雑度、接続プールの管理が少し複雑になる。
3. データベース分離(Database per Tenant)
テナントごとに完全に別のデータベース(あるいはインスタンス)を割り当てます。
- メリット: 完全な物理分離、最強のセキュリティ、スケーリングの柔軟性(ホットテナントのみSSD化など)。
- デメリット: 接続数の爆発、運用コストが非常に高い、クロステナント集計が困難。
| アプローチ | メリット | デメリット | 適したケース |
|---|---|---|---|
| Discriminator Column | 実装コスト低、運用容易、リソース効率極大 | データ量増時のパフォーマンス劣化、セキュリティリスク | スタートアップ、中小規模SaaS、B2C |
| Schema Separation | バランスの取れた分離とパフォーマンス、メンテナンス性 | アプリ側での動的切替ロジックが必要 | 中〜大規模B2B SaaS、機密性要件中程度 |
| Database per Tenant | 最強の分離、個別最適化が可能 | 接続数管理・運用コストが高い、集計が困難 | エンタープライズ向け、金融・医療系、超巨大テナント |
技術解説:内部動作と仕組み
ここからは、各戦略の内部動作について、特に「どのようにクエリがルーティングされるか」という点に焦点を当てて解説します。
アプリケーション層でのコンテキスト管理
どの戦略を採用するにせよ、共通して必要になるのが「リクエストコンテキストからのテナント識別」です。HTTPリクエストヘッダー(例: X-Tenant-ID)やサブドメイン(例: tenant.example.com)からIDを抽出し、DBセッションに紐付けるプロセスが必要です。
この処理をミドルウェアで実装し、開発者が意識しなくても自動的にクエリにフィルタがかかるようにするのがベストプラクティスです。これを怠ると、開発者がうっかり WHERE tenant_id = ? を書き忘れ、別のテナントのデータを漏洩させる重大な事故につながります。
PostgreSQLのRow Level Security (RLS)
Discriminator Column方式を採用する場合、私はPostgreSQLのRow Level Security (RLS)機能を強く推奨します。これはデータベース側で強制的にフィルタをかける機能です。
アプリケーションコードでバグがあったとしても、DB側で tenant_id = current_setting('app.current_tenant') を強制することで、物理的なデータ漏洩を防ぐ最後の砦になります。
動的スキーマルーティング
Schema Separation方式では、SQL実行前に SET search_path TO tenant_xxx を発行する必要があります。ORM(Object-Relational Mapping)のイベントリスナーやセッションスコープのイベントフックを利用して、コネクション取得時に動的にパスを書き換える仕組みを実装します。
実装例
それでは、具体的なコードを見ていきましょう。ここでは、PythonのFastAPI + SQLAlchemy、およびTypeScriptのPrismaを用いた実装を提示します。
実装例 1: Python (SQLAlchemy) - Context ManagerとTenant Scoping
この例では、Discriminator Column方式において、開発者が明示的にフィルタを書かなくてもよいように、グローバルなスコープを設定する仕組みを実装します。エラーハンドリングとロギングを含めた実用的なコードです。
import logging
from contextvars import ContextVar
from typing import Optional, Generator
from sqlalchemy import event, Column, Integer, String, create_engine
from sqlalchemy.orm import declarative_base, sessionmaker, Session, with_loader_criteria
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.sql import expression
# ロギングの設定
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# コンテキスト変数を使用して非同期環境でも安全にテナントIDを管理
TENANT_CONTEXT: ContextVar[Optional[str]] = ContextVar('tenant_id', default=None)
Base = declarative_base()
class TenantMixin:
"""全てのテナント共有モデルに継承させるMixin"""
tenant_id = Column(String, index=True, nullable=False)
class User(Base, TenantMixin):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String)
email = Column(String)
# データベース接続設定
engine = create_engine('postgresql://user:pass@localhost/db')
SessionLocal = sessionmaker(bind=engine)
@event.listens_for(Session, 'do_orm_execute')
def add_tenant_filter_to_queries(execute_state):
"""
SQLAlchemy 1.4/2.0+ の機能(do_orm_execute)を用いて
全クエリに対して自動的にtenant_idフィルタを追加するイベントリスナー。
"""
# 現在のコンテキストから対象のテナントIDを取得
current_tenant = TENANT_CONTEXT.get()
if not current_tenant:
logger.warning("Tenant context is not set. Assuming global/admin context.")
return
# is_select判定などで必要なクエリのみフックを適用できる
if execute_state.is_select or execute_state.is_update or execute_state.is_delete:
execute_state.statement = execute_state.statement.options(
with_loader_criteria(
TenantMixin,
lambda cls: cls.tenant_id == current_tenant,
include_aliases=True
)
)
def set_tenant_context(tenant_id: str):
"""リクエストの処理を開始する際にテナントIDをコンテキストに設定"""
logger.info(f"Setting tenant context: {tenant_id}")
TENANT_CONTEXT.set(tenant_id)
def get_db_session() -> Generator[Session, None, None]:
"""
依存性注入用のDBセッションジェネレータ。
テナントコンテキストの検証とエラーハンドリングを行う。
"""
session = SessionLocal()
try:
tenant_id = TENANT_CONTEXT.get()
if not tenant_id:
raise ValueError("Tenant ID must be set in context before accessing DB")
# セッションにテナント情報を紐付け(必要に応じてカスタム属性として設定)
session.info['tenant_id'] = tenant_id
yield session
session.commit()
except SQLAlchemyError as e:
session.rollback()
logger.error(f"Database error occurred: {e}")
raise
except Exception as e:
session.rollback()
logger.error(f"Unexpected error: {e}")
raise
finally:
session.close()
# 実行例(FastAPIのDependsで利用することを想定)
if __name__ == "__main__":
# テスト用のコンテキスト設定
set_tenant_context("tenant_123")
try:
# このセッション内で発行されるクエリは自動的に tenant_id='tenant_123' でフィルタされる
# (実際の適用にはQueryクラスの拡張が必要だが、概念はこの通り)
db_gen = get_db_session()
session = next(db_gen)
# 手動でクエリを投げる場合も、バリデーションロジックが働く
users = session.query(User).all()
logger.info(f"Found {len(users)} users for tenant {TENANT_CONTEXT.get()}")
except ValueError as ve:
logger.error(f"Validation Error: {ve}")
実装例 2: Python (SQLAlchemy) - 動的スキーマ切り替え
次に、Schema Separation方式の実装です。接続ごとに search_path を動的に変更し、テナントごとのスキーマにルーティングします。
import logging
from typing import Optional
from sqlalchemy import create_engine, event, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
logger = logging.getLogger(__name__)
Base = declarative_base()
class Product(Base):
__tablename__ = 'products'
__table_args__ = {'schema': 'public'} # デフォルトスキーマ(実際には動的に変更)
id = Column(Integer, primary_key=True)
name = Column(String)
# 接続プールの設定
engine = create_engine('postgresql://user:pass@localhost/db')
SessionLocal = sessionmaker(bind=engine)
@event.listens_for(engine, "connect")
def receive_connect(dbapi_connection, connection_record):
"""
DB接続が確立された直後に呼び出されるイベント。
ここではまだテナントが確定していないため、デフォルトの設定を行う。
"""
cursor = dbapi_connection.cursor()
# デフォルトではpublicスキーマを使用
cursor.execute("SET search_path TO public")
cursor.close()
logger.debug("Default search_path set to public")
def get_tenant_session(tenant_id: str) -> Session:
"""
指定されたテナントIDのスキーマに切り替えたセッションを返す。
"""
if not tenant_id or not tenant_id.replace('_', '').isalnum():
raise ValueError(f"Invalid tenant_id format: {tenant_id}")
session = SessionLocal()
try:
# 接続ごとにsearch_pathを変更する
# 注意: 接続プールを使い回している場合、他のテナントのセッションが使い回されるリスクがあるため、
# 接続チェックアウト時のイベント処理や、プールの設定(pool_pre_ping=Trueなど)が重要
conn = session.connection()
cursor = conn.connection.cursor()
# テナントIDと同名のスキーマに切り替え(存在しない場合はエラーになるので事前作成が必要)
new_schema = f"tenant_{tenant_id}"
cursor.execute(f"SET search_path TO {new_schema}")
cursor.close()
logger.info(f"Switched search_path to {new_schema} for session {id(session)}")
return session
except Exception as e:
session.close()
logger.error(f"Failed to switch schema for tenant {tenant_id}: {e}")
raise
# マイグレーション用ヘルパー(新しいテナント作成時)
def create_tenant_schema(tenant_id: str):
"""
新しいテナント用のスキーマとテーブル構造を作成する関数。
"""
engine = create_engine('postgresql://user:pass@localhost/db')
schema_name = f"tenant_{tenant_id}"
try:
with engine.connect() as conn:
# スキーマ作成
conn.execute(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")
conn.commit()
# スキーマ内にテーブルを作成
# 本来はAlembicなどのマイグレーションツールを使って `schema` 引数を指定して実行する
Base.metadata.create_all(engine, schema=schema_name)
logger.info(f"Successfully created schema and tables for {schema_name}")
except Exception as e:
logger.error(f"Failed to create schema for {tenant_id}: {e}")
raise
if __name__ == "__main__":
# テナントの作成
create_tenant_schema("acme_corp")
# テナント固有のセッションを取得
session = get_tenant_session("acme_corp")
# acme_corpスキーマのproductsテーブルにデータを挿入
new_product = Product(name="Gadget X")
session.add(new_product)
session.commit()
logger.info(f"Product created with ID: {new_product.id}")
session.close()
実装例 3: TypeScript (Prisma) - Middlewareによる自動フィルタリング
最後に、TypeScriptとPrismaを使用した、Discriminator Column方式の自動化実装です。Prisma Middlewareを利用して、全てのクエリfind/findFirst/findManyに対して自動的に where 句を注入します。
import { PrismaClient } from '@prisma/client';
import { AsyncLocalStorage } from 'node:async_hooks';
// テナントコンテキストを非同期処理内で安全に引き継ぐためのストレージ
const tenantStorage = new AsyncLocalStorage<string>();
/**
* テナントコンテキストが適用されたPrisma Clientを生成する拡張
*/
const prisma = new PrismaClient().$extends({
query: {
// 全てのモデルに対するデフォルトのクエリ操作をフックする
$allModels: {
async $allOperations({ model, operation, args, query }) {
// 現在の非同期コンテキストからテナントIDを取得
const tenantId = tenantStorage.getStore();
if (!tenantId) {
console.warn(`[Prisma] No tenant context found for ${model}.${operation}. Proceeding without filter.`);
return query(args);
}
// tenantId フィールドを持つと想定される操作にのみフィルタを適用
// ※ 実際の運用ではモデルごとのメタデータや型情報に基づいてより厳格に制御します
const requireFilterOps = ['findUnique', 'findFirst', 'findMany', 'update', 'updateMany', 'delete', 'deleteMany'];
if (requireFilterOps.includes(operation)) {
// findUnique は where に一意なキー以外を入れると型エラーになる場合があるため、
// 構成によっては findFirst にフォールバックするなどの工夫が必要です。
// ここでは簡素化のため、強制的に where へ tenantId を注入しています。
const where = (args as any).where || {};
(args as any).where = {
...where,
tenantId,
};
console.log(`[Prisma] Applied tenant filter (${tenantId}) to ${model}.${operation}`);
}
return query(args);
}
}
}
});
/**
* リクエスト処理全体をテナントスコープでラップするミドルウェア的な関数
*/
export async function runWithTenantContext<T>(
tenantId: string,
callback: () => Promise<T>
): Promise<T> {
if (!tenantId.match(/^[a-zA-Z0-9_-]+$/)) {
throw new Error(`Invalid tenant ID format: ${tenantId}`);
}
// callback内の全ての非同期処理で tenantId が参照可能になる
return tenantStorage.run(tenantId, callback);
}
/**
* ビジネスロジックの例
*/
export async function getUserPosts(userId: number) {
// 開発者は tenantId を意識する必要が一切ない
// AsyncLocalStorageから自動的にテナント識別子が抽出され、クエリに挿入される
const posts = await prisma.post.findMany({
where: {
authorId: userId
}
});
return posts;
}
// エラーハンドリングと実行の例
export async function safeOperation() {
try {
// リクエストごとにこのスコープを確立する(例: Expressのミドルウェア層)
await runWithTenantContext('tenant_abc', async () => {
// 内部の関数呼び出しはすべて tenant_abc の文脈で実行される
const posts = await getUserPosts(123);
console.log(`Found ${posts.length} posts for workspace`);
});
} catch (error) {
console.error('Database operation failed:', error);
} finally {
await prisma.$disconnect();
}
}
ビジネスユースケース
ユースケース:急成長するHRテックSaaSでの移行事例
あるスタートアップ企業が提供するHR管理SaaS「PeopleFlow」を想定します。
初期段階(創業期)は、Discriminator Column方式を採用していました。コストを抑え、素早く機能をリリースするための正しい判断でした。しかし、契約企業数が500社を超え、大手製造業(従業員数5万名)が加入した状況で問題が顕在化しました。
課題:
- 大手テナントの給与計算処理(月次バッチ)が実行されると、インデックス競合により他の小規模テナントの画面表示が遅延した。
- 大手テナントから「データのエクスポートと復元を自社で行いたい(監査証跡のため)」という要望が出たが、共有DB方式ではテナント単位のダンプが困難であった。
解決策: 私たちが提案したのは、ハイブリッド戦略です。
- 従業員数100名以下のテナント:従来通り共有DB(Discriminator Column)。
- 従業員数100名以上のテナント:専用スキーマ(Schema Separation)へ移行。
移行プロセスでは、先ほど紹介したPythonの動的スキーマ切り替えコードを用いて、アプリケーション側の修正を最小限に抑えました。ルーティング層でテナントサイズを判定し、接続先(スキーマ)を動的に振り分けることで、開発者はテナントの規模を意識せずに同じコードベースを維持できました。
この結果、大規模テナントのバッチ処理による他テナントへの影響を排除し、大手テナントからの要望にも応えることができたのです。
図解:テナント解決からDBアクセスまでのフロー
テナントIDの特定から、適切なデータベースパーティションへクエリをルーティングするまでの流れを図解します。
sequenceDiagram
participant Client as クライアント
participant API as API Gateway
participant Middleware as Middleware
participant DB as Database Engine
Client->>API: HTTP Request (Header: X-Tenant-ID)
API->>Middleware: リクエスト処理開始
Note over Middleware: テナントIDの検証と<br/>コンテキスト設定
Middleware->>Middleware: setTenantContext("tenant_A")
alt テナントが小規模の場合
Middleware->>DB: SET search_path TO shared_schema
else テナントが大規模の場合
Middleware->>DB: SET search_path TO tenant_A_schema
end
Middleware->>DB: SELECT * FROM users WHERE ...
Note over DB: Row Level Security または<br/>Schema Isolation により<br/>自動フィルタリング
DB-->>Middleware: Query Result
Middleware-->>Client: JSON Response
よくある質問
質問1: マルチテナントDBにおけるバックアップとリストアはどう管理すべきですか?
共有データベース方式の場合、テナント単位のバックアップが困難です。そのため、論理ダンプ(pg_dump)でテーブルごとにダンプし、sed などで tenant_id によるフィルタリングを行うスクリプトを作成するか、アプリケーション層でエクスポート機能を実装する必要があります。一方、Schema SeparationやDatabase per Tenant方式であれば、pg_dump -n schema_name のようにテナント単位で簡単にバックアップ取得が可能です。運用負荷を考えると、中規模以上のSaaSではSchema Separation以上の分離が推奨されます。
質問2: 「Noisy Neighbor問題(特定のテナントによるリソース食い」)に対する技術的な対策はありますか?
PostgreSQLを使用している場合、リソース消費の大きいクエリに対してタイムアウトを設定する(statement_timeout)のが第一歩です。さらに高度な対策としては、リソースクエリ管理機能を活用し、テナントグループごとにCPU使用率やI/O上限を設定します。Database per Tenant方式であれば、特定のテナント用のDBインスタンスをハードウェアスペックの高いものに変更する(スケールアップ)といった対策も可能です。共有環境でも、接続プール(PgBouncerなど)を適切に設定し、特定のテナントが接続を独占しないよう制御することが重要です。
質問3: クロステナント分析(全テナントのデータを集計する)を行いたいのですが、どの設計が有利ですか?
Discriminator Column方式が最も有利です。全データが単一のテーブルに入っているため、GROUP BY tenant_id のような集計クエリをそのまま実行できます。Schema SeparationやDatabase per Tenant方式では、各テナントのデータを結合するために、データウェアハウス(DWH)へデータを連携(ETL)してから分析するのが一般的です。運用コストと分析要件のバランスを考えると、トランザクションDBは分離し、分析用DBへ複製するアーキテクチャが、大規模SaaSではデファクトスタンダードになりつつあります。
おわりに
SaaSにおけるデータベースパーティショニングは、正解が一つではない典型的なトレードオフの問題です。初期段階ではDiscriminator Columnで始めるのが賢明ですが、サービスの成長に合わせてSchema Separationへの移行を視野に入れておく必要があります。
私たちShineosが提供するソリューションは、こうしたアーキテクチャの進化に対応できる柔軟なデータアクセスレイヤーを提供しています。もし、自社SaaSのデータベース設計やスケーリングにお悩みでしたら、ぜひ一度ご相談ください。貴社のビジネス成長を支える最適なインフラ設計をご提案します。
参考リンク
[1] Multi-tenancy - Prisma Documentation [2] Row Security Policies - PostgreSQL Documentation [3] SaaS Multi-Tenancy - Microsoft Azure Architecture