大数据 XnMatrix JavaSE 细胞因子 MyBatis 阿里巴巴 mysql安装 join button routes arm vue安装 vue特点 后台界面 nginx教程视频 pmp教程 nginx默认端口号 arraylist删除指定元素 python实例 python操作mysql python读取本地文件 java泛型 java数组添加 java运行环境 java中获取当前时间 linux系统安装步骤 opengl编程指南 猫爪 狮子狗出装 打马赛克的软件 忧思华光玉攻略 bat脚本 字幕制作软件哪个好 ad19 凯恩与林奇2下载 cdr字体加粗 小米8游戏模式 男网红头像 mix2s拆机 设备管理器在哪
当前位置: 首页 > 学习教程  > 编程语言

pytorch实现的TextCNN(Dataset, DataLoader的使用)

2020/11/24 9:48:30 文章标签: 测试文章如有侵权请发送至邮箱809451989@qq.com投诉后文章立即删除

主要是Dataset, DataLoader的使用&#xff08;1&#xff09;数据处理&#xff0c;生成Batch和向量化词表import torch import numpy as np from tqdm import tqdm from torch.utils.data import Dataset, DataLoadertokenizer lambda x: [y for y in x] UNK, PAD <UNK>…

主要是Dataset, DataLoader的使用


(1)数据处理,生成Batch和向量化词表

import torch
import numpy as np
from tqdm import tqdm
from torch.utils.data import Dataset, DataLoader

tokenizer = lambda x: [y for y in x]
UNK, PAD = '<UNK>', '<PAD>'  # 未知字,padding符号


def build_vocab(path, max_size, freq_min):
    vocab_dic = {}
    with open(path, "r", encoding="utf-8") as f:
        for sentences in tqdm(f):
            sentence = sentences.strip()
            if not sentence:
                continue
            content = sentence.split("\t")[0]
            for word in tokenizer(content):
                vocab_dic[word] = vocab_dic.get(word, 0) + 1
        vocab_list = sorted([_ for _ in vocab_dic.items() if _[1] >= freq_min], key=lambda x: x[1], reverse=True)[
                     :max_size]
        vocab_dic = {word_count[0]: idx for idx, word_count in enumerate(vocab_list)}
        vocab_dic.update({UNK: len(vocab_dic), PAD: len(vocab_dic) + 1})
    return vocab_dic


def load_data(path, padding_size=32):
    contents = []
    vocab = build_vocab(path, max_size=10000, freq_min=1)
    with open(path, "r", encoding="utf-8") as f:
        for sentences in tqdm(f):
            sentence = sentences.strip()
            if not sentence:
                continue
            text, label = sentence.split("\t")
            word_line = []
            token = tokenizer(text)
            seq_len = len(token)
            if padding_size:
                if len(token) < padding_size:
                    token.extend([PAD] * (padding_size - len(token)))
                else:
                    token = token[:padding_size]
                    seq_len = len(token)

            # word2id
            for word in token:
                word_line.append(vocab.get(word, vocab.get(UNK)))
            contents.append((word_line, int(label), seq_len))
    return contents


class MyDataset(Dataset):
    def __init__(self, path):
        all_text = []
        all_label = []
        all_len = []
        train_data = load_data(path)
        for data in train_data:
            all_text.append(data[0])
            all_label.append(data[1])
            all_len.append(data[2])
        self.text = all_text
        self.label = all_label
        self.length = all_len

    def __getitem__(self, index):
        return self.text[index], self.label[index], self.length[index]

    def __len__(self):
        return len(self.text)


dataset = MyDataset('data.txt')


def collate(data):
    text, label, length = list(zip(*data))
    new_text = torch.LongTensor(text)
    new_label = torch.LongTensor(label)
    new_length = torch.LongTensor(length)
    return (new_text, new_length), new_label


my_train_loader = DataLoader(dataset=dataset, batch_size=128, shuffle=True, num_workers=0, collate_fn=collate)


# for i, batch in enumerate(my_train_loader, 0):
#     print(i, batch)


# 把词表中的词语转换为向量
def transfer_embed(path, embedding_path, embed_path, embed_dim=300):
    vocab = build_vocab(path, max_size=10000, freq_min=1)
    vocab_len = len(vocab)
    embedding = np.random.rand(vocab_len, embed_dim)
    with open(embed_path, 'r', encoding='utf-8') as f:
        for i, lines in enumerate(f.readlines()):
            if i == 0:
                continue
            line = lines.strip().split(" ")
            if line[0] in vocab:
                idx = vocab[line[0]]
                embed = [float(x) for x in line[1:301]]
                embedding[idx] = np.asarray(embed, dtype="float32")
    np.savez_compressed(embedding_path, embedding=embedding)


transfer_embed("data.txt", embedding_path="embed.npz", embed_path="sogou.txt", embed_dim=300)

(2)TextCNN模型搭建

import torch
import torch.nn as nn
import numpy as np
import torch.nn.functional as F


class Config(object):
    """
    配置参数
    """

    def __init__(self):
        self.embed_path = "emdedding_data.npz"
        self.label_list = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
        self.train_path = 'data.txt'
        self.embedding_pretrained = torch.Tensor(np.load(self.embed_path)["embedding"].astype("float32"))
        self.device = torch.device('cuda' if torch.cuda.is_available() else "cpu")

        self.dropout = 0.5
        self.vocab_size = 10000
        self.num_classes = len(self.label_list)
        self.num_epochs = 20
        self.batch_size = 128
        self.learning_rate = 1e-3
        self.embed = 300
        self.filter_size = (3, 4, 5)
        self.num_filter = 128



class TextCNN(nn.Module):
    def __init__(self, config):
        super(TextCNN, self).__init__()
        if config.embedding_pretrained is not None:
            self.embedding = nn.Embedding.from_pretrained(config.embedding_pretrained, freeze=False)
        else:
            self.embedding = nn.Embedding(config.vocab_size, config.embed)
        self.convs = nn.ModuleList([nn.Conv2d(1, config.num_filter, (k, config.embed)) for k in config.filter_size])
        self.dropout = nn.Dropout(config.dropout)
        self.fc = nn.Linear(config.num_filter * len(config.filter_size), config.num_classes)

    def conv_and_pool(self, x, conv):
        # print("x1", x, x.shape)
        x = F.relu(conv(x)).squeeze(3)
        # print("x", x.shape)
        x = F.max_pool1d(x, x.size(2)).squeeze(2)
        return x

    def forward(self, x):
        out = self.embedding(x[0])
        # print("out1", out, out.shape)
        out = out.unsqueeze(1)
        # print("out2", out, out.shape)
        out = torch.cat([self.conv_and_pool(out, conv) for conv in self.convs], 1)
        out = self.dropout(out)
        out = self.fc(out)
        return out


(3)训练和评估模块

import torch
import numpy as np
import torch.nn as nn
import torch.optim as opt
from sklearn import metrics
import torch.nn.functional as F


def evaluate(config, model, data_iter, test=False):
    model.eval()
    loss_total = 0
    predict_all = np.array([], dtype=int)
    label_all = np.array([], dtype=int)
    with torch.no_grad():
        for text, label in data_iter:
            outputs = model(text)
            loss = F.cross_entropy(outputs, label)
            loss_total += loss
            label = label.data.cpu().numpy()
            predict = torch.max(outputs.data, 1)[1].cpu().numpy()
            label_all.all = np.append(label_all, label)
            predict_all = np.append(predict_all, predict)
    acc = metrics.accuracy_score(label_all, predict_all)
    if test:
        report = metrics.classification_report(label_all, predict_all, target_names=config.label_lsit, digits=4)
        confusion = metrics.confusion_matrix(label_all, predict_all)
        return acc, loss_total / len(data_iter), report, confusion
    return acc, loss_total / len(data_iter)


def train(config, model, train_iter):
    model.train()
    optimizer = opt.Adam(model.parameters(), lr=config.learning_rate)
    total_batch = 0  # 记录进行到多少batch
    dev_best_loss = float('inf')
    last_improve = 0  # 记录上次验证集loss下降的batch数
    flag = False  # 记录是否很久没有效果提升
    for epoch in range(config.num_epochs):
        print('Epoch [{}/{}]'.format(epoch + 1, config.num_epochs))
        # scheduler.step() # 学习率衰减
        for i, (trains, labels) in enumerate(train_iter):
            outputs = model(trains)
            model.zero_grad()
            loss = F.cross_entropy(outputs, labels)
            loss.backward()
            optimizer.step()
            if total_batch % 100 == 0:
                # 每多少轮输出在训练集和验证集上的效果
                true = labels.data.cpu()
                predic = torch.max(outputs.data, 1)[1].cpu()
                train_acc = metrics.accuracy_score(true, predic)
                # dev_acc, dev_loss = evaluate(config, model, dev_iter)
                # if dev_loss < dev_best_loss:
                #     dev_best_loss = dev_loss
                #     torch.save(model.state_dict(), config.save_path)
                #     improve = '*'
                #     last_improve = total_batch
                # else:
                #     improve = ''
                msg = 'Iter: {0:>6},  Train Loss: {1:>5.2},  Train Acc: {2:>6.2%}'
                print(msg.format(total_batch, loss, train_acc))


if __name__ == "__main__":
    from train_model import Config, TextCNN
    from re_struct_dataloader import my_train_loader

    config = Config()
    model = TextCNN(config)
    train_iter = my_train_loader
    train(config, model, train_iter)

这里主要是对pytorch自带的Dataset, DataLoader进行试验,欢迎大家指正。


本文链接: http://www.dtmao.cc/news_show_400135.shtml

附件下载

相关教程

    暂无相关的数据...

共有条评论 网友评论

验证码: 看不清楚?