From 1649007859c3f154d7769c13f81121eb40656f6e Mon Sep 17 00:00:00 2001 From: Uyghur Date: Mon, 14 Jun 2021 16:55:40 +0900 Subject: [PATCH] Delete UFormerCTC1N.py --- UFormerCTC1N.py | 610 ------------------------------------------------ 1 file changed, 610 deletions(-) delete mode 100644 UFormerCTC1N.py diff --git a/UFormerCTC1N.py b/UFormerCTC1N.py deleted file mode 100644 index b124127..0000000 --- a/UFormerCTC1N.py +++ /dev/null @@ -1,610 +0,0 @@ -import torch -import torch.nn as nn -import torch.nn.functional as F -import torch.nn.init as I - -import numpy as np -import math - -from BaseModel import BaseModel -from data import melfuture -from uyghur import uyghur_latin - -class UFormerCTC1N(BaseModel): - def __init__(self, num_features_input, load_best=False): - super(UFormerCTC1N, self).__init__('UFormerCTC1N') - num_layers = 1 #'Number of layers' - num_heads = 8 #'Number of heads' - dim_model = 768 #'Model dimension' - dim_key = 96 #'Key dimension' - dim_value = 96 #'Value dimension' - dim_inner = 1024 #'Inner dimension' - dim_emb = 768 #'Embedding dimension' - src_max_len = 2500 #'Source max length' - tgt_max_len = 1000 #'Target max length' - dropout = 0.1 - emb_trg_sharing = False - self.flayer = UDS2W2L8(num_features_input) - self.encoder = Encoder(num_layers, num_heads=num_heads, dim_model=dim_model, dim_key=dim_key, dim_value=dim_value, dim_inner=dim_inner, src_max_length=src_max_len, dropout=dropout) - self.decoder = Decoder(num_layers=num_layers, num_heads=num_heads, dim_emb=dim_emb, dim_model=dim_model, dim_inner=dim_inner, dim_key=dim_key, dim_value=dim_value, trg_max_length=tgt_max_len, dropout=dropout, emb_trg_sharing=emb_trg_sharing) - for p in self.parameters(): - if p.dim() > 1: - nn.init.xavier_uniform_(p) - - self.ctcOut = None - self.ctcLen = None - - self.checkpoint = "results/" + self.ModelName - self._load() - - print(" Model Name:", self.ModelName) - print(f'The model has {self.parameters_count(self):,} trainable parameters') - print(f' Future has {self.parameters_count(self.flayer):,} trainable parameters') - print(f' Encoder has {self.parameters_count(self.encoder):,} trainable parameters') - print(f' Decoder has {self.parameters_count(self.decoder):,} trainable parameters') - - - def forward(self, padded_input, input_lengths, padded_target): - padded_input,self.ctcOut, self.ctcLen = self.flayer(padded_input,input_lengths) - #input must be #B x T x F format - encoder_padded_outputs, _ = self.encoder(padded_input, self.ctcLen) # BxTxH or #B x T x F - seq_in_pad, gold = self.preprocess(padded_target) - pred = self.decoder(seq_in_pad, encoder_padded_outputs, self.ctcLen) - return pred, gold - - def greedydecode(self, pred, len=0): - _, pred = torch.topk(pred, 1, dim=2) - preds = pred.squeeze(2) - strs_pred = [uyghur_latin.decode(pred_id) for pred_id in preds] - return strs_pred - - def predict(self,wavfile, device): - self.eval() - spec = melfuture(wavfile).unsqueeze(0).to(device) - spec_len = torch.tensor([spec.shape[2]], dtype=torch.int) - padded_input,self.ctcOut, self.ctcLen = self.flayer(spec,spec_len) - encoder_padded_outputs, _ = self.encoder(padded_input, self.ctcLen) # BxTxH or #B x T x F - strs_hyps = self.decoder.greedy_search(encoder_padded_outputs) - return strs_hyps - - -class ResB(nn.Module): - def __init__(self, in_channel, out_channel, kernel, d = 0.4, stride = 1): - super().__init__() - - self.isRes = (in_channel == out_channel and stride == 1) - pad = (kernel-1)//2 - self.conv = nn.Sequential( - nn.Conv1d(in_channel, out_channel, kernel_size = kernel, stride = stride , padding=pad, bias=False), - nn.BatchNorm1d(out_channel), - nn.ReLU(), - ) - - self.bn = nn.BatchNorm1d(out_channel) - self.actfn = nn.ReLU() - self.drop = nn.Dropout(d) - - def forward(self, x): - out = self.conv(x) - if self.isRes: - out = self.bn(out+x) - out = self.actfn(out) - - out = self.drop(out) - return out - - -class UDS2W2L8(nn.Module): - def __init__(self, num_features_input): - super(UDS2W2L8, self).__init__() - self.conv = nn.Sequential( - nn.Conv2d(1, 32, kernel_size=(41, 11), stride=(2, 2), padding=(20, 5), bias=False), - nn.BatchNorm2d(32), - nn.Hardtanh(0, 20, inplace=True), - nn.Conv2d(32, 32, kernel_size=(21, 11), stride=(2, 1), padding=(10, 5),bias=False), - nn.BatchNorm2d(32), - nn.Hardtanh(0, 20, inplace=True), - ) - self.lstm1 = nn.GRU(1024, 256, num_layers=1 , batch_first=True, bidirectional=True) - self.cnn1 = nn.Sequential( - ResB(256, 256, 11, 0.2,2), - ResB(256, 256, 11, 0.2), - ResB(256, 256, 11, 0.2), - ResB(256, 256, 11, 0.2), - ResB(256, 256, 11, 0.2) - ) - self.lstm2 = nn.GRU(256, 384, num_layers=1 , batch_first=True, bidirectional=True) - self.cnn2 = nn.Sequential( - ResB(384,384,13,0.2), - ResB(384,384,13,0.2), - ResB(384,384,13,0.2), - - ResB(384,512,17,0.2), - ResB(512,512,17,0.3), - ResB(512,512,17,0.3), - ResB(512,768, 1,0.3), - ResB(768,768, 1,0.0), - ) - self.outlayer = nn.Conv1d(768, uyghur_latin.vocab_size, 1, 1) - self.softMax = nn.LogSoftmax(dim=1) - - def forward(self, x, lengths): - out_lens = lengths//2 - - x.unsqueeze_(1) - out = self.conv(x) - - b, c, h, w = out.size() - out = out.view(b, c*h, w).contiguous() #.permute(0,2,1) - - out = out.permute(0,2,1) - out = nn.utils.rnn.pack_padded_sequence(out, out_lens, batch_first=True) - out, _ = self.lstm1(out) - out, _ = nn.utils.rnn.pad_packed_sequence(out, batch_first=True) - - out = (out[:, :, :self.lstm1.hidden_size] + out[:, :, self.lstm1.hidden_size:]).contiguous() - out = self.cnn1(out.permute(0,2,1)) - - out_lens = out_lens//2 - out = out.permute(0,2,1) - out = nn.utils.rnn.pack_padded_sequence(out, out_lens, batch_first=True) - out,_ = self.lstm2(out) - out, _ = nn.utils.rnn.pad_packed_sequence(out, batch_first=True) - - out = (out[:, :, :self.lstm2.hidden_size] + out[:, :, self.lstm2.hidden_size:]).contiguous() - out = self.cnn2(out.permute(0,2,1)) - outctc = self.softMax(self.outlayer(out)) - return out.contiguous().permute(0,2,1), outctc, out_lens - - def load(self): - pack = torch.load('results/UDS2W2L8_last.pth', map_location='cpu') - sdict = pack['st_dict'] - news_dict = self.state_dict() - filtered_dict = {k: v for k, v in sdict.items() if k in news_dict and v.size() == news_dict[k].size()} - news_dict.update(filtered_dict) - self.load_state_dict(news_dict) - - -class Encoder(nn.Module): - """ - Encoder Transformer class - """ - - def __init__(self, num_layers, num_heads, dim_model, dim_key, dim_value, dim_inner, dropout=0.1, src_max_length=2500): - super(Encoder, self).__init__() - - self.num_layers = num_layers - self.num_heads = num_heads - - self.dim_model = dim_model - self.dim_key = dim_key - self.dim_value = dim_value - self.dim_inner = dim_inner - - self.src_max_length = src_max_length - - self.dropout = nn.Dropout(dropout) - self.dropout_rate = dropout - - self.positional_encoding = PositionalEncoding(dim_model, src_max_length) - - self.layers = nn.ModuleList([ - EncoderLayer(num_heads, dim_model, dim_inner, dim_key, dim_value, dropout=dropout) for _ in range(num_layers) - ]) - - def forward(self, padded_input, input_lengths): - """ - args: - padded_input: B x T x D - input_lengths: B - return: - output: B x T x H - """ - encoder_self_attn_list = [] - - # Prepare masks - non_pad_mask = get_non_pad_mask(padded_input, input_lengths=input_lengths) # B x T x D - seq_len = padded_input.size(1) - self_attn_mask = get_attn_pad_mask(padded_input, input_lengths, seq_len) # B x T x T - pos = self.positional_encoding(padded_input) - encoder_output = padded_input + pos - - for layer in self.layers: - encoder_output, self_attn = layer(encoder_output, non_pad_mask=non_pad_mask, self_attn_mask=self_attn_mask) - encoder_self_attn_list += [self_attn] - - return encoder_output, encoder_self_attn_list - - -class EncoderLayer(nn.Module): - """ - Encoder Layer Transformer class - """ - - def __init__(self, num_heads, dim_model, dim_inner, dim_key, dim_value, dropout=0.1): - super(EncoderLayer, self).__init__() - self.self_attn = MultiHeadAttention(num_heads, dim_model, dim_key, dim_value, dropout=dropout) - self.pos_ffn = PositionwiseFeedForwardWithConv(dim_model, dim_inner, dropout=dropout) - - def forward(self, enc_input, non_pad_mask=None, self_attn_mask=None): - enc_output, self_attn = self.self_attn(enc_input, enc_input, enc_input, mask=self_attn_mask) - enc_output *= non_pad_mask - - enc_output = self.pos_ffn(enc_output) - enc_output *= non_pad_mask - - return enc_output, self_attn - - -class Decoder(nn.Module): - """ - Decoder Layer Transformer class - """ - - def __init__(self, num_layers, num_heads, dim_emb, dim_model, dim_inner, dim_key, dim_value, dropout=0.1, trg_max_length=1000, emb_trg_sharing=False): - super(Decoder, self).__init__() - self.num_trg_vocab = uyghur_latin.vocab_size - self.num_layers = num_layers - self.num_heads = num_heads - - self.dim_emb = dim_emb - self.dim_model = dim_model - self.dim_inner = dim_inner - self.dim_key = dim_key - self.dim_value = dim_value - - self.dropout_rate = dropout - self.emb_trg_sharing = emb_trg_sharing - - self.trg_max_length = trg_max_length - - self.trg_embedding = nn.Embedding(self.num_trg_vocab, dim_emb, padding_idx=uyghur_latin.pad_idx) - self.positional_encoding = PositionalEncoding(dim_model, trg_max_length) - self.dropout = nn.Dropout(dropout) - - self.layers = nn.ModuleList([ - DecoderLayer(dim_model, dim_inner, num_heads,dim_key, dim_value, dropout=dropout) - for _ in range(num_layers) - ]) - - self.output_linear = nn.Linear(dim_model, self.num_trg_vocab, bias=False) - nn.init.xavier_normal_(self.output_linear.weight) - - if emb_trg_sharing: - self.output_linear.weight = self.trg_embedding.weight - self.x_logit_scale = (dim_model ** -0.5) - else: - self.x_logit_scale = 1.0 - - def forward(self, seq_in_pad, encoder_padded_outputs, encoder_input_lengths): - """ - args: - padded_input: B x T - encoder_padded_outputs: B x T x H - encoder_input_lengths: B - returns: - pred: B x T x vocab - gold: B x T - """ - decoder_self_attn_list, decoder_encoder_attn_list = [], [] - - # Prepare masks - non_pad_mask = get_non_pad_mask(seq_in_pad, pad_idx=uyghur_latin.pad_idx) - self_attn_mask_subseq = get_subsequent_mask(seq_in_pad) - self_attn_mask_keypad = get_attn_key_pad_mask(seq_k=seq_in_pad, seq_q=seq_in_pad, pad_idx=uyghur_latin.pad_idx) - self_attn_mask = (self_attn_mask_keypad + self_attn_mask_subseq).gt(0) - - output_length = seq_in_pad.size(1) - dec_enc_attn_mask = get_attn_pad_mask(encoder_padded_outputs, encoder_input_lengths, output_length) - - decoder_output = self.dropout(self.trg_embedding(seq_in_pad) * self.x_logit_scale + self.positional_encoding(seq_in_pad)) - - for layer in self.layers: - decoder_output, decoder_self_attn, decoder_enc_attn = layer(decoder_output, encoder_padded_outputs, non_pad_mask=non_pad_mask, self_attn_mask=self_attn_mask, dec_enc_attn_mask=dec_enc_attn_mask) - - decoder_self_attn_list += [decoder_self_attn] - decoder_encoder_attn_list += [decoder_enc_attn] - - seq_logit = self.output_linear(decoder_output) - - return seq_logit - - def greedy_search(self, encoder_padded_outputs): - """ - Greedy search, decode 1-best utterance - args: - encoder_padded_outputs: B x T x H - output: - batch_ids_nbest_hyps: list of nbest in ids (size B) - batch_strs_nbest_hyps: list of nbest in strings (size B) - """ - with torch.no_grad(): - device = encoder_padded_outputs.device - max_seq_len = self.trg_max_length - - #ys = torch.ones(encoder_padded_outputs.size(0),1).fill_(uyghur_latin.sos_idx).long().to(device) # batch_size x 1 - max_seq_len = min(max_seq_len, encoder_padded_outputs.size(1)) - inps=[uyghur_latin.sos_idx] - result = [] - for t in range(max_seq_len): - ys = torch.LongTensor(inps).unsqueeze(0).to(device) - non_pad_mask = torch.ones_like(ys).float().unsqueeze(-1) # batch_size x t x 1 - self_attn_mask = get_subsequent_mask(ys).gt(0) # batch_size x t x t - - decoder_output = self.dropout(self.trg_embedding(ys) * self.x_logit_scale + self.positional_encoding(ys)) - - for layer in self.layers: - decoder_output, _, _ = layer( - decoder_output, encoder_padded_outputs, - non_pad_mask=non_pad_mask, - self_attn_mask=self_attn_mask, - dec_enc_attn_mask=None - ) - - prob = self.output_linear(decoder_output) # batch_size x t x label_size - _, next_word = torch.max(prob[:, -1], dim=1) - next_word = next_word.item() - result.append(next_word) - if next_word == uyghur_latin.eos_idx: - break - - inps.append(next_word) - - sent = uyghur_latin.decode(result) - return sent - -class DecoderLayer(nn.Module): - """ - Decoder Transformer class - """ - - def __init__(self, dim_model, dim_inner, num_heads, dim_key, dim_value, dropout=0.1): - super(DecoderLayer, self).__init__() - self.self_attn = MultiHeadAttention( - num_heads, dim_model, dim_key, dim_value, dropout=dropout) - self.encoder_attn = MultiHeadAttention( - num_heads, dim_model, dim_key, dim_value, dropout=dropout) - self.pos_ffn = PositionwiseFeedForwardWithConv( - dim_model, dim_inner, dropout=dropout) - - def forward(self, decoder_input, encoder_output, non_pad_mask=None, self_attn_mask=None, dec_enc_attn_mask=None): - decoder_output, decoder_self_attn = self.self_attn(decoder_input, decoder_input, decoder_input, mask=self_attn_mask) - decoder_output *= non_pad_mask - - decoder_output, decoder_encoder_attn = self.encoder_attn(decoder_output, encoder_output, encoder_output, mask=dec_enc_attn_mask) - decoder_output *= non_pad_mask - - decoder_output = self.pos_ffn(decoder_output) - decoder_output *= non_pad_mask - - return decoder_output, decoder_self_attn, decoder_encoder_attn - - -""" -Transformer common layers -""" - -def get_non_pad_mask(padded_input, input_lengths=None, pad_idx=None): - """ - padding position is set to 0, either use input_lengths or pad_idx - """ - assert input_lengths is not None or pad_idx is not None - if input_lengths is not None: - # padded_input: N x T x .. - N = padded_input.size(0) - non_pad_mask = padded_input.new_ones(padded_input.size()[:-1]) # B x T - for i in range(N): - non_pad_mask[i, input_lengths[i]:] = 0 - if pad_idx is not None: - # padded_input: N x T - assert padded_input.dim() == 2 - non_pad_mask = padded_input.ne(pad_idx).float() - # unsqueeze(-1) for broadcast - return non_pad_mask.unsqueeze(-1) - -def get_attn_key_pad_mask(seq_k, seq_q, pad_idx): - """ - For masking out the padding part of key sequence. - """ - # Expand to fit the shape of key query attention matrix. - len_q = seq_q.size(1) - padding_mask = seq_k.eq(pad_idx) - padding_mask = padding_mask.unsqueeze(1).expand(-1, len_q, -1).byte() # B x T_Q x T_K - - return padding_mask - -def get_attn_pad_mask(padded_input, input_lengths, expand_length): - """mask position is set to 1""" - # N x Ti x 1 - non_pad_mask = get_non_pad_mask(padded_input, input_lengths=input_lengths) - # N x Ti, lt(1) like not operation - pad_mask = non_pad_mask.squeeze(-1).lt(1) - attn_mask = pad_mask.unsqueeze(1).expand(-1, expand_length, -1) - return attn_mask - -def get_subsequent_mask(seq): - ''' For masking out the subsequent info. ''' - - sz_b, len_s = seq.size() - subsequent_mask = torch.triu( - torch.ones((len_s, len_s), device=seq.device, dtype=torch.uint8), diagonal=1) - subsequent_mask = subsequent_mask.unsqueeze(0).expand(sz_b, -1, -1) # b x ls x ls - - return subsequent_mask - -class PositionalEncoding(nn.Module): - """ - Positional Encoding class - """ - def __init__(self, dim_model, max_length=2000): - super(PositionalEncoding, self).__init__() - - pe = torch.zeros(max_length, dim_model, requires_grad=False) - position = torch.arange(0, max_length).unsqueeze(1).float() - exp_term = torch.exp(torch.arange(0, dim_model, 2).float() * -(math.log(10000.0) / dim_model)) - pe[:, 0::2] = torch.sin(position * exp_term) # take the odd (jump by 2) - pe[:, 1::2] = torch.cos(position * exp_term) # take the even (jump by 2) - pe = pe.unsqueeze(0) - self.register_buffer('pe', pe) - - def forward(self, input): - """ - args: - input: B x T x D - output: - tensor: B x T - """ - return self.pe[:, :input.size(1)] - - - -class PositionwiseFeedForward(nn.Module): - """ - Position-wise Feedforward Layer class - FFN(x) = max(0, xW1 + b1) W2+ b2 - """ - def __init__(self, dim_model, dim_ff, dropout=0.1): - super(PositionwiseFeedForward, self).__init__() - self.linear_1 = nn.Linear(dim_model, dim_ff) - self.linear_2 = nn.Linear(dim_ff, dim_model) - self.dropout = nn.Dropout(dropout) - self.layer_norm = nn.LayerNorm(dim_model) - - def forward(self, x): - """ - args: - x: tensor - output: - y: tensor - """ - residual = x - output = self.dropout(self.linear_2(F.relu(self.linear_1(x)))) - output = self.layer_norm(output + residual) - return output - -class PositionwiseFeedForwardWithConv(nn.Module): - """ - Position-wise Feedforward Layer Implementation with Convolution class - """ - def __init__(self, dim_model, dim_hidden, dropout=0.1): - super(PositionwiseFeedForwardWithConv, self).__init__() - self.conv_1 = nn.Conv1d(dim_model, dim_hidden, 1) - self.conv_2 = nn.Conv1d(dim_hidden, dim_model, 1) - self.dropout = nn.Dropout(dropout) - self.layer_norm = nn.LayerNorm(dim_model) - - def forward(self, x): - residual = x - output = x.transpose(1, 2) - output = self.conv_2(F.relu(self.conv_1(output))) - output = output.transpose(1, 2) - output = self.dropout(output) - output = self.layer_norm(output + residual) - return output - -class MultiHeadAttention(nn.Module): - def __init__(self, num_heads, dim_model, dim_key, dim_value, dropout=0.1): - super(MultiHeadAttention, self).__init__() - - self.num_heads = num_heads - - self.dim_model = dim_model - self.dim_key = dim_key - self.dim_value = dim_value - - self.query_linear = nn.Linear(dim_model, num_heads * dim_key) - self.key_linear = nn.Linear(dim_model, num_heads * dim_key) - self.value_linear = nn.Linear(dim_model, num_heads * dim_value) - - nn.init.normal_(self.query_linear.weight, mean=0, std=np.sqrt(2.0 / (self.dim_model + self.dim_key))) - nn.init.normal_(self.key_linear.weight, mean=0, std=np.sqrt(2.0 / (self.dim_model + self.dim_key))) - nn.init.normal_(self.value_linear.weight, mean=0, std=np.sqrt(2.0 / (self.dim_model + self.dim_value))) - - self.attention = ScaledDotProductAttention(temperature=np.power(dim_key, 0.5), attn_dropout=dropout) - self.layer_norm = nn.LayerNorm(dim_model) - - self.output_linear = nn.Linear(num_heads * dim_value, dim_model) - nn.init.xavier_normal_(self.output_linear.weight) - - self.dropout = nn.Dropout(dropout) - - def forward(self, query, key, value, mask=None): - """ - query: B x T_Q x H, key: B x T_K x H, value: B x T_V x H - mask: B x T x T (attention mask) - """ - batch_size, len_query, _ = query.size() - batch_size, len_key, _ = key.size() - batch_size, len_value, _ = value.size() - - residual = query - - query = self.query_linear(query).view(batch_size, len_query, self.num_heads, self.dim_key) # B x T_Q x num_heads x H_K - key = self.key_linear(key).view(batch_size, len_key, self.num_heads, self.dim_key) # B x T_K x num_heads x H_K - value = self.value_linear(value).view(batch_size, len_value, self.num_heads, self.dim_value) # B x T_V x num_heads x H_V - - query = query.permute(2, 0, 1, 3).contiguous().view(-1, len_query, self.dim_key) # (num_heads * B) x T_Q x H_K - key = key.permute(2, 0, 1, 3).contiguous().view(-1, len_key, self.dim_key) # (num_heads * B) x T_K x H_K - value = value.permute(2, 0, 1, 3).contiguous().view(-1, len_value, self.dim_value) # (num_heads * B) x T_V x H_V - - if mask is not None: - mask = mask.repeat(self.num_heads, 1, 1) # (B * num_head) x T x T - - output, attn = self.attention(query, key, value, mask=mask) - - output = output.view(self.num_heads, batch_size, len_query, self.dim_value) # num_heads x B x T_Q x H_V - output = output.permute(1, 2, 0, 3).contiguous().view(batch_size, len_query, -1) # B x T_Q x (num_heads * H_V) - - output = self.dropout(self.output_linear(output)) # B x T_Q x H_O - output = self.layer_norm(output + residual) - - return output, attn - -class ScaledDotProductAttention(nn.Module): - ''' Scaled Dot-Product Attention ''' - - def __init__(self, temperature, attn_dropout=0.1): - super().__init__() - self.temperature = temperature - self.dropout = nn.Dropout(attn_dropout) - self.softmax = nn.Softmax(dim=2) - - def forward(self, q, k, v, mask=None): - """ - - """ - attn = torch.bmm(q, k.transpose(1, 2)) - attn = attn / self.temperature - - if mask is not None: - attn = attn.masked_fill(mask, -np.inf) - - attn = self.softmax(attn) - attn = self.dropout(attn) - output = torch.bmm(attn, v) - - return output, attn - -if __name__ == "__main__": - from data import melfuture, featurelen, uyghur_latin, SpeechDataset, _collate_fn - device = 'cuda' - model = UFormerCTC1N(featurelen,uyghur_latin) - model.to(device) - model.save(0) - - txt = model.predict("test3.wav", device) - print(txt) - - txt = model.predict("test4.wav", device) - print(txt) - - train_dataset = SpeechDataset('uyghur_thuyg20_train_small.csv', augumentation=False) - bbb = [] - bbb.append(train_dataset[0]) - bbb.append(train_dataset[3]) - bbb.append(train_dataset[4]) - inps, targs, in_lens,_,_ = _collate_fn(bbb) - model.train() - outs, trg = model(inps.to(device),in_lens, targs.to(device)) - print(outs.size()) - print(trg.size())