Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

杨皓予的第八周作业 #915

Merged
merged 2 commits into from
Aug 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions 杨皓予/week8/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
改表示形文本匹配代码,使用三元组损失函数训练
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
21 changes: 21 additions & 0 deletions 杨皓予/week8/sentence_match_as_sentence_encoder/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# -*- coding: utf-8 -*-

"""
配置参数信息
"""

Config = {
"model_path": "model_output",
"schema_path": "../data/schema.json",
"train_data_path": "../data/train.json",
"valid_data_path": "../data/valid.json",
"vocab_path":"../chars.txt",
"max_length": 20,
"hidden_size": 128,
"epoch": 10,
"batch_size": 32,
"epoch_data_size": 200, #每轮训练中采样数量
"positive_sample_rate":0.5, #正样本比例
"optimizer": "adam",
"learning_rate": 1e-3,
}
78 changes: 78 additions & 0 deletions 杨皓予/week8/sentence_match_as_sentence_encoder/evaluate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# -*- coding: utf-8 -*-
import torch
from loader import load_data


"""
模型效果测试
"""

class Evaluator:
def __init__(self, config, model, logger):
self.config = config
self.model = model
self.logger = logger
self.valid_data = load_data(config["valid_data_path"], config, shuffle=False)
# 由于效果测试需要训练集当做知识库,再次加载训练集。
# 事实上可以通过传参把前面加载的训练集传进来更合理,但是为了主流程代码改动量小,在这里重新加载一遍
self.train_data = load_data(config["train_data_path"], config)
self.stats_dict = {"correct":0, "wrong":0} #用于存储测试结果

#将知识库中的问题向量化,为匹配做准备
#每轮训练的模型参数不一样,生成的向量也不一样,所以需要每轮测试都重新进行向量化
def knwb_to_vector(self):
self.question_index_to_standard_question_index = {}
self.question_ids = []
for standard_question_index, question_ids in self.train_data.dataset.knwb.items():
for question_id in question_ids:
#记录问题编号到标准问题标号的映射,用来确认答案是否正确
self.question_index_to_standard_question_index[len(self.question_ids)] = standard_question_index
self.question_ids.append(question_id)
with torch.no_grad():
question_matrixs = torch.stack(self.question_ids, dim=0)
if torch.cuda.is_available():
question_matrixs = question_matrixs.cuda()
self.knwb_vectors = self.model(question_matrixs)
#将所有向量都作归一化 v / |v|
self.knwb_vectors = torch.nn.functional.normalize(self.knwb_vectors, dim=-1)
return

def eval(self, epoch):
self.logger.info("开始测试第%d轮模型效果:" % epoch)
self.stats_dict = {"correct":0, "wrong":0} #清空前一轮的测试结果
self.model.eval()
self.knwb_to_vector()
for index, batch_data in enumerate(self.valid_data):
if torch.cuda.is_available():
batch_data = [d.cuda() for d in batch_data]
input_id, labels = batch_data #输入变化时这里需要修改,比如多输入,多输出的情况
with torch.no_grad():
test_question_vectors = self.model(input_id) #不输入labels,使用模型当前参数进行预测
self.write_stats(test_question_vectors, labels)
self.show_stats()
return

def write_stats(self, test_question_vectors, labels):
assert len(labels) == len(test_question_vectors)
for test_question_vector, label in zip(test_question_vectors, labels):
#通过一次矩阵乘法,计算输入问题和知识库中所有问题的相似度
#test_question_vector shape [vec_size] knwb_vectors shape = [n, vec_size]
res = torch.mm(test_question_vector.unsqueeze(0), self.knwb_vectors.T)
hit_index = int(torch.argmax(res.squeeze())) #命中问题标号
hit_index = self.question_index_to_standard_question_index[hit_index] #转化成标准问编号
if int(hit_index) == int(label):
self.stats_dict["correct"] += 1
else:
self.stats_dict["wrong"] += 1
return

def show_stats(self):
correct = self.stats_dict["correct"]
wrong = self.stats_dict["wrong"]
self.logger.info("预测集合条目总量:%d" % (correct +wrong))
self.logger.info("预测正确条目:%d,预测错误条目:%d" % (correct, wrong))
self.logger.info("预测准确率:%f" % (correct / (correct + wrong)))
self.logger.info("--------------------")
return


159 changes: 159 additions & 0 deletions 杨皓予/week8/sentence_match_as_sentence_encoder/loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
# -*- coding: utf-8 -*-

import json
import re
import os
import torch
import random
import jieba
import numpy as np
from torch.utils.data import Dataset, DataLoader
from collections import defaultdict
"""
数据加载
"""


class DataGenerator:
def __init__(self, data_path, config):
self.config = config
self.path = data_path
self.vocab = load_vocab(config["vocab_path"])
self.config["vocab_size"] = len(self.vocab)
self.schema = load_schema(config["schema_path"])
self.train_data_size = config["epoch_data_size"] #由于采取随机采样,所以需要设定一个采样数量,否则可以一直采
self.data_type = None #用来标识加载的是训练集还是测试集 "train" or "test"
self.load()

def load(self):
self.data = []
self.knwb = defaultdict(list)
with open(self.path, encoding="utf8") as f:
for line in f:
line = json.loads(line)
#加载训练集
if isinstance(line, dict):
self.data_type = "train"
questions = line["questions"]
label = line["target"]
for question in questions:
input_id = self.encode_sentence(question)
input_id = torch.LongTensor(input_id)
self.knwb[self.schema[label]].append(input_id)
#加载测试集
else:
self.data_type = "test"
assert isinstance(line, list)
question, label = line
input_id = self.encode_sentence(question)
input_id = torch.LongTensor(input_id)
label_index = torch.LongTensor([self.schema[label]])
self.data.append([input_id, label_index])
return

def encode_sentence(self, text):
input_id = []
if self.config["vocab_path"] == "words.txt":
for word in jieba.cut(text):
input_id.append(self.vocab.get(word, self.vocab["[UNK]"]))
else:
for char in text:
input_id.append(self.vocab.get(char, self.vocab["[UNK]"]))
input_id = self.padding(input_id)
return input_id

#补齐或截断输入的序列,使其可以在一个batch内运算
def padding(self, input_id):
input_id = input_id[:self.config["max_length"]]
input_id += [0] * (self.config["max_length"] - len(input_id))
return input_id

def __len__(self):
if self.data_type == "train":
return self.config["epoch_data_size"]
else:
assert self.data_type == "test", self.data_type
return len(self.data)

def __getitem__(self, index):
if self.data_type == "train":
return self.random_train_sample() #随机生成一个训练样本
else:
return self.data[index]

#依照一定概率生成负样本或正样本
#负样本从随机两个不同的标准问题中各随机选取一个
#正样本从随机一个标准问题中随机选取两个
def random_train_sample(self):
standard_question_index = list(self.knwb.keys())

p, d = random.sample(standard_question_index, 2)
if p == d or len(self.knwb[p]) < 2:
return self.random_train_sample()
else:
s1, s2 = random.sample(self.knwb[p], 2)
s3 = random.choice(self.knwb[d])
return [s1, s2, s3]


# #随机正样本
# if random.random() <= self.config["positive_sample_rate"]:
# p = random.choice(standard_question_index)
# #如果选取到的标准问下不足两个问题,则无法选取,所以重新随机一次
# if len(self.knwb[p]) < 2:
# return self.random_train_sample()
# else:
# s1, s2 = random.sample(self.knwb[p], 2)
# return [s1, s2, torch.LongTensor([1])]
# #随机负样本
# else:
# p, n = random.sample(standard_question_index, 2)
# s1 = random.choice(self.knwb[p])
# s2 = random.choice(self.knwb[n])
# return [s1, s2, torch.LongTensor([-1])]



#加载字表或词表
def load_vocab(vocab_path):
token_dict = {}
with open(vocab_path, encoding="utf8") as f:
for index, line in enumerate(f):
token = line.strip()
token_dict[token] = index + 1 #0留给padding位置,所以从1开始
return token_dict

#加载schema
def load_schema(schema_path):
with open(schema_path, encoding="utf8") as f:
return json.loads(f.read())

#用torch自带的DataLoader类封装数据
def load_data(data_path, config, shuffle=True):
dg = DataGenerator(data_path, config)
dl = DataLoader(dg, batch_size=config["batch_size"], shuffle=shuffle)
return dl



if __name__ == "__main__":
from config import Config

dc = DataGenerator(r"D:\资料\week8 文本匹配问题\week8 文本匹配问题\data\valid.json", Config)
print(dc[0])
print(dc[1])
print(len(dc))
print(464/32)
dg = load_data(r"D:\资料\week8 文本匹配问题\week8 文本匹配问题\data\valid.json", Config, shuffle=False)
for i in dg:
print(i)
da = load_data(r"D:\资料\week8 文本匹配问题\week8 文本匹配问题\data\train.json", Config, shuffle=False)
print(200/32)
print(len(da))
# for i in range(len(dg)):
# print(dg[i])
#
# dg = DataGenerator(r"D:\资料\week8 文本匹配问题\week8 文本匹配问题\data\valid.json", Config)
# print(len(dg))
# for i in range(len(dg)):
# print(dg[i])
62 changes: 62 additions & 0 deletions 杨皓予/week8/sentence_match_as_sentence_encoder/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# -*- coding: utf-8 -*-

import torch
import os
import random
import os
import numpy as np
import logging
from config import Config
from model import SiameseNetwork, choose_optimizer
from evaluate import Evaluator
from loader import load_data

logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

"""
模型训练主程序
"""

def main(config):
#创建保存模型的目录
if not os.path.isdir(config["model_path"]):
os.mkdir(config["model_path"])
#加载训练数据
train_data = load_data(config["train_data_path"], config)
#加载模型
model = SiameseNetwork(config)
# 标识是否使用gpu
cuda_flag = torch.cuda.is_available()
if cuda_flag:
logger.info("gpu可以使用,迁移模型至gpu")
model = model.cuda()
#加载优化器
optimizer = choose_optimizer(config, model)
#加载效果测试类
evaluator = Evaluator(config, model, logger)
#训练
for epoch in range(config["epoch"]):
epoch += 1
model.train()
logger.info("epoch %d begin" % epoch)
train_loss = []
for index, batch_data in enumerate(train_data):
optimizer.zero_grad()
if cuda_flag:
batch_data = [d.cuda() for d in batch_data]
input_ids1, input_ids2, input_ids3 = batch_data #输入变化时这里需要修改,比如多输入,多输出的情况
loss = model(input_ids1, input_ids2, input_ids3)
train_loss.append(loss.item())
# if index % int(len(train_data) / 2) == 0:
# logger.info("batch loss %f" % loss)
loss.backward()
optimizer.step()
logger.info("epoch average loss: %f" % np.mean(train_loss))
evaluator.eval(epoch)
model_path = os.path.join(config["model_path"], "epoch_%d.pth" % epoch)
# torch.save(model.state_dict(), model_path)
return

if __name__ == "__main__":
main(Config)
Loading