# -*- coding: utf-8 -*-
import matplotlib.pyplot as plt
import numpy as np
import os
import re
import random
class NativeBayes(object):
def __init__(self):
self._train_x = [] # 训练数据x
self._train_y = [] # 训练数据y
self._test_x = [] # 测试数据x
self._test_y = [] # 测试数据y
self._all_words = None # 所有的单词
self._all_words_num = 0 # 所以单词数量
def split_to_word(self, text): # 将文本切分称单词
words = re.split(r'\W*', text)
#return [word.lower() for word in words if len(word) > 2]
return [word.lower() for word in words if len(word) > 2 and re.match(r'[a-zA-Z]', word)]
def words_to_vector(self, words): # 将切分后的单词转换称单词向量
vector = [0]*self._all_words_num
for word in words:
if word in self._all_words:
vector[self._all_words.index(word)] += 1
return vector
def load_data(self, positive_dir, nagetive_dir):
train_files = os.listdir(positive_dir)
input_x = []
input_y = []
all_input_x = []
for i in train_files:
with open('{}/{}'.format(positive_dir,i)) as f:
text = f.read()
words = self.split_to_word(text)
input_x.append(words)
all_input_x.extend(words)
input_y.append(1)
train_files = os.listdir(nagetive_dir)
for i in train_files:
with open('{}/{}'.format(nagetive_dir,i)) as f:
text = f.read()
words = self.split_to_word(text)
input_x.append(words)
all_input_x.extend(words)
input_y.append(-1)
self._all_words = list(set(all_input_x)) # 获得数据里面所有的单词列表
self._all_words_num = len(self._all_words) # 单词列表里面的单词数量
total = len(input_y)
test_x = []
test_y = []
for i in range(10): # 将数据集分为训练数据和测试数据
index = random.randint(0, total-1)
test_x.append(input_x[index])
test_y.append(input_y[index])
del(input_x[index])
del(input_y[index])
total -= 1
self._train_x = []
self._train_y = input_y
train_num = len(input_y)
print('train data num', train_num)
for i in range(train_num): # 将训练数据单词列表转换称单词向量
vector = self.words_to_vector(input_x[i])
self._train_x.append(vector)
self._test_x = []
self._test_y = test_y
test_num = len(test_y)
print('test data num', test_num)
for i in range(test_num): # 将测试数据单词列表转换称单词向量
vector = self.words_to_vector(test_x[i])
self._test_x.append(vector)
def train(self):
train_data_num = len(self._train_y)
p_positive = np.ones(self._all_words_num) # 贝叶斯估计,所有单词初始化lamda=1
p_negative = np.ones(self._all_words_num) # 贝叶斯估计,lamda=1
positive_words_total = self._all_words_num # 同时所有的单词数量响应增加
negative_words_total = self._all_words_num # 原书中此处为0,应该是错误的
total_positive = 0
for i in range(train_data_num):
if self._train_y[i] == 1:
p_positive += self._train_x[i]
positive_words_total += sum(self._train_x[i])
total_positive += 1
else:
p_negative += self._train_x[i]
negative_words_total += sum(self._train_x[i])
p_positive = np.log(p_positive/positive_words_total) # 计算各个单词的条件概率
p_negative = np.log(p_negative/negative_words_total)
positive_class = total_positive/float(train_data_num) # 计算分类概率
print('train positive percent',positive_class)
return p_positive,p_negative,positive_class
def classify(self, p_positive, p_negative, positive_class, vector):
# 分别计算各个子类的概率
positive = np.sum(p_positive*vector) + np.log(positive_class)
nagative = np.sum(p_negative*vector) + np.log(1 - positive_class)
print(positive,nagative)
if positive > nagative:
return 1
else:
return -1
def test_data(self):
p_positive,p_negative,positive_class = self.train()
total_test = len(self._test_y)
error_num = 0
for i in range(total_test):
vector = self._test_x[i]
predict = self.classify(p_positive, p_negative, positive_class, vector)
if predict != self._test_y[i]:
error_num += 1
print('predict error num', error_num)
bayes = NativeBayes()
postive = "/path_to/input/4.NaiveBayes/email/ham/"
nagative = "/path_to/input/4.NaiveBayes/email/spam/"
bayes.load_data(postive, nagative)
bayes.test_data()