实现GHMM的重点在于,将EM算法的两步分别实现出来,高斯HMM与普通HMM的区别在于它的观测值是连续变量,服从高斯分布假设,而原始HMM模型中观测值为离散变量。
E步的过程,就是假定观测值已知,参数已知,求这个观测值序列在此参数值下出现的概率,即后验概率。
E步代码如下:
"""E-step"""
def e_step(x_list, pi, A, phi):
""" E-step: Compute posterior distribution of the latent variables,
p(Z|X, theta_old). Specifically, we compute
1) gamma(z_n): Marginal posterior distribution, and
2) xi(z_n-1, z_n): Joint posterior distribution of two successive
latent states
Args:
x_list (List[np.ndarray]): List of sequences of observed measurements
pi (np.ndarray): Current estimated Initial state distribution (K,)
A (np.ndarray): Current estimated Transition matrix (K, K)
phi (Dict[np.ndarray]): Current estimated gaussian parameters
Returns:
gamma_list (List[np.ndarray]), xi_list (List[np.ndarray])
"""
n_states = pi.shape[0]
gamma_list = [np.zeros([len(x), n_states]) for x in x_list]
xi_list = [np.zeros([len(x)-1, n_states, n_states]) for x in x_list]
""" YOUR CODE HERE
Use the forward-backward procedure on each input sequence to populate
"gamma_list" and "xi_list" with the correct values.
Be sure to use the scaling factor for numerical stability.
"""
from scipy.stats import norm as nm
for oo in range(len(x_list)):
T=len(x_list[oo])
#create alpha, beta
alphas=alpha(x_list[oo],n_states,pi,A,phi)
betas=beta(x_list[oo],n_states,pi,A,phi)
#compute gamma
for k in range(n_states):
gamma_list[oo][:,k]=np.multiply(alphas[:,k],betas[:,k])/np.sum(np.multiply(alphas,betas),axis=1)
#compute xi
#for k in range(n_states)
for t in range(T-1):
for j in range(n_states):
for k in range(n_states):
xi_list[oo][t,j,k]=alphas[t,j]*A[j,k]*betas[t+1,k]*nm.pdf(x_list[oo][t+1],loc=phi["mu"][k],scale=phi["sigma"][k])
xi_list[oo][t,:,:]/=np.sum(xi_list[oo][t,:,:])
return gamma_list, xi_list
其中用到了前向后向算法,forward backward算法的本质,是使用递归的方式,快速计算E步。
Forward的实现如下:
#forward compute alpha
def alpha(x_list,n_states,pi,A,phi):
from scipy.stats import norm as nm
T=len(x_list)
alpha=np.zeros((T,n_states))
for k in range(n_states):
alpha[0,k]=pi[k]*nm.pdf(x_list[0],loc=phi["mu"][k],scale=phi["sigma"][k])
alpha[0,:] = alpha[0,:]/np.sum(alpha[0,:])
for t in range(1,T):
for j in range(n_states):
for k in range(n_states):
alpha[t,j] += alpha[t-1,k]*A[k,j]*nm.pdf(x_list[t],loc=phi["mu"][j],scale=phi["sigma"][j])
alpha[t,:]=alpha[t,:]/np.sum(alpha[t,:])
return alpha
Backward的实现如下:
#backward compute beta
def beta(x_list,n_states,pi,A,phi):
from scipy.stats import norm as nm
T=len(x_list)
beta=np.zeros((T,n_states))
beta[T-1,:]=1
for t in reversed(range(T-1)):
for j in range(n_states):
for k in range(n_states):
beta[t,j] += A[j,k]*beta[t+1,k]*nm.pdf(x_list[t+1],loc=phi["mu"][k],scale=phi["sigma"][k])
beta[t,:]=beta[t,:]/np.sum(beta[t,:])
return beta
这样就实现了gamma,和xi这两个关键中间变量的计算,从而完成E步的计算。