CQRS (Command Query Responsibility Segregation)#

概要#

CQRS(Command Query Responsibility Segregation、コマンドクエリ責務分離)は、データの読み取り(Query)と書き込み(Command)のモデルを分離するアーキテクチャパターン。

Bertrand MeyerのCommand Query Separation(CQS)原則を拡張したもの。

CQS原則#

CQS原則

すべてのメソッドは以下のいずれかであるべき:

  • Command(コマンド): 状態を変更するが、値を返さない

  • Query(クエリ): 値を返すが、状態を変更しない

# CQS原則の例
class BankAccount:
    def __init__(self):
        self.balance = 0
    
    # Command: 状態を変更
    def deposit(self, amount):
        self.balance += amount
        # 値を返さない
    
    # Query: 値を返す
    def get_balance(self):
        return self.balance
        # 状態を変更しない
# ❌️悪い例:コマンド(変更・書き込み)とクエリ(取得・読み取り)の混在
def withdraw(self, amount):
    self.balance -= amount # 状態変更
    return self.balance    # 値の返却

CQRSの基本構造#

         ┌──────────────────┐
         │   Application    │
         └─────┬──────┬─────┘
               │      │
        Command│      │Query
               │      │
       ┌───────▼──┐  ┌▼──────────┐
       │ Write    │  │ Read      │
       │ Model    │  │ Model     │
       └───────┬──┘  └──┬────────┘
               │        │
       ┌───────▼──┐  ┌─▼─────────┐
       │ Write DB │  │ Read DB   │
       └──────────┘  └───────────┘

実装例#

シンプルなCQRS#

# ===== Commands =====

from dataclasses import dataclass

@dataclass
class CreateOrderCommand:
    customer_id: str
    items: list

@dataclass
class CancelOrderCommand:
    order_id: str

# コマンドハンドラー
class OrderCommandHandler:
    def __init__(self, write_repository):
        self.repository = write_repository
    
    def handle_create_order(self, command: CreateOrderCommand):
        # ビジネスロジック
        order = Order(
            customer_id=command.customer_id,
            items=command.items
        )
        
        # 書き込みモデルに保存
        self.repository.save(order)
        
        # イベント発行(オプション)
        publish_event("OrderCreated", order)
    
    def handle_cancel_order(self, command: CancelOrderCommand):
        order = self.repository.find_by_id(command.order_id)
        order.cancel()
        self.repository.save(order)
        publish_event("OrderCancelled", order)

# ===== Queries =====

@dataclass
class GetOrderQuery:
    order_id: str

@dataclass
class GetCustomerOrdersQuery:
    customer_id: str

# クエリハンドラー
class OrderQueryHandler:
    def __init__(self, read_repository):
        self.repository = read_repository
    
    def handle_get_order(self, query: GetOrderQuery):
        # 読み取りモデルから取得
        return self.repository.find_by_id(query.order_id)
    
    def handle_get_customer_orders(self, query: GetCustomerOrdersQuery):
        # 読み取り専用の最適化されたクエリ
        return self.repository.find_by_customer(query.customer_id)

CQRSの段階#

レベル1: モデルの分離のみ#

同じデータベースを使用するが、読み取りと書き込みのモデルを分離。

# 書き込みモデル
class Order:
    def __init__(self, customer_id, items):
        self.customer_id = customer_id
        self.items = items
        self.total = sum(item.price for item in items)
    
    def add_item(self, item):
        self.items.append(item)
        self.total += item.price

# 読み取りモデル(DTO)
class OrderSummary:
    def __init__(self, order_id, customer_name, total, status):
        self.order_id = order_id
        self.customer_name = customer_name
        self.total = total
        self.status = status

レベル2: データベースの分離#

読み取りと書き込みで異なるデータベースを使用。

# 書き込み側: PostgreSQL(正規化されたスキーマ)
class WriteOrderRepository:
    def save(self, order):
        # トランザクション処理
        postgres_db.execute("""
            INSERT INTO orders (id, customer_id, total)
            VALUES (?, ?, ?)
        """, order.id, order.customer_id, order.total)

# 読み取り側: MongoDB(非正規化されたスキーマ)
class ReadOrderRepository:
    def find_by_id(self, order_id):
        # 非正規化されたデータを高速に取得
        return mongo_db.orders.find_one({
            "order_id": order_id
        })

レベル3: イベントソーシングとの組み合わせ#

書き込み側でイベントソーシングを使用。

# 書き込み側: イベントストア
class EventSourcedOrder:
    def __init__(self, order_id):
        self.order_id = order_id
        self.events = []
    
    def create(self, customer_id, items):
        event = OrderCreatedEvent(self.order_id, customer_id, items)
        self.apply(event)
        self.events.append(event)
    
    def apply(self, event):
        if isinstance(event, OrderCreatedEvent):
            self.customer_id = event.customer_id
            self.items = event.items

# 読み取り側: プロジェクション
class OrderProjection:
    def handle_order_created(self, event):
        # イベントから読み取りモデルを構築
        read_db.insert({
            "order_id": event.order_id,
            "customer_id": event.customer_id,
            "items": event.items,
            "total": sum(item.price for item in event.items)
        })

完全な実装例#

# ===== Domain Events =====

from dataclasses import dataclass
from datetime import datetime

@dataclass
class OrderCreatedEvent:
    order_id: str
    customer_id: str
    items: list
    total: float
    timestamp: datetime

# ===== Write Side =====

class OrderAggregate:
    def __init__(self, order_id):
        self.order_id = order_id
        self.customer_id = None
        self.items = []
        self.total = 0
        self.uncommitted_events = []
    
    def create_order(self, customer_id, items):
        # ビジネスルール検証
        if not items:
            raise ValueError("Order must have items")
        
        total = sum(item["price"] for item in items)
        
        # イベント生成
        event = OrderCreatedEvent(
            order_id=self.order_id,
            customer_id=customer_id,
            items=items,
            total=total,
            timestamp=datetime.now()
        )
        
        self._apply_event(event)
        self.uncommitted_events.append(event)
    
    def _apply_event(self, event):
        if isinstance(event, OrderCreatedEvent):
            self.customer_id = event.customer_id
            self.items = event.items
            self.total = event.total

class OrderWriteRepository:
    def __init__(self, event_store, event_bus):
        self.event_store = event_store
        self.event_bus = event_bus
    
    def save(self, aggregate):
        # イベントストアに保存
        for event in aggregate.uncommitted_events:
            self.event_store.append(aggregate.order_id, event)
            # イベントバスに発行
            self.event_bus.publish(event)
        
        aggregate.uncommitted_events.clear()

# ===== Read Side =====

class OrderProjectionBuilder:
    def __init__(self, read_db):
        self.read_db = read_db
    
    def handle(self, event):
        if isinstance(event, OrderCreatedEvent):
            # 非正規化されたビューを構築
            self.read_db.insert({
                "order_id": event.order_id,
                "customer_id": event.customer_id,
                "customer_name": get_customer_name(event.customer_id),
                "items_count": len(event.items),
                "total": event.total,
                "created_at": event.timestamp
            })

class OrderReadRepository:
    def __init__(self, read_db):
        self.read_db = read_db
    
    def find_by_id(self, order_id):
        return self.read_db.query(
            "SELECT * FROM order_view WHERE order_id = ?",
            order_id
        )
    
    def find_by_customer(self, customer_id):
        return self.read_db.query(
            "SELECT * FROM order_view WHERE customer_id = ?",
            customer_id
        )

# ===== Application Service =====

class OrderService:
    def __init__(self, write_repo, read_repo):
        self.write_repo = write_repo
        self.read_repo = read_repo
    
    # Command
    def create_order(self, command: CreateOrderCommand):
        aggregate = OrderAggregate(generate_id())
        aggregate.create_order(command.customer_id, command.items)
        self.write_repo.save(aggregate)
    
    # Query
    def get_order(self, query: GetOrderQuery):
        return self.read_repo.find_by_id(query.order_id)

メリット#

  1. パフォーマンス最適化

    • 読み取りと書き込みを独立してスケール

    • 読み取り用の最適化されたスキーマを使用

  2. 複雑性の管理

    • 読み取りと書き込みのロジックを分離

    • それぞれ独立して進化可能

  3. 柔軟性

    • 複数の読み取りモデルを持てる(用途別に最適化)

    • 技術スタックを分けられる

  4. セキュリティ

    • 読み取りと書き込みの権限を分離しやすい

デメリット・課題#

  1. 複雑性の増加

    • 実装とメンテナンスのコストが高い

    • 小規模プロジェクトには過剰

  2. 結果整合性

    • 読み取りモデルが即座に更新されない

    • ユーザーが古いデータを見る可能性

  3. データの同期

    • 読み取りモデルの更新が失敗する可能性

    • 同期メカニズムが必要

  4. 学習コスト

    • 開発者の理解が必要

    • チーム全体での採用が難しい

いつCQRSを使うべきか#

CQRSが適している場合#

  • 読み取りと書き込みの負荷が大きく異なる

  • 複雑なクエリが多い

  • 複数の読み取りモデルが必要

  • イベントソーシングを使用している

  • 高いスケーラビリティが必要

CQRSが不要な場合#

  • シンプルなCRUDアプリケーション

  • 読み取りと書き込みの要件が似ている

  • チームが小さく経験が少ない

  • 即座のデータ一貫性が必須

CQRSとイベントソーシングの関係#

CQRSとイベントソーシングはしばしば一緒に使われるが、独立したパターン:

  • CQRS単独: 読み取りと書き込みを分離するだけ

  • イベントソーシング単独: イベントでデータを保存するだけ

  • CQRS + イベントソーシング: 書き込み側でイベントソーシング、読み取り側でプロジェクション

組み合わせることで、さらに強力なシステムを構築できる。