高斯混合模型是一种流行的无监督学习算法。GMM方法类似于K-Means聚类算法,但是由于其复杂性,它更健壮,因此更有用。
K-means聚类使用欧式距离函数来发现数据中的聚类。只要数据相对于质心呈圆形分布,此方法就可以很好地工作。但是,如果数据是非线性的呢?或者数据具有非零的协方差呢?如果聚类具有不同的均值和协方差怎么办?
这就要用到高斯混合模型了!
GMM假设生成数据的是一种混合的高斯分布。与将数据点硬分配到聚类的K-means方法(假设围绕质心的数据呈圆形分布)相比,它使用了将数据点软分配到聚类的方法(即概率性,因此更好)。
简而言之,GMM效果更好,因为:(A)通过使用软分配捕获属于不同聚类的数据点的不确定性,(B)对圆形聚类没有偏见。即使是非线性数据分布,它也能很好地工作。
GMM
GMM的目标函数是最大化数据X、p(X)或对数似然值L的似然值(因为对数是单调递增函数)。通过假设混合了K个高斯来生成数据,我们可以将p(X)写为边缘概率,对所有数据点的K个聚类求和。
利用上面对数函数的求和,我们不能得到解析解。看起来很讨厌,但这个问题有一个很好的解决方案:Expectation-Maximization(EM)算法。
数学
EM算法是一种迭代算法,用于在无法直接找到参数的情况下寻找模型的最大似然估计(MLE)。它包括两个步骤:期望步骤和最大化步骤。
1.期望步骤:计算成员值r_ic。这是数据点x_i属于聚类c的概率。
2. 最大化步骤:计算一个新参数mc,该参数确定属于不同聚类的点的分数。 通过计算每个聚类c的MLE来更新参数μ,π,Σ。
重复EM步骤,直到对数似然值L收敛。
Python编码
让我们从头开始用python编写GMM的基本实现。
生成一维数据。
x = np.linspace(-5, 5, 20)
x1 = x*np.random.rand(20)
x2 = x*np.random.rand(20) + 10
x3 = x*np.random.rand(20) - 10
xt = np.hstack((x1,x2,x3))
初始化GMM的参数:μ,π,Σ。
max_iterations = 10
pi = np.array([1/3, 1/3, 1/3])
mu = np.array([5,6,-3])
var = np.array([1,3,9])
r = np.zeros((len(xt), 3))
运行EM算法的第一次迭代
import matplotlib.pyplot as plt
import numpy as np
from scipy.stats import norm
gauss1 = norm(loc=mu[0], scale=var[0])
gauss2 = norm(loc=mu[1], scale=var[1])
gauss3 = norm(loc=mu[2], scale=var[2])
# E-Step
for c,g,p in zip(range(3), [gauss1, gauss2, gauss3], pi):
r[:,c] = p*g.pdf(xt[:])
for i in range(len(r)):
r[i,:] /= np.sum(r[i,:])
fig = plt.figure(figsize=(10,10))
ax0 = fig.add_subplot(111)
for i in range(len(r)):
ax0.scatter(xt[i],0,c=r[i,:],s=100)
for g,c in zip([gauss1.pdf(np.linspace(-15,15)),gauss2.pdf(np.linspace(-15,15)),gauss3.pdf(np.linspace(-15,15))],['r','g','b']):
ax0.plot(np.linspace(-15,15),g,c=c,zorder=0)
ax0.set_xlabel('X-axis')
ax0.set_ylabel('Gaussian pdf value')
ax0.legend(['Gaussian 1', 'Gaussian 2', 'Gaussian 3'])
plt.show()
# M-Step
mc = np.sum(r, axis=0)
pi = mc/len(xt)
mu = np.sum(r*np.vstack((xt, xt, xt)).T, axis=0)/mc
var = []
for c in range(len(pi)):
var.append(np.sum(np.dot(r[:,c]*(xt[i] - mu[c]).T, r[:,c]*(xt[i] - mu[c])))/mc[c])
将此代码放在for循环中,并将其放在类对象中。
class GMM1D:
"""Apply GMM to 1D Data"""
def __init__(self, X, max_iterations):
"""Initialize data and max_iterations"""
self.X = X
self.max_iterations = max_iterations
def run(self):
"""Initialize parameters mu, var, pi"""
self.pi = np.array([1/3, 1/3, 1/3])
self.mu = np.array([5,8,1])
self.var = np.array([5,3,1])
r = np.zeros((len(self.X), 3))
for itr in range(self.max_iterations):
gauss1 = norm(loc=self.mu[0], scale=self.var[0])
gauss2 = norm(loc=self.mu[1], scale=self.var[1])
gauss3 = norm(loc=self.mu[2], scale=self.var[2])
# E-Step
for c,g,p in zip(range(3), [gauss1, gauss2, gauss3], self.pi):
r[:,c] = p*g.pdf(xt[:])
for i in range(len(r)):
r[i,:] /= np.sum(r[i,:])
fig = plt.figure(figsize=(10,10))
ax0 = fig.add_subplot(111)
for i in range(len(r)):
ax0.scatter(xt[i],0,c=r[i,:],s=100)
for g,c in zip([gauss1.pdf(np.linspace(-15,15)),gauss2.pdf(np.linspace(-15,15)),gauss3.pdf(np.linspace(-15,15))],['r','g','b']):
ax0.plot(np.linspace(-15,15),g,c=c,zorder=0)
plt.show()
# M-Step
mc = np.sum(r, axis=0)
self.pi = mc/len(self.X)
self.mu = np.sum(r*np.vstack((self.X, self.X, self.X)).T, axis=0)/mc
self.var = []
for c in range(len(self.pi)):
self.var.append(np.sum(np.dot(r[:,c]*(self.X[i] - self.mu[c]).T, r[:,c]*(self.X[i] - self.mu[c])))/mc[c])
gmm = GMM1D(xt, 10)
gmm.run()
我们已经建立并运行了一个一维数据模型。同样的原理也适用于更高维度(≥2D)。唯一的区别是我们将使用多元高斯分布。让我们为2D模型编写Python代码。
让我们生成一些数据并编写我们的模型
from sklearn.datasets.samples_generator import make_blobs
from scipy.stats import multivariate_normal
X,Y = make_blobs(cluster_std=1.5,random_state=20,n_samples=500,centers=3)
X = np.dot(X, np.random.RandomState(0).randn(2,2))
plt.figure(figsize=(8,8))
plt.scatter(X[:, 0], X[:, 1])
plt.show()
class GMM2D:
"""Apply GMM to 2D data"""
def __init__(self, num_clusters, max_iterations):
"""Initialize num_clusters(K) and max_iterations for the model"""
self.num_clusters = num_clusters
self.max_iterations = max_iterations
def run(self, X):
"""Initialize parameters and run E and M step storing log-likelihood value after every iteration"""
self.pi = np.ones(self.num_clusters)/self.num_clusters
self.mu = np.random.randint(min(X[:, 0]), max(X[:, 0]), size=(self.num_clusters, len(X[0])))
self.cov = np.zeros((self.num_clusters, len(X[0]), len(X[0])))
for n in range(len(self.cov)):
np.fill_diagonal(self.cov[n], 5)
# reg_cov is used for numerical stability i.e. to check singularity issues in covariance matrix
self.reg_cov = 1e-6*np.identity(len(X[0]))
x,y = np.meshgrid(np.sort(X[:,0]), np.sort(X[:,1]))
self.XY = np.array([x.flatten(), y.flatten()]).T
# Plot the data and the initial model
fig0 = plt.figure(figsize=(10,10))
ax0 = fig0.add_subplot(111)
ax0.scatter(X[:, 0], X[:, 1])
ax0.set_title("Initial State")
for m, c in zip(self.mu, self.cov):
c += self.reg_cov
multi_normal = multivariate_normal(mean=m, cov=c)
ax0.contour(np.sort(X[:, 0]), np.sort(X[:, 1]), multi_normal.pdf(self.XY).reshape(len(X), len(X)), colors = 'black', alpha = 0.3)
ax0.scatter(m[0], m[1], c='grey', zorder=10, s=100)
fig0.savefig('GMM2D Initial State.png')
plt.show()
self.log_likelihoods = []
for iters in range(self.max_iterations):
# E-Step
self.ric = np.zeros((len(X), len(self.mu)))
for pic, muc, covc, r in zip(self.pi, self.mu, self.cov, range(len(self.ric[0]))):
covc += self.reg_cov
mn = multivariate_normal(mean=muc, cov=covc)
self.ric[:, r] = pic*mn.pdf(X)
for r in range(len(self.ric)):
self.ric[r, :] = self.ric[r, :] / np.sum(self.ric[r, :])
# M-step
self.mc = np.sum(self.ric, axis=0)
self.pi = self.mc/np.sum(self.mc)
self.mu = np.dot(self.ric.T, X) / self.mc.reshape(self.num_clusters,1)
self.cov = []
for r in range(len(self.pi)):
covc = 1/self.mc[r] * (np.dot( (self.ric[:, r].reshape(len(X), 1)*(X-self.mu[r]) ).T, X - self.mu[r]) + self.reg_cov)
self.cov.append(covc)
self.cov = np.asarray(self.cov)
self.log_likelihoods.append(np.log(np.sum([self.pi[r]*multivariate_normal(self.mu[r], self.cov[r] + self.reg_cov).pdf(X) for r in range(len(self.pi))])))
fig1 = plt.figure(figsize=(10,10))
ax1 = fig1.add_subplot(111)
ax1.scatter(X[:, 0], X[:, 1])
ax1.set_title("Iteration " + str(iters))
for m, c in zip(self.mu, self.cov):
c += self.reg_cov
multi_normal = multivariate_normal(mean=m, cov=c)
ax1.contour(np.sort(X[:, 0]), np.sort(X[:, 1]), multi_normal.pdf(self.XY).reshape(len(X), len(X)), colors = 'black', alpha = 0.3)
ax1.scatter(m[0], m[1], c='grey', zorder=10, s=100)
fig1.savefig("GMM2D Iter " + str(iters) + ".png")
plt.show()
fig2 = plt.figure(figsize=(10,10))
ax2 = fig2.add_subplot(111)
ax2.plot(range(0, iters+1, 1), self.log_likelihoods)
ax2.set_title('Log Likelihood Values')
fig2.savefig('GMM2D Log Likelihood.png')
plt.show()
def predict(self, Y):
"""Predicting cluster for new samples in array Y"""
predictions = []
for pic, m, c in zip(self.pi, self.mu, self.cov):
prob = pic*multivariate_normal(mean=m, cov=c).pdf(Y)
predictions.append([prob])
predictions = np.asarray(predictions).reshape(len(Y), self.num_clusters)
predictions = np.argmax(predictions, axis=1)
fig2 = plt.figure(figsize=(10,10))
ax2 = fig2.add_subplot(111)
ax2.scatter(X[:, 0], X[:, 1], c='c')
ax2.scatter(Y[:, 0], Y[:, 1], marker='*', c='k', s=150, label = 'New Data')
ax2.set_title("Predictions on New Data")
colors = ['r', 'b', 'g']
for m, c, col, i in zip(self.mu, self.cov, colors, range(len(colors))):
# c += reg_cov
multi_normal = multivariate_normal(mean=m, cov=c)
ax2.contour(np.sort(X[:, 0]), np.sort(X[:, 1]), multi_normal.pdf(self.XY).reshape(len(X), len(X)), colors = 'black', alpha = 0.3)
ax2.scatter(m[0], m[1], marker='o', c=col, zorder=10, s=150, label = 'Centroid ' + str(i+1))
for i in range(len(Y)):
ax2.scatter(Y[i, 0], Y[i, 1], marker='*', c=colors[predictions[i]], s=150)
ax2.set_xlabel('X-axis')
ax2.set_ylabel('Y-axis')
ax2.legend()
fig2.savefig('GMM2D Predictions.png')
plt.show()
return predictions
让我们对此模型进行一些预测
y = np.random.randint(-10, 20, size=(12, 2))
gmm2d = GMM2D(num_clusters=3, max_iterations=10)
gmm2d.run(X)
gmm2d.predict(y)
如果使用sklearn,可以在几行代码中完成相同的任务。
from sklearn.mixture import GaussianMixture
X,Y = make_blobs(cluster_std=1.5,random_state=20,n_samples=500,centers=3)
X = np.dot(X, np.random.RandomState(0).randn(2,2))
GMM = GaussianMixture(n_components=3)
GMM.fit(X)
Y = np.random.randint(-10, 20, size=(1, 2))
print(GMM.means_, GMM.predict_proba(Y))
"""Out:
[[19.88168663 17.47097164]
[-12.83538784 4.89646199]
[11.09673732 18.67548935]]
[[1.91500946e-17 9.30483496e-01 6.95165038e-02]]"""
GMM将样本分类为第二类。
结论
实现高斯混合模型并不难。一旦你清楚了数学,它将为模型找到最大似然估计(无论是一维数据还是高维数据)。该方法具有较强的鲁棒性,在执行聚类任务时非常有用。现在您已经熟悉了GMM的python实现,可以使用数据集执行一些很酷的操作。