段階反応モデル#
段階反応モデル(graded response model: GRM, Samejima, 1969) は多値の順序尺度の反応を扱えるモデル。複数の二値IRTモデルを組み合わせて多値反応を表現する。
回答者\(i\)の項目\(j\)に対する回答\(y_{ij} = k \ (k=1,2,\dots,K)\)について、「\(k\)以上のカテゴリを選ぶ確率」を考えると、これはまだ「\(k\)未満 or \(k\)以上」の二値なので2PLなどで表せる。例えば以下のようになる。
\[
P(y_{ij} \geq k)
= \frac{1}{1+ \exp \big(-a_j ( \theta_i - b_{jk}) \big)}
\]
なお、困難度は項目\(j\)のカテゴリ\(k\)ごとに用意されるため\(b_{jk}\)に変更している。
このモデルを組み合わせると、「ちょうど\(k\)番目のカテゴリを選ぶ確率」は
\[
P(y_{ij} = k) = P(y_{ij} \geq k) - P(y_{ij} \geq k + 1)
\]
と表すことができる。ただし端のカテゴリは\(P(y_{ij} \geq 1) = 1, P(y_{ij} \geq K + 1) = 0\)とする。また確率100%の困難度は低くて当然なので\(b_{j1} = -\infty\)とする。
段階反応モデル
\[
P(y_{ij} = k) = \frac{1}{1+ \exp(-a_j (\theta_i - b_{jk}) )} - \frac{1}{1+ \exp(-a_j ( \theta_i - b_{jk+1}) )}
\]
実装例#
# データを生成
import numpy as np
import pandas as pd
def simulate_grm(N=1000, J=10, K=4, seed=42):
rng = np.random.default_rng(seed)
theta = rng.normal(0, 1, size=N)
a = rng.lognormal(0, 0.3, size=J)
b = np.sort(
rng.normal(0, 1, size=(J, K-1)),
axis=1
)
U = np.zeros((N, J), dtype=int)
for j in range(J):
for i in range(N):
eta = a[j]*(theta[i] - b[j])
P_ge = 1 / (1 + np.exp(-eta))
P = np.empty(K)
P[0] = 1 - P_ge[0]
for k in range(1, K-1):
P[k] = P_ge[k-1] - P_ge[k]
P[K-1] = P_ge[K-2]
U[i, j] = rng.choice(K, p=P)
return U, theta, a, b
num_users = 1000
num_items = 20
U, true_theta, true_a, true_b = simulate_grm(N=num_users, J=num_items)
df = pd.DataFrame(U,
index=[f"user_{i+1}" for i in range(num_users)],
columns=[f"question_{j+1}" for j in range(num_items)])
df.head()
| question_1 | question_2 | question_3 | question_4 | question_5 | question_6 | question_7 | question_8 | question_9 | question_10 | question_11 | question_12 | question_13 | question_14 | question_15 | question_16 | question_17 | question_18 | question_19 | question_20 | |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| user_1 | 0 | 3 | 3 | 0 | 3 | 3 | 3 | 3 | 3 | 1 | 3 | 1 | 3 | 0 | 2 | 1 | 3 | 0 | 0 | 1 |
| user_2 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 3 | 0 | 1 | 0 | 3 | 0 | 0 | 0 | 0 | 1 | 0 |
| user_3 | 3 | 0 | 3 | 3 | 3 | 0 | 3 | 0 | 3 | 3 | 2 | 1 | 3 | 0 | 3 | 3 | 0 | 1 | 1 | 0 |
| user_4 | 0 | 1 | 1 | 1 | 0 | 3 | 1 | 1 | 3 | 3 | 3 | 1 | 3 | 3 | 2 | 3 | 3 | 3 | 3 | 1 |
| user_5 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 2 | 0 | 1 | 0 | 0 | 0 | 0 | 2 | 0 | 2 | 0 | 0 |
# 縦持ちへ変換
user_categories = df.index
item_categories = df.columns
df_long = pd.melt(
df.reset_index(),
id_vars="index",
var_name="item",
value_name="response",
).rename(columns={"index": "user"})
df_long["user"] = pd.Categorical(df_long["user"], categories=user_categories, ordered=True)
df_long["item"] = pd.Categorical(df_long["item"], categories=item_categories, ordered=True)
df_long.head()
| user | item | response | |
|---|---|---|---|
| 0 | user_1 | question_1 | 0 |
| 1 | user_2 | question_1 | 0 |
| 2 | user_3 | question_1 | 3 |
| 3 | user_4 | question_1 | 0 |
| 4 | user_5 | question_1 | 0 |
import numpy as np
import pymc as pm
import pytensor.tensor as pt
from statsmodels.miscmodels.ordinal_model import OrderedModel
# indexと値の取得
user_idx = df_long["user"].cat.codes.to_numpy()
users = df_long["user"].cat.categories.to_numpy()
item_idx = df_long["item"].cat.codes.to_numpy()
items = df_long["item"].cat.categories.to_numpy()
responses = df_long["response"].to_numpy().astype("int64") # 0..K-1
# カテゴリ数
K = int(responses.max() + 1)
n_thresholds = K - 1
coords = {
"user": users,
"item": items,
"threshold": np.arange(n_thresholds),
"obs_id": np.arange(len(df_long)),
}
with pm.Model(coords=coords) as model:
user_idx_ = pm.Data("user_idx", user_idx, dims="obs_id")
item_idx_ = pm.Data("item_idx", item_idx, dims="obs_id")
responses_ = pm.Data("responses", responses, dims="obs_id")
theta = pm.Normal("theta", 0.0, 1.0, dims="user")
a = pm.LogNormal("a", 0.0, 0.5, dims="item")
# --- GRM の閾値:各 item ごとに (K-1) 本 ---
# ordered transform が last axis に対して単調増加を強制する想定
b = pm.Normal(
"b",
mu=np.linspace(-1, 1, n_thresholds), # 初期の目安
sigma=2.0,
dims=("item", "threshold"),
transform=pm.distributions.transforms.ordered,
)
# 線形予測子
eta = a[item_idx_] * theta[user_idx_]
# GRM の cutpoints: a_j * b_{jk}
# OrderedLogistic は P(Y >= k) = sigma(eta - cutpoints_k) を計算するため、
# Samejima のパラメタリゼーション P(Y >= k) = sigma(a*(theta - b)) に合わせるには
# cutpoints = a * b とする必要がある(a > 0 なので順序は保たれる)
cutpoints = a[:, None] * b
# 観測:item ごとの cutpoints を観測ごとに参照
pm.OrderedLogistic(
"obs",
eta=eta,
cutpoints=cutpoints[item_idx_],
observed=responses_,
dims="obs_id",
)
推定#
%%time
with model:
idata = pm.sample(random_seed=0, draws=1000)
Initializing NUTS using jitter+adapt_diag...
Multiprocess sampling (2 chains in 2 jobs)
NUTS: [theta, a, b]
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
File <timed exec>:2
File ~/work/notes/notes/.venv/lib/python3.10/site-packages/pymc/sampling/mcmc.py:957, in sample(draws, tune, chains, cores, random_seed, progressbar, progressbar_theme, step, var_names, nuts_sampler, initvals, init, jitter_max_retries, n_init, trace, discard_tuned_samples, compute_convergence_checks, keep_warning_stat, return_inferencedata, idata_kwargs, nuts_sampler_kwargs, callback, mp_ctx, blas_cores, model, compile_kwargs, **kwargs)
953 t_sampling = time.time() - t_start
955 # Packaging, validating and returning the result was extracted
956 # into a function to make it easier to test and refactor.
--> 957 return _sample_return(
958 run=run,
959 traces=trace if isinstance(trace, ZarrTrace) else traces,
960 tune=tune,
961 t_sampling=t_sampling,
962 discard_tuned_samples=discard_tuned_samples,
963 compute_convergence_checks=compute_convergence_checks,
964 return_inferencedata=return_inferencedata,
965 keep_warning_stat=keep_warning_stat,
966 idata_kwargs=idata_kwargs or {},
967 model=model,
968 )
File ~/work/notes/notes/.venv/lib/python3.10/site-packages/pymc/sampling/mcmc.py:1042, in _sample_return(run, traces, tune, t_sampling, discard_tuned_samples, compute_convergence_checks, return_inferencedata, keep_warning_stat, idata_kwargs, model)
1040 # Pick and slice chains to keep the maximum number of samples
1041 if discard_tuned_samples:
-> 1042 traces, length = _choose_chains(traces, tune)
1043 else:
1044 traces, length = _choose_chains(traces, 0)
File ~/work/notes/notes/.venv/lib/python3.10/site-packages/pymc/backends/base.py:624, in _choose_chains(traces, tune)
622 lengths = [max(0, len(trace) - tune) for trace in traces]
623 if not sum(lengths):
--> 624 raise ValueError("Not enough samples to build a trace.")
626 idxs = np.argsort(lengths)
627 l_sort = np.array(lengths)[idxs]
ValueError: Not enough samples to build a trace.
EAP推定量#
post_mean = idata.posterior.mean(dim=["chain", "draw"])
# 項目パラメータのEAP推定量
params_EAP = pd.DataFrame({
"item": coords["item"],
"a": post_mean["a"],
})
params_EAP.head()
| item | a | |
|---|---|---|
| 0 | question_1 | 0.943863 |
| 1 | question_2 | 0.747024 |
| 2 | question_3 | 0.900281 |
| 3 | question_4 | 1.136561 |
| 4 | question_5 | 0.886757 |
参考文献#
Lipovetsky, S. (2021). Handbook of Item Response Theory, Volume 1, Models. Technometrics, 63(3), 428-431.