イベント駆動アーキテクチャ#
概要#
イベント駆動アーキテクチャ(Event-Driven Architecture, EDA)は、イベントの生成、検出、消費、反応に基づいてシステムを設計するアーキテクチャパターン。
コンポーネント間の疎結合を実現し、スケーラビリティと柔軟性を高める。
イベントとは#
イベント(Event): システム内で発生した状態変化や重要な出来事を表すメッセージ。
# イベントの例
order_created_event = {
"event_type": "OrderCreated",
"event_id": "evt_12345",
"timestamp": "2024-01-01T10:00:00Z",
"data": {
"order_id": "ord_67890",
"customer_id": "cust_111",
"total": 10000,
"items": [...]
}
}
主要コンポーネント#
1. イベントプロデューサー(Event Producer)#
イベントを生成し、発行するコンポーネント。
from kafka import KafkaProducer
import json
class OrderService:
def __init__(self):
self.producer = KafkaProducer(
bootstrap_servers='kafka:9092',
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
def create_order(self, order_data):
# 注文を作成
order = self._save_order(order_data)
# イベントを発行
event = {
"event_type": "OrderCreated",
"order_id": order.id,
"customer_id": order.customer_id,
"total": order.total
}
self.producer.send('order-events', event)
return order
2. イベントブローカー(Event Broker)#
イベントの配信を管理するミドルウェア。
Apache Kafka
RabbitMQ
AWS EventBridge
Google Cloud Pub/Sub
3. イベントコンシューマー(Event Consumer)#
イベントを受信し、処理するコンポーネント。
from kafka import KafkaConsumer
import json
class InventoryService:
def __init__(self):
self.consumer = KafkaConsumer(
'order-events',
bootstrap_servers='kafka:9092',
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
def start(self):
for message in self.consumer:
event = message.value
if event['event_type'] == 'OrderCreated':
self._reserve_inventory(event['order_id'])
class EmailService:
def __init__(self):
self.consumer = KafkaConsumer(
'order-events',
bootstrap_servers='kafka:9092',
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
def start(self):
for message in self.consumer:
event = message.value
if event['event_type'] == 'OrderCreated':
self._send_confirmation_email(event['customer_id'])
パターン#
1. Pub/Sub(Publish/Subscribe)パターン#
プロデューサーはイベントをトピックに発行し、複数のコンシューマーが購読する。
┌──────────────┐
│ Publisher │
└──────┬───────┘
│
↓
┌──────────────┐
│ Topic │
└──────┬───────┘
│
┌─────────┼─────────┐
↓ ↓ ↓
┌────────┐ ┌────────┐ ┌────────┐
│ Sub A │ │ Sub B │ │ Sub C │
└────────┘ └────────┘ └────────┘
2. イベントソーシング(Event Sourcing)#
状態の変更をイベントの連続として保存する。
class BankAccount:
def __init__(self, account_id):
self.account_id = account_id
self.balance = 0
self.events = []
def deposit(self, amount):
event = {
"type": "MoneyDeposited",
"amount": amount,
"timestamp": datetime.now()
}
self._apply_event(event)
self.events.append(event)
def withdraw(self, amount):
event = {
"type": "MoneyWithdrawn",
"amount": amount,
"timestamp": datetime.now()
}
self._apply_event(event)
self.events.append(event)
def _apply_event(self, event):
if event["type"] == "MoneyDeposited":
self.balance += event["amount"]
elif event["type"] == "MoneyWithdrawn":
self.balance -= event["amount"]
def rebuild_from_events(self, events):
for event in events:
self._apply_event(event)
3. CQRS(Command Query Responsibility Segregation)#
コマンド(書き込み)とクエリ(読み込み)のモデルを分離。
詳細は別ページ参照。
実装例#
Eコマースシステムの例#
# イベント定義
from dataclasses import dataclass
from datetime import datetime
@dataclass
class Event:
event_id: str
event_type: str
timestamp: datetime
data: dict
# プロデューサー
class OrderService:
def __init__(self, event_bus):
self.event_bus = event_bus
def create_order(self, customer_id, items):
# ビジネスロジック
order = Order(customer_id=customer_id, items=items)
order.save()
# イベント発行
event = Event(
event_id=generate_id(),
event_type="OrderCreated",
timestamp=datetime.now(),
data={
"order_id": order.id,
"customer_id": customer_id,
"items": items,
"total": order.total
}
)
self.event_bus.publish("orders", event)
return order
# コンシューマー1: 在庫管理
class InventoryEventHandler:
def handle(self, event):
if event.event_type == "OrderCreated":
items = event.data["items"]
self._reserve_inventory(items)
elif event.event_type == "OrderCancelled":
items = event.data["items"]
self._release_inventory(items)
# コンシューマー2: 通知
class NotificationEventHandler:
def handle(self, event):
if event.event_type == "OrderCreated":
customer_id = event.data["customer_id"]
self._send_email(customer_id, "Order Confirmation")
elif event.event_type == "OrderShipped":
customer_id = event.data["customer_id"]
self._send_email(customer_id, "Order Shipped")
# コンシューマー3: 分析
class AnalyticsEventHandler:
def handle(self, event):
# すべてのイベントを分析DBに保存
self._save_to_analytics_db(event)
メリット#
疎結合: プロデューサーとコンシューマーが互いを知らない
スケーラビリティ: コンシューマーを独立してスケール可能
柔軟性: 新しいコンシューマーを追加しやすい
非同期処理: リアルタイム処理と遅延処理を混在可能
監査とデバッグ: イベントログが監査証跡になる
イベントリプレイ: 過去のイベントを再処理できる
デメリット・課題#
複雑性の増加
デバッグが困難
システム全体の動作を追跡しにくい
結果整合性
即座にデータが一貫しない
最終的な整合性のみ保証
イベントの順序性
イベントの処理順序が保証されない場合がある
パーティショニングが必要
重複処理
同じイベントが複数回処理される可能性
べき等性の確保が必要
イベントスキーマの管理
イベント構造の変更が困難
バージョニングが必要
ベストプラクティス#
1. イベント設計#
# 良い例: 十分な情報を含む
{
"event_id": "evt_123",
"event_type": "OrderCreated",
"version": "1.0",
"timestamp": "2024-01-01T10:00:00Z",
"data": {
"order_id": "ord_456",
"customer_id": "cust_789",
"total": 10000,
"items": [...]
},
"metadata": {
"correlation_id": "corr_abc",
"causation_id": "cause_xyz"
}
}
# 悪い例: 情報が不足
{
"order_id": "ord_456"
}
2. べき等性の確保#
class EmailService:
def __init__(self):
self.processed_events = set()
def handle_order_created(self, event):
# イベントIDで重複チェック
if event.event_id in self.processed_events:
return # 既に処理済み
# 処理実行
self._send_email(event.data["customer_id"])
# 処理済みとしてマーク
self.processed_events.add(event.event_id)
3. エラーハンドリング#
def handle_event(event):
try:
process_event(event)
except Exception as e:
# デッドレターキューに送信
dead_letter_queue.send(event)
logger.error(f"Failed to process event: {event.event_id}", exc_info=e)
適用場面#
イベント駆動アーキテクチャが適している場合:
リアルタイム処理が必要なシステム
複数のサブシステムが連携するシステム
非同期処理が多いシステム
スケーラビリティが重要なシステム
監査ログが必要なシステム
マイクロサービスアーキテクチャ