本文主要介绍深度学习中文本分类的方法模型及调优trick
Fasttext是Facebook推出的一个便捷的工具,包含文本分类和词向量训练两个功能。
Fasttext的分类实现很简单:把输入转化为词向量,取平均,再经过线性分类器得到类别。输入的词向量可以是预先训练好的,也可以随机初始化,跟着分类任务一起训练。
Fasttext直到现在还被不少人使用,主要有以下优点:
详情可参见 https://cloud.tencent.com/developer/article/1835273
部分代码示例
import logging
import fasttext
import pandas as pd
import codecs
basedir = '/Data/'
logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s', level=logging.INFO)
# 训练
classifier = fasttext.supervised(basedir + "news.data.seg.train", basedir + "news.dat.seg.model", label_prefix="__label__", word_ngrams=3, bucket=2000000)
# 测试并输出 F-score
result = classifier.test(basedir + "news.data.seg.test")
print(result.precision * result.recall * 2 / (result.recall + result.precision))
# 读取验证集
validate_texts = []
with open(basedir + 'news.data.seg.validate', 'r', encoding='utf-8') as infile:
for line in infile:
validate_texts += [line]
# 预测结果
labels = classifier.predict(validate_texts)
模型结构如图,图像中的卷积都是二维的,而TextCNN则使用「一维卷积」,即filter_size * embedding_dim,有一个维度和embedding相等。这样filter_size就能抽取n-gram的信息。以1个样本为例,整体的前向逻辑是:
在TextCNN的实践中,有很多地方可以优化(参考这篇论文1):
TextCNN是很适合中短文本场景的强baseline,但不太适合长文本,因为卷积核尺寸通常不会设很大,无法捕获长距离特征。同时max-pooling也存在局限,会丢掉一些有用特征。另外再仔细想的话,TextCNN和传统的n-gram词袋模型本质是一样的,它的好效果很大部分来自于词向量的引入3,解决了词袋模型的稀疏性问题。
相关代码参考
w2v_model=Word2Vec.load('sentiment_analysis/w2v_model.pkl')
# 预训练的词向量中没有出现的词用0向量表示
embedding_matrix = np.zeros((len(vocab) + 1, 300))
for word, i in vocab.items():
try:
embedding_vector = w2v_model[str(word)]
embedding_matrix[i] = embedding_vector
except KeyError:
continue
#构建TextCNN模型
def TextCNN_model_2(x_train_padded_seqs,y_train,x_test_padded_seqs,y_test,embedding_matrix):
# 模型结构:词嵌入-卷积池化*3-拼接-全连接-dropout-全连接
main_input = Input(shape=(50,), dtype='float64')
# 词嵌入(使用预训练的词向量)
embedder = Embedding(len(vocab) + 1, 300, input_length=50, weights=[embedding_matrix], trainable=False)
#embedder = Embedding(len(vocab) + 1, 300, input_length=50, trainable=False)
embed = embedder(main_input)
# 词窗大小分别为3,4,5
cnn1 = Conv1D(256, 3, padding='same', strides=1, activation='relu')(embed)
cnn1 = MaxPooling1D(pool_size=38)(cnn1)
cnn2 = Conv1D(256, 4, padding='same', strides=1, activation='relu')(embed)
cnn2 = MaxPooling1D(pool_size=37)(cnn2)
cnn3 = Conv1D(256, 5, padding='same', strides=1, activation='relu')(embed)
cnn3 = MaxPooling1D(pool_size=36)(cnn3)
# 合并三个模型的输出向量
cnn = concatenate([cnn1, cnn2, cnn3], axis=-1)
flat = Flatten()(cnn)
drop = Dropout(0.2)(flat)
main_output = Dense(3, activation='softmax')(drop)
model = Model(inputs=main_input, outputs=main_output)
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
one_hot_labels = keras.utils.to_categorical(y_train, num_classes=3) # 将标签转换为one-hot编码
model.fit(x_train_padded_seqs, one_hot_labels, batch_size=800, epochs=20)
#y_test_onehot = keras.utils.to_categorical(y_test, num_classes=3) # 将标签转换为one-hot编码
result = model.predict(x_test_padded_seqs) # 预测样本属于每个类别的概率
result_labels = np.argmax(result, axis=1) # 获得最大概率对应的标签
y_predict = list(map(str, result_labels))
print('准确率', metrics.accuracy_score(y_test, y_predict))
print('平均f1-score:', metrics.f1_score(y_test, y_predict, average='weighted'))
一般的 CNN 网络,都是卷积层 + 池化层。TextRCNN是将卷积层换成了双向 RNN,所以结果是,两向 RNN + 池化层。
模型的前向过程是:
这里的convolutional是指max-pooling。通过加入RNN,比纯CNN提升了1-2个百分点。
具体代码实现:
from tensorflow.keras import Input, Model
from tensorflow.keras import backend as K
from tensorflow.keras.layers import Embedding, Dense, SimpleRNN, Lambda, Concatenate, Conv1D, GlobalMaxPooling1D
class RCNN(object):
def __init__(self, maxlen, max_features, embedding_dims,
class_num=5,
last_activation='softmax'):
self.maxlen = maxlen
self.max_features = max_features
self.embedding_dims = embedding_dims
self.class_num = class_num
self.last_activation = last_activation
def get_model(self):
input_current = Input((self.maxlen,))
input_left = Input((self.maxlen,))
input_right = Input((self.maxlen,))
embedder = Embedding(self.max_features, self.embedding_dims, input_length=self.maxlen)
embedding_current = embedder(input_current)
embedding_left = embedder(input_left)
embedding_right = embedder(input_right)
x_left = SimpleRNN(128, return_sequences=True)(embedding_left)
x_right = SimpleRNN(128, return_sequences=True, go_backwards=True)(embedding_right)
x_right = Lambda(lambda x: K.reverse(x, axes=1))(x_right)
x = Concatenate(axis=2)([x_left, embedding_current, x_right])
x = Conv1D(64, kernel_size=1, activation='tanh')(x)
x = GlobalMaxPooling1D()(x)
output = Dense(self.class_num, activation=self.last_activation)(x)
model = Model(inputs=[input_current, input_left, input_right], outputs=output)
return model
# 神经网络配置
max_features = 40001
maxlen = 400
batch_size = 32
embedding_dims = 50
epochs = 10
print('数据预处理与加载数据...')
# 如果不存在词汇表,重建
if not os.path.exists(vocab_file):
build_vocab(data_dir, vocab_file, vocab_size)
# 获得 词汇/类别 与id映射字典
categories, cat_to_id = read_category()
words, word_to_id = read_vocab(vocab_file)
# 全部数据
x, y = read_files(data_dir)
data = list(zip(x,y))
del x,y
# 乱序
random.shuffle(data)
# 切分训练集和测试集
train_data, test_data = train_test_split(data)
# 对文本的词id和类别id进行编码
x_train = encode_sentences([content[0] for content in train_data], word_to_id)
y_train = to_categorical(encode_cate([content[1] for content in train_data], cat_to_id))
x_test = encode_sentences([content[0] for content in test_data], word_to_id)
y_test = to_categorical(encode_cate([content[1] for content in test_data], cat_to_id))
print('对序列做padding,保证是 samples*timestep 的维度')
x_train = sequence.pad_sequences(x_train, maxlen=maxlen)
x_test = sequence.pad_sequences(x_test, maxlen=maxlen)
print('x_train shape:', x_train.shape)
print('x_test shape:', x_test.shape)
print('为模型准备输入数据...')
x_train_current = x_train
x_train_left = np.hstack([np.expand_dims(x_train[:, 0], axis=1), x_train[:, 0:-1]])
x_train_right = np.hstack([x_train[:, 1:], np.expand_dims(x_train[:, -1], axis=1)])
x_test_current = x_test
x_test_left = np.hstack([np.expand_dims(x_test[:, 0], axis=1), x_test[:, 0:-1]])
x_test_right = np.hstack([x_test[:, 1:], np.expand_dims(x_test[:, -1], axis=1)])
print('x_train_current 维度:', x_train_current.shape)
print('x_train_left 维度:', x_train_left.shape)
print('x_train_right 维度:', x_train_right.shape)
print('x_test_current 维度:', x_test_current.shape)
print('x_test_left 维度:', x_test_left.shape)
print('x_test_right 维度:', x_test_right.shape)
print('构建模型...')
model = RCNN(maxlen, max_features, embedding_dims).get_model()
model.compile('adam', 'categorical_crossentropy', metrics=['accuracy'])
print('Train...')
early_stopping = EarlyStopping(monitor='val_accuracy', patience=2, mode='max')
history = model.fit([x_train_current, x_train_left, x_train_right], y_train,
batch_size=batch_size,
epochs=epochs,
callbacks=[early_stopping],
validation_data=([x_test_current, x_test_left, x_test_right], y_test))
print('Test...')
result = model.predict([x_test_current, x_test_left, x_test_right])
从前面介绍的几种方法,可以自然地得到文本分类的框架,就是先基于上下文对token编码,然后pooling出句子表示再分类。
在最终池化时,max-pooling通常表现更好,因为文本分类经常是主题上的分类,从句子中一两个主要的词就可以得到结论,其他大多是噪声,对分类没有意义。
而到更细粒度的分析时,max-pooling可能又把有用的特征去掉了,这时便可以用attention进行句子表示的融合:
计算attention score时会先进行变换:
其中w是context vector,随机初始化并随着训练更新。最后得到句子表示 r,再进行分类。
这个加attention的套路用到CNN编码器之后代替pooling也是可以的,从实验结果来看attention的加入可以提高2个点。如果是情感分析这种由句子整体决定分类结果的任务首选RNN。
具体实现可以参考 https://blog.csdn.net/dendi_hust/article/details/94435919
上文都是句子级别的分类,虽然用到长文本、篇章级也是可以的,但速度精度都会下降,于是有研究者提出了层次注意力分类框架,即Hierarchical Attention。先对每个句子用 BiGRU+Att 编码得到句向量,再对句向量用 BiGRU+Att 得到doc级别的表示进行分类。
https://github.com/richliao/textClassifier
BERT及其衍生模型分类方法大致如下图所示
具体代码及流程
import os
import sys
import pickle
import pandas as pd
import numpy as np
from concurrent.futures import ThreadPoolExecutor
import torch
from sklearn.preprocessing import LabelEncoder
from torch.optim import optimizer
from torch.utils.data import DataLoader, RandomSampler, SequentialSampler, TensorDataset
from torch.nn import CrossEntropyLoss,BCEWithLogitsLoss
from tqdm import tqdm_notebook, trange
from pytorch_pretrained_bert import BertTokenizer, BertModel, BertForMaskedLM, BertForSequenceClassification
from pytorch_pretrained_bert.optimization import BertAdam, WarmupLinearSchedule
from sklearn.metrics import precision_recall_curve,classification_report
import matplotlib.pyplot as plt
import torch.nn as nn
from torch.autograd import Variable
from sklearn.model_selection import train_test_split
%matplotlib inline
初始参数
SPLIT_RATIO = 0.9 #训练和验证集的比例
#MAX_SEQ_LEN = 50
BATCH_SIZE = 192
SEED = 0
EPOCHS = 10
本文所用的文本分类数据取自THUCNews的一个子集,这个子集可以在此下载:链接:https://pan.baidu.com/s/1hugrfRu密码: qfud
train = pd.read_table('./data/cnews/cnews.train.txt', encoding='utf-8', names=['label', 'text'])
train = train[['text', 'label']]
对标签进行编码。
le = LabelEncoder()
le.fit(train.label.tolist())
train['label_id'] = le.transform(train.label.tolist())
保存码表
labeldata = train.groupby(['label', 'label_id']).count().reset_index()
num_labels = labeldata.shape[0]
labeldata.to_excel('./data/train_labels.xlsx', index=None)
将训练数据集拆分为训练集和验证集。
train_data= train[['text', 'label_id']]
train, valid = train_test_split(train_data, train_size=SPLIT_RATIO, random_state=SEED)
train_labels = train.groupby(['label_id']).count().reset_index()
valid_labels = valid.groupby(['label_id']).count().reset_index()
分词工具。可以搜索下chinese_wwm_ext_pytorch,再把它下载下来。
# 分词工具
bert_tokenizer = BertTokenizer.from_pretrained('./data/chinese_wwm_ext_pytorch/', do_lower_case=False)
# 类初始化
processor = DataPrecessForSingleSentence(bert_tokenizer= bert_tokenizer)
产生训练集输入数据。
# 产生训练集输入数据
seqs, seq_masks, seq_segments, labels = processor.get_input(
dataset=train)
# 转换为torch tensor
t_seqs = torch.tensor(seqs, dtype=torch.long)
t_seq_masks = torch.tensor(seq_masks, dtype = torch.long)
t_seq_segments = torch.tensor(seq_segments, dtype = torch.long)
t_labels = torch.tensor(labels, dtype = torch.long)
train_data = TensorDataset(t_seqs, t_seq_masks, t_seq_segments, t_labels)
train_sampler = RandomSampler(train_data)
train_dataloder = DataLoader(dataset= train_data, sampler= train_sampler,batch_size = BATCH_SIZE)
产生验证集输入数据。
# 产生验证集输入数据
seqs, seq_masks, seq_segments, labels = processor.get_input(
dataset=valid)
# 转换为torch tensor
t_seqs = torch.tensor(seqs, dtype=torch.long)
t_seq_masks = torch.tensor(seq_masks, dtype = torch.long)
t_seq_segments = torch.tensor(seq_segments, dtype = torch.long)
t_labels = torch.tensor(labels, dtype = torch.long)
valid_data = TensorDataset(t_seqs, t_seq_masks, t_seq_segments, t_labels)
valid_sampler = RandomSampler(valid_data)
valid_dataloder = DataLoader(dataset= valid_data, sampler= valid_sampler,batch_size = BATCH_SIZE)
加载预训练的bert模型。
# 加载预训练的bert模型
model = BertForSequenceClassification.from_pretrained('./data/chinese_wwm_ext_pytorch/', num_labels=num_labels)
设置cpu跑还是gpu跑模型,如果有条件尽量用gpu吧,我自己电脑的gpu只有2g,没法跑模型,所以这里用cpu。
device = torch.device('cpu')
#device = torch.device('cuda') #gpu版本
model = model.to(device)
设置模型待优化的参数。
# 待优化的参数
param_optimizer = list(model.named_parameters())
no_decay = ['bias', 'LayerNorm.bias', 'LayerNorm.weight']
optimizer_grouped_parameters = [
{'params':[p for n, p in param_optimizer if not any(nd in n for nd in no_decay)], 'weight_decay':0.01},
{'params':[p for n, p in param_optimizer if any(nd in n for nd in no_decay)], 'weight_decay':0.0}
]
steps = len(train_dataloder) * EPOCHS
optimizer = BertAdam(optimizer_grouped_parameters, lr=2e-05, warmup= 0.1 , t_total= steps)
loss_function = LabelSmoothing(num_labels, 0.1)
存储loss。
#存储loss
train_losses = []
valid_losses = []
avg_train_losses = []
avg_valid_losses = []
patience = 20
early_stopping = EarlyStopping(patience=patience, verbose=True)
模型训练,通过交叉验证和早停法防止模型过拟合。
for i in trange(EPOCHS, desc='Epoch'):
model.train() #训练
for step, batch_data in enumerate(
tqdm_notebook(train_dataloder, desc='Iteration')):
batch_data = tuple(t.to(device) for t in batch_data)
batch_seqs, batch_seq_masks, batch_seq_segments, batch_labels = batch_data
# 对标签进行onehot编码
# one_hot = torch.zeros(batch_labels.size(0), num_labels).long().cuda() #gpu版本
one_hot = torch.zeros(batch_labels.size(0), num_labels).long() #cpu版本
#以下注释为gpu版本
# one_hot_batch_labels = one_hot.scatter_(
# dim=1,
# index=torch.unsqueeze(batch_labels, dim=1),
# src=torch.ones(batch_labels.size(0), num_labels).long().cuda())
one_hot_batch_labels = one_hot.scatter_(
dim=1,
index=torch.unsqueeze(batch_labels, dim=1),
src=torch.ones(batch_labels.size(0), num_labels).long())
logits = model(
batch_seqs, batch_seq_masks, batch_seq_segments, labels=None)
logits = torch.nn.functional.log_softmax(logits, dim=1)
#loss_function = CrossEntropyLoss()
loss = loss_function(logits, batch_labels)
loss.backward()
train_losses.append(loss.item())
print("\r%f" % loss, end='')
optimizer.step()
optimizer.zero_grad()
model.eval() #验证
for step, batch_data in enumerate(
tqdm_notebook(valid_dataloder, desc='Iteration')):
with torch.no_grad():
batch_data = tuple(t.to(device) for t in batch_data)
batch_seqs, batch_seq_masks, batch_seq_segments, batch_labels = batch_data
# 对标签进行onehot编码,以下注释为gpu版本
# one_hot = torch.zeros(batch_labels.size(0), num_labels).long().cuda()
# one_hot_batch_labels = one_hot.scatter_(
# dim=1,
# index=torch.unsqueeze(batch_labels, dim=1),
# src=torch.ones(batch_labels.size(0), num_labels).long().cuda())
one_hot = torch.zeros(batch_labels.size(0), num_labels).long()
one_hot_batch_labels = one_hot.scatter_(
dim=1,
index=torch.unsqueeze(batch_labels, dim=1),
src=torch.ones(batch_labels.size(0), num_labels).long())
logits = model(
batch_seqs, batch_seq_masks, batch_seq_segments, labels=None)
logits = torch.nn.functional.log_softmax(logits, dim=1)
#loss_function = CrossEntropyLoss()
loss = loss_function(logits, batch_labels)
valid_losses.append(loss.item())
train_loss = np.average(train_losses)
valid_loss = np.average(valid_losses)
avg_train_losses.append(train_loss)
avg_valid_losses.append(valid_loss)
print("train_loss:%f, valid_loss:%f" %(train_loss, valid_loss))
#重置训练损失和验证损失
train_losses = []
valid_losses = []
early_stopping(valid_loss, model)
if early_stopping.early_stop:
print("Early Stopping")
break
训练完后可以绘制loss图(这里本来用CPU训练好了一小部分,但由于没保存,就没截图了,可自己尝试)。
%matplotlib inline
fig = plt.figure(figsize=(8,6))
plt.plot(range(1, len(avg_train_losses)+1), avg_train_losses, label='Training Loss')
plt.plot(range(1, len(avg_valid_losses)+1), avg_valid_losses, label='Validation Loss')
#find the position of lowest validation loss
minposs = avg_valid_losses.index(min(avg_valid_losses))+1
plt.axvline(minposs, linestyle='--', color = 'r', lable='Early Stopping Checkpoint')
plt.xlabel('epochs')
plt.ylabel('loss')
plt.grid(True)
plt.legend()
plt.tight_layout()
plt.show()
fig.savefig('loss_plot.png', bbox_inches='tight')
保存训练好的模型。
torch.save(model, open("job_fine_tuned_bert.bin", "wb"))
将测试集转为tensor数据并进行预测。
test_data = pd.read_csv("./data/test.csv", encoding='utf-8')
le = LabelEncoder()
le.fit(test_data.label.tolist())
# 标签ID化
test_data['label'] = le.transform(test_data.label.tolist())
labels_data = test_data.groupby(['label', 'label_id']).count().reset_index()
labels_map = labels_data[['label', 'label_id']]
test_data = test_data[['job_text', 'label']]
test.columns = ['job_text', 'label']
# 转换为tensor
test_seqs, test_seq_masks, test_seq_segments, test_labels = processor.get_input(
dataset=test_data)
test_seqs = torch.tensor(test_seqs, dtype=torch.long)
test_seq_masks = torch.tensor(test_seq_masks, dtype = torch.long)
test_seq_segments = torch.tensor(test_seq_segments, dtype = torch.long)
test_labels = torch.tensor(test_labels, dtype = torch.long)
test_data = TensorDataset(test_seqs, test_seq_masks, test_seq_segments, test_labels)
test_dataloder = DataLoader(dataset= test_data, batch_size = 192)
# 用于存储预测标签与真实标签
true_labels = []
pred_labels = []
model.eval()
# 预测
with torch.no_grad():
for batch_data in tqdm_notebook(test_dataloder, desc = 'TEST'):
batch_data = tuple(t.to(device) for t in batch_data)
batch_seqs, batch_seq_masks, batch_seq_segments, batch_labels = batch_data
logits = model(
batch_seqs, batch_seq_masks, batch_seq_segments, labels=None)
logits = logits.softmax(dim=1).argmax(dim = 1)
pred_labels.append(logits.detach().cpu().numpy())
true_labels.append(batch_labels.detach().cpu().numpy())
# 查看各个类别的准确率和召回率
result = classification_report(np.concatenate(true_labels), np.concatenate(pred_labels))
print(result)
BERT分类的优化可以尝试:
任务简单的话(比如新闻分类),直接用fasttext就可以达到不错的效果。
用BERT的话,最简单的方法是粗暴截断,比如只取句首+句尾、句首+tfidf筛几个词出来;或者每句都预测,最后对结果综合。
尝试,比如XLNet、Reformer、Longformer。
如果是离线任务且来得及的话还是建议跑全部,让我们相信模型的编码能力。
自从用了BERT之后,很少受到数据不均衡或者过少的困扰,先无脑训一版。
如果样本在几百条,可以先把分类问题转化成匹配问题,或者用这种思想再去标一些高置信度的数据,或者用自监督、半监督的方法。
在实际的应用中,鲁棒性是个很重要的问题,否则在面对badcase时会很尴尬,怎么明明那样就分对了,加一个字就错了呢?
这里可以直接使用一些粗暴的数据增强,加停用词加标点、删词、同义词替换等,如果效果下降就把增强后的训练数据洗一下。
当然也可以用对抗学习、对比学习这样的高阶技巧来提升,一般可以提1个点左右,但不一定能避免上面那种尴尬的情况。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。