介绍
最近感觉以前的文章说的效率比较低,可能也不透彻
以后直接贴代码,说模型就好了
如果不懂的话 可以百度 或者参考书籍(《统计学习方法》《周志华-机器学习》《数据挖掘导论》)
正文
这一次我们讲一个很出名的模型,决策树
决策树过程是从数据集中依次挑选出最有效益的特征,且每次挑选特征后,会将样本按这个特征分组,且样本剔除这个特征。然后一直递归上述操作,直到满足指定条件
指定条件
- 所有在一个组中的样本都同类
- 样本中只剩下一个特征值,不能再分类,返回样本中类别次数最多的、
关键细节
- 如何定义数据集的纯度 – 信息熵
- 如何挑选出对分类最有效益的特征 – 信息增益
- 每次抽取特征后要去除这个特征
- 通过递归创建树
code
from math import log
from sklearn.datasets import load_iris
import numpy as np
from sklearn.model_selection import train_test_split
from plot_tree import create_plot_tree
import operator
from cross_validate import *
"""
决策树是根据每一次抽取信息增益最大的特征值而进行分类的
力求每一次分类后,两部分尽可能的各自为一个类别
"""
class Decision_Tree:
def data_process(self, X):
n = len(X[0])
self.mean = np.zeros(n)
for i in range(n):
self.mean[i] = np.mean(X[:, i])
for j in range(len(X[:, i])):
if X[j, i] > self.mean[i]:
X[j, i] = 1
else:
X[j, i] = 0
return X
def data_process2(self, X):
for i in range(len(X)):
if X[i] > self.mean[i]:
X[i] = 1
else:
X[i] = 0
return X
def merge(self,X,y): # 合并X 与 y
return np.column_stack((X,y))
def calc_shannon_ent(self, data_set): # 计算香农熵
num_entries = len(data_set)
label_count = {}
for featVec in data_set: # 对于每一个样本
current_label = featVec[-1] # 获取类别标签
label_count[current_label] = label_count.get(current_label, 0) + 1 # 类别数量加一,如果没有,则赋值0再加1
shannon_rnt = 0.0
for key in label_count: # 香农熵的计算
prob = float(label_count[key])/num_entries
shannon_rnt -= prob * log(prob, 2)
return shannon_rnt
def split_data_set(self, data_set, axis, value): # 抽取特征值,划开数据集
ret_data_set = []
for feat_vec in data_set:
if feat_vec[axis] == value: # 如果样本当前特征值等于某个值
reduced_feat_vec = list(feat_vec[:axis]) # 获取抽取掉特征值的样本
reduced_feat_vec.extend(list(feat_vec[axis+1:]))
ret_data_set.append(reduced_feat_vec) # 组成新的数据集
return ret_data_set
def choose_best_feature_to_split(self, data_set): # 选择最好的特征值去划分
num_features = len(data_set[0]) - 1 # 获取特征值的数量
base_entropy = self.calc_shannon_ent(data_set) # 计算香农熵
best_info_gain, best_feature = 0.0, -1 # 最好的收益 和 特征
for i in range(num_features): # 对于每个特征值
feat_list = [example[i] for example in data_set] # 获得所有此特征值的值
unique_vals = set(feat_list) # 去重
new_entropy = 0.0
for value in unique_vals: # 对于所有值
sub_data_set = self.split_data_set(data_set, i, value) # 尝试划分数据集
prob = len(sub_data_set)/float(len(data_set))
new_entropy += prob * self.calc_shannon_ent(sub_data_set) # 尝试计算香农熵
info_gain = base_entropy - new_entropy # 得到信息增益
if info_gain > best_info_gain: # 判断增益大小
best_info_gain = info_gain
best_feature = i
return best_feature
def major_cnt(self, class_list): # 多数选举策略
class_count = {}
for vote in class_list:
class_count[vote] = class_count.get(vote, 0) + 1
sorted_class_count = sorted(class_count.items(),
key=operator.itemgetter(1), reverse=True)
return int(sorted_class_count[0][0]) # 返回出现次数最多的类
def create_tree(self, data_set, labels): # 创建决策树
class_list = [example[-1] for example in data_set] # 获得当前数据集类标签
if class_list.count(class_list[0]) == len(class_list): # 如果只剩下一个类,返回这个类
return class_list[0]
if len(data_set[0]) == 2: # 如果只剩下一个特征值, 返回出现次数最多的类
return self.major_cnt(class_list)
best_feat = self.choose_best_feature_to_split(data_set) # 获得最好的特征值
best_feat_label = labels[best_feat] # 获得当前特征值名称
my_tree = {best_feat_label:{}} # 以嵌套字典形式创建树
del(labels[best_feat]) # 删除抽取之后的特征值的名称
feat_value = [i [best_feat] for i in data_set] # 获得所有当前被抽取特征值的值
unique_vals = set(feat_value)
for value in unique_vals: # 对于每一个值来说,继续下一步的嵌套
sub_labels = labels[:]
my_tree[best_feat_label][value] = self.create_tree(self.split_data_set # 每个字典值都等于一个类或者是子树(字典)
(data_set, best_feat, value), sub_labels)
return my_tree
def classify(self, input_tree, feat_labels, test_vec): # 获得测试样本分类
first_str = list(input_tree.keys())[0] # 获得树第一个特征值名称
second_dict = input_tree[first_str] # 获得类或子树
feat_index = feat_labels.index(first_str) # 获取当前特征值索引
for key in second_dict.keys(): # 对于所有值
if test_vec[feat_index] == key: # 找到对于特征值
if type(second_dict[key]).__name__ == 'dict': # 如果还是子树,递归
class_label = self.classify(second_dict[key], feat_labels, test_vec)
else:
class_label = second_dict[key] # 如果是类,返回这个类
return class_label
def fit(self, X, y, label): # 调用函数,训练模型
self.label = list(label) # 设定一个类属性 label
X = self.data_process(X) # 数据预处理
merged_data = self.merge(X, y)
self.tree = self.create_tree(merged_data, label) # 生成树
def predict(self, X):
predict_list = []
for i in X: # 对每一个做分类
x = self.data_process2(i)
class_name = self.classify(self.tree, self.label, x)
predict_list.append(class_name)
return predict_list
def store_tree(self, filename): # 存储树
import pickle
fw = open(filename, 'w+')
pickle.dump(self.tree, fw)
fw.close()
def grab_tree(self, filename): # 读取树
import pickle
fr = open(filename)
return pickle.load(fr)
def plant_tree(self):
create_plot_tree(self.tree)
if __name__ == '__main__':
X, Y = load_iris().data, load_iris().target
label_name = load_iris().feature_names
train_x, test_x, train_y, test_y = train_test_split(X, Y, train_size=0.8)
tree_clf = Decision_Tree()
tree_clf.fit(train_x, train_y, label_name)
prediction = tree_clf.predict(test_x)
print(tree_clf.tree)
tree_clf.plant_tree()
print(accuracy(prediction, test_y))
这部分代码借鉴《机器学习实战》,如果不懂的话可以去看一下这本书
树的可视化
import matplotlib.pyplot as plt
decision_node = dict(boxstyle='sawtooth', fc='0.8') # 判断结点
leaf_node = dict(boxstyle='round4',fc='0.8') # 叶子结点
arrow_args = dict(arrowstyle='<-') # 箭头
def plot_node(node_txt, center_pt, parent_pt, node_type): # 画图
create_plot.ax1.annotate(node_txt, xy=parent_pt, xycoords='axes fraction',
xytext=center_pt, textcoords='axes fraction',
va='center', ha='center', bbox=node_type, arrowprops=arrow_args)
def create_plot():
fig = plt.figure(1, facecolor='white')
fig.clf()
create_plot.ax1 = plt.subplot(111, frameon=False)
plot_node('a decision node', (0.5, 0.1), (0.1, 0.5), decision_node)
plot_node('a leaf node ', (0.8, 0.1), (0.3, 0.8), leaf_node)
plt.show()
def get_num_leaf(my_tree): # 获得叶子结点数目
num_leafs = 0 # 叶子结点数目
first_str = list(my_tree.keys())[0] # 获取第一个键值, 需要转列表
second_dict = my_tree[first_str] # 获取嵌套字典
for key in second_dict.keys(): # 对于内嵌字典
if type(second_dict[key]).__name__ == 'dict': # 如果是字典而是而不是值(类别),则继续递归
num_leafs += get_num_leaf(second_dict[key])
else:
num_leafs += 1
return num_leafs
def get_tree_depth(my_tree): # 获得树深度
max_depth = 0
first_str = list(my_tree.keys())[0]
second_dict = my_tree[first_str]
for key in second_dict.keys():
if type(second_dict[key]).__name__ == 'dict':
this_depth = 1 + get_tree_depth(second_dict[key])
else:
this_depth = 1
if this_depth > max_depth:
max_depth = this_depth
return max_depth
def plot_mid_test(cntr_pt, parent_pt, txt_string):
x_mid = (parent_pt[0] - cntr_pt[0])/2.0 + cntr_pt[0]
y_mid = (parent_pt[1] - cntr_pt[1])/2.0 + cntr_pt[1]
create_plot.ax1.text(x_mid, y_mid, txt_string)
def plot_tree(my_tree, parent_pt, node_txt):
num_leafs = get_num_leaf(my_tree)
depth = get_tree_depth(my_tree)
first_str = list(my_tree.keys())[0]
cntr_pt = (plot_tree.xOff + (1.0 + float(num_leafs))/2.0/plot_tree.total_w,
plot_tree.yOff)
plot_mid_test(cntr_pt, parent_pt, node_txt)
plot_node(first_str, cntr_pt, parent_pt, decision_node)
second_dict = my_tree[first_str]
plot_tree.yOff = plot_tree.yOff - 1.0/plot_tree.total_d
for key in second_dict.keys():
if type(second_dict[key]).__name__ == 'dict':
plot_tree(second_dict[key], cntr_pt, str(key))
else:
plot_tree.xOff = plot_tree.xOff + 1.0/plot_tree.total_w
plot_node(second_dict[key], (plot_tree.xOff, plot_tree.yOff),
cntr_pt, leaf_node)
plot_mid_test((plot_tree.xOff, plot_tree.yOff), cntr_pt, str(key))
plot_tree.yOff = plot_tree.yOff = 1.0/plot_tree.total_d
def create_plot_tree(in_tree):
fig = plt.figure(1, facecolor='white',figsize=(8,8))
fig.clf()
axprops = dict(xticks=[], yticks=[])
create_plot.ax1 = plt.subplot(111, frameon=False, **axprops)
plot_tree.total_w = float(get_num_leaf(in_tree))
plot_tree.total_d = float(get_tree_depth(in_tree))
plot_tree.xOff = -0.5/plot_tree.total_w
plot_tree.yOff = 1.0
plot_tree(in_tree, (0.5, 1.0), '')
plt.show()