Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

week8 #973

Merged
merged 2 commits into from
Aug 11, 2024
Merged

week8 #973

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 148 additions & 0 deletions 张志/hm_week8/hm_week8_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
# -*- coding: utf-8 -*-

import json
import re
import os
import torch
import random
import jieba
import numpy as np
from torch.utils.data import Dataset, DataLoader
from collections import defaultdict
"""
数据加载
"""


class DataGenerator:
def __init__(self, data_path, config):
self.config = config
self.path = data_path
self.vocab = load_vocab(config["vocab_path"])
self.config["vocab_size"] = len(self.vocab)
self.schema = load_schema(config["schema_path"])
self.train_data_size = config["epoch_data_size"] #由于采取随机采样,所以需要设定一个采样数量,否则可以一直采
self.data_type = None #用来标识加载的是训练集还是测试集 "train" or "test"
self.load()

def load(self):
self.data = []
self.knwb = defaultdict(list)
with open(self.path, encoding="utf8") as f:
for line in f:
line = json.loads(line)
#加载训练集
if isinstance(line, dict):
self.data_type = "train"
questions = line["questions"]
label = line["target"]
for question in questions:
input_id = self.encode_sentence(question)
input_id = torch.LongTensor(input_id)
self.knwb[self.schema[label]].append(input_id)
#加载测试集
else:
self.data_type = "test"
assert isinstance(line, list)
question, label = line
input_id = self.encode_sentence(question)
input_id = torch.LongTensor(input_id)
label_index = torch.LongTensor([self.schema[label]])
self.data.append([input_id, label_index])
return

def encode_sentence(self, text):
input_id = []
if self.config["vocab_path"] == "words.txt":
for word in jieba.cut(text):
input_id.append(self.vocab.get(word, self.vocab["[UNK]"]))
else:
for char in text:
input_id.append(self.vocab.get(char, self.vocab["[UNK]"]))
input_id = self.padding(input_id)
return input_id

#补齐或截断输入的序列,使其可以在一个batch内运算
def padding(self, input_id):
input_id = input_id[:self.config["max_length"]]
input_id += [0] * (self.config["max_length"] - len(input_id))
return input_id

def __len__(self):
if self.data_type == "train":
return self.config["epoch_data_size"]
else:
assert self.data_type == "test", self.data_type
return len(self.data)

def __getitem__(self, index):
if self.data_type == "train":
return self.random_train_sample() #随机生成一个训练样本
else:
return self.data[index]

# #依照一定概率生成负样本或正样本
# #负样本从随机两个不同的标准问题中各随机选取一个
# #正样本从随机一个标准问题中随机选取两个
# def random_train_sample(self):
# standard_question_index = list(self.knwb.keys())
# #随机正样本
# if random.random() <= self.config["positive_sample_rate"]:
# p = random.choice(standard_question_index)
# #如果选取到的标准问下不足两个问题,则无法选取,所以重新随机一次
# if len(self.knwb[p]) < 2:
# return self.random_train_sample()
# else:
# s1, s2 = random.sample(self.knwb[p], 2)
# return [s1, s2, torch.LongTensor([1])]
# #随机负样本
# else:
# p, n = random.sample(standard_question_index, 2)
# s1 = random.choice(self.knwb[p])
# s2 = random.choice(self.knwb[n])
# return [s1, s2, torch.LongTensor([-1])]


def random_train_sample(self):
standard_question_index = list(self.knwb.keys())
# 先选定两个意图,之后从第一个意图中取2个问题,第二个意图中取一个问题
p, n = random.sample(standard_question_index, 2)
# 如果某个意图下刚好只有一条问题,那只能两个正样本用一样的;
# 这种对训练没帮助,因为相同的样本距离肯定是0,但是数据充分的情况下这种情况很少
if len(self.knwb[p]) == 1:
s1 = s2 = self.knwb[p][0]
#这应当是一般情况
else:
s1, s2 = random.sample(self.knwb[p], 2)
# 随机一个负样本
s3 = random.choice(self.knwb[n])
# 前2个相似,后1个不相似,不需要额外在输入一个0或1的label,这与一般的loss计算不同
return [s1, s2, s3]


#加载字表或词表
def load_vocab(vocab_path):
token_dict = {}
with open(vocab_path, encoding="utf8") as f:
for index, line in enumerate(f):
token = line.strip()
token_dict[token] = index + 1 #0留给padding位置,所以从1开始
return token_dict

#加载schema
def load_schema(schema_path):
with open(schema_path, encoding="utf8") as f:
return json.loads(f.read())

#用torch自带的DataLoader类封装数据
def load_data(data_path, config, shuffle=True):
dg = DataGenerator(data_path, config)
dl = DataLoader(dg, batch_size=config["batch_size"], shuffle=shuffle)
return dl



if __name__ == "__main__":
from config import Config
dg = DataGenerator("valid_tag_news.json", Config)
print(dg[1])
102 changes: 102 additions & 0 deletions 张志/hm_week8/hm_week8_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# -*- coding: utf-8 -*-

import torch
import torch.nn as nn
from torch.optim import Adam, SGD
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence
"""
建立网络模型结构
"""

class SentenceEncoder(nn.Module):
def __init__(self, config):
super(SentenceEncoder, self).__init__()
hidden_size = config["hidden_size"]
vocab_size = config["vocab_size"] + 1
max_length = config["max_length"]
self.embedding = nn.Embedding(vocab_size, hidden_size, padding_idx=0)
# self.lstm = nn.LSTM(hidden_size, hidden_size, batch_first=True, bidirectional=True)
self.layer = nn.Linear(hidden_size, hidden_size)
self.dropout = nn.Dropout(0.5)

#输入为问题字符编码
def forward(self, x):
x = self.embedding(x)
#使用lstm
# x, _ = self.lstm(x)
#使用线性层
x = self.layer(x)
x = nn.functional.max_pool1d(x.transpose(1, 2), x.shape[1]).squeeze()
return x


class SiameseNetwork(nn.Module):
def __init__(self, config):
super(SiameseNetwork, self).__init__()
self.sentence_encoder = SentenceEncoder(config)
self.loss = nn.CosineEmbeddingLoss()

# 计算余弦距离 1-cos(a,b)
# cos=1时两个向量相同,余弦距离为0;cos=0时,两个向量正交,余弦距离为1
def cosine_distance(self, tensor1, tensor2):
tensor1 = torch.nn.functional.normalize(tensor1, dim=-1)
tensor2 = torch.nn.functional.normalize(tensor2, dim=-1)
cosine = torch.sum(torch.mul(tensor1, tensor2), axis=-1)
return 1 - cosine

def cosine_triplet_loss(self, a, p, n, margin=None):
ap = self.cosine_distance(a, p)
an = self.cosine_distance(a, n)
if margin is None:
diff = ap - an + 0.1
else:
diff = ap - an + margin.squeeze()
return torch.mean(diff[diff.gt(0)]) #greater than

# #sentence : (batch_size, max_length)
# def forward(self, sentence1, sentence2=None, target=None):
# #同时传入两个句子
# if sentence2 is not None:
# vector1 = self.sentence_encoder(sentence1) #vec:(batch_size, hidden_size)
# vector2 = self.sentence_encoder(sentence2)
# #如果有标签,则计算loss
# if target is not None:
# return self.loss(vector1, vector2, target.squeeze())
# #如果无标签,计算余弦距离
# else:
# return self.cosine_distance(vector1, vector2)
# #单独传入一个句子时,认为正在使用向量化能力
# else:
# return self.sentence_encoder(sentence1)

def forward(self, sentence1, sentence2=None, sentence3=None):
#同时传入3个句子,则做tripletloss的loss计算
if sentence2 is not None and sentence3 is not None:
vector1 = self.sentence_encoder(sentence1)
vector2 = self.sentence_encoder(sentence2)
vector3 = self.sentence_encoder(sentence3)
return self.cosine_triplet_loss(vector1, vector2, vector3)
#单独传入一个句子时,认为正在使用向量化能力
else:
return self.sentence_encoder(sentence1)

def choose_optimizer(config, model):
optimizer = config["optimizer"]
learning_rate = config["learning_rate"]
if optimizer == "adam":
return Adam(model.parameters(), lr=learning_rate)
elif optimizer == "sgd":
return SGD(model.parameters(), lr=learning_rate)


if __name__ == "__main__":
from config import Config
Config["vocab_size"] = 10
Config["max_length"] = 4
model = SiameseNetwork(Config)
s1 = torch.LongTensor([[1,2,3,0], [2,2,0,0]])
s2 = torch.LongTensor([[1,2,3,4], [3,2,3,4]])
l = torch.LongTensor([[1],[0]])
y = model(s1, s2, l)
print(y)
# print(model.state_dict())