练习地址:https://www.kaggle.com/c/ds100fa19
相关博文:
[Kaggle] Spam/Ham Email Classification 垃圾邮件分类(spacy)
[Kaggle] Spam/Ham Email Classification 垃圾邮件分类(RNN/GRU/LSTM)
本文使用 huggingface 上的预训练模型,在预训练模型的基础上,使用垃圾邮件数据集,进行训练 finetune,在kaggle提交测试结果
本文代码参考了《自然语言处理动手学Bert文本分类》
from datetime import timedelta
import torch
import torch.nn as nn
import pandas as pd
import numpy as np
train = pd.read_csv("train.csv")
test_csv = pd.read_csv("test.csv")
train = train.fillna(" ")
test_csv = test_csv.fillna(" ")
train['all'] = train['subject'] + ' ' + train['email'] # 合并两个特征
# 切分出一些验证集,分层抽样
from sklearn.model_selection import StratifiedShuffleSplit
splt = StratifiedShuffleSplit(n_splits=1,test_size=0.2,random_state=1)
for train_idx, valid_idx in splt.split(train, train['spam']):
train_part = train.loc[train_idx]
valid_part = train.loc[valid_idx]
y_train = train_part['spam']
y_valid = valid_part['spam']
X_train = train_part['all']
X_valid = valid_part['all']
X_test = test_csv['subject'] + ' ' + test_csv['email']
y_test = [0]*len(X_test) # 测试集没有标签,这么处理方便代码处理
y_test = torch.LongTensor(y_test) # 转成tensor
模型下载很慢的话,我传到 csdn了,可以免费下载
以上模型文件放在一个文件夹里,如./bert_hugginggace/
提前安装包
pip install transformers
from transformers import AutoTokenizer, AutoModelForSequenceClassification
tokenizer = AutoTokenizer.from_pretrained("./bert_hugginggace")
# distilbert-base-uncased-finetuned-sst-2-english
pretrain_model = AutoModelForSequenceClassification.from_pretrained("./bert_hugginggace")
一些使用的参数
PAD, CLS = '[PAD]', '[CLS]'
max_seq_len = 128
bert_hidden = 768
num_classes = 2
learning_rate = 1e-5
decay = 0.01
num_epochs = 5
early_stop_time = 2000
batch_size = 32
save_path = "./best_model.ckpt" # 最好的模型
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
def load_dataset(texts, labels):
contents = []
for t, label in zip(texts, labels):
token = tokenizer.tokenize(t)
token = [CLS] + token
# ['[CLS]', 'subject', ':', 'cell', 'phones', 'coming', 'soon', '<', 'html', '>', ...]
seq_len = len(token)
mask = []
token_ids = tokenizer.convert_tokens_to_ids(token)
# [101, 3395, 1024, 3526, 11640, 2746, 2574, 1026, 16129, 。。。]
if len(token) < max_seq_len: # 长度不够的,pad 补齐
mask = [1]*len(token) + [0]*(max_seq_len-len(token))
token_ids = token_ids + [0]*(max_seq_len-len(token))
else: # 超长的,截断
mask = [1]*max_seq_len
token_ids = token_ids[:max_seq_len]
seq_len = max_seq_len
y = [0]*num_classes
y[label] = 1 # 处理下标签,方便后面计算 二元交叉熵损失
contents.append((token_ids, y, seq_len, mask))
return contents
class datasetIter():
def __init__(self, datasets, batch_size, device):
self.datasets = datasets
self.idx = 0
self.device = device
self.batch_size = batch_size
self.batches = len(datasets)//batch_size
self.residues = False
if len(datasets)%batch_size != 0:
self.residues = True # 剩余不足 batch_size 个的样本
def __next__(self):
if self.residues and self.idx==self.batches:
batch_data = self.datasets[self.idx * self.batch_size : len(self.datasets)]
self.idx += 1
batch_data = self._to_tensor(batch_data)
return batch_data
elif self.idx > self.batches:
self.idx = 0
raise StopIteration
else:
batch_data = self.datasets[self.idx * self.batch_size : (self.idx+1) * self.batch_size]
self.idx += 1
batch_data = self._to_tensor(batch_data)
return batch_data
def _to_tensor(self, datasets):
x = torch.LongTensor([item[0] for item in datasets]).to(self.device)
y = torch.FloatTensor([item[1] for item in datasets]).to(self.device)
seq_len = torch.LongTensor([item[2] for item in datasets]).to(self.device)
mask = torch.LongTensor([item[3] for item in datasets]).to(self.device)
return (x, seq_len, mask), y
def __iter__(self):
return self
def __len__(self):
if self.residues:
return self.batches + 1
else:
return self.batches
def build_iter(datasets, batch_size, device):
iter = datasetIter(datasets,batch_size,device)
return iter
class myModel(nn.Module):
def __init__(self):
super(myModel, self).__init__()
self.pretrain_model = pretrain_model # 预训练的bert模型
for param in self.pretrain_model.parameters():
param.requires_grad = True # 打开 finetune 开关
def forward(self, x):
context = x[0]
mask = x[2]
out = self.pretrain_model(context, attention_mask=mask)
out = torch.sigmoid(out.logits) # sigmoid到 (0,1) 方便计算交叉熵
return out
import time
import torch.nn.functional as F
from sklearn import metrics
from transformers.optimization import AdamW
def get_time_dif(starttime):
# calculate used time
endtime = time.time()
return timedelta(seconds=int(round(endtime-starttime)))
def train(model, train_iter, dev_iter, test_iter):
starttime = time.time() # 记录开始时间
model.train()
optimizer = AdamW(model.parameters(),lr=learning_rate,weight_decay=decay)
total_batch = 0
dev_best_loss = float("inf")
last_improve = 0
no_improve_flag = False
model.train()
for epoch in range(num_epochs):
print("Epoch {}/{}".format(epoch+1, num_epochs))
for i, (X, y) in enumerate(train_iter):
outputs = model(X) # batch_size * num_classes
model.zero_grad() # 清理梯度增量
loss = F.binary_cross_entropy(outputs, y)
loss.backward()
optimizer.step()
if total_batch%100 == 0: # 打印训练信息
truelabels = torch.max(y.data, 1)[1].cpu()
pred = torch.max(outputs, 1)[1].cpu()
train_acc = metrics.accuracy_score(truelabels, pred)
# 调用 评估函数 检查验证集上的效果
dev_acc, dev_loss = evaluate(model, dev_iter)
# 检查验证集上的效果, 保留效果最好的
if dev_loss < dev_best_loss:
dev_best_loss = dev_loss
torch.save(model.state_dict(), save_path)
improve = '*'
last_improve = total_batch
else:
improve = ' '
time_dif = get_time_dif(starttime)
# 打印训练信息,id : >右对齐,n 宽度,.3 小数位数
msg = 'Iter:{0:>6}, Train Loss:{1:>5.2}, Train Acc:{2:>6.2}, Val Loss:{3:>5.2}, val Acc :{4:>6.2%}, Time:{5} {6}'
print(msg.format(total_batch, loss.item(),train_acc, dev_loss, dev_acc, time_dif, improve))
model.train()
total_batch += 1
# 如果长时间没有改进,认为收敛,停止训练
if total_batch - last_improve > early_stop_time:
print("no improve after {} times, stop!".format(early_stop_time))
no_improve_flag = True
break
if no_improve_flag:
break
# 调用 测试函数,生成预测结果
test(model, test_iter)
def evaluate(model, dev_iter):
model.eval() # 评估模式
loss_total = 0
pred_all = np.array([], dtype=int)
labels_all = np.array([], dtype=int)
with torch.no_grad(): # 不记录图的操作,不更新梯度
for X, y in dev_iter:
outputs = model(X)
loss = F.binary_cross_entropy(outputs, y)
loss_total += loss
truelabels = torch.max(y.data, 1)[1].cpu()
pred = torch.max(outputs, 1)[1].cpu().numpy()
labels_all = np.append(labels_all, truelabels)
pred_all = np.append(pred_all, pred)
acc = metrics.accuracy_score(labels_all, pred_all)
return acc, loss_total/len(dev_iter)
def test(model, test_iter):
model.load_state_dict(torch.load(save_path)) # 加载最佳模型
model.eval() # 评估模式
pred_all = np.array([], dtype=int)
with torch.no_grad():
for X, y in test_iter:
outputs = model(X)
pred = torch.max(outputs, 1)[1].cpu().numpy()
pred_all = np.append(pred_all, pred)
# 写入提交文件
id = test_csv['id']
output = pd.DataFrame({'id':id, 'Class': pred_all})
output.to_csv("submission_bert.csv", index=False)
# 确定随机数
np.random.seed(520)
torch.manual_seed(520)
torch.cuda.manual_seed_all(520)
torch.backends.cudnn.deterministic = True
# 加载数据
train_data = load_dataset(X_train, y_train)
valid_data = load_dataset(X_valid, y_valid)
test_data = load_dataset(X_test, y_test)
# 数据迭代器
train_iter = build_iter(train_data, batch_size, device)
valid_iter = build_iter(valid_data, batch_size, device)
test_iter = build_iter(test_data, batch_size, device)
# 模型
model = myModel().to(device)
# 训练、评估、测试
train(model, train_iter, valid_iter, test_iter)
Private Score:0.98714
Public Score:0.99000
没怎么调参,准确率接近99%,效果还是很不错的!
欢迎大家提出意见和指正!多谢!