CQRS (Command Query Responsibility Segregation)#
概要#
CQRS(Command Query Responsibility Segregation、コマンドクエリ責務分離)は、データの読み取り(Query)と書き込み(Command)のモデルを分離するアーキテクチャパターン。
Bertrand MeyerのCommand Query Separation(CQS)原則を拡張したもの。
CQS原則#
CQS原則
すべてのメソッドは以下のいずれかであるべき:
Command(コマンド): 状態を変更するが、値を返さない
Query(クエリ): 値を返すが、状態を変更しない
# CQS原則の例
class BankAccount:
def __init__(self):
self.balance = 0
# Command: 状態を変更
def deposit(self, amount):
self.balance += amount
# 値を返さない
# Query: 値を返す
def get_balance(self):
return self.balance
# 状態を変更しない
# ❌️悪い例:コマンド(変更・書き込み)とクエリ(取得・読み取り)の混在
def withdraw(self, amount):
self.balance -= amount # 状態変更
return self.balance # 値の返却
CQRSの基本構造#
┌──────────────────┐
│ Application │
└─────┬──────┬─────┘
│ │
Command│ │Query
│ │
┌───────▼──┐ ┌▼──────────┐
│ Write │ │ Read │
│ Model │ │ Model │
└───────┬──┘ └──┬────────┘
│ │
┌───────▼──┐ ┌─▼─────────┐
│ Write DB │ │ Read DB │
└──────────┘ └───────────┘
実装例#
シンプルなCQRS#
# ===== Commands =====
from dataclasses import dataclass
@dataclass
class CreateOrderCommand:
customer_id: str
items: list
@dataclass
class CancelOrderCommand:
order_id: str
# コマンドハンドラー
class OrderCommandHandler:
def __init__(self, write_repository):
self.repository = write_repository
def handle_create_order(self, command: CreateOrderCommand):
# ビジネスロジック
order = Order(
customer_id=command.customer_id,
items=command.items
)
# 書き込みモデルに保存
self.repository.save(order)
# イベント発行(オプション)
publish_event("OrderCreated", order)
def handle_cancel_order(self, command: CancelOrderCommand):
order = self.repository.find_by_id(command.order_id)
order.cancel()
self.repository.save(order)
publish_event("OrderCancelled", order)
# ===== Queries =====
@dataclass
class GetOrderQuery:
order_id: str
@dataclass
class GetCustomerOrdersQuery:
customer_id: str
# クエリハンドラー
class OrderQueryHandler:
def __init__(self, read_repository):
self.repository = read_repository
def handle_get_order(self, query: GetOrderQuery):
# 読み取りモデルから取得
return self.repository.find_by_id(query.order_id)
def handle_get_customer_orders(self, query: GetCustomerOrdersQuery):
# 読み取り専用の最適化されたクエリ
return self.repository.find_by_customer(query.customer_id)
CQRSの段階#
レベル1: モデルの分離のみ#
同じデータベースを使用するが、読み取りと書き込みのモデルを分離。
# 書き込みモデル
class Order:
def __init__(self, customer_id, items):
self.customer_id = customer_id
self.items = items
self.total = sum(item.price for item in items)
def add_item(self, item):
self.items.append(item)
self.total += item.price
# 読み取りモデル(DTO)
class OrderSummary:
def __init__(self, order_id, customer_name, total, status):
self.order_id = order_id
self.customer_name = customer_name
self.total = total
self.status = status
レベル2: データベースの分離#
読み取りと書き込みで異なるデータベースを使用。
# 書き込み側: PostgreSQL(正規化されたスキーマ)
class WriteOrderRepository:
def save(self, order):
# トランザクション処理
postgres_db.execute("""
INSERT INTO orders (id, customer_id, total)
VALUES (?, ?, ?)
""", order.id, order.customer_id, order.total)
# 読み取り側: MongoDB(非正規化されたスキーマ)
class ReadOrderRepository:
def find_by_id(self, order_id):
# 非正規化されたデータを高速に取得
return mongo_db.orders.find_one({
"order_id": order_id
})
レベル3: イベントソーシングとの組み合わせ#
書き込み側でイベントソーシングを使用。
# 書き込み側: イベントストア
class EventSourcedOrder:
def __init__(self, order_id):
self.order_id = order_id
self.events = []
def create(self, customer_id, items):
event = OrderCreatedEvent(self.order_id, customer_id, items)
self.apply(event)
self.events.append(event)
def apply(self, event):
if isinstance(event, OrderCreatedEvent):
self.customer_id = event.customer_id
self.items = event.items
# 読み取り側: プロジェクション
class OrderProjection:
def handle_order_created(self, event):
# イベントから読み取りモデルを構築
read_db.insert({
"order_id": event.order_id,
"customer_id": event.customer_id,
"items": event.items,
"total": sum(item.price for item in event.items)
})
完全な実装例#
# ===== Domain Events =====
from dataclasses import dataclass
from datetime import datetime
@dataclass
class OrderCreatedEvent:
order_id: str
customer_id: str
items: list
total: float
timestamp: datetime
# ===== Write Side =====
class OrderAggregate:
def __init__(self, order_id):
self.order_id = order_id
self.customer_id = None
self.items = []
self.total = 0
self.uncommitted_events = []
def create_order(self, customer_id, items):
# ビジネスルール検証
if not items:
raise ValueError("Order must have items")
total = sum(item["price"] for item in items)
# イベント生成
event = OrderCreatedEvent(
order_id=self.order_id,
customer_id=customer_id,
items=items,
total=total,
timestamp=datetime.now()
)
self._apply_event(event)
self.uncommitted_events.append(event)
def _apply_event(self, event):
if isinstance(event, OrderCreatedEvent):
self.customer_id = event.customer_id
self.items = event.items
self.total = event.total
class OrderWriteRepository:
def __init__(self, event_store, event_bus):
self.event_store = event_store
self.event_bus = event_bus
def save(self, aggregate):
# イベントストアに保存
for event in aggregate.uncommitted_events:
self.event_store.append(aggregate.order_id, event)
# イベントバスに発行
self.event_bus.publish(event)
aggregate.uncommitted_events.clear()
# ===== Read Side =====
class OrderProjectionBuilder:
def __init__(self, read_db):
self.read_db = read_db
def handle(self, event):
if isinstance(event, OrderCreatedEvent):
# 非正規化されたビューを構築
self.read_db.insert({
"order_id": event.order_id,
"customer_id": event.customer_id,
"customer_name": get_customer_name(event.customer_id),
"items_count": len(event.items),
"total": event.total,
"created_at": event.timestamp
})
class OrderReadRepository:
def __init__(self, read_db):
self.read_db = read_db
def find_by_id(self, order_id):
return self.read_db.query(
"SELECT * FROM order_view WHERE order_id = ?",
order_id
)
def find_by_customer(self, customer_id):
return self.read_db.query(
"SELECT * FROM order_view WHERE customer_id = ?",
customer_id
)
# ===== Application Service =====
class OrderService:
def __init__(self, write_repo, read_repo):
self.write_repo = write_repo
self.read_repo = read_repo
# Command
def create_order(self, command: CreateOrderCommand):
aggregate = OrderAggregate(generate_id())
aggregate.create_order(command.customer_id, command.items)
self.write_repo.save(aggregate)
# Query
def get_order(self, query: GetOrderQuery):
return self.read_repo.find_by_id(query.order_id)
メリット#
パフォーマンス最適化
読み取りと書き込みを独立してスケール
読み取り用の最適化されたスキーマを使用
複雑性の管理
読み取りと書き込みのロジックを分離
それぞれ独立して進化可能
柔軟性
複数の読み取りモデルを持てる(用途別に最適化)
技術スタックを分けられる
セキュリティ
読み取りと書き込みの権限を分離しやすい
デメリット・課題#
複雑性の増加
実装とメンテナンスのコストが高い
小規模プロジェクトには過剰
結果整合性
読み取りモデルが即座に更新されない
ユーザーが古いデータを見る可能性
データの同期
読み取りモデルの更新が失敗する可能性
同期メカニズムが必要
学習コスト
開発者の理解が必要
チーム全体での採用が難しい
いつCQRSを使うべきか#
CQRSが適している場合#
読み取りと書き込みの負荷が大きく異なる
複雑なクエリが多い
複数の読み取りモデルが必要
イベントソーシングを使用している
高いスケーラビリティが必要
CQRSが不要な場合#
シンプルなCRUDアプリケーション
読み取りと書き込みの要件が似ている
チームが小さく経験が少ない
即座のデータ一貫性が必須
CQRSとイベントソーシングの関係#
CQRSとイベントソーシングはしばしば一緒に使われるが、独立したパターン:
CQRS単独: 読み取りと書き込みを分離するだけ
イベントソーシング単独: イベントでデータを保存するだけ
CQRS + イベントソーシング: 書き込み側でイベントソーシング、読み取り側でプロジェクション
組み合わせることで、さらに強力なシステムを構築できる。