Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

week8 #973

Merged
merged 2 commits into from
Aug 11, 2024
Merged

week8 #973

Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Create hm_week8_loader.py
  • Loading branch information
Showaker authored Aug 11, 2024
commit 97445e9d8bbf018219579ea761beabb3e4cd6825
148 changes: 148 additions & 0 deletions 张志/hm_week8/hm_week8_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
# -*- coding: utf-8 -*-

import json
import re
import os
import torch
import random
import jieba
import numpy as np
from torch.utils.data import Dataset, DataLoader
from collections import defaultdict
"""
数据加载
"""


class DataGenerator:
def __init__(self, data_path, config):
self.config = config
self.path = data_path
self.vocab = load_vocab(config["vocab_path"])
self.config["vocab_size"] = len(self.vocab)
self.schema = load_schema(config["schema_path"])
self.train_data_size = config["epoch_data_size"] #由于采取随机采样,所以需要设定一个采样数量,否则可以一直采
self.data_type = None #用来标识加载的是训练集还是测试集 "train" or "test"
self.load()

def load(self):
self.data = []
self.knwb = defaultdict(list)
with open(self.path, encoding="utf8") as f:
for line in f:
line = json.loads(line)
#加载训练集
if isinstance(line, dict):
self.data_type = "train"
questions = line["questions"]
label = line["target"]
for question in questions:
input_id = self.encode_sentence(question)
input_id = torch.LongTensor(input_id)
self.knwb[self.schema[label]].append(input_id)
#加载测试集
else:
self.data_type = "test"
assert isinstance(line, list)
question, label = line
input_id = self.encode_sentence(question)
input_id = torch.LongTensor(input_id)
label_index = torch.LongTensor([self.schema[label]])
self.data.append([input_id, label_index])
return

def encode_sentence(self, text):
input_id = []
if self.config["vocab_path"] == "words.txt":
for word in jieba.cut(text):
input_id.append(self.vocab.get(word, self.vocab["[UNK]"]))
else:
for char in text:
input_id.append(self.vocab.get(char, self.vocab["[UNK]"]))
input_id = self.padding(input_id)
return input_id

#补齐或截断输入的序列,使其可以在一个batch内运算
def padding(self, input_id):
input_id = input_id[:self.config["max_length"]]
input_id += [0] * (self.config["max_length"] - len(input_id))
return input_id

def __len__(self):
if self.data_type == "train":
return self.config["epoch_data_size"]
else:
assert self.data_type == "test", self.data_type
return len(self.data)

def __getitem__(self, index):
if self.data_type == "train":
return self.random_train_sample() #随机生成一个训练样本
else:
return self.data[index]

# #依照一定概率生成负样本或正样本
# #负样本从随机两个不同的标准问题中各随机选取一个
# #正样本从随机一个标准问题中随机选取两个
# def random_train_sample(self):
# standard_question_index = list(self.knwb.keys())
# #随机正样本
# if random.random() <= self.config["positive_sample_rate"]:
# p = random.choice(standard_question_index)
# #如果选取到的标准问下不足两个问题,则无法选取,所以重新随机一次
# if len(self.knwb[p]) < 2:
# return self.random_train_sample()
# else:
# s1, s2 = random.sample(self.knwb[p], 2)
# return [s1, s2, torch.LongTensor([1])]
# #随机负样本
# else:
# p, n = random.sample(standard_question_index, 2)
# s1 = random.choice(self.knwb[p])
# s2 = random.choice(self.knwb[n])
# return [s1, s2, torch.LongTensor([-1])]


def random_train_sample(self):
standard_question_index = list(self.knwb.keys())
# 先选定两个意图,之后从第一个意图中取2个问题,第二个意图中取一个问题
p, n = random.sample(standard_question_index, 2)
# 如果某个意图下刚好只有一条问题,那只能两个正样本用一样的;
# 这种对训练没帮助,因为相同的样本距离肯定是0,但是数据充分的情况下这种情况很少
if len(self.knwb[p]) == 1:
s1 = s2 = self.knwb[p][0]
#这应当是一般情况
else:
s1, s2 = random.sample(self.knwb[p], 2)
# 随机一个负样本
s3 = random.choice(self.knwb[n])
# 前2个相似,后1个不相似,不需要额外在输入一个0或1的label,这与一般的loss计算不同
return [s1, s2, s3]


#加载字表或词表
def load_vocab(vocab_path):
token_dict = {}
with open(vocab_path, encoding="utf8") as f:
for index, line in enumerate(f):
token = line.strip()
token_dict[token] = index + 1 #0留给padding位置,所以从1开始
return token_dict

#加载schema
def load_schema(schema_path):
with open(schema_path, encoding="utf8") as f:
return json.loads(f.read())

#用torch自带的DataLoader类封装数据
def load_data(data_path, config, shuffle=True):
dg = DataGenerator(data_path, config)
dl = DataLoader(dg, batch_size=config["batch_size"], shuffle=shuffle)
return dl



if __name__ == "__main__":
from config import Config
dg = DataGenerator("valid_tag_news.json", Config)
print(dg[1])