diff --git "a/\345\274\240\345\277\227/hm_week8/hm_week8_loader.py" "b/\345\274\240\345\277\227/hm_week8/hm_week8_loader.py" new file mode 100644 index 000000000..6afdc6685 --- /dev/null +++ "b/\345\274\240\345\277\227/hm_week8/hm_week8_loader.py" @@ -0,0 +1,148 @@ +# -*- coding: utf-8 -*- + +import json +import re +import os +import torch +import random +import jieba +import numpy as np +from torch.utils.data import Dataset, DataLoader +from collections import defaultdict +""" +数据加载 +""" + + +class DataGenerator: + def __init__(self, data_path, config): + self.config = config + self.path = data_path + self.vocab = load_vocab(config["vocab_path"]) + self.config["vocab_size"] = len(self.vocab) + self.schema = load_schema(config["schema_path"]) + self.train_data_size = config["epoch_data_size"] #由于采取随机采样,所以需要设定一个采样数量,否则可以一直采 + self.data_type = None #用来标识加载的是训练集还是测试集 "train" or "test" + self.load() + + def load(self): + self.data = [] + self.knwb = defaultdict(list) + with open(self.path, encoding="utf8") as f: + for line in f: + line = json.loads(line) + #加载训练集 + if isinstance(line, dict): + self.data_type = "train" + questions = line["questions"] + label = line["target"] + for question in questions: + input_id = self.encode_sentence(question) + input_id = torch.LongTensor(input_id) + self.knwb[self.schema[label]].append(input_id) + #加载测试集 + else: + self.data_type = "test" + assert isinstance(line, list) + question, label = line + input_id = self.encode_sentence(question) + input_id = torch.LongTensor(input_id) + label_index = torch.LongTensor([self.schema[label]]) + self.data.append([input_id, label_index]) + return + + def encode_sentence(self, text): + input_id = [] + if self.config["vocab_path"] == "words.txt": + for word in jieba.cut(text): + input_id.append(self.vocab.get(word, self.vocab["[UNK]"])) + else: + for char in text: + input_id.append(self.vocab.get(char, self.vocab["[UNK]"])) + input_id = self.padding(input_id) + return input_id + + #补齐或截断输入的序列,使其可以在一个batch内运算 + def padding(self, input_id): + input_id = input_id[:self.config["max_length"]] + input_id += [0] * (self.config["max_length"] - len(input_id)) + return input_id + + def __len__(self): + if self.data_type == "train": + return self.config["epoch_data_size"] + else: + assert self.data_type == "test", self.data_type + return len(self.data) + + def __getitem__(self, index): + if self.data_type == "train": + return self.random_train_sample() #随机生成一个训练样本 + else: + return self.data[index] + + # #依照一定概率生成负样本或正样本 + # #负样本从随机两个不同的标准问题中各随机选取一个 + # #正样本从随机一个标准问题中随机选取两个 + # def random_train_sample(self): + # standard_question_index = list(self.knwb.keys()) + # #随机正样本 + # if random.random() <= self.config["positive_sample_rate"]: + # p = random.choice(standard_question_index) + # #如果选取到的标准问下不足两个问题,则无法选取,所以重新随机一次 + # if len(self.knwb[p]) < 2: + # return self.random_train_sample() + # else: + # s1, s2 = random.sample(self.knwb[p], 2) + # return [s1, s2, torch.LongTensor([1])] + # #随机负样本 + # else: + # p, n = random.sample(standard_question_index, 2) + # s1 = random.choice(self.knwb[p]) + # s2 = random.choice(self.knwb[n]) + # return [s1, s2, torch.LongTensor([-1])] + + + def random_train_sample(self): + standard_question_index = list(self.knwb.keys()) + # 先选定两个意图,之后从第一个意图中取2个问题,第二个意图中取一个问题 + p, n = random.sample(standard_question_index, 2) + # 如果某个意图下刚好只有一条问题,那只能两个正样本用一样的; + # 这种对训练没帮助,因为相同的样本距离肯定是0,但是数据充分的情况下这种情况很少 + if len(self.knwb[p]) == 1: + s1 = s2 = self.knwb[p][0] + #这应当是一般情况 + else: + s1, s2 = random.sample(self.knwb[p], 2) + # 随机一个负样本 + s3 = random.choice(self.knwb[n]) + # 前2个相似,后1个不相似,不需要额外在输入一个0或1的label,这与一般的loss计算不同 + return [s1, s2, s3] + + +#加载字表或词表 +def load_vocab(vocab_path): + token_dict = {} + with open(vocab_path, encoding="utf8") as f: + for index, line in enumerate(f): + token = line.strip() + token_dict[token] = index + 1 #0留给padding位置,所以从1开始 + return token_dict + +#加载schema +def load_schema(schema_path): + with open(schema_path, encoding="utf8") as f: + return json.loads(f.read()) + +#用torch自带的DataLoader类封装数据 +def load_data(data_path, config, shuffle=True): + dg = DataGenerator(data_path, config) + dl = DataLoader(dg, batch_size=config["batch_size"], shuffle=shuffle) + return dl + + + +if __name__ == "__main__": + from config import Config + dg = DataGenerator("valid_tag_news.json", Config) + print(dg[1]) diff --git "a/\345\274\240\345\277\227/hm_week8/hm_week8_model.py" "b/\345\274\240\345\277\227/hm_week8/hm_week8_model.py" new file mode 100644 index 000000000..13cdd7eca --- /dev/null +++ "b/\345\274\240\345\277\227/hm_week8/hm_week8_model.py" @@ -0,0 +1,102 @@ +# -*- coding: utf-8 -*- + +import torch +import torch.nn as nn +from torch.optim import Adam, SGD +from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence +""" +建立网络模型结构 +""" + +class SentenceEncoder(nn.Module): + def __init__(self, config): + super(SentenceEncoder, self).__init__() + hidden_size = config["hidden_size"] + vocab_size = config["vocab_size"] + 1 + max_length = config["max_length"] + self.embedding = nn.Embedding(vocab_size, hidden_size, padding_idx=0) + # self.lstm = nn.LSTM(hidden_size, hidden_size, batch_first=True, bidirectional=True) + self.layer = nn.Linear(hidden_size, hidden_size) + self.dropout = nn.Dropout(0.5) + + #输入为问题字符编码 + def forward(self, x): + x = self.embedding(x) + #使用lstm + # x, _ = self.lstm(x) + #使用线性层 + x = self.layer(x) + x = nn.functional.max_pool1d(x.transpose(1, 2), x.shape[1]).squeeze() + return x + + +class SiameseNetwork(nn.Module): + def __init__(self, config): + super(SiameseNetwork, self).__init__() + self.sentence_encoder = SentenceEncoder(config) + self.loss = nn.CosineEmbeddingLoss() + + # 计算余弦距离 1-cos(a,b) + # cos=1时两个向量相同,余弦距离为0;cos=0时,两个向量正交,余弦距离为1 + def cosine_distance(self, tensor1, tensor2): + tensor1 = torch.nn.functional.normalize(tensor1, dim=-1) + tensor2 = torch.nn.functional.normalize(tensor2, dim=-1) + cosine = torch.sum(torch.mul(tensor1, tensor2), axis=-1) + return 1 - cosine + + def cosine_triplet_loss(self, a, p, n, margin=None): + ap = self.cosine_distance(a, p) + an = self.cosine_distance(a, n) + if margin is None: + diff = ap - an + 0.1 + else: + diff = ap - an + margin.squeeze() + return torch.mean(diff[diff.gt(0)]) #greater than + + # #sentence : (batch_size, max_length) + # def forward(self, sentence1, sentence2=None, target=None): + # #同时传入两个句子 + # if sentence2 is not None: + # vector1 = self.sentence_encoder(sentence1) #vec:(batch_size, hidden_size) + # vector2 = self.sentence_encoder(sentence2) + # #如果有标签,则计算loss + # if target is not None: + # return self.loss(vector1, vector2, target.squeeze()) + # #如果无标签,计算余弦距离 + # else: + # return self.cosine_distance(vector1, vector2) + # #单独传入一个句子时,认为正在使用向量化能力 + # else: + # return self.sentence_encoder(sentence1) + + def forward(self, sentence1, sentence2=None, sentence3=None): + #同时传入3个句子,则做tripletloss的loss计算 + if sentence2 is not None and sentence3 is not None: + vector1 = self.sentence_encoder(sentence1) + vector2 = self.sentence_encoder(sentence2) + vector3 = self.sentence_encoder(sentence3) + return self.cosine_triplet_loss(vector1, vector2, vector3) + #单独传入一个句子时,认为正在使用向量化能力 + else: + return self.sentence_encoder(sentence1) + +def choose_optimizer(config, model): + optimizer = config["optimizer"] + learning_rate = config["learning_rate"] + if optimizer == "adam": + return Adam(model.parameters(), lr=learning_rate) + elif optimizer == "sgd": + return SGD(model.parameters(), lr=learning_rate) + + +if __name__ == "__main__": + from config import Config + Config["vocab_size"] = 10 + Config["max_length"] = 4 + model = SiameseNetwork(Config) + s1 = torch.LongTensor([[1,2,3,0], [2,2,0,0]]) + s2 = torch.LongTensor([[1,2,3,4], [3,2,3,4]]) + l = torch.LongTensor([[1],[0]]) + y = model(s1, s2, l) + print(y) + # print(model.state_dict())