イベント駆動アーキテクチャ#

概要#

イベント駆動アーキテクチャ(Event-Driven Architecture, EDA)は、イベントの生成、検出、消費、反応に基づいてシステムを設計するアーキテクチャパターン。

コンポーネント間の疎結合を実現し、スケーラビリティと柔軟性を高める。

イベントとは#

イベント(Event): システム内で発生した状態変化や重要な出来事を表すメッセージ。

# イベントの例
order_created_event = {
    "event_type": "OrderCreated",
    "event_id": "evt_12345",
    "timestamp": "2024-01-01T10:00:00Z",
    "data": {
        "order_id": "ord_67890",
        "customer_id": "cust_111",
        "total": 10000,
        "items": [...]  
    }
}

主要コンポーネント#

1. イベントプロデューサー(Event Producer)#

イベントを生成し、発行するコンポーネント。

from kafka import KafkaProducer
import json

class OrderService:
    def __init__(self):
        self.producer = KafkaProducer(
            bootstrap_servers='kafka:9092',
            value_serializer=lambda v: json.dumps(v).encode('utf-8')
        )
    
    def create_order(self, order_data):
        # 注文を作成
        order = self._save_order(order_data)
        
        # イベントを発行
        event = {
            "event_type": "OrderCreated",
            "order_id": order.id,
            "customer_id": order.customer_id,
            "total": order.total
        }
        self.producer.send('order-events', event)
        
        return order

2. イベントブローカー(Event Broker)#

イベントの配信を管理するミドルウェア。

  • Apache Kafka

  • RabbitMQ

  • AWS EventBridge

  • Google Cloud Pub/Sub

3. イベントコンシューマー(Event Consumer)#

イベントを受信し、処理するコンポーネント。

from kafka import KafkaConsumer
import json

class InventoryService:
    def __init__(self):
        self.consumer = KafkaConsumer(
            'order-events',
            bootstrap_servers='kafka:9092',
            value_deserializer=lambda m: json.loads(m.decode('utf-8'))
        )
    
    def start(self):
        for message in self.consumer:
            event = message.value
            
            if event['event_type'] == 'OrderCreated':
                self._reserve_inventory(event['order_id'])

class EmailService:
    def __init__(self):
        self.consumer = KafkaConsumer(
            'order-events',
            bootstrap_servers='kafka:9092',
            value_deserializer=lambda m: json.loads(m.decode('utf-8'))
        )
    
    def start(self):
        for message in self.consumer:
            event = message.value
            
            if event['event_type'] == 'OrderCreated':
                self._send_confirmation_email(event['customer_id'])

パターン#

1. Pub/Sub(Publish/Subscribe)パターン#

プロデューサーはイベントをトピックに発行し、複数のコンシューマーが購読する。

         ┌──────────────┐
         │  Publisher   │
         └──────┬───────┘
                │
                ↓
         ┌──────────────┐
         │    Topic     │
         └──────┬───────┘
                │
      ┌─────────┼─────────┐
      ↓         ↓         ↓
  ┌────────┐ ┌────────┐ ┌────────┐
  │ Sub A  │ │ Sub B  │ │ Sub C  │
  └────────┘ └────────┘ └────────┘

2. イベントソーシング(Event Sourcing)#

状態の変更をイベントの連続として保存する。

class BankAccount:
    def __init__(self, account_id):
        self.account_id = account_id
        self.balance = 0
        self.events = []
    
    def deposit(self, amount):
        event = {
            "type": "MoneyDeposited",
            "amount": amount,
            "timestamp": datetime.now()
        }
        self._apply_event(event)
        self.events.append(event)
    
    def withdraw(self, amount):
        event = {
            "type": "MoneyWithdrawn",
            "amount": amount,
            "timestamp": datetime.now()
        }
        self._apply_event(event)
        self.events.append(event)
    
    def _apply_event(self, event):
        if event["type"] == "MoneyDeposited":
            self.balance += event["amount"]
        elif event["type"] == "MoneyWithdrawn":
            self.balance -= event["amount"]
    
    def rebuild_from_events(self, events):
        for event in events:
            self._apply_event(event)

3. CQRS(Command Query Responsibility Segregation)#

コマンド(書き込み)とクエリ(読み込み)のモデルを分離。

詳細は別ページ参照。

実装例#

Eコマースシステムの例#

# イベント定義
from dataclasses import dataclass
from datetime import datetime

@dataclass
class Event:
    event_id: str
    event_type: str
    timestamp: datetime
    data: dict

# プロデューサー
class OrderService:
    def __init__(self, event_bus):
        self.event_bus = event_bus
    
    def create_order(self, customer_id, items):
        # ビジネスロジック
        order = Order(customer_id=customer_id, items=items)
        order.save()
        
        # イベント発行
        event = Event(
            event_id=generate_id(),
            event_type="OrderCreated",
            timestamp=datetime.now(),
            data={
                "order_id": order.id,
                "customer_id": customer_id,
                "items": items,
                "total": order.total
            }
        )
        self.event_bus.publish("orders", event)
        
        return order

# コンシューマー1: 在庫管理
class InventoryEventHandler:
    def handle(self, event):
        if event.event_type == "OrderCreated":
            items = event.data["items"]
            self._reserve_inventory(items)
        elif event.event_type == "OrderCancelled":
            items = event.data["items"]
            self._release_inventory(items)

# コンシューマー2: 通知
class NotificationEventHandler:
    def handle(self, event):
        if event.event_type == "OrderCreated":
            customer_id = event.data["customer_id"]
            self._send_email(customer_id, "Order Confirmation")
        elif event.event_type == "OrderShipped":
            customer_id = event.data["customer_id"]
            self._send_email(customer_id, "Order Shipped")

# コンシューマー3: 分析
class AnalyticsEventHandler:
    def handle(self, event):
        # すべてのイベントを分析DBに保存
        self._save_to_analytics_db(event)

メリット#

  1. 疎結合: プロデューサーとコンシューマーが互いを知らない

  2. スケーラビリティ: コンシューマーを独立してスケール可能

  3. 柔軟性: 新しいコンシューマーを追加しやすい

  4. 非同期処理: リアルタイム処理と遅延処理を混在可能

  5. 監査とデバッグ: イベントログが監査証跡になる

  6. イベントリプレイ: 過去のイベントを再処理できる

デメリット・課題#

  1. 複雑性の増加

    • デバッグが困難

    • システム全体の動作を追跡しにくい

  2. 結果整合性

    • 即座にデータが一貫しない

    • 最終的な整合性のみ保証

  3. イベントの順序性

    • イベントの処理順序が保証されない場合がある

    • パーティショニングが必要

  4. 重複処理

    • 同じイベントが複数回処理される可能性

    • べき等性の確保が必要

  5. イベントスキーマの管理

    • イベント構造の変更が困難

    • バージョニングが必要

ベストプラクティス#

1. イベント設計#

# 良い例: 十分な情報を含む
{
    "event_id": "evt_123",
    "event_type": "OrderCreated",
    "version": "1.0",
    "timestamp": "2024-01-01T10:00:00Z",
    "data": {
        "order_id": "ord_456",
        "customer_id": "cust_789",
        "total": 10000,
        "items": [...]
    },
    "metadata": {
        "correlation_id": "corr_abc",
        "causation_id": "cause_xyz"
    }
}

# 悪い例: 情報が不足
{
    "order_id": "ord_456"
}

2. べき等性の確保#

class EmailService:
    def __init__(self):
        self.processed_events = set()
    
    def handle_order_created(self, event):
        # イベントIDで重複チェック
        if event.event_id in self.processed_events:
            return  # 既に処理済み
        
        # 処理実行
        self._send_email(event.data["customer_id"])
        
        # 処理済みとしてマーク
        self.processed_events.add(event.event_id)

3. エラーハンドリング#

def handle_event(event):
    try:
        process_event(event)
    except Exception as e:
        # デッドレターキューに送信
        dead_letter_queue.send(event)
        logger.error(f"Failed to process event: {event.event_id}", exc_info=e)

適用場面#

イベント駆動アーキテクチャが適している場合:

  • リアルタイム処理が必要なシステム

  • 複数のサブシステムが連携するシステム

  • 非同期処理が多いシステム

  • スケーラビリティが重要なシステム

  • 監査ログが必要なシステム

  • マイクロサービスアーキテクチャ