记录一下网上看到的机器学习算法实现
# %%
import numpy as np
import matplotlib.pyplot as plt
"""
seed( ) 用于指定随机数生成时所用算法开始的整数值,如果使用相同的seed( )值,则每次生成的随即数都相同,
如果不设置这个值,则系统根据时间来自己选择这个值,此时每次生成的随机数因时间差异而不同
"""
np.random.seed(42)
N = 100 # 维度0, x有几行
# rand函数根据给定维度生成[0,1)之间的数据,包含0,不包含1
# 参数就是生成的维度(N,1) -> N*1
# 返回值为指定维度的array
x = np.random.rand(N, 1) * 5 # 随机x
y = 9.81 * x
# randn函数返回一个或一组样本,具有标准正态分布。
# 参数就是生成的维度(N,1) -> N*1
noise = 2 * np.random.randn(N, 1)
y_obs = y + noise # 作为随机的y真实值
# %%
plt.scatter(x, y_obs, label="Observations")
plt.plot(x, y, c='r', label="True function")
plt.legend()
plt.show()
# %% 定义辅助函数
# helper functions
# w*x (y=w^t*x+b)
def f(w):
return w * x
# 损失函数, 将误差平方后求合 / N -> 方差
def loss_function(e):
L = np.sum(np.square(e)) / N
return L
# 计算梯度(导数): &L/&w = 1/N * ∑(2(y-wx)(-x)) = 1/N * ∑(2ξx)
def dL_dw(e, w):
# mean:求平均
return -2 * np.mean(e * x)
# w 会更新 -> w_new = w_current - γ * &L/&w ,其中γ是学习率(0~1)
# the actual gradient descent
def gradient_descent(iter=100, gamma=0.1):
"""
梯度下降
:param iter: 迭代轮数
:param gamma: 学习率
:return: w的历史值,loss的历史值
"""
# get starting conditions
w = 10 * np.random.random() # 随机初始化权重参数
# 保存参数w的所有历史值
params = []
# 生成 iter行1列的 0填充的 数组
loss = np.zeros((iter, 1)) # 记录损失值
for i in range(iter):
params.append(w)
# 计算误差
e = y_obs - f(w) # (y_obs真实值-f(w)预测值)
loss[i] = loss_function(e)
# update parameters 更新权重参数
w_news = w - gamma * dL_dw(e, w) # 更新w
w = w_news
return params, loss
# %% 开始计算
params, loss = gradient_descent()
iter = 100
gamma = 0.1
# 随机初始化权重参数
w = 10 * np.random.randn()
params = [] # w
loss = np.zeros((iter, 1)) # loss(e)
for i in range(iter):
params.append(w)
e = y_obs - f(w) # 求误差 (y_true-y_pred)
loss[i] = loss_function(e)
w_news = w - gamma * dL_dw(e, w)
w = w_news
print(dL_dw(e, w)) # -1.0766942892814769e-14
plt.plot(loss)
plt.show() # 损失值的图像一直在下降
params = np.array(params)
plt.plot(params)
plt.title('Gradient Descent')
plt.xlabel('w')
plt.show() # 权重的变化
# 查看最后一个w,可以看到非常接近9.8
print(params[-1]) # 9.757142247178272
import matplotlib.pyplot as plt
import numpy as np
# 设置随机数种子
np.random.seed(42)
# %matplotlib inline
# 集群数
K = 3
# 两个维度
D = 2
# 点的总数
N = 1000
# 不同集群的点数
Ns = [300, 400, 300]
# 3行2列 的随机数组(服从正态分布)
# 相当于生成3个二维坐标点 -> [[x1,y1],[x2,y2],[x3,y3]]
means = 5 * np.random.randn(K, D)
print(means)
x = []
# x: ['a','b','c'], y: [1, 2, 3] -> zip(x,y) -> ('a',1), ('b', 2), ('c',3)
# zip(Ns, means) -> [(300,[x1,y1]),(400,[x2,y2]),(300,[x3,y3])]
for n, m in zip(Ns, means):
print(n, m)
# 生成 n行2列的 随机数组 -> 相当于n个二维坐标点 -> m是means里的二维坐标(集群中心点)
x.append(np.random.randn(n, D) + m)
# 此时x的shape(形状)
# [(300, 2), (400, 2), (300, 2)]
print([x_.shape for x_ in x])
# zip -> [((300, 2),[x1,y1]),((400, 2),[x2,y2]),((300, 2),[x3,y3])]
for x_, m in zip(x, means):
print(x_.shape, m.shape)
# scatter(x,y) -> 绘制散点图
# x: n行2列 -> x_[:,0]: 所有行的第一列(xi), x[:,1]: 所有行的第二列(yi)
plt.scatter(x_[:, 0], x_[:, 1])
# 绘制集群中心点
# plt.plot里的这个'kx' -> 颜色字符k: 表示黑色,标记字符x: 表示图案为x标记
plt.plot(m[0], m[1], 'kx')
plt.title('True Clusters')
plt.show()
# 从x取出所有数据
# np.vstack: 按垂直方向(行顺序)堆叠数组构成一个新的数组,堆叠的数组需要具有相同的维度
# np.hstack: 按水平方向(列顺序)堆叠数组构成一个新的数组,堆叠的数组需要具有相同的维度
data = np.vstack(x)
print(data.shape)
plt.scatter(data[:, 0], data[:, 1])
plt.show()
# 计算欧式距离 √((x1-x2)^2 + (y1-y2)^2)
def dist(x1, x2, axis=None):
"""
Calculate euclidean distance
:param x1: 点1 (x1,y1)
:param x2: 点2 (x2,y2)
:param axis:
:return:
"""
# np.square(x1 - x2) -> (x1-x2)^2 平方
# np.sqrt 开根号
return np.sqrt(np.sum(np.square(x1 - x2), axis))
# 计算平均值和任意给定点的距离
def distance_matrix(x, m):
"""
Calculates the distance from each element x to each element in m.
:param x: data points
:param m: possible means
:return: distance matrix
"""
# 初始化(用0填充)距离矩阵(len(x)行len(m)列)
d = np.zeros((len(x), len(m)))
for i in range(len(x)):
for j in range(len(m)):
# 传入x的第i行所有列 x[i] -> (xi,yi)
# 传入m的第j行所有列 m[j] -> (xm,ym)
d[i, j] = dist(x[i, :], m[j, :])
return d
# 测试
# Test out the distance matrix algorithm
x_test = np.array([[0, 1], [1, 0], [0, 0], [1, 1]])
print(distance_matrix(x_test, x_test))
# 使用广播替代循环(python的循环性能低)
# broadcasting
# 计算欧式距离 √((x1-x2)^2 + (y1-y2)^2)
def dist(x1, x2, axis=-1):
"""
Calculate euclidean distance
:param x1: 点1 (x1,y1)
:param x2: 点2 (x2,y2)
:param axis:
:return:
"""
return np.sqrt(np.sum(np.square(x1 - x2), axis))
def distance_matrix_broadcasting(x, m):
# np.sum(axis=0) -> 行(维度1)求和 -> [[x1,y1],[x2,y2]] -> [[x1+x2],[y1+y2]]
# np.sum(axis=1) -> 列(维度2)求和 -> [[x1,y1],[x2,y2]] -> [[x1+y1],[x2+y2]]
# x所有点与m_相减([xx-mx,xy-my])并求平方,然后列相加((xx-mx)^2+(xy-my)^2,然后开根号得距离
d = [dist(x, m_, axis=1) for m_ in m]
# print(d)
# 按行堆叠(axis=1)
d = np.stack(d, axis=1)
return d
# 测试
# Test out the distance matrix algorithm
x_test = np.array([[0, 1], [1, 0], [0, 0], [1, 1]])
print(distance_matrix_broadcasting(x_test, x_test))
# 计算k-means
# 初始化数据
# 集群数
k = 3
#
iters = 10
print(data.shape)
print(data)
# (1000, 2)
# 3个集群中心点,(3行2列)
means = np.random.randn(k, data.shape[1])
for i in range(iters):
# 求data和中心点的距离 -> shape: (1000, 3) -> 这1000个点分别到3个中心点的距离
d = distance_matrix_broadcasting(data, means)
# print(d[:10])
# print(d.shape) # (1000,3)
# np.argmin: 找到最小的(值的下标)然后映射到指定轴
'''
array([[0, 4, 2],
[3, 1, 5]]) # 3*2
>>> np.argmin(a, axis=0)
映射到行x
找出每列最小元素的下标
array([0 1 0]) # (3,)
>>> np.argmin(a, axis=1)
映射到列y
找出每行最小元素的下标
array([0 1]) #(2,)
'''
cluster = d.argmin(axis=-1) # 找到平均距离最小值然后映射到列
# true
# print(cluster == np.argmin(d, axis=1))
# print(cluster[:10]) # [一堆0、1、2]
# print(cluster.shape) # (1000,)
for j in range(k):
# print(j) # 0,1,2
# 筛选出和第j个集群中心点最近的下标
idx = cluster == j # numpy布尔索引
# print(idx)
# print(data[idx])
# 绘制集群中心点
plt.plot(means[j, 0], means[j, 1], 'rx')
# data[idx] -> 找出所有cluster==j(与集群中心点j最近)的元素,组成新数组
plt.scatter(data[idx, 0], data[idx, 1])
# calculate new mean
# print('means\n', means[j, :])
# print('new:\n', data[idx].mean(axis=0))
# 根据当前平均距离最小的点确定新点
# 将距离集群中心点j最近的所有点的坐标求平均,得出新点的坐标
means[j, :] = data[idx].mean(axis=0)
# print(data[idx])
# print(means[j, :])
plt.show()
import numpy as np
import pandas as pd
import matplotlib as mpl
import matplotlib.pyplot as plt
plt.switch_backend('TkAgg')
# %matplotlib inline
# 导入数据集
iris = pd.read_csv('iris.txt', header=None)
print('iris:\n', iris[:3])
# print(len(iris.columns))
# iris = iris.iloc[:, :len(iris.columns) - 1]
# print('iris:\n', iris[:3])
# 计算距离
def distEclud(arrA, arrB):
# d = a-b
d = arrA - arrB
# sum(d^2)
dist = np.sum(np.power(d, 2), axis=1)
return dist
# 自动生成随机质心
def randCent(dataSet, k):
n = dataSet.shape[1]
# 取每一列的最大最小值
data_max = dataSet.iloc[:,:n-1].max()
data_min = dataSet.iloc[:,:n-1].min()
# 均匀分布抽样
# numpy.random.uniform(low,high,size)
# 从一个均匀分布[low,high)中随机采样,注意定义域是左闭右开
# size: 输出样本数目,为int或元组(tuple)类型,
# 例如,size=(m,n,k), 则输出 m * n * k 个样本,缺省时输出1个值。
data_cent = np.random.uniform(data_min, data_max, (k, n-1))
return data_cent # 质心
# 随机生成3个质心
iris_cent = randCent(iris, 3)
def kMeans(dataSet, k, distMeas=distEclud, createCent=randCent): # iris为150*5
m, n = dataSet.shape # m是行数(数据量),n是列数iris为150*5
# 下面生成的centroids,即第一个容器,后面用来存储最新更新的质心
centroids = createCent(dataSet, k) # centroids为3*4,用三个长度为4的一维数组记载3个质心
# 第一次centroids是随机生成的
# 这段生成的result_set,即第二个容器
# result_set结构: [数据集, 该行到最近质心的距离, 本次迭代中最近质心编号,上次迭代中最近质心编号]
clusterAssment = np.zeros((m, 3)) # clusterAssment为150*3的数组
clusterAssment[:, 0] = np.inf # np.inf为无穷大
clusterAssment[:, 1: 3] = -1 # 此时clusterAssment为150*3
result_set = pd.concat([dataSet, pd.DataFrame(clusterAssment)],
axis=1, ignore_index=True) # result_set为150*8
clusterChanged = True
while clusterChanged:
clusterChanged = False
for i in range(m): # 遍历result_set中每一行,一共m行
# 小心,下面的n为5,而resulit_set的列数已经变成8
dist = distMeas(dataSet.iloc[i, :n-1].values , centroids) # 第i行与三个质心的距离,dist为3*1
result_set.iloc[i, n] = dist.min() # result_set[i,n]记录该行与3个质心的最小距离
result_set.iloc[i, n + 1] = np.where(dist == dist.min())[0] # result_set[i,n]记录最近质心的索引
clusterChanged = not (result_set.iloc[:, -1] == result_set.iloc[:, -2]).all()
# 只要result_set最后两列不完全相等,意味着本次for循环结束时,m行所有的新质心与上次while循环留下的不完全一样
# 后果:clusterChanged为True,while继续循环
# clusterChanged为True,则需要运行下面的if语句代码块,重置第一个容器centroids和第二个容器result_set
if clusterChanged:
cent_df = result_set.groupby(n + 1).mean() # 按照列索引为n+1(质心索引)(第6列)进行分组求均值
# 即:按照最新的簇分类,计算最新3个质心的位置
centroids = cent_df.iloc[:, :n - 1].values # 重置centroids,用最新质心位置,替换上次的。3*4
result_set.iloc[:, -1] = result_set.iloc[:, -2] # result_set最后一列,本次的簇分类编码,替换掉上次的
return centroids, result_set
iris_cent, iris_result = kMeans(iris, 3)
print(iris_result.head())
# 全部行都能输出
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
# 解决坐标轴刻度负号乱码
plt.rcParams['axes.unicode_minus'] = False
# 解决中文乱码问题
plt.rcParams['font.sans-serif'] = ['Simhei']
plt.style.use('ggplot')
# plt.figure(figsize=(2,3),dpi=720)
rowdata = {'颜色深度': [14.13, 13.2, 13.16, 14.27, 13.24, 12.07, 12.43, 11.79, 12.37, 12.04],
'酒精浓度': [5.64, 4.28, 5.68, 4.80, 4.22, 2.76, 3.94, 3.1, 2.12, 2.6],
'品种': [0, 0, 0, 0, 0, 1, 1, 1, 1, 1]}
# 0 代表 “黑皮诺”,1 代表 “赤霞珠”
wine_data = pd.DataFrame(rowdata)
print(wine_data)
X = np.array(wine_data.iloc[:, 0:2]) # 我们把特征(酒的属性)放在X
y = np.array(wine_data.iloc[:, -1]) # 把标签(酒的类别)放在Y
# 探索数据,假如我们给出新数据[12.03,4.1] ,你能猜出这杯红酒是什么类别么?
new_data = np.array([12.03, 4.1])
plt.scatter(X[y == 1, 0], X[y == 1, 1], color='red', label='赤霞珠') # 画出标签y为1的、关于“赤霞珠”的散点
plt.scatter(X[y == 0, 0], X[y == 0, 1], color='purple', label='黑皮诺') # 画出标签y为0的、关于“黑皮诺”的散点
plt.scatter(new_data[0], new_data[1], color='yellow') # 新数据点
print(new_data)
plt.xlabel('酒精浓度')
plt.ylabel('颜色深度')
plt.legend(loc='lower right')
plt.savefig('葡萄酒样本.png')
plt.show()
from math import sqrt
distance = [sqrt(np.sum((x - new_data) ** 2)) for x in X]
print(distance)
# 从近到远排序后的原数据点
sort_dist = np.argsort(distance)
print('sort_dist:\n', sort_dist)
k = 3
# 找出离新数据最近的3个点
topK = [y[i] for i in sort_dist[:k]]
print(topK) # [1,1,0]
# Pandas Series 类似表格中的一个列(column),类似于一维数组,可以保存任何数据类型
# 找出其中数量最多的值
print(pd.Series(topK).value_counts().index[0]) # 1
def KNN(new_data, dataSet, k):
'''
函数功能:KNN分类器
参数说明:
new_data: 需要预测分类的数据集
dataSet: 已知分类标签的数据集
k: k-近邻算法参数,选择距离最小的k个点
return:
result: 分类结果
'''
from math import sqrt
from collections import Counter
import numpy as np
import pandas as pd
result = []
distance = [sqrt(np.sum((x - new_data) ** 2)) for x in
np.array(dataSet.iloc[:, 0:len(dataSet.columns) - 2])]
sort_dist = np.argsort(distance)
topK = [dataSet.iloc[:, -1][i] for i in sort_dist[:k]]
result.append(pd.Series(topK).value_counts().index[0])
return result
# 测试函数的运行结果
new_data = np.array([12.03, 4.1])
k = 3
print(KNN(new_data, wine_data, k)) # [1]
import numpy as np
import pandas as pd
def shannon_entropy():
"""
香农熵
:return:
"""
p = 1 / 2
return -((p * np.log2(p)) + (p * np.log2(p)))
def calEnt(dataSet):
"""
计算数据集的香农熵
:param dataSet:
:return:
"""
n = dataSet.shape[0] # 数据集总行数
# [[不是, 3], [是, 2]]
iset = dataSet.iloc[:, -1].value_counts() # 标签的所有类别
p = iset / n # 每一类标签所占比
ent = (-p * np.log2(p)).sum() # 计算信息熵
return ent
row_data = {'是否陪伴': [0, 0, 0, 1, 1],
'是否玩游戏': [1, 1, 0, 1, 1],
'渣男': ['是', '是', '不是', '不是', '不是']}
dataSet = pd.DataFrame(row_data)
# 计算全体数据的信息熵——根据标签列去进行计算
# 0.9709505944546686
print(calEnt(dataSet))
# 选择最优的列进行切分
def bestSplit(dataSet):
baseEnt = calEnt(dataSet) # 计算原始熵
bestGain = 0 # 初始化信息增益
axis = -1 # 初始化最佳切分列,标签列
for i in range(dataSet.shape[1] - 1): # 对特征的每一列进行循环
levels = dataSet.iloc[:, i].value_counts().index # 提取出当前列的所有取值
ents = 0 # 初始化子节点的信息熵
for j in levels: # 对当前列的每一个取值进行循环
childSet = dataSet[dataSet.iloc[:, i] == j] # 某一个子节点的dataframe
ent = calEnt(childSet) # 计算某一个子节点的信息熵
ents += (childSet.shape[0] / dataSet.shape[0]) * ent # 计算当前列的信息熵
print('第{}列的信息熵为{}'.format(i, ents))
infoGain = baseEnt - ents # 计算当前列的信息增益
print('第{}列的信息增益为{}\n'.format(i, infoGain))
if (infoGain > bestGain):
bestGain = infoGain # 选择最大信息增益
axis = i # 最大信息增益所在列的索引
print("第{}列为最优切分列".format(axis))
return axis
print(bestSplit(dataSet)) # 0
def mySplit(dataSet, axis, value):
"""
函数功能:按照给定的列划分数据集
参数说明:
dataSet:原始数据集
axis:指定的列索引
value:指定的属性值
return:redataSet:按照指定列索引和属性值切分后的数据集
"""
col = dataSet.columns[axis]
redataSet = dataSet.loc[dataSet[col] == value, :].drop(col, axis=1)
return redataSet
# 验证函数:以axis=0,value=1为例
print(mySplit(dataSet, 0, 1))
import pandas as pd
from shannon import bestSplit, mySplit
row_data = {'是否陪伴': [0, 0, 0, 1, 1],
'是否玩游戏': [1, 1, 0, 1, 1],
'渣男': ['是', '是', '不是', '不是', '不是']}
dataSet = pd.DataFrame(row_data)
def createTree(dataSet):
"""
函数功能:基于最大信息增益切分数据集,递归构建决策树
参数说明:
dataSet:原始数据集(最有一列是标签)
return:myTree:字典形式的树
"""
featlist = list(dataSet.columns) # 提取出数据集所有的列
# print(featlist) # ['是否陪伴', '是否玩游戏', '渣男']
# 最后一列 -> [所有出现的取值:该取值的数量]
classlist = dataSet.iloc[:, -1].value_counts() # 获取最后一列类标签
# 每个分支下的所有实例都具有相同的分类 || 程序遍历完所有划分数据集的属性
# 判断最多标签数目是否等于数据集行数,或者数据集是否只有一列
if classlist[0] == dataSet.shape[0] or dataSet.shape[1] == 1:
return classlist.index[0] # 如果是,返回类标签
axis = bestSplit(dataSet) # 确定出当前最佳切分列的索引
bestfeat = featlist[axis] # 获取该索引对应的特征
myTree = {bestfeat: {}} # 采用字典嵌套的方式存储树信息
del featlist[axis] # 删除当前特征
valuelist = set(dataSet.iloc[:, axis]) # 提取最佳切分列所有属性值
for value in valuelist: # 对每一个属性值递归建树
# mySplit([[1,1],[2,1],[0,0]],1,1) -> [1,2]
# mySplit(数据集,第i列,value) 提取dataset中,第i列中,和value相等的值,组成新dataset
myTree[bestfeat][value] = createTree(mySplit(dataSet, axis, value))
return myTree
#查看运行结果
myTree = createTree(dataSet)
# {'是否陪伴': {0: {'是否玩游戏': {0: '不是', 1: '是'}}, 1: '不是'}}
print(myTree)
# 多元线性回归
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
a = np.array([[1, 2], [3, 4]])
m = np.mat(a) # NumPy中创建矩阵需要使?mat函数,该函数需要输??维数组
print('matrix:\n', m)
# 矩阵转置
print('m.T:\n', m.T)
# 矩阵乘法
print('m * m:\n', m * m)
# 计算矩阵行列式
print('np.linalg.det(m):\n', np.linalg.det(m))
# 求逆矩阵
print('m.I:\n', m.I)
# 线性回归函数
def standRegres(dataSet):
xMat = np.mat(dataSet.iloc[:, :-1].values) # 提取特征
yMat = np.mat(dataSet.iloc[:, -1].values).T # 提取标签
xTx = xMat.T * xMat
if np.linalg.det(xTx) == 0:
print('This matrix is singular,cannot do inverse') # 行列式为0,则该矩阵为奇异矩阵,无法求解逆矩阵
return
ws = xTx.I * (xMat.T * yMat)
return ws
rng = np.random.RandomState(1) # 设置随机种子
x = 5 * rng.rand(100) # 100个[0,5)的随机数
y = 2 * x - 5 + rng.randn(100) # 真实规律的标签取值
X = pd.DataFrame(x)
Y = pd.DataFrame(y)
ex = pd.DataFrame(np.ones([100, 1])) # 添加一列权威1的列,表示截距
data = pd.concat([ex, X, Y], axis=1)
ws = standRegres(data)
print('ws', ws)
yhat = data.iloc[:, :-1].values * ws # 预测标签值
plt.plot(data.iloc[:, 1], data.iloc[:, 2], 'o') # 原始数据点
plt.plot(data.iloc[:, 1], yhat) # 拟合直线
y = data.iloc[:, -1].values
yhat = yhat.flatten()
SSE = np.power(yhat - y, 2).sum()
print('SSE:\n', SSE)
def sseCal(dataSet, regres):
n = dataSet.shape[0]
y = dataSet.iloc[:, -1].values
ws = regres(dataSet)
yhat = dataSet.iloc[:, :-1].values * ws
yhat = yhat.flatten()
SSE = np.power(yhat - y, 2).sum()
return SSE
SSE = sseCal(data, standRegres)
print('SSE:\n', SSE)
sse = sseCal(data, standRegres)
y = data.iloc[:, -1].values
sst = np.power(y - y.mean(), 2).sum()
print(1 - sse / sst)
def rSquare(dataSet, regres):
sse = sseCal(dataSet, regres)
y = dataSet.iloc[:, -1].values
sst = np.power(y - y.mean(), 2).sum()
return 1 - sse / sst
res = rSquare(data, standRegres)
print('rSquare:\n', res)