Add files via upload

This commit is contained in:
Uyghur 2020-12-25 16:47:12 +09:00 committed by GitHub
parent a3fd9dff1c
commit 0873625c09
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 3463 additions and 0 deletions

167
UDS2W2LDS.py Normal file
View File

@ -0,0 +1,167 @@
import math
import torch
import torch.nn as nn
from torch.nn.functional import log_softmax
import torch.nn.functional as F
from torch.nn.parameter import Parameter
from data import melfuture
from uyghur import uyghur_latin
from BaseModel import BaseModel
class UDS2W2LDS(BaseModel):
def __init__(self,num_features_input,load_best=False):
super(UDS2W2LDS, self).__init__('UDS2W2LDS')
dropout = 0.2
self.conv = nn.Sequential(
nn.Conv2d(1, 32, kernel_size=(41, 11), stride=(2, 1), padding=(20, 5), bias=False),
nn.BatchNorm2d(32),
nn.ReLU(inplace=True),
nn.Dropout(dropout),
nn.Conv2d(32, 32, kernel_size=(21, 11), stride=(2, 2), padding=(10, 5), bias=False),
nn.BatchNorm2d(32),
nn.ReLU(inplace=True),
nn.Dropout(dropout),
)
self.lstm1 = nn.GRU(1024, 256, num_layers=1 , batch_first=True, bidirectional=True)
self.cnn1 = nn.Sequential(
ResB(256,11,5,0.2),
ResB(256,11,5,0.2),
ResB(256,11,5,0.2),
ResB(256,11,5,0.2),
ResB(256,11,5,0.2)
)
self.lstm2 = nn.GRU(256, 384, num_layers=1 , batch_first=True, bidirectional=True)
self.cnn2 = nn.Sequential(
ResB(384,13,6,0.2),
ResB(384,13,6,0.2),
ResB(384,13,6,0.2),
nn.Conv1d(384, 512, 17, 1,8,),
nn.BatchNorm1d(512),
nn.ReLU(),
nn.Dropout(0.2),
ResB(512,17,8,0.3),
ResB(512,17,8,0.3),
nn.Conv1d(512, 768, 25, 1,12),
nn.BatchNorm1d(768),
nn.ReLU(),
nn.Dropout(0.3),
ResB(768,25,12,0.3),
nn.Conv1d(768, 1024, 1, 1),
nn.BatchNorm1d(1024),
nn.ReLU(),
nn.Dropout(0.3),
ResB(1024,1,0,0.0),
)
self.outlayer = nn.Conv1d(1024, uyghur_latin.vocab_size, 1, 1)
self.softMax = nn.LogSoftmax(dim=1)
self.checkpoint = 'results/' + self.ModelName
self._loadfrom()
print(f'The model has {self.parameters_count(self):,} trainable parameters')
def smooth_labels(self, x):
return (1.0 - self.smoothing) * x + self.smoothing / x.size(-1)
def forward(self, x, lengths):
x.unsqueeze_(1)
out = self.conv(x)
b, c, h, w = out.size()
out = out.view(b, c*h, w).contiguous() #.permute(0,2,1)
out = out.permute(0,2,1)
out_lens = lengths//2
out = nn.utils.rnn.pack_padded_sequence(out, out_lens, batch_first=True)
out, _ = self.lstm1(out)
out, _ = nn.utils.rnn.pad_packed_sequence(out, batch_first=True)
out = (out[:, :, :self.lstm1.hidden_size] + out[:, :, self.lstm1.hidden_size:]).contiguous()
out = self.cnn1(out.permute(0,2,1))
out = out.permute(0,2,1)
out = nn.utils.rnn.pack_padded_sequence(out, out_lens, batch_first=True)
out,_ = self.lstm2(out)
out, _ = nn.utils.rnn.pad_packed_sequence(out, batch_first=True)
out = (out[:, :, :self.lstm2.hidden_size] + out[:, :, self.lstm2.hidden_size:]).contiguous()
out = self.cnn2(out.permute(0,2,1))
out = self.outlayer(out)
out = self.softMax(out)
return out, out_lens
class ResB(nn.Module):
def __init__(self, num_filters, kernel, pad, d = 0.4):
super().__init__()
self.conv = nn.Sequential(
nn.Conv1d(num_filters, num_filters, kernel_size = kernel, stride = 1 , padding=pad),
nn.BatchNorm1d(num_filters)
)
self.relu = nn.ReLU()
self.bn = nn.BatchNorm1d(num_filters)
self.drop =nn.Dropout(d)
def forward(self, x):
identity = x
out = self.conv(x)
out += identity
out = self.bn(out)
out = self.relu(out)
out = self.drop(out)
return out
if __name__ == "__main__":
from data import featurelen, melfuture
device ="cpu"
net = UDS2W2LDS(featurelen).to(device)
text = net.predict("test1.wav",device)
print(text)
text = net.predict("test2.wav",device)
print(text)
#net.best_cer = 1.0
#net.save(0)
melf = melfuture("test3.wav")
melf.unsqueeze_(0)
conv0 = nn.Conv1d(featurelen,256,11,2, 5, 1)
conv1 = nn.Conv1d(256,256,11,1, 5, 1)
conv3 = nn.Conv1d(256,256,11,1, 5*2, 2)
conv5 = nn.Conv1d(256,256,11,1, 5*3, 3)
out0 = conv0(melf)
out1 = conv1(out0)
out3 = conv3(out0)
out5 = conv5(out0)
print(out1.size())
print(out3.size())
print(out5.size())
out = out1 * out3 * out5
print(out.size())
#net = GCGCRes(featurelen).to(device)
#net.save(1)
#text = net.predict("test1.wav",device)
#print(text)
#text = net.predict("test2.wav",device)
#print(text)

160
UDS2W2LDS00.py Normal file
View File

@ -0,0 +1,160 @@
import math
import torch
import torch.nn as nn
from torch.nn.functional import log_softmax
import torch.nn.functional as F
from torch.nn.parameter import Parameter
from data import melfuture
from uyghur import uyghur_latin
from BaseModel import BaseModel
class UDS2W2LDS00(BaseModel):
def __init__(self,num_features_input,load_best=False):
super(UDS2W2LDS00, self).__init__('UDS2W2LDS00')
dropout = 0.2
self.conv = nn.Sequential(
nn.Conv2d(1, 32, kernel_size=(41, 11), stride=(2, 1), padding=(20, 5), bias=False),
nn.BatchNorm2d(32),
nn.ReLU(inplace=True),
nn.Dropout(dropout),
nn.Conv2d(32, 32, kernel_size=(21, 11), stride=(2, 2), padding=(10, 5), bias=False),
nn.BatchNorm2d(32),
nn.ReLU(inplace=True),
nn.Dropout(dropout),
)
#self.lstm1 = nn.GRU(1024, 256, num_layers=1 , batch_first=True, bidirectional=True)
self.cnn1 = nn.Sequential(
nn.Conv1d(1024, 256, 11, 1,5),
nn.BatchNorm1d(256),
nn.ReLU(),
nn.Dropout(0.2),
ResB(256,11,5,0.2),
ResB(256,11,5,0.2),
ResB(256,11,5,0.2),
ResB(256,11,5,0.2)
)
#self.lstm2 = nn.GRU(256, 384, num_layers=1 , batch_first=True, bidirectional=True)
self.cnn2 = nn.Sequential(
nn.Conv1d(256, 384, 13, 1,6),
nn.BatchNorm1d(384),
nn.ReLU(),
nn.Dropout(0.2),
ResB(384,13,6,0.2),
ResB(384,13,6,0.2),
ResB(384,13,6,0.2),
nn.Conv1d(384, 512, 17, 1,8),
nn.BatchNorm1d(512),
nn.ReLU(),
nn.Dropout(0.2),
ResB(512,17,8,0.3),
ResB(512,17,8,0.3),
nn.Conv1d(512, 768, 25, 1,12),
nn.BatchNorm1d(768),
nn.ReLU(),
nn.Dropout(0.3),
ResB(768,25,12,0.3),
nn.Conv1d(768, 1024, 1, 1),
nn.BatchNorm1d(1024),
nn.ReLU(),
nn.Dropout(0.3),
ResB(1024,1,0,0.0),
)
self.outlayer = nn.Conv1d(1024, uyghur_latin.vocab_size, 1, 1)
self.softMax = nn.LogSoftmax(dim=1)
self.checkpoint = 'results/' + self.ModelName
self._loadfrom()
print(f'The model has {self.parameters_count(self):,} trainable parameters')
def smooth_labels(self, x):
return (1.0 - self.smoothing) * x + self.smoothing / x.size(-1)
def forward(self, x, lengths):
x.unsqueeze_(1)
out = self.conv(x)
b, c, h, w = out.size()
out = out.view(b, c*h, w).contiguous()
out_lens = lengths//2
out = self.cnn1(out)
out = self.cnn2(out)
out = self.outlayer(out)
out = self.softMax(out)
return out, out_lens
class ResB(nn.Module):
def __init__(self, num_filters, kernel, pad, d = 0.4):
super().__init__()
self.conv = nn.Sequential(
nn.Conv1d(num_filters, num_filters, kernel_size = kernel, stride = 1 , padding=pad),
nn.BatchNorm1d(num_filters)
)
self.relu = nn.ReLU()
self.bn = nn.BatchNorm1d(num_filters)
self.drop =nn.Dropout(d)
def forward(self, x):
identity = x
out = self.conv(x)
out += identity
out = self.bn(out)
out = self.relu(out)
out = self.drop(out)
return out
if __name__ == "__main__":
from data import featurelen, melfuture
device ="cpu"
net = UDS2W2LDS00(featurelen).to(device)
text = net.predict("test1.wav",device)
print(text)
text = net.predict("test2.wav",device)
print(text)
#net.best_cer = 1.0
#net.save(0)
melf = melfuture("test3.wav")
melf.unsqueeze_(0)
conv0 = nn.Conv1d(featurelen,256,11,2, 5, 1)
conv1 = nn.Conv1d(256,256,11,1, 5, 1)
conv3 = nn.Conv1d(256,256,11,1, 5*2, 2)
conv5 = nn.Conv1d(256,256,11,1, 5*3, 3)
out0 = conv0(melf)
out1 = conv1(out0)
out3 = conv3(out0)
out5 = conv5(out0)
print(out1.size())
print(out3.size())
print(out5.size())
out = out1 * out3 * out5
print(out.size())
#net = GCGCRes(featurelen).to(device)
#net.save(1)
#text = net.predict("test1.wav",device)
#print(text)
#text = net.predict("test2.wav",device)
#print(text)

155
UDS2W2LG.py Normal file
View File

@ -0,0 +1,155 @@
import math
import torch
import torch.nn as nn
from torch.nn.functional import log_softmax
import torch.nn.functional as F
from torch.nn.parameter import Parameter
from data import melfuture
from uyghur import uyghur_latin
from BaseModel import BaseModel
class UDS2W2LG(BaseModel):
def __init__(self,num_features_input,load_best=False):
super(UDS2W2LG, self).__init__('UDS2W2LG')
dropout = 0.1
self.conv = nn.Sequential(
nn.Conv2d(1, 32, kernel_size=(11, 11), stride=(2, 2), padding=(5, 5), bias=False),
nn.BatchNorm2d(32), nn.ReLU(inplace=True), nn.Dropout(dropout),
nn.Conv2d(32, 32, kernel_size=(5, 5), stride=(2, 2), padding=(2, 2), bias=False),
nn.BatchNorm2d(32), nn.ReLU(inplace=True), nn.Dropout(dropout)
)
self.lstm1 = nn.GRU(1024, 256, num_layers=1 , batch_first=True, bidirectional=True)
self.cnn1 = nn.Sequential(
ResB(256,11,5,0.2),
ResB(256,11,5,0.2),
ResB(256,11,5,0.2),
ResB(256,11,5,0.2),
ResB(256,11,5,0.2)
)
self.lstm2 = nn.GRU(256, 384, num_layers=2 , batch_first=True, bidirectional=True)
self.cnn2 = nn.Sequential(
ResB(384,13,6,0.2),
ResB(384,13,6,0.2),
ResB(384,13,6,0.2),
nn.Conv1d(384, 512, 17, 1,8,bias=False),
nn.BatchNorm1d(512),
nn.ReLU(),
nn.Dropout(0.2),
ResB(512,17,8,0.3),
ResB(512,17,8,0.3),
nn.Conv1d(512, 1024, 1, 1,bias=False),
nn.BatchNorm1d(1024),
nn.ReLU(),
nn.Dropout(0.3),
ResB(1024,1,0,0.0),
)
self.outlayer = nn.Conv1d(1024, uyghur_latin.vocab_size, 1, 1)
self.softMax = nn.LogSoftmax(dim=1)
print(" Model Name:", self.ModelName)
self.checkpoint = 'results/' + self.ModelName
self._loadfrom()
print(f'The model has {self.parameters_count(self):,} trainable parameters')
def smooth_labels(self, x):
return (1.0 - self.smoothing) * x + self.smoothing / x.size(-1)
def forward(self, x, lengths):
out_lens = lengths//4
x.unsqueeze_(1)
out = self.conv(x)
b, c, h, w = out.size()
out = out.view(b, c*h, w).contiguous() #.permute(0,2,1)
out = out.permute(0,2,1)
out = nn.utils.rnn.pack_padded_sequence(out, out_lens, batch_first=True)
out, _ = self.lstm1(out)
out, _ = nn.utils.rnn.pad_packed_sequence(out, batch_first=True)
out = (out[:, :, :self.lstm1.hidden_size] + out[:, :, self.lstm1.hidden_size:]).contiguous()
out = self.cnn1(out.permute(0,2,1))
out = out.permute(0,2,1)
out = nn.utils.rnn.pack_padded_sequence(out, out_lens, batch_first=True)
out,_ = self.lstm2(out)
out, _ = nn.utils.rnn.pad_packed_sequence(out, batch_first=True)
out = (out[:, :, :self.lstm2.hidden_size] + out[:, :, self.lstm2.hidden_size:]).contiguous()
out = self.cnn2(out.permute(0,2,1))
out = self.outlayer(out)
out = self.softMax(out)
return out, out_lens
class ResB(nn.Module):
def __init__(self, num_filters, kernel, pad, d = 0.4):
super().__init__()
self.conv = nn.Sequential(
nn.Conv1d(num_filters, num_filters, kernel_size = kernel, stride = 1 , padding=pad, bias=False),
nn.BatchNorm1d(num_filters)
)
self.relu = nn.ReLU()
self.bn = nn.BatchNorm1d(num_filters)
self.drop =nn.Dropout(d)
def forward(self, x):
identity = x
out = self.conv(x)
out += identity
out = self.bn(out)
out = self.relu(out)
out = self.drop(out)
return out
if __name__ == "__main__":
from data import featurelen, melfuture
device ="cpu"
net = UDS2W2LG(featurelen).to(device)
text = net.predict("test1.wav",device)
print(text)
text = net.predict("test2.wav",device)
print(text)
#net.best_cer = 1.0
#net.save(0)
melf = melfuture("test3.wav")
melf.unsqueeze_(0)
conv0 = nn.Conv1d(featurelen,256,11,2, 5, 1)
conv1 = nn.Conv1d(256,256,11,1, 5, 1)
conv3 = nn.Conv1d(256,256,11,1, 5*2, 2)
conv5 = nn.Conv1d(256,256,11,1, 5*3, 3)
out0 = conv0(melf)
out1 = conv1(out0)
out3 = conv3(out0)
out5 = conv5(out0)
print(out1.size())
print(out3.size())
print(out5.size())
out = out1 * out3 * out5
print(out.size())
#net = GCGCRes(featurelen).to(device)
#net.save(1)
#text = net.predict("test1.wav",device)
#print(text)
#text = net.predict("test2.wav",device)
#print(text)

157
UDS2W2LG1.py Normal file
View File

@ -0,0 +1,157 @@
import math
import torch
import torch.nn as nn
from torch.nn.functional import log_softmax
import torch.nn.functional as F
from torch.nn.parameter import Parameter
from data import melfuture
from uyghur import uyghur_latin
from BaseModel import BaseModel
class UDS2W2LG1(BaseModel):
def __init__(self,num_features_input,load_best=False):
super(UDS2W2LG1, self).__init__('UDS2W2LG1')
dropout = 0.1
#UDS2W2LG ning bu yerila ozgerdi
self.conv = nn.Sequential(
nn.Conv2d(1, 32, kernel_size=(41, 11), stride=(2, 2), padding=(20, 5), bias=False),
nn.BatchNorm2d(32),
nn.Hardtanh(0, 20, inplace=True),
nn.Conv2d(32, 32, kernel_size=(21, 11), stride=(2, 2), padding=(10, 5),bias=False),
nn.BatchNorm2d(32),
nn.Hardtanh(0, 20, inplace=True),
)
self.lstm1 = nn.GRU(1024, 256, num_layers=1 , batch_first=True, bidirectional=True)
self.cnn1 = nn.Sequential(
ResB(256,11,5,0.2),
ResB(256,11,5,0.2),
ResB(256,11,5,0.2),
ResB(256,11,5,0.2),
ResB(256,11,5,0.2)
)
self.lstm2 = nn.GRU(256, 384, num_layers=2 , batch_first=True, bidirectional=True)
self.cnn2 = nn.Sequential(
ResB(384,13,6,0.2),
ResB(384,13,6,0.2),
ResB(384,13,6,0.2),
nn.Conv1d(384, 512, 17, 1,8,bias=False),
nn.BatchNorm1d(512),
nn.ReLU(),
nn.Dropout(0.2),
ResB(512,17,8,0.3),
ResB(512,17,8,0.3),
nn.Conv1d(512, 1024, 1, 1,bias=False),
nn.BatchNorm1d(1024),
nn.ReLU(),
nn.Dropout(0.3),
ResB(1024,1,0,0.0),
)
self.outlayer = nn.Conv1d(1024, uyghur_latin.vocab_size, 1, 1)
self.softMax = nn.LogSoftmax(dim=1)
self.checkpoint = 'results/' + self.ModelName
self._load()
print(f'The model has {self.parameters_count(self):,} trainable parameters')
def smooth_labels(self, x):
return (1.0 - self.smoothing) * x + self.smoothing / x.size(-1)
def forward(self, x, lengths):
out_lens = lengths//4
x.unsqueeze_(1)
out = self.conv(x)
b, c, h, w = out.size()
out = out.view(b, c*h, w).contiguous() #.permute(0,2,1)
out = out.permute(0,2,1)
out = nn.utils.rnn.pack_padded_sequence(out, out_lens, batch_first=True)
out, _ = self.lstm1(out)
out, _ = nn.utils.rnn.pad_packed_sequence(out, batch_first=True)
out = (out[:, :, :self.lstm1.hidden_size] + out[:, :, self.lstm1.hidden_size:]).contiguous()
out = self.cnn1(out.permute(0,2,1))
out = out.permute(0,2,1)
out = nn.utils.rnn.pack_padded_sequence(out, out_lens, batch_first=True)
out,_ = self.lstm2(out)
out, _ = nn.utils.rnn.pad_packed_sequence(out, batch_first=True)
out = (out[:, :, :self.lstm2.hidden_size] + out[:, :, self.lstm2.hidden_size:]).contiguous()
out = self.cnn2(out.permute(0,2,1))
out = self.outlayer(out)
out = self.softMax(out)
return out, out_lens
class ResB(nn.Module):
def __init__(self, num_filters, kernel, pad, d = 0.4):
super().__init__()
self.conv = nn.Sequential(
nn.Conv1d(num_filters, num_filters, kernel_size = kernel, stride = 1 , padding=pad, bias=False),
nn.BatchNorm1d(num_filters)
)
self.relu = nn.ReLU()
self.bn = nn.BatchNorm1d(num_filters)
self.drop =nn.Dropout(d)
def forward(self, x):
identity = x
out = self.conv(x)
out += identity
out = self.bn(out)
out = self.relu(out)
out = self.drop(out)
return out
if __name__ == "__main__":
from data import featurelen, melfuture
device ="cpu"
net = UDS2W2LG1(featurelen).to(device)
text = net.predict("test1.wav",device)
print(text)
text = net.predict("test2.wav",device)
print(text)
#net.best_cer = 1.0
#net.save(0)
melf = melfuture("test3.wav")
melf.unsqueeze_(0)
conv0 = nn.Conv1d(featurelen,256,11,2, 5, 1)
conv1 = nn.Conv1d(256,256,11,1, 5, 1)
conv3 = nn.Conv1d(256,256,11,1, 5*2, 2)
conv5 = nn.Conv1d(256,256,11,1, 5*3, 3)
out0 = conv0(melf)
out1 = conv1(out0)
out3 = conv3(out0)
out5 = conv5(out0)
print(out1.size())
print(out3.size())
print(out5.size())
out = out1 * out3 * out5
print(out.size())
#net = GCGCRes(featurelen).to(device)
#net.save(1)
#text = net.predict("test1.wav",device)
#print(text)
#text = net.predict("test2.wav",device)
#print(text)

132
UDS2W2LGLU.py Normal file
View File

@ -0,0 +1,132 @@
import math
import torch
import torch.nn as nn
from torch.nn.functional import log_softmax
import torch.nn.functional as F
from torch.nn.parameter import Parameter
from data import melfuture
from uyghur import uyghur_latin
from BaseModel import BaseModel
class UDS2W2LGLU(BaseModel):
def __init__(self,num_features_input,load_best=False):
super(UDS2W2LGLU, self).__init__('UDS2W2LGLU')
self.smoothing = 0.01
self.conv = nn.Sequential(
nn.Conv2d(1, 32, kernel_size=(41, 11), stride=(2, 2), padding=(20, 5), bias=False),
nn.BatchNorm2d(32),
nn.Hardtanh(0, 20, inplace=True),
nn.Conv2d(32, 32, kernel_size=(21, 11), stride=(2, 1), padding=(10, 5),bias=False),
nn.BatchNorm2d(32),
nn.Hardtanh(0, 20, inplace=True),
)
self.lstm1 = nn.GRU(1024, 256, num_layers=1 , batch_first=True, bidirectional=True)
self.cnn1 = nn.Sequential(
nn.Conv1d(256, 256*2, 11, 2, 5,bias=False),
nn.BatchNorm1d(256*2),
nn.GLU(dim=1),
nn.Dropout(0.2),
ResBGLU(256,11,5,0.2),
ResBGLU(256,11,5,0.2),
ResBGLU(256,11,5,0.2),
ResBGLU(256,11,5,0.2)
)
self.lstm2 = nn.GRU(256, 384, num_layers=1 , batch_first=True, bidirectional=True)
self.cnn2 = nn.Sequential(
ResBGLU(384,13,6,0.2),
ResBGLU(384,13,6,0.2),
ResBGLU(384,13,6,0.2),
nn.Conv1d(384, 512*2, 17, 1,8,bias=False),
nn.BatchNorm1d(512*2),
nn.GLU(dim=1),
nn.Dropout(0.2),
ResBGLU(512,17,8,0.3),
ResBGLU(512,17,8,0.3),
nn.Conv1d(512, 1024*2, 1, 1,bias=False),
nn.BatchNorm1d(1024*2),
nn.GLU(dim=1),
nn.Dropout(0.3),
ResBGLU(1024,1,0,0.0),
)
self.outlayer = nn.Conv1d(1024, uyghur_latin.vocab_size, 1, 1)
self.softMax = nn.LogSoftmax(dim=1)
self.checkpoint = 'results/' + self.ModelName
self._load(load_best)
print(f'The model has {self.parameters_count(self):,} trainable parameters')
def smooth_labels(self, x):
sl = x.size(1)
return (1.0 - self.smoothing) * x + self.smoothing / sl
def forward(self, x, lengths):
out_lens = lengths//2
x.unsqueeze_(1)
out = self.conv(x)
b, c, h, w = out.size()
out = out.view(b, c*h, w).contiguous() #.permute(0,2,1)
out = out.permute(0,2,1)
out = nn.utils.rnn.pack_padded_sequence(out, out_lens, batch_first=True)
out, _ = self.lstm1(out)
out, _ = nn.utils.rnn.pad_packed_sequence(out, batch_first=True)
out = (out[:, :, :self.lstm1.hidden_size] + out[:, :, self.lstm1.hidden_size:]).contiguous()
out = self.cnn1(out.permute(0,2,1))
out_lens = out_lens//2
out = out.permute(0,2,1)
out = nn.utils.rnn.pack_padded_sequence(out, out_lens, batch_first=True)
out,_ = self.lstm2(out)
out, _ = nn.utils.rnn.pad_packed_sequence(out, batch_first=True)
out = (out[:, :, :self.lstm2.hidden_size] + out[:, :, self.lstm2.hidden_size:]).contiguous()
out = self.cnn2(out.permute(0,2,1))
out = self.outlayer(out)
#out = self.smooth_labels(out)
out = self.softMax(out)
return out, out_lens
class ResBGLU(nn.Module):
def __init__(self, num_filters, kernel, pad, d = 0.4):
super().__init__()
self.conv = nn.Sequential(
nn.Conv1d(num_filters, num_filters*2, kernel_size = kernel, stride = 1 , padding=pad, bias=False),
nn.BatchNorm1d(num_filters*2),
nn.GLU(dim=1)
)
self.fc = nn.Sequential(
nn.BatchNorm1d(num_filters),
nn.ReLU(),
nn.Dropout(d)
)
def forward(self, x):
identity = x
out = self.conv(x)
out += identity
out = self.fc(out)
return out
if __name__ == "__main__":
from data import featurelen, melfuture
device ="cpu"
net = UDS2W2LGLU(featurelen).to(device)
text = net.predict("test1.wav",device)
print(text)
text = net.predict("test2.wav",device)
print(text)
#net.best_cer = 1.0
#net.save(78)

135
UDS2W2LGLU8.py Normal file
View File

@ -0,0 +1,135 @@
import math
import torch
import torch.nn as nn
from torch.nn.functional import log_softmax
import torch.nn.functional as F
from torch.nn.parameter import Parameter
from data import melfuture
from uyghur import uyghur_latin
from BaseModel import BaseModel
class Swish(nn.Module):
def forward(self, x):
return x * x.sigmoid()
class Mish(nn.Module):
def forward(self, x):
#inlining this saves 1 second per epoch (V100 GPU) vs having a temp x and then returning x(!)
return x *( torch.tanh(F.softplus(x)))
class UDS2W2LGLU8(BaseModel):
def __init__(self,num_features_input,load_best=False):
super(UDS2W2LGLU8, self).__init__('UDS2W2LGLU8')
self.smoothing = 0.01
self.conv = nn.Sequential(
nn.Conv2d(1, 32, kernel_size=(41, 11), stride=(2, 2), padding=(20, 5), bias=False),
nn.BatchNorm2d(32),
nn.Hardtanh(0, 20, inplace=True),
nn.Conv2d(32, 32, kernel_size=(21, 11), stride=(2, 1), padding=(10, 5),bias=False),
nn.BatchNorm2d(32),
nn.Hardtanh(0, 20, inplace=True),
)
self.lstm1 = nn.GRU(1024, 256, num_layers=1 , batch_first=True, bidirectional=True)
self.cnn1 = nn.Sequential(
ResBGLU(256, 256, 11, 0.2, 2),
ResBGLU(256, 256, 11, 0.2),
ResBGLU(256, 256, 11, 0.2),
ResBGLU(256, 256, 11, 0.2),
ResBGLU(256, 256, 11, 0.2),
)
self.lstm2 = nn.GRU(256, 384, num_layers=1 , batch_first=True, bidirectional=True)
self.cnn2 = nn.Sequential(
ResBGLU(384, 384, 13, 0.2),
ResBGLU(384, 384, 13, 0.2),
ResBGLU(384, 384, 13, 0.2),
ResBGLU(384, 512, 17, 0.2),
ResBGLU(512, 512, 17, 0.3),
ResBGLU(512, 512, 1, 0.3),
)
self.outlayer = nn.Conv1d(512, uyghur_latin.vocab_size, 1, 1)
self.softMax = nn.LogSoftmax(dim=1)
self.checkpoint = 'results/' + self.ModelName
self._load(load_best)
print(f'The model has {self.parameters_count(self):,} trainable parameters')
def smooth_labels(self, x):
sl = x.size(1)
return (1.0 - self.smoothing) * x + self.smoothing / sl
def forward(self, x, lengths):
out_lens = lengths//2
x.unsqueeze_(1)
out = self.conv(x)
b, c, h, w = out.size()
out = out.view(b, c*h, w).contiguous() #.permute(0,2,1)
out = out.permute(0,2,1)
out = nn.utils.rnn.pack_padded_sequence(out, out_lens, batch_first=True)
out, _ = self.lstm1(out)
out, _ = nn.utils.rnn.pad_packed_sequence(out, batch_first=True)
out = (out[:, :, :self.lstm1.hidden_size] + out[:, :, self.lstm1.hidden_size:]).contiguous()
out = self.cnn1(out.permute(0,2,1))
out_lens = out_lens//2
out = out.permute(0,2,1)
out = nn.utils.rnn.pack_padded_sequence(out, out_lens, batch_first=True)
out,_ = self.lstm2(out)
out, _ = nn.utils.rnn.pad_packed_sequence(out, batch_first=True)
out = (out[:, :, :self.lstm2.hidden_size] + out[:, :, self.lstm2.hidden_size:]).contiguous()
out = self.cnn2(out.permute(0,2,1))
out = self.outlayer(out)
#out = self.smooth_labels(out)
out = self.softMax(out)
return out, out_lens
class ResBGLU(nn.Module):
def __init__(self, in_channel, out_channel, kernel, d = 0.4, stride = 1):
super().__init__()
self.isRes = (in_channel == out_channel and stride == 1)
pad = (kernel-1)//2
self.conv = nn.Sequential(
nn.Conv1d(in_channel, out_channel*2, kernel_size = kernel, stride = stride , padding=pad, bias=False),
nn.BatchNorm1d(out_channel*2),
nn.GLU(dim=1)
)
self.fc = nn.Sequential(
nn.BatchNorm1d(out_channel),
Mish(),
)
self.drop = nn.Dropout(d)
def forward(self, x):
out = self.conv(x)
if self.isRes:
out = self.fc(out+x)
out = self.drop(out)
return out
if __name__ == "__main__":
from data import featurelen, melfuture
device ="cpu"
net = UDS2W2LGLU8(featurelen).to(device)
text = net.predict("test1.wav",device)
print(text)
text = net.predict("test2.wav",device)
print(text)
#net.best_cer = 1.0
#net.save(78)

610
UFormerCTC1N.py Normal file
View File

@ -0,0 +1,610 @@
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.nn.init as I
import numpy as np
import math
from BaseModel import BaseModel
from data import melfuture
from uyghur import uyghur_latin
class UFormerCTC1N(BaseModel):
def __init__(self, num_features_input, load_best=False):
super(UFormerCTC1N, self).__init__('UFormerCTC1N')
num_layers = 1 #'Number of layers'
num_heads = 8 #'Number of heads'
dim_model = 768 #'Model dimension'
dim_key = 96 #'Key dimension'
dim_value = 96 #'Value dimension'
dim_inner = 1024 #'Inner dimension'
dim_emb = 768 #'Embedding dimension'
src_max_len = 2500 #'Source max length'
tgt_max_len = 1000 #'Target max length'
dropout = 0.1
emb_trg_sharing = False
self.flayer = UDS2W2L8(num_features_input)
self.encoder = Encoder(num_layers, num_heads=num_heads, dim_model=dim_model, dim_key=dim_key, dim_value=dim_value, dim_inner=dim_inner, src_max_length=src_max_len, dropout=dropout)
self.decoder = Decoder(num_layers=num_layers, num_heads=num_heads, dim_emb=dim_emb, dim_model=dim_model, dim_inner=dim_inner, dim_key=dim_key, dim_value=dim_value, trg_max_length=tgt_max_len, dropout=dropout, emb_trg_sharing=emb_trg_sharing)
for p in self.parameters():
if p.dim() > 1:
nn.init.xavier_uniform_(p)
self.ctcOut = None
self.ctcLen = None
self.checkpoint = "results/" + self.ModelName
self._load()
print(" Model Name:", self.ModelName)
print(f'The model has {self.parameters_count(self):,} trainable parameters')
print(f' Future has {self.parameters_count(self.flayer):,} trainable parameters')
print(f' Encoder has {self.parameters_count(self.encoder):,} trainable parameters')
print(f' Decoder has {self.parameters_count(self.decoder):,} trainable parameters')
def forward(self, padded_input, input_lengths, padded_target):
padded_input,self.ctcOut, self.ctcLen = self.flayer(padded_input,input_lengths)
#input must be #B x T x F format
encoder_padded_outputs, _ = self.encoder(padded_input, self.ctcLen) # BxTxH or #B x T x F
seq_in_pad, gold = self.preprocess(padded_target)
pred = self.decoder(seq_in_pad, encoder_padded_outputs, self.ctcLen)
return pred, gold
def greedydecode(self, pred, len=0):
_, pred = torch.topk(pred, 1, dim=2)
preds = pred.squeeze(2)
strs_pred = [uyghur_latin.decode(pred_id) for pred_id in preds]
return strs_pred
def predict(self,wavfile, device):
self.eval()
spec = melfuture(wavfile).unsqueeze(0).to(device)
spec_len = torch.tensor([spec.shape[2]], dtype=torch.int)
padded_input,self.ctcOut, self.ctcLen = self.flayer(spec,spec_len)
encoder_padded_outputs, _ = self.encoder(padded_input, self.ctcLen) # BxTxH or #B x T x F
strs_hyps = self.decoder.greedy_search(encoder_padded_outputs)
return strs_hyps
class ResB(nn.Module):
def __init__(self, in_channel, out_channel, kernel, d = 0.4, stride = 1):
super().__init__()
self.isRes = (in_channel == out_channel and stride == 1)
pad = (kernel-1)//2
self.conv = nn.Sequential(
nn.Conv1d(in_channel, out_channel, kernel_size = kernel, stride = stride , padding=pad, bias=False),
nn.BatchNorm1d(out_channel),
nn.ReLU(),
)
self.bn = nn.BatchNorm1d(out_channel)
self.actfn = nn.ReLU()
self.drop = nn.Dropout(d)
def forward(self, x):
out = self.conv(x)
if self.isRes:
out = self.bn(out+x)
out = self.actfn(out)
out = self.drop(out)
return out
class UDS2W2L8(nn.Module):
def __init__(self, num_features_input):
super(UDS2W2L8, self).__init__()
self.conv = nn.Sequential(
nn.Conv2d(1, 32, kernel_size=(41, 11), stride=(2, 2), padding=(20, 5), bias=False),
nn.BatchNorm2d(32),
nn.Hardtanh(0, 20, inplace=True),
nn.Conv2d(32, 32, kernel_size=(21, 11), stride=(2, 1), padding=(10, 5),bias=False),
nn.BatchNorm2d(32),
nn.Hardtanh(0, 20, inplace=True),
)
self.lstm1 = nn.GRU(1024, 256, num_layers=1 , batch_first=True, bidirectional=True)
self.cnn1 = nn.Sequential(
ResB(256, 256, 11, 0.2,2),
ResB(256, 256, 11, 0.2),
ResB(256, 256, 11, 0.2),
ResB(256, 256, 11, 0.2),
ResB(256, 256, 11, 0.2)
)
self.lstm2 = nn.GRU(256, 384, num_layers=1 , batch_first=True, bidirectional=True)
self.cnn2 = nn.Sequential(
ResB(384,384,13,0.2),
ResB(384,384,13,0.2),
ResB(384,384,13,0.2),
ResB(384,512,17,0.2),
ResB(512,512,17,0.3),
ResB(512,512,17,0.3),
ResB(512,768, 1,0.3),
ResB(768,768, 1,0.0),
)
self.outlayer = nn.Conv1d(768, uyghur_latin.vocab_size, 1, 1)
self.softMax = nn.LogSoftmax(dim=1)
def forward(self, x, lengths):
out_lens = lengths//2
x.unsqueeze_(1)
out = self.conv(x)
b, c, h, w = out.size()
out = out.view(b, c*h, w).contiguous() #.permute(0,2,1)
out = out.permute(0,2,1)
out = nn.utils.rnn.pack_padded_sequence(out, out_lens, batch_first=True)
out, _ = self.lstm1(out)
out, _ = nn.utils.rnn.pad_packed_sequence(out, batch_first=True)
out = (out[:, :, :self.lstm1.hidden_size] + out[:, :, self.lstm1.hidden_size:]).contiguous()
out = self.cnn1(out.permute(0,2,1))
out_lens = out_lens//2
out = out.permute(0,2,1)
out = nn.utils.rnn.pack_padded_sequence(out, out_lens, batch_first=True)
out,_ = self.lstm2(out)
out, _ = nn.utils.rnn.pad_packed_sequence(out, batch_first=True)
out = (out[:, :, :self.lstm2.hidden_size] + out[:, :, self.lstm2.hidden_size:]).contiguous()
out = self.cnn2(out.permute(0,2,1))
outctc = self.softMax(self.outlayer(out))
return out.contiguous().permute(0,2,1), outctc, out_lens
def load(self):
pack = torch.load('results/UDS2W2L8_last.pth', map_location='cpu')
sdict = pack['st_dict']
news_dict = self.state_dict()
filtered_dict = {k: v for k, v in sdict.items() if k in news_dict and v.size() == news_dict[k].size()}
news_dict.update(filtered_dict)
self.load_state_dict(news_dict)
class Encoder(nn.Module):
"""
Encoder Transformer class
"""
def __init__(self, num_layers, num_heads, dim_model, dim_key, dim_value, dim_inner, dropout=0.1, src_max_length=2500):
super(Encoder, self).__init__()
self.num_layers = num_layers
self.num_heads = num_heads
self.dim_model = dim_model
self.dim_key = dim_key
self.dim_value = dim_value
self.dim_inner = dim_inner
self.src_max_length = src_max_length
self.dropout = nn.Dropout(dropout)
self.dropout_rate = dropout
self.positional_encoding = PositionalEncoding(dim_model, src_max_length)
self.layers = nn.ModuleList([
EncoderLayer(num_heads, dim_model, dim_inner, dim_key, dim_value, dropout=dropout) for _ in range(num_layers)
])
def forward(self, padded_input, input_lengths):
"""
args:
padded_input: B x T x D
input_lengths: B
return:
output: B x T x H
"""
encoder_self_attn_list = []
# Prepare masks
non_pad_mask = get_non_pad_mask(padded_input, input_lengths=input_lengths) # B x T x D
seq_len = padded_input.size(1)
self_attn_mask = get_attn_pad_mask(padded_input, input_lengths, seq_len) # B x T x T
pos = self.positional_encoding(padded_input)
encoder_output = padded_input + pos
for layer in self.layers:
encoder_output, self_attn = layer(encoder_output, non_pad_mask=non_pad_mask, self_attn_mask=self_attn_mask)
encoder_self_attn_list += [self_attn]
return encoder_output, encoder_self_attn_list
class EncoderLayer(nn.Module):
"""
Encoder Layer Transformer class
"""
def __init__(self, num_heads, dim_model, dim_inner, dim_key, dim_value, dropout=0.1):
super(EncoderLayer, self).__init__()
self.self_attn = MultiHeadAttention(num_heads, dim_model, dim_key, dim_value, dropout=dropout)
self.pos_ffn = PositionwiseFeedForwardWithConv(dim_model, dim_inner, dropout=dropout)
def forward(self, enc_input, non_pad_mask=None, self_attn_mask=None):
enc_output, self_attn = self.self_attn(enc_input, enc_input, enc_input, mask=self_attn_mask)
enc_output *= non_pad_mask
enc_output = self.pos_ffn(enc_output)
enc_output *= non_pad_mask
return enc_output, self_attn
class Decoder(nn.Module):
"""
Decoder Layer Transformer class
"""
def __init__(self, num_layers, num_heads, dim_emb, dim_model, dim_inner, dim_key, dim_value, dropout=0.1, trg_max_length=1000, emb_trg_sharing=False):
super(Decoder, self).__init__()
self.num_trg_vocab = uyghur_latin.vocab_size
self.num_layers = num_layers
self.num_heads = num_heads
self.dim_emb = dim_emb
self.dim_model = dim_model
self.dim_inner = dim_inner
self.dim_key = dim_key
self.dim_value = dim_value
self.dropout_rate = dropout
self.emb_trg_sharing = emb_trg_sharing
self.trg_max_length = trg_max_length
self.trg_embedding = nn.Embedding(self.num_trg_vocab, dim_emb, padding_idx=uyghur_latin.pad_idx)
self.positional_encoding = PositionalEncoding(dim_model, trg_max_length)
self.dropout = nn.Dropout(dropout)
self.layers = nn.ModuleList([
DecoderLayer(dim_model, dim_inner, num_heads,dim_key, dim_value, dropout=dropout)
for _ in range(num_layers)
])
self.output_linear = nn.Linear(dim_model, self.num_trg_vocab, bias=False)
nn.init.xavier_normal_(self.output_linear.weight)
if emb_trg_sharing:
self.output_linear.weight = self.trg_embedding.weight
self.x_logit_scale = (dim_model ** -0.5)
else:
self.x_logit_scale = 1.0
def forward(self, seq_in_pad, encoder_padded_outputs, encoder_input_lengths):
"""
args:
padded_input: B x T
encoder_padded_outputs: B x T x H
encoder_input_lengths: B
returns:
pred: B x T x vocab
gold: B x T
"""
decoder_self_attn_list, decoder_encoder_attn_list = [], []
# Prepare masks
non_pad_mask = get_non_pad_mask(seq_in_pad, pad_idx=uyghur_latin.pad_idx)
self_attn_mask_subseq = get_subsequent_mask(seq_in_pad)
self_attn_mask_keypad = get_attn_key_pad_mask(seq_k=seq_in_pad, seq_q=seq_in_pad, pad_idx=uyghur_latin.pad_idx)
self_attn_mask = (self_attn_mask_keypad + self_attn_mask_subseq).gt(0)
output_length = seq_in_pad.size(1)
dec_enc_attn_mask = get_attn_pad_mask(encoder_padded_outputs, encoder_input_lengths, output_length)
decoder_output = self.dropout(self.trg_embedding(seq_in_pad) * self.x_logit_scale + self.positional_encoding(seq_in_pad))
for layer in self.layers:
decoder_output, decoder_self_attn, decoder_enc_attn = layer(decoder_output, encoder_padded_outputs, non_pad_mask=non_pad_mask, self_attn_mask=self_attn_mask, dec_enc_attn_mask=dec_enc_attn_mask)
decoder_self_attn_list += [decoder_self_attn]
decoder_encoder_attn_list += [decoder_enc_attn]
seq_logit = self.output_linear(decoder_output)
return seq_logit
def greedy_search(self, encoder_padded_outputs):
"""
Greedy search, decode 1-best utterance
args:
encoder_padded_outputs: B x T x H
output:
batch_ids_nbest_hyps: list of nbest in ids (size B)
batch_strs_nbest_hyps: list of nbest in strings (size B)
"""
with torch.no_grad():
device = encoder_padded_outputs.device
max_seq_len = self.trg_max_length
#ys = torch.ones(encoder_padded_outputs.size(0),1).fill_(uyghur_latin.sos_idx).long().to(device) # batch_size x 1
max_seq_len = min(max_seq_len, encoder_padded_outputs.size(1))
inps=[uyghur_latin.sos_idx]
result = []
for t in range(max_seq_len):
ys = torch.LongTensor(inps).unsqueeze(0).to(device)
non_pad_mask = torch.ones_like(ys).float().unsqueeze(-1) # batch_size x t x 1
self_attn_mask = get_subsequent_mask(ys).gt(0) # batch_size x t x t
decoder_output = self.dropout(self.trg_embedding(ys) * self.x_logit_scale + self.positional_encoding(ys))
for layer in self.layers:
decoder_output, _, _ = layer(
decoder_output, encoder_padded_outputs,
non_pad_mask=non_pad_mask,
self_attn_mask=self_attn_mask,
dec_enc_attn_mask=None
)
prob = self.output_linear(decoder_output) # batch_size x t x label_size
_, next_word = torch.max(prob[:, -1], dim=1)
next_word = next_word.item()
result.append(next_word)
if next_word == uyghur_latin.eos_idx:
break
inps.append(next_word)
sent = uyghur_latin.decode(result)
return sent
class DecoderLayer(nn.Module):
"""
Decoder Transformer class
"""
def __init__(self, dim_model, dim_inner, num_heads, dim_key, dim_value, dropout=0.1):
super(DecoderLayer, self).__init__()
self.self_attn = MultiHeadAttention(
num_heads, dim_model, dim_key, dim_value, dropout=dropout)
self.encoder_attn = MultiHeadAttention(
num_heads, dim_model, dim_key, dim_value, dropout=dropout)
self.pos_ffn = PositionwiseFeedForwardWithConv(
dim_model, dim_inner, dropout=dropout)
def forward(self, decoder_input, encoder_output, non_pad_mask=None, self_attn_mask=None, dec_enc_attn_mask=None):
decoder_output, decoder_self_attn = self.self_attn(decoder_input, decoder_input, decoder_input, mask=self_attn_mask)
decoder_output *= non_pad_mask
decoder_output, decoder_encoder_attn = self.encoder_attn(decoder_output, encoder_output, encoder_output, mask=dec_enc_attn_mask)
decoder_output *= non_pad_mask
decoder_output = self.pos_ffn(decoder_output)
decoder_output *= non_pad_mask
return decoder_output, decoder_self_attn, decoder_encoder_attn
"""
Transformer common layers
"""
def get_non_pad_mask(padded_input, input_lengths=None, pad_idx=None):
"""
padding position is set to 0, either use input_lengths or pad_idx
"""
assert input_lengths is not None or pad_idx is not None
if input_lengths is not None:
# padded_input: N x T x ..
N = padded_input.size(0)
non_pad_mask = padded_input.new_ones(padded_input.size()[:-1]) # B x T
for i in range(N):
non_pad_mask[i, input_lengths[i]:] = 0
if pad_idx is not None:
# padded_input: N x T
assert padded_input.dim() == 2
non_pad_mask = padded_input.ne(pad_idx).float()
# unsqueeze(-1) for broadcast
return non_pad_mask.unsqueeze(-1)
def get_attn_key_pad_mask(seq_k, seq_q, pad_idx):
"""
For masking out the padding part of key sequence.
"""
# Expand to fit the shape of key query attention matrix.
len_q = seq_q.size(1)
padding_mask = seq_k.eq(pad_idx)
padding_mask = padding_mask.unsqueeze(1).expand(-1, len_q, -1).byte() # B x T_Q x T_K
return padding_mask
def get_attn_pad_mask(padded_input, input_lengths, expand_length):
"""mask position is set to 1"""
# N x Ti x 1
non_pad_mask = get_non_pad_mask(padded_input, input_lengths=input_lengths)
# N x Ti, lt(1) like not operation
pad_mask = non_pad_mask.squeeze(-1).lt(1)
attn_mask = pad_mask.unsqueeze(1).expand(-1, expand_length, -1)
return attn_mask
def get_subsequent_mask(seq):
''' For masking out the subsequent info. '''
sz_b, len_s = seq.size()
subsequent_mask = torch.triu(
torch.ones((len_s, len_s), device=seq.device, dtype=torch.uint8), diagonal=1)
subsequent_mask = subsequent_mask.unsqueeze(0).expand(sz_b, -1, -1) # b x ls x ls
return subsequent_mask
class PositionalEncoding(nn.Module):
"""
Positional Encoding class
"""
def __init__(self, dim_model, max_length=2000):
super(PositionalEncoding, self).__init__()
pe = torch.zeros(max_length, dim_model, requires_grad=False)
position = torch.arange(0, max_length).unsqueeze(1).float()
exp_term = torch.exp(torch.arange(0, dim_model, 2).float() * -(math.log(10000.0) / dim_model))
pe[:, 0::2] = torch.sin(position * exp_term) # take the odd (jump by 2)
pe[:, 1::2] = torch.cos(position * exp_term) # take the even (jump by 2)
pe = pe.unsqueeze(0)
self.register_buffer('pe', pe)
def forward(self, input):
"""
args:
input: B x T x D
output:
tensor: B x T
"""
return self.pe[:, :input.size(1)]
class PositionwiseFeedForward(nn.Module):
"""
Position-wise Feedforward Layer class
FFN(x) = max(0, xW1 + b1) W2+ b2
"""
def __init__(self, dim_model, dim_ff, dropout=0.1):
super(PositionwiseFeedForward, self).__init__()
self.linear_1 = nn.Linear(dim_model, dim_ff)
self.linear_2 = nn.Linear(dim_ff, dim_model)
self.dropout = nn.Dropout(dropout)
self.layer_norm = nn.LayerNorm(dim_model)
def forward(self, x):
"""
args:
x: tensor
output:
y: tensor
"""
residual = x
output = self.dropout(self.linear_2(F.relu(self.linear_1(x))))
output = self.layer_norm(output + residual)
return output
class PositionwiseFeedForwardWithConv(nn.Module):
"""
Position-wise Feedforward Layer Implementation with Convolution class
"""
def __init__(self, dim_model, dim_hidden, dropout=0.1):
super(PositionwiseFeedForwardWithConv, self).__init__()
self.conv_1 = nn.Conv1d(dim_model, dim_hidden, 1)
self.conv_2 = nn.Conv1d(dim_hidden, dim_model, 1)
self.dropout = nn.Dropout(dropout)
self.layer_norm = nn.LayerNorm(dim_model)
def forward(self, x):
residual = x
output = x.transpose(1, 2)
output = self.conv_2(F.relu(self.conv_1(output)))
output = output.transpose(1, 2)
output = self.dropout(output)
output = self.layer_norm(output + residual)
return output
class MultiHeadAttention(nn.Module):
def __init__(self, num_heads, dim_model, dim_key, dim_value, dropout=0.1):
super(MultiHeadAttention, self).__init__()
self.num_heads = num_heads
self.dim_model = dim_model
self.dim_key = dim_key
self.dim_value = dim_value
self.query_linear = nn.Linear(dim_model, num_heads * dim_key)
self.key_linear = nn.Linear(dim_model, num_heads * dim_key)
self.value_linear = nn.Linear(dim_model, num_heads * dim_value)
nn.init.normal_(self.query_linear.weight, mean=0, std=np.sqrt(2.0 / (self.dim_model + self.dim_key)))
nn.init.normal_(self.key_linear.weight, mean=0, std=np.sqrt(2.0 / (self.dim_model + self.dim_key)))
nn.init.normal_(self.value_linear.weight, mean=0, std=np.sqrt(2.0 / (self.dim_model + self.dim_value)))
self.attention = ScaledDotProductAttention(temperature=np.power(dim_key, 0.5), attn_dropout=dropout)
self.layer_norm = nn.LayerNorm(dim_model)
self.output_linear = nn.Linear(num_heads * dim_value, dim_model)
nn.init.xavier_normal_(self.output_linear.weight)
self.dropout = nn.Dropout(dropout)
def forward(self, query, key, value, mask=None):
"""
query: B x T_Q x H, key: B x T_K x H, value: B x T_V x H
mask: B x T x T (attention mask)
"""
batch_size, len_query, _ = query.size()
batch_size, len_key, _ = key.size()
batch_size, len_value, _ = value.size()
residual = query
query = self.query_linear(query).view(batch_size, len_query, self.num_heads, self.dim_key) # B x T_Q x num_heads x H_K
key = self.key_linear(key).view(batch_size, len_key, self.num_heads, self.dim_key) # B x T_K x num_heads x H_K
value = self.value_linear(value).view(batch_size, len_value, self.num_heads, self.dim_value) # B x T_V x num_heads x H_V
query = query.permute(2, 0, 1, 3).contiguous().view(-1, len_query, self.dim_key) # (num_heads * B) x T_Q x H_K
key = key.permute(2, 0, 1, 3).contiguous().view(-1, len_key, self.dim_key) # (num_heads * B) x T_K x H_K
value = value.permute(2, 0, 1, 3).contiguous().view(-1, len_value, self.dim_value) # (num_heads * B) x T_V x H_V
if mask is not None:
mask = mask.repeat(self.num_heads, 1, 1) # (B * num_head) x T x T
output, attn = self.attention(query, key, value, mask=mask)
output = output.view(self.num_heads, batch_size, len_query, self.dim_value) # num_heads x B x T_Q x H_V
output = output.permute(1, 2, 0, 3).contiguous().view(batch_size, len_query, -1) # B x T_Q x (num_heads * H_V)
output = self.dropout(self.output_linear(output)) # B x T_Q x H_O
output = self.layer_norm(output + residual)
return output, attn
class ScaledDotProductAttention(nn.Module):
''' Scaled Dot-Product Attention '''
def __init__(self, temperature, attn_dropout=0.1):
super().__init__()
self.temperature = temperature
self.dropout = nn.Dropout(attn_dropout)
self.softmax = nn.Softmax(dim=2)
def forward(self, q, k, v, mask=None):
"""
"""
attn = torch.bmm(q, k.transpose(1, 2))
attn = attn / self.temperature
if mask is not None:
attn = attn.masked_fill(mask, -np.inf)
attn = self.softmax(attn)
attn = self.dropout(attn)
output = torch.bmm(attn, v)
return output, attn
if __name__ == "__main__":
from data import melfuture, featurelen, uyghur_latin, SpeechDataset, _collate_fn
device = 'cuda'
model = UFormerCTC1N(featurelen,uyghur_latin)
model.to(device)
model.save(0)
txt = model.predict("test3.wav", device)
print(txt)
txt = model.predict("test4.wav", device)
print(txt)
train_dataset = SpeechDataset('uyghur_thuyg20_train_small.csv', augumentation=False)
bbb = []
bbb.append(train_dataset[0])
bbb.append(train_dataset[3])
bbb.append(train_dataset[4])
inps, targs, in_lens,_,_ = _collate_fn(bbb)
model.train()
outs, trg = model(inps.to(device),in_lens, targs.to(device))
print(outs.size())
print(trg.size())

615
UFormerCTC3N.py Normal file
View File

@ -0,0 +1,615 @@
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.nn.init as I
import numpy as np
import math
from BaseModel import BaseModel
from data import melfuture
from uyghur import uyghur_latin
class UFormerCTC3N(BaseModel):
def __init__(self, num_features_input, load_best=False):
super(UFormerCTC3N, self).__init__('UFormerCTC3N')
num_layers = 3 #'Number of layers'
num_heads = 8 #'Number of heads'
dim_model = 768 #'Model dimension'
dim_key = 96 #'Key dimension'
dim_value = 96 #'Value dimension'
dim_inner = 1024 #'Inner dimension'
dim_emb = 768 #'Embedding dimension'
src_max_len = 2500 #'Source max length'
tgt_max_len = 1000 #'Target max length'
dropout = 0.1
emb_trg_sharing = False
self.flayer = UDS2W2L8(num_features_input)
self.encoder = Encoder(num_layers, num_heads=num_heads, dim_model=dim_model, dim_key=dim_key, dim_value=dim_value, dim_inner=dim_inner, src_max_length=src_max_len, dropout=dropout)
self.decoder = Decoder(num_layers=num_layers, num_heads=num_heads, dim_emb=dim_emb, dim_model=dim_model, dim_inner=dim_inner, dim_key=dim_key, dim_value=dim_value, trg_max_length=tgt_max_len, dropout=dropout, emb_trg_sharing=emb_trg_sharing)
for p in self.parameters():
if p.dim() > 1:
nn.init.xavier_uniform_(p)
self.ctcOut = None
self.ctcLen = None
self.checkpoint = "results/" + self.ModelName
self._load(load_best)
print(" Model Name:", self.ModelName)
print(f'The model has {self.parameters_count(self):,} trainable parameters')
print(f' Future has {self.parameters_count(self.flayer):,} trainable parameters')
print(f' Encoder has {self.parameters_count(self.encoder):,} trainable parameters')
print(f' Decoder has {self.parameters_count(self.decoder):,} trainable parameters')
def forward(self, padded_input, input_lengths, padded_target):
padded_input,self.ctcOut, self.ctcLen = self.flayer(padded_input,input_lengths)
#input must be #B x T x F format
encoder_padded_outputs, _ = self.encoder(padded_input, self.ctcLen) # BxTxH or #B x T x F
seq_in_pad, gold = self.preprocess(padded_target)
pred = self.decoder(seq_in_pad, encoder_padded_outputs, self.ctcLen)
return pred, gold
def greedydecode(self, pred, len=0):
_, pred = torch.topk(pred, 1, dim=2)
preds = pred.squeeze(2)
strs_pred = [uyghur_latin.decode(pred_id) for pred_id in preds]
return strs_pred
def predict(self,wavfile, device):
self.eval()
spec = melfuture(wavfile).unsqueeze(0).to(device)
spec_len = torch.tensor([spec.shape[2]], dtype=torch.int)
padded_input,self.ctcOut, self.ctcLen = self.flayer(spec,spec_len)
encoder_padded_outputs, _ = self.encoder(padded_input, self.ctcLen) # BxTxH or #B x T x F
strs_hyps = self.decoder.greedy_search(encoder_padded_outputs)
return strs_hyps
class ResB(nn.Module):
def __init__(self, num_filters, kernel, pad, d = 0.4):
super().__init__()
self.conv = nn.Sequential(
nn.Conv1d(num_filters, num_filters, kernel_size = kernel, stride = 1 , padding=pad, bias=False),
nn.BatchNorm1d(num_filters)
)
self.relu = nn.ReLU()
self.bn = nn.BatchNorm1d(num_filters)
self.drop =nn.Dropout(d)
def forward(self, x):
identity = x
out = self.conv(x)
out += identity
out = self.bn(out)
out = self.relu(out)
out = self.drop(out)
return out
class UDS2W2L8(nn.Module):
def __init__(self, num_features_input):
super(UDS2W2L8, self).__init__()
self.conv = nn.Sequential(
nn.Conv2d(1, 32, kernel_size=(41, 11), stride=(2, 2), padding=(20, 5), bias=False),
nn.BatchNorm2d(32),
nn.Hardtanh(0, 20, inplace=True),
nn.Conv2d(32, 32, kernel_size=(21, 11), stride=(2, 1), padding=(10, 5),bias=False),
nn.BatchNorm2d(32),
nn.Hardtanh(0, 20, inplace=True),
)
self.lstm1 = nn.GRU(1024, 256, num_layers=1 , batch_first=True, bidirectional=True)
self.cnn1 = nn.Sequential(
nn.Conv1d(256, 256, 11, 2, 5,bias=False),
nn.BatchNorm1d(256),
nn.ReLU(),
nn.Dropout(0.2),
ResB(256,11,5,0.2),
ResB(256,11,5,0.2),
ResB(256,11,5,0.2),
ResB(256,11,5,0.2)
)
self.lstm2 = nn.GRU(256, 384, num_layers=1 , batch_first=True, bidirectional=True)
self.cnn2 = nn.Sequential(
ResB(384,13,6,0.2),
ResB(384,13,6,0.2),
ResB(384,13,6,0.2),
nn.Conv1d(384, 512, 17, 1,8,bias=False),
nn.BatchNorm1d(512),
nn.ReLU(),
nn.Dropout(0.2),
ResB(512,17,8,0.3),
ResB(512,17,8,0.3),
nn.Conv1d(512, 768, 1, 1,bias=False),
nn.BatchNorm1d(768),
nn.ReLU(),
nn.Dropout(0.3),
ResB(768,1,0,0.0),
)
self.outlayer = nn.Conv1d(768, uyghur_latin.vocab_size, 1, 1)
self.softMax = nn.LogSoftmax(dim=1)
def forward(self, x, lengths):
out_lens = lengths//2
x.unsqueeze_(1)
out = self.conv(x)
b, c, h, w = out.size()
out = out.view(b, c*h, w).contiguous() #.permute(0,2,1)
out = out.permute(0,2,1)
#out = nn.utils.rnn.pack_padded_sequence(out, out_lens, batch_first=True)
out, _ = self.lstm1(out)
#out, _ = nn.utils.rnn.pad_packed_sequence(out, batch_first=True)
out = (out[:, :, :self.lstm1.hidden_size] + out[:, :, self.lstm1.hidden_size:]).contiguous()
out = self.cnn1(out.permute(0,2,1))
out_lens = out_lens//2
out = out.permute(0,2,1)
#out = nn.utils.rnn.pack_padded_sequence(out, out_lens, batch_first=True)
out,_ = self.lstm2(out)
#out, _ = nn.utils.rnn.pad_packed_sequence(out, batch_first=True)
out = (out[:, :, :self.lstm2.hidden_size] + out[:, :, self.lstm2.hidden_size:]).contiguous()
out = self.cnn2(out.permute(0,2,1))
outctc = self.softMax(self.outlayer(out))
return out.contiguous().permute(0,2,1), outctc, out_lens
def load(self):
pack = torch.load('results/UDS2W2L8_last.pth', map_location='cpu')
sdict = pack['st_dict']
news_dict = self.state_dict()
filtered_dict = {k: v for k, v in sdict.items() if k in news_dict and v.size() == news_dict[k].size()}
news_dict.update(filtered_dict)
self.load_state_dict(news_dict)
class Encoder(nn.Module):
"""
Encoder Transformer class
"""
def __init__(self, num_layers, num_heads, dim_model, dim_key, dim_value, dim_inner, dropout=0.1, src_max_length=2500):
super(Encoder, self).__init__()
self.num_layers = num_layers
self.num_heads = num_heads
self.dim_model = dim_model
self.dim_key = dim_key
self.dim_value = dim_value
self.dim_inner = dim_inner
self.src_max_length = src_max_length
self.dropout = nn.Dropout(dropout)
self.dropout_rate = dropout
self.positional_encoding = PositionalEncoding(dim_model, src_max_length)
self.layers = nn.ModuleList([
EncoderLayer(num_heads, dim_model, dim_inner, dim_key, dim_value, dropout=dropout) for _ in range(num_layers)
])
def forward(self, padded_input, input_lengths):
"""
args:
padded_input: B x T x D
input_lengths: B
return:
output: B x T x H
"""
encoder_self_attn_list = []
# Prepare masks
non_pad_mask = get_non_pad_mask(padded_input, input_lengths=input_lengths) # B x T x D
seq_len = padded_input.size(1)
self_attn_mask = get_attn_pad_mask(padded_input, input_lengths, seq_len) # B x T x T
pos = self.positional_encoding(padded_input)
encoder_output = padded_input + pos
for layer in self.layers:
encoder_output, self_attn = layer(encoder_output, non_pad_mask=non_pad_mask, self_attn_mask=self_attn_mask)
encoder_self_attn_list += [self_attn]
return encoder_output, encoder_self_attn_list
class EncoderLayer(nn.Module):
"""
Encoder Layer Transformer class
"""
def __init__(self, num_heads, dim_model, dim_inner, dim_key, dim_value, dropout=0.1):
super(EncoderLayer, self).__init__()
self.self_attn = MultiHeadAttention(num_heads, dim_model, dim_key, dim_value, dropout=dropout)
self.pos_ffn = PositionwiseFeedForwardWithConv(dim_model, dim_inner, dropout=dropout)
def forward(self, enc_input, non_pad_mask=None, self_attn_mask=None):
enc_output, self_attn = self.self_attn(enc_input, enc_input, enc_input, mask=self_attn_mask)
enc_output *= non_pad_mask
enc_output = self.pos_ffn(enc_output)
enc_output *= non_pad_mask
return enc_output, self_attn
class Decoder(nn.Module):
"""
Decoder Layer Transformer class
"""
def __init__(self, num_layers, num_heads, dim_emb, dim_model, dim_inner, dim_key, dim_value, dropout=0.1, trg_max_length=1000, emb_trg_sharing=False):
super(Decoder, self).__init__()
self.num_trg_vocab = uyghur_latin.vocab_size
self.num_layers = num_layers
self.num_heads = num_heads
self.dim_emb = dim_emb
self.dim_model = dim_model
self.dim_inner = dim_inner
self.dim_key = dim_key
self.dim_value = dim_value
self.dropout_rate = dropout
self.emb_trg_sharing = emb_trg_sharing
self.trg_max_length = trg_max_length
self.trg_embedding = nn.Embedding(self.num_trg_vocab, dim_emb, padding_idx=uyghur_latin.pad_idx)
self.positional_encoding = PositionalEncoding(dim_model, trg_max_length)
self.dropout = nn.Dropout(dropout)
self.layers = nn.ModuleList([
DecoderLayer(dim_model, dim_inner, num_heads,dim_key, dim_value, dropout=dropout)
for _ in range(num_layers)
])
self.output_linear = nn.Linear(dim_model, self.num_trg_vocab, bias=False)
nn.init.xavier_normal_(self.output_linear.weight)
if emb_trg_sharing:
self.output_linear.weight = self.trg_embedding.weight
self.x_logit_scale = (dim_model ** -0.5)
else:
self.x_logit_scale = 1.0
def forward(self, seq_in_pad, encoder_padded_outputs, encoder_input_lengths):
"""
args:
padded_input: B x T
encoder_padded_outputs: B x T x H
encoder_input_lengths: B
returns:
pred: B x T x vocab
gold: B x T
"""
decoder_self_attn_list, decoder_encoder_attn_list = [], []
# Prepare masks
non_pad_mask = get_non_pad_mask(seq_in_pad, pad_idx=uyghur_latin.pad_idx)
self_attn_mask_subseq = get_subsequent_mask(seq_in_pad)
self_attn_mask_keypad = get_attn_key_pad_mask(seq_k=seq_in_pad, seq_q=seq_in_pad, pad_idx=uyghur_latin.pad_idx)
self_attn_mask = (self_attn_mask_keypad + self_attn_mask_subseq).gt(0)
output_length = seq_in_pad.size(1)
dec_enc_attn_mask = get_attn_pad_mask(encoder_padded_outputs, encoder_input_lengths, output_length)
decoder_output = self.dropout(self.trg_embedding(seq_in_pad) * self.x_logit_scale + self.positional_encoding(seq_in_pad))
for layer in self.layers:
decoder_output, decoder_self_attn, decoder_enc_attn = layer(decoder_output, encoder_padded_outputs, non_pad_mask=non_pad_mask, self_attn_mask=self_attn_mask, dec_enc_attn_mask=dec_enc_attn_mask)
decoder_self_attn_list += [decoder_self_attn]
decoder_encoder_attn_list += [decoder_enc_attn]
seq_logit = self.output_linear(decoder_output)
return seq_logit
def greedy_search(self, encoder_padded_outputs):
"""
Greedy search, decode 1-best utterance
args:
encoder_padded_outputs: B x T x H
output:
batch_ids_nbest_hyps: list of nbest in ids (size B)
batch_strs_nbest_hyps: list of nbest in strings (size B)
"""
with torch.no_grad():
device = encoder_padded_outputs.device
max_seq_len = self.trg_max_length
#ys = torch.ones(encoder_padded_outputs.size(0),1).fill_(uyghur_latin.sos_idx).long().to(device) # batch_size x 1
max_seq_len = min(max_seq_len, encoder_padded_outputs.size(1))
inps=[uyghur_latin.sos_idx]
result = []
for t in range(max_seq_len):
ys = torch.LongTensor(inps).unsqueeze(0).to(device)
non_pad_mask = torch.ones_like(ys).float().unsqueeze(-1) # batch_size x t x 1
self_attn_mask = get_subsequent_mask(ys).gt(0) # batch_size x t x t
decoder_output = self.dropout(self.trg_embedding(ys) * self.x_logit_scale + self.positional_encoding(ys))
for layer in self.layers:
decoder_output, _, _ = layer(
decoder_output, encoder_padded_outputs,
non_pad_mask=non_pad_mask,
self_attn_mask=self_attn_mask,
dec_enc_attn_mask=None
)
prob = self.output_linear(decoder_output) # batch_size x t x label_size
_, next_word = torch.max(prob[:, -1], dim=1)
next_word = next_word.item()
result.append(next_word)
if next_word == uyghur_latin.eos_idx:
break
inps.append(next_word)
sent = uyghur_latin.decode(result)
return sent
class DecoderLayer(nn.Module):
"""
Decoder Transformer class
"""
def __init__(self, dim_model, dim_inner, num_heads, dim_key, dim_value, dropout=0.1):
super(DecoderLayer, self).__init__()
self.self_attn = MultiHeadAttention(
num_heads, dim_model, dim_key, dim_value, dropout=dropout)
self.encoder_attn = MultiHeadAttention(
num_heads, dim_model, dim_key, dim_value, dropout=dropout)
self.pos_ffn = PositionwiseFeedForwardWithConv(
dim_model, dim_inner, dropout=dropout)
def forward(self, decoder_input, encoder_output, non_pad_mask=None, self_attn_mask=None, dec_enc_attn_mask=None):
decoder_output, decoder_self_attn = self.self_attn(decoder_input, decoder_input, decoder_input, mask=self_attn_mask)
decoder_output *= non_pad_mask
decoder_output, decoder_encoder_attn = self.encoder_attn(decoder_output, encoder_output, encoder_output, mask=dec_enc_attn_mask)
decoder_output *= non_pad_mask
decoder_output = self.pos_ffn(decoder_output)
decoder_output *= non_pad_mask
return decoder_output, decoder_self_attn, decoder_encoder_attn
"""
Transformer common layers
"""
def get_non_pad_mask(padded_input, input_lengths=None, pad_idx=None):
"""
padding position is set to 0, either use input_lengths or pad_idx
"""
assert input_lengths is not None or pad_idx is not None
if input_lengths is not None:
# padded_input: N x T x ..
N = padded_input.size(0)
non_pad_mask = padded_input.new_ones(padded_input.size()[:-1]) # B x T
for i in range(N):
non_pad_mask[i, input_lengths[i]:] = 0
if pad_idx is not None:
# padded_input: N x T
assert padded_input.dim() == 2
non_pad_mask = padded_input.ne(pad_idx).float()
# unsqueeze(-1) for broadcast
return non_pad_mask.unsqueeze(-1)
def get_attn_key_pad_mask(seq_k, seq_q, pad_idx):
"""
For masking out the padding part of key sequence.
"""
# Expand to fit the shape of key query attention matrix.
len_q = seq_q.size(1)
padding_mask = seq_k.eq(pad_idx)
padding_mask = padding_mask.unsqueeze(1).expand(-1, len_q, -1).byte() # B x T_Q x T_K
return padding_mask
def get_attn_pad_mask(padded_input, input_lengths, expand_length):
"""mask position is set to 1"""
# N x Ti x 1
non_pad_mask = get_non_pad_mask(padded_input, input_lengths=input_lengths)
# N x Ti, lt(1) like not operation
pad_mask = non_pad_mask.squeeze(-1).lt(1)
attn_mask = pad_mask.unsqueeze(1).expand(-1, expand_length, -1)
return attn_mask
def get_subsequent_mask(seq):
''' For masking out the subsequent info. '''
sz_b, len_s = seq.size()
subsequent_mask = torch.triu(
torch.ones((len_s, len_s), device=seq.device, dtype=torch.uint8), diagonal=1)
subsequent_mask = subsequent_mask.unsqueeze(0).expand(sz_b, -1, -1) # b x ls x ls
return subsequent_mask
class PositionalEncoding(nn.Module):
"""
Positional Encoding class
"""
def __init__(self, dim_model, max_length=2000):
super(PositionalEncoding, self).__init__()
pe = torch.zeros(max_length, dim_model, requires_grad=False)
position = torch.arange(0, max_length).unsqueeze(1).float()
exp_term = torch.exp(torch.arange(0, dim_model, 2).float() * -(math.log(10000.0) / dim_model))
pe[:, 0::2] = torch.sin(position * exp_term) # take the odd (jump by 2)
pe[:, 1::2] = torch.cos(position * exp_term) # take the even (jump by 2)
pe = pe.unsqueeze(0)
self.register_buffer('pe', pe)
def forward(self, input):
"""
args:
input: B x T x D
output:
tensor: B x T
"""
return self.pe[:, :input.size(1)]
class PositionwiseFeedForward(nn.Module):
"""
Position-wise Feedforward Layer class
FFN(x) = max(0, xW1 + b1) W2+ b2
"""
def __init__(self, dim_model, dim_ff, dropout=0.1):
super(PositionwiseFeedForward, self).__init__()
self.linear_1 = nn.Linear(dim_model, dim_ff)
self.linear_2 = nn.Linear(dim_ff, dim_model)
self.dropout = nn.Dropout(dropout)
self.layer_norm = nn.LayerNorm(dim_model)
def forward(self, x):
"""
args:
x: tensor
output:
y: tensor
"""
residual = x
output = self.dropout(self.linear_2(F.relu(self.linear_1(x))))
output = self.layer_norm(output + residual)
return output
class PositionwiseFeedForwardWithConv(nn.Module):
"""
Position-wise Feedforward Layer Implementation with Convolution class
"""
def __init__(self, dim_model, dim_hidden, dropout=0.1):
super(PositionwiseFeedForwardWithConv, self).__init__()
self.conv_1 = nn.Conv1d(dim_model, dim_hidden, 1)
self.conv_2 = nn.Conv1d(dim_hidden, dim_model, 1)
self.dropout = nn.Dropout(dropout)
self.layer_norm = nn.LayerNorm(dim_model)
def forward(self, x):
residual = x
output = x.transpose(1, 2)
output = self.conv_2(F.relu(self.conv_1(output)))
output = output.transpose(1, 2)
output = self.dropout(output)
output = self.layer_norm(output + residual)
return output
class MultiHeadAttention(nn.Module):
def __init__(self, num_heads, dim_model, dim_key, dim_value, dropout=0.1):
super(MultiHeadAttention, self).__init__()
self.num_heads = num_heads
self.dim_model = dim_model
self.dim_key = dim_key
self.dim_value = dim_value
self.query_linear = nn.Linear(dim_model, num_heads * dim_key)
self.key_linear = nn.Linear(dim_model, num_heads * dim_key)
self.value_linear = nn.Linear(dim_model, num_heads * dim_value)
nn.init.normal_(self.query_linear.weight, mean=0, std=np.sqrt(2.0 / (self.dim_model + self.dim_key)))
nn.init.normal_(self.key_linear.weight, mean=0, std=np.sqrt(2.0 / (self.dim_model + self.dim_key)))
nn.init.normal_(self.value_linear.weight, mean=0, std=np.sqrt(2.0 / (self.dim_model + self.dim_value)))
self.attention = ScaledDotProductAttention(temperature=np.power(dim_key, 0.5), attn_dropout=dropout)
self.layer_norm = nn.LayerNorm(dim_model)
self.output_linear = nn.Linear(num_heads * dim_value, dim_model)
nn.init.xavier_normal_(self.output_linear.weight)
self.dropout = nn.Dropout(dropout)
def forward(self, query, key, value, mask=None):
"""
query: B x T_Q x H, key: B x T_K x H, value: B x T_V x H
mask: B x T x T (attention mask)
"""
batch_size, len_query, _ = query.size()
batch_size, len_key, _ = key.size()
batch_size, len_value, _ = value.size()
residual = query
query = self.query_linear(query).view(batch_size, len_query, self.num_heads, self.dim_key) # B x T_Q x num_heads x H_K
key = self.key_linear(key).view(batch_size, len_key, self.num_heads, self.dim_key) # B x T_K x num_heads x H_K
value = self.value_linear(value).view(batch_size, len_value, self.num_heads, self.dim_value) # B x T_V x num_heads x H_V
query = query.permute(2, 0, 1, 3).contiguous().view(-1, len_query, self.dim_key) # (num_heads * B) x T_Q x H_K
key = key.permute(2, 0, 1, 3).contiguous().view(-1, len_key, self.dim_key) # (num_heads * B) x T_K x H_K
value = value.permute(2, 0, 1, 3).contiguous().view(-1, len_value, self.dim_value) # (num_heads * B) x T_V x H_V
if mask is not None:
mask = mask.repeat(self.num_heads, 1, 1) # (B * num_head) x T x T
output, attn = self.attention(query, key, value, mask=mask)
output = output.view(self.num_heads, batch_size, len_query, self.dim_value) # num_heads x B x T_Q x H_V
output = output.permute(1, 2, 0, 3).contiguous().view(batch_size, len_query, -1) # B x T_Q x (num_heads * H_V)
output = self.dropout(self.output_linear(output)) # B x T_Q x H_O
output = self.layer_norm(output + residual)
return output, attn
class ScaledDotProductAttention(nn.Module):
''' Scaled Dot-Product Attention '''
def __init__(self, temperature, attn_dropout=0.1):
super().__init__()
self.temperature = temperature
self.dropout = nn.Dropout(attn_dropout)
self.softmax = nn.Softmax(dim=2)
def forward(self, q, k, v, mask=None):
"""
"""
attn = torch.bmm(q, k.transpose(1, 2))
attn = attn / self.temperature
if mask is not None:
attn = attn.masked_fill(mask, -np.inf)
attn = self.softmax(attn)
attn = self.dropout(attn)
output = torch.bmm(attn, v)
return output, attn
if __name__ == "__main__":
from data import melfuture, featurelen, uyghur_latin, SpeechDataset, _collate_fn
device = 'cuda'
model = UFormerCTC3N(featurelen,uyghur_latin)
model.to(device)
#model.best_cer = 1.0
#model.save(0)
txt = model.predict("test3.wav", device)
print(txt)
txt = model.predict("test4.wav", device)
print(txt)
train_dataset = SpeechDataset('uyghur_thuyg20_train_small.csv', augumentation=False)
bbb = []
bbb.append(train_dataset[0])
bbb.append(train_dataset[3])
bbb.append(train_dataset[4])
inps, targs, in_lens,_,_ = _collate_fn(bbb)
model.train()
outs, trg = model(inps.to(device),in_lens, targs.to(device))
print(outs.size())
print(trg.size())

618
UFormerCTC5.py Normal file
View File

@ -0,0 +1,618 @@
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.nn.init as I
import numpy as np
import math
from BaseModel import BaseModel
from data import melfuture
from uyghur import uyghur_latin
class UFormerCTC5(BaseModel):
def __init__(self, num_features_input, load_best=False):
super(UFormerCTC5, self).__init__('UFormerCTC5')
num_layers = 5 #'Number of layers'
num_heads = 8 #'Number of heads'
dim_model = 512 #'Model dimension'
dim_key = 64 #'Key dimension'
dim_value = 64 #'Value dimension'
dim_inner = 1024 #'Inner dimension'
dim_emb = 512 #'Embedding dimension'
src_max_len = 2500 #'Source max length'
tgt_max_len = 1000 #'Target max length'
dropout = 0.1
emb_trg_sharing = False
#self.future_len = num_features_input
self.flayer = UDS2W2L8(num_features_input)
self.encoder = Encoder(num_layers, num_heads=num_heads, dim_model=dim_model, dim_key=dim_key, dim_value=dim_value, dim_inner=dim_inner, src_max_length=src_max_len, dropout=dropout)
self.decoder = Decoder(num_layers=num_layers, num_heads=num_heads, dim_emb=dim_emb, dim_model=dim_model, dim_inner=dim_inner, dim_key=dim_key, dim_value=dim_value, trg_max_length=tgt_max_len, dropout=dropout, emb_trg_sharing=emb_trg_sharing)
for p in self.parameters():
if p.dim() > 1:
nn.init.xavier_uniform_(p)
self.ctcOut = None
self.ctcLen = None
self.checkpoint = "results/" + self.ModelName
self._load(load_best)
#self._loadfrom("results/UFormerCTC1_last.pth")
#self.flayer.load()
print(" Model Name:", self.ModelName)
print(f'The model has {self.parameters_count(self):,} trainable parameters')
print(f' Future has {self.parameters_count(self.flayer):,} trainable parameters')
print(f' Encoder has {self.parameters_count(self.encoder):,} trainable parameters')
print(f' Decoder has {self.parameters_count(self.decoder):,} trainable parameters')
def forward(self, padded_input, input_lengths, padded_target):
padded_input,self.ctcOut, self.ctcLen = self.flayer(padded_input,input_lengths)
#input must be #B x T x F format
encoder_padded_outputs, _ = self.encoder(padded_input, self.ctcLen) # BxTxH or #B x T x F
seq_in_pad, gold = self.preprocess(padded_target)
pred = self.decoder(seq_in_pad, encoder_padded_outputs, self.ctcLen)
return pred, gold
def greedydecode(self, pred, len=0):
_, pred = torch.topk(pred, 1, dim=2)
preds = pred.squeeze(2)
strs_pred = [uyghur_latin.decode(pred_id) for pred_id in preds]
return strs_pred
def predict(self,wavfile, device):
self.eval()
spec = melfuture(wavfile).unsqueeze(0).to(device)
spec_len = torch.tensor([spec.shape[2]], dtype=torch.int)
padded_input,self.ctcOut, self.ctcLen = self.flayer(spec,spec_len)
encoder_padded_outputs, _ = self.encoder(padded_input, self.ctcLen) # BxTxH or #B x T x F
strs_hyps = self.decoder.greedy_search(encoder_padded_outputs)
return strs_hyps
class ResB(nn.Module):
def __init__(self, num_filters, kernel, pad, d = 0.4):
super().__init__()
self.conv = nn.Sequential(
nn.Conv1d(num_filters, num_filters, kernel_size = kernel, stride = 1 , padding=pad, bias=False),
nn.BatchNorm1d(num_filters)
)
self.relu = nn.ReLU()
self.bn = nn.BatchNorm1d(num_filters)
self.drop =nn.Dropout(d)
def forward(self, x):
identity = x
out = self.conv(x)
out += identity
out = self.bn(out)
out = self.relu(out)
out = self.drop(out)
return out
class UDS2W2L8(nn.Module):
def __init__(self, num_features_input):
super(UDS2W2L8, self).__init__()
self.conv = nn.Sequential(
nn.Conv2d(1, 32, kernel_size=(41, 11), stride=(2, 2), padding=(20, 5), bias=False),
nn.BatchNorm2d(32),
nn.Hardtanh(0, 20, inplace=True),
nn.Conv2d(32, 32, kernel_size=(21, 11), stride=(2, 1), padding=(10, 5),bias=False),
nn.BatchNorm2d(32),
nn.Hardtanh(0, 20, inplace=True),
)
self.lstm1 = nn.GRU(1024, 256, num_layers=1 , batch_first=True, bidirectional=True)
self.cnn1 = nn.Sequential(
nn.Conv1d(256, 256, 11, 2, 5,bias=False),
nn.BatchNorm1d(256),
nn.ReLU(),
nn.Dropout(0.2),
ResB(256,11,5,0.2),
ResB(256,11,5,0.2),
ResB(256,11,5,0.2),
ResB(256,11,5,0.2)
)
self.lstm2 = nn.GRU(256, 384, num_layers=1 , batch_first=True, bidirectional=True)
self.cnn2 = nn.Sequential(
ResB(384,13,6,0.2),
ResB(384,13,6,0.2),
ResB(384,13,6,0.2),
nn.Conv1d(384, 512, 17, 1,8,bias=False),
nn.BatchNorm1d(512),
nn.ReLU(),
nn.Dropout(0.2),
ResB(512,17,8,0.3),
ResB(512,17,8,0.3),
nn.Conv1d(512, 512, 1, 1,bias=False),
nn.BatchNorm1d(512),
nn.ReLU(),
nn.Dropout(0.3),
ResB(512,1,0,0.0),
)
self.outlayer = nn.Conv1d(512, uyghur_latin.vocab_size, 1, 1)
self.softMax = nn.LogSoftmax(dim=1)
def forward(self, x, lengths):
out_lens = lengths//2
x.unsqueeze_(1)
out = self.conv(x)
b, c, h, w = out.size()
out = out.view(b, c*h, w).contiguous() #.permute(0,2,1)
out = out.permute(0,2,1)
#out = nn.utils.rnn.pack_padded_sequence(out, out_lens, batch_first=True)
out, _ = self.lstm1(out)
#out, _ = nn.utils.rnn.pad_packed_sequence(out, batch_first=True)
out = (out[:, :, :self.lstm1.hidden_size] + out[:, :, self.lstm1.hidden_size:]).contiguous()
out = self.cnn1(out.permute(0,2,1))
out_lens = out_lens//2
out = out.permute(0,2,1)
#out = nn.utils.rnn.pack_padded_sequence(out, out_lens, batch_first=True)
out,_ = self.lstm2(out)
#out, _ = nn.utils.rnn.pad_packed_sequence(out, batch_first=True)
out = (out[:, :, :self.lstm2.hidden_size] + out[:, :, self.lstm2.hidden_size:]).contiguous()
out = self.cnn2(out.permute(0,2,1))
outctc = self.softMax(self.outlayer(out))
return out.contiguous().permute(0,2,1), outctc, out_lens
def load(self):
pack = torch.load('results/UDS2W2L8_last.pth', map_location='cpu')
sdict = pack['st_dict']
news_dict = self.state_dict()
filtered_dict = {k: v for k, v in sdict.items() if k in news_dict and v.size() == news_dict[k].size()}
news_dict.update(filtered_dict)
self.load_state_dict(news_dict)
class Encoder(nn.Module):
"""
Encoder Transformer class
"""
def __init__(self, num_layers, num_heads, dim_model, dim_key, dim_value, dim_inner, dropout=0.1, src_max_length=2500):
super(Encoder, self).__init__()
self.num_layers = num_layers
self.num_heads = num_heads
self.dim_model = dim_model
self.dim_key = dim_key
self.dim_value = dim_value
self.dim_inner = dim_inner
self.src_max_length = src_max_length
self.dropout = nn.Dropout(dropout)
self.dropout_rate = dropout
self.positional_encoding = PositionalEncoding(dim_model, src_max_length)
self.layers = nn.ModuleList([
EncoderLayer(num_heads, dim_model, dim_inner, dim_key, dim_value, dropout=dropout) for _ in range(num_layers)
])
def forward(self, padded_input, input_lengths):
"""
args:
padded_input: B x T x D
input_lengths: B
return:
output: B x T x H
"""
encoder_self_attn_list = []
# Prepare masks
non_pad_mask = get_non_pad_mask(padded_input, input_lengths=input_lengths) # B x T x D
seq_len = padded_input.size(1)
self_attn_mask = get_attn_pad_mask(padded_input, input_lengths, seq_len) # B x T x T
pos = self.positional_encoding(padded_input)
encoder_output = padded_input + pos
for layer in self.layers:
encoder_output, self_attn = layer(encoder_output, non_pad_mask=non_pad_mask, self_attn_mask=self_attn_mask)
encoder_self_attn_list += [self_attn]
return encoder_output, encoder_self_attn_list
class EncoderLayer(nn.Module):
"""
Encoder Layer Transformer class
"""
def __init__(self, num_heads, dim_model, dim_inner, dim_key, dim_value, dropout=0.1):
super(EncoderLayer, self).__init__()
self.self_attn = MultiHeadAttention(num_heads, dim_model, dim_key, dim_value, dropout=dropout)
self.pos_ffn = PositionwiseFeedForwardWithConv(dim_model, dim_inner, dropout=dropout)
def forward(self, enc_input, non_pad_mask=None, self_attn_mask=None):
enc_output, self_attn = self.self_attn(enc_input, enc_input, enc_input, mask=self_attn_mask)
enc_output *= non_pad_mask
enc_output = self.pos_ffn(enc_output)
enc_output *= non_pad_mask
return enc_output, self_attn
class Decoder(nn.Module):
"""
Decoder Layer Transformer class
"""
def __init__(self, num_layers, num_heads, dim_emb, dim_model, dim_inner, dim_key, dim_value, dropout=0.1, trg_max_length=1000, emb_trg_sharing=False):
super(Decoder, self).__init__()
self.num_trg_vocab = uyghur_latin.vocab_size
self.num_layers = num_layers
self.num_heads = num_heads
self.dim_emb = dim_emb
self.dim_model = dim_model
self.dim_inner = dim_inner
self.dim_key = dim_key
self.dim_value = dim_value
self.dropout_rate = dropout
self.emb_trg_sharing = emb_trg_sharing
self.trg_max_length = trg_max_length
self.trg_embedding = nn.Embedding(self.num_trg_vocab, dim_emb, padding_idx=uyghur_latin.pad_idx)
self.positional_encoding = PositionalEncoding(dim_model, trg_max_length)
self.dropout = nn.Dropout(dropout)
self.layers = nn.ModuleList([
DecoderLayer(dim_model, dim_inner, num_heads,dim_key, dim_value, dropout=dropout)
for _ in range(num_layers)
])
self.output_linear = nn.Linear(dim_model, self.num_trg_vocab, bias=False)
nn.init.xavier_normal_(self.output_linear.weight)
if emb_trg_sharing:
self.output_linear.weight = self.trg_embedding.weight
self.x_logit_scale = (dim_model ** -0.5)
else:
self.x_logit_scale = 1.0
def forward(self, seq_in_pad, encoder_padded_outputs, encoder_input_lengths):
"""
args:
padded_input: B x T
encoder_padded_outputs: B x T x H
encoder_input_lengths: B
returns:
pred: B x T x vocab
gold: B x T
"""
decoder_self_attn_list, decoder_encoder_attn_list = [], []
# Prepare masks
non_pad_mask = get_non_pad_mask(seq_in_pad, pad_idx=uyghur_latin.pad_idx)
self_attn_mask_subseq = get_subsequent_mask(seq_in_pad)
self_attn_mask_keypad = get_attn_key_pad_mask(seq_k=seq_in_pad, seq_q=seq_in_pad, pad_idx=uyghur_latin.pad_idx)
self_attn_mask = (self_attn_mask_keypad + self_attn_mask_subseq).gt(0)
output_length = seq_in_pad.size(1)
dec_enc_attn_mask = get_attn_pad_mask(encoder_padded_outputs, encoder_input_lengths, output_length)
decoder_output = self.dropout(self.trg_embedding(seq_in_pad) * self.x_logit_scale + self.positional_encoding(seq_in_pad))
for layer in self.layers:
decoder_output, decoder_self_attn, decoder_enc_attn = layer(decoder_output, encoder_padded_outputs, non_pad_mask=non_pad_mask, self_attn_mask=self_attn_mask, dec_enc_attn_mask=dec_enc_attn_mask)
decoder_self_attn_list += [decoder_self_attn]
decoder_encoder_attn_list += [decoder_enc_attn]
seq_logit = self.output_linear(decoder_output)
return seq_logit
def greedy_search(self, encoder_padded_outputs):
"""
Greedy search, decode 1-best utterance
args:
encoder_padded_outputs: B x T x H
output:
batch_ids_nbest_hyps: list of nbest in ids (size B)
batch_strs_nbest_hyps: list of nbest in strings (size B)
"""
with torch.no_grad():
device = encoder_padded_outputs.device
max_seq_len = self.trg_max_length
#ys = torch.ones(encoder_padded_outputs.size(0),1).fill_(uyghur_latin.sos_idx).long().to(device) # batch_size x 1
max_seq_len = min(max_seq_len, encoder_padded_outputs.size(1))
inps=[uyghur_latin.sos_idx]
result = []
for t in range(max_seq_len):
ys = torch.LongTensor(inps).unsqueeze(0).to(device)
non_pad_mask = torch.ones_like(ys).float().unsqueeze(-1) # batch_size x t x 1
self_attn_mask = get_subsequent_mask(ys).gt(0) # batch_size x t x t
decoder_output = self.dropout(self.trg_embedding(ys) * self.x_logit_scale + self.positional_encoding(ys))
for layer in self.layers:
decoder_output, _, _ = layer(
decoder_output, encoder_padded_outputs,
non_pad_mask=non_pad_mask,
self_attn_mask=self_attn_mask,
dec_enc_attn_mask=None
)
prob = self.output_linear(decoder_output) # batch_size x t x label_size
_, next_word = torch.max(prob[:, -1], dim=1)
next_word = next_word.item()
result.append(next_word)
if next_word == uyghur_latin.eos_idx:
break
inps.append(next_word)
sent = uyghur_latin.decode(result)
return sent
class DecoderLayer(nn.Module):
"""
Decoder Transformer class
"""
def __init__(self, dim_model, dim_inner, num_heads, dim_key, dim_value, dropout=0.1):
super(DecoderLayer, self).__init__()
self.self_attn = MultiHeadAttention(
num_heads, dim_model, dim_key, dim_value, dropout=dropout)
self.encoder_attn = MultiHeadAttention(
num_heads, dim_model, dim_key, dim_value, dropout=dropout)
self.pos_ffn = PositionwiseFeedForwardWithConv(
dim_model, dim_inner, dropout=dropout)
def forward(self, decoder_input, encoder_output, non_pad_mask=None, self_attn_mask=None, dec_enc_attn_mask=None):
decoder_output, decoder_self_attn = self.self_attn(decoder_input, decoder_input, decoder_input, mask=self_attn_mask)
decoder_output *= non_pad_mask
decoder_output, decoder_encoder_attn = self.encoder_attn(decoder_output, encoder_output, encoder_output, mask=dec_enc_attn_mask)
decoder_output *= non_pad_mask
decoder_output = self.pos_ffn(decoder_output)
decoder_output *= non_pad_mask
return decoder_output, decoder_self_attn, decoder_encoder_attn
"""
Transformer common layers
"""
def get_non_pad_mask(padded_input, input_lengths=None, pad_idx=None):
"""
padding position is set to 0, either use input_lengths or pad_idx
"""
assert input_lengths is not None or pad_idx is not None
if input_lengths is not None:
# padded_input: N x T x ..
N = padded_input.size(0)
non_pad_mask = padded_input.new_ones(padded_input.size()[:-1]) # B x T
for i in range(N):
non_pad_mask[i, input_lengths[i]:] = 0
if pad_idx is not None:
# padded_input: N x T
assert padded_input.dim() == 2
non_pad_mask = padded_input.ne(pad_idx).float()
# unsqueeze(-1) for broadcast
return non_pad_mask.unsqueeze(-1)
def get_attn_key_pad_mask(seq_k, seq_q, pad_idx):
"""
For masking out the padding part of key sequence.
"""
# Expand to fit the shape of key query attention matrix.
len_q = seq_q.size(1)
padding_mask = seq_k.eq(pad_idx)
padding_mask = padding_mask.unsqueeze(1).expand(-1, len_q, -1).byte() # B x T_Q x T_K
return padding_mask
def get_attn_pad_mask(padded_input, input_lengths, expand_length):
"""mask position is set to 1"""
# N x Ti x 1
non_pad_mask = get_non_pad_mask(padded_input, input_lengths=input_lengths)
# N x Ti, lt(1) like not operation
pad_mask = non_pad_mask.squeeze(-1).lt(1)
attn_mask = pad_mask.unsqueeze(1).expand(-1, expand_length, -1)
return attn_mask
def get_subsequent_mask(seq):
''' For masking out the subsequent info. '''
sz_b, len_s = seq.size()
subsequent_mask = torch.triu(
torch.ones((len_s, len_s), device=seq.device, dtype=torch.uint8), diagonal=1)
subsequent_mask = subsequent_mask.unsqueeze(0).expand(sz_b, -1, -1) # b x ls x ls
return subsequent_mask
class PositionalEncoding(nn.Module):
"""
Positional Encoding class
"""
def __init__(self, dim_model, max_length=2000):
super(PositionalEncoding, self).__init__()
pe = torch.zeros(max_length, dim_model, requires_grad=False)
position = torch.arange(0, max_length).unsqueeze(1).float()
exp_term = torch.exp(torch.arange(0, dim_model, 2).float() * -(math.log(10000.0) / dim_model))
pe[:, 0::2] = torch.sin(position * exp_term) # take the odd (jump by 2)
pe[:, 1::2] = torch.cos(position * exp_term) # take the even (jump by 2)
pe = pe.unsqueeze(0)
self.register_buffer('pe', pe)
def forward(self, input):
"""
args:
input: B x T x D
output:
tensor: B x T
"""
return self.pe[:, :input.size(1)]
class PositionwiseFeedForward(nn.Module):
"""
Position-wise Feedforward Layer class
FFN(x) = max(0, xW1 + b1) W2+ b2
"""
def __init__(self, dim_model, dim_ff, dropout=0.1):
super(PositionwiseFeedForward, self).__init__()
self.linear_1 = nn.Linear(dim_model, dim_ff)
self.linear_2 = nn.Linear(dim_ff, dim_model)
self.dropout = nn.Dropout(dropout)
self.layer_norm = nn.LayerNorm(dim_model)
def forward(self, x):
"""
args:
x: tensor
output:
y: tensor
"""
residual = x
output = self.dropout(self.linear_2(F.relu(self.linear_1(x))))
output = self.layer_norm(output + residual)
return output
class PositionwiseFeedForwardWithConv(nn.Module):
"""
Position-wise Feedforward Layer Implementation with Convolution class
"""
def __init__(self, dim_model, dim_hidden, dropout=0.1):
super(PositionwiseFeedForwardWithConv, self).__init__()
self.conv_1 = nn.Conv1d(dim_model, dim_hidden, 1)
self.conv_2 = nn.Conv1d(dim_hidden, dim_model, 1)
self.dropout = nn.Dropout(dropout)
self.layer_norm = nn.LayerNorm(dim_model)
def forward(self, x):
residual = x
output = x.transpose(1, 2)
output = self.conv_2(F.relu(self.conv_1(output)))
output = output.transpose(1, 2)
output = self.dropout(output)
output = self.layer_norm(output + residual)
return output
class MultiHeadAttention(nn.Module):
def __init__(self, num_heads, dim_model, dim_key, dim_value, dropout=0.1):
super(MultiHeadAttention, self).__init__()
self.num_heads = num_heads
self.dim_model = dim_model
self.dim_key = dim_key
self.dim_value = dim_value
self.query_linear = nn.Linear(dim_model, num_heads * dim_key)
self.key_linear = nn.Linear(dim_model, num_heads * dim_key)
self.value_linear = nn.Linear(dim_model, num_heads * dim_value)
nn.init.normal_(self.query_linear.weight, mean=0, std=np.sqrt(2.0 / (self.dim_model + self.dim_key)))
nn.init.normal_(self.key_linear.weight, mean=0, std=np.sqrt(2.0 / (self.dim_model + self.dim_key)))
nn.init.normal_(self.value_linear.weight, mean=0, std=np.sqrt(2.0 / (self.dim_model + self.dim_value)))
self.attention = ScaledDotProductAttention(temperature=np.power(dim_key, 0.5), attn_dropout=dropout)
self.layer_norm = nn.LayerNorm(dim_model)
self.output_linear = nn.Linear(num_heads * dim_value, dim_model)
nn.init.xavier_normal_(self.output_linear.weight)
self.dropout = nn.Dropout(dropout)
def forward(self, query, key, value, mask=None):
"""
query: B x T_Q x H, key: B x T_K x H, value: B x T_V x H
mask: B x T x T (attention mask)
"""
batch_size, len_query, _ = query.size()
batch_size, len_key, _ = key.size()
batch_size, len_value, _ = value.size()
residual = query
query = self.query_linear(query).view(batch_size, len_query, self.num_heads, self.dim_key) # B x T_Q x num_heads x H_K
key = self.key_linear(key).view(batch_size, len_key, self.num_heads, self.dim_key) # B x T_K x num_heads x H_K
value = self.value_linear(value).view(batch_size, len_value, self.num_heads, self.dim_value) # B x T_V x num_heads x H_V
query = query.permute(2, 0, 1, 3).contiguous().view(-1, len_query, self.dim_key) # (num_heads * B) x T_Q x H_K
key = key.permute(2, 0, 1, 3).contiguous().view(-1, len_key, self.dim_key) # (num_heads * B) x T_K x H_K
value = value.permute(2, 0, 1, 3).contiguous().view(-1, len_value, self.dim_value) # (num_heads * B) x T_V x H_V
if mask is not None:
mask = mask.repeat(self.num_heads, 1, 1) # (B * num_head) x T x T
output, attn = self.attention(query, key, value, mask=mask)
output = output.view(self.num_heads, batch_size, len_query, self.dim_value) # num_heads x B x T_Q x H_V
output = output.permute(1, 2, 0, 3).contiguous().view(batch_size, len_query, -1) # B x T_Q x (num_heads * H_V)
output = self.dropout(self.output_linear(output)) # B x T_Q x H_O
output = self.layer_norm(output + residual)
return output, attn
class ScaledDotProductAttention(nn.Module):
''' Scaled Dot-Product Attention '''
def __init__(self, temperature, attn_dropout=0.1):
super().__init__()
self.temperature = temperature
self.dropout = nn.Dropout(attn_dropout)
self.softmax = nn.Softmax(dim=2)
def forward(self, q, k, v, mask=None):
"""
"""
attn = torch.bmm(q, k.transpose(1, 2))
attn = attn / self.temperature
if mask is not None:
attn = attn.masked_fill(mask, -np.inf)
attn = self.softmax(attn)
attn = self.dropout(attn)
output = torch.bmm(attn, v)
return output, attn
if __name__ == "__main__":
from data import melfuture, featurelen, uyghur_latin, SpeechDataset, _collate_fn
device = 'cuda'
model = UFormerCTC5(featurelen,uyghur_latin)
model.to(device)
#model.best_cer = 1.0
model.save(0)
txt = model.predict("test3.wav", device)
print(txt)
txt = model.predict("test4.wav", device)
print(txt)
train_dataset = SpeechDataset('uyghur_thuyg20_train_small.csv', augumentation=False)
bbb = []
bbb.append(train_dataset[0])
bbb.append(train_dataset[3])
bbb.append(train_dataset[4])
inps, targs, in_lens,_,_ = _collate_fn(bbb)
model.train()
outs, trg = model(inps.to(device),in_lens, targs.to(device))
print(outs.size())
print(trg.size())

278
data.py Normal file
View File

@ -0,0 +1,278 @@
import torch
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
import librosa
import soundfile
from sklearn import preprocessing
import os
import random
import re
from uyghur import uyghur_latin
featurelen = 128
sample_rate = 22050
fft_len = 1024
window_len = fft_len
window = "hann"
white_noise,_=librosa.load('white.wav',sr=sample_rate, duration=15.0)
perlin_noise,_=librosa.load('perlin.wav',sr=sample_rate, duration=15.0)
cafe_noise, _ = librosa.load('cafe.wav',sr=sample_rate, duration=15.0)
radio_noise, _ = librosa.load('radionoise.wav',sr=sample_rate, duration=15.0)
def addnoise(audio):
rnd = random.random()
if len(audio) > len(white_noise):
pass
elif rnd <0.25:
audio = audio + white_noise[:len(audio)]
elif rnd <0.50:
audio = audio + perlin_noise[:audio.shape[0]]
elif rnd <0.75:
audio = audio + radio_noise[:audio.shape[0]]
else:
audio = audio + cafe_noise[:audio.shape[0]]
return audio
def randomstretch(audio):
factor = random.uniform(0.8, 1.2)
audio = librosa.core.resample(audio,sample_rate,sample_rate*factor)
return audio
def spec_augment(feat, T=50, F=13, time_mask_num=1, freq_mask_num=1):
#def spec_augment(feat, T=70, F=15, time_mask_num=1, freq_mask_num=1):
rnd = random.random()
feat_size = feat.size(0)
seq_len = feat.size(1)
if rnd< 0.33:
# time mask
for _ in range(time_mask_num):
t = random.randint(0, T)
t0 = random.randint(0, seq_len - t)
feat[:, t0 : t0 + t] = 0
elif rnd <0.66:
# freq mask
for _ in range(freq_mask_num):
f = random.randint(0, F)
f0 = random.randint(0, feat_size - f)
feat[f0 : f0 + f, :] = 0
else:
# time mask
for _ in range(time_mask_num):
t = random.randint(0, T)
t0 = random.randint(0, seq_len - t)
feat[:, t0 : t0 + t] = 0
# freq mask
for _ in range(freq_mask_num):
f = random.randint(0, F)
f0 = random.randint(0, feat_size - f)
feat[f0 : f0 + f, :] = 0
return feat
def melfuture(wav_path, augument = False):
audio, s_r = librosa.load(wav_path, sr=sample_rate, res_type='polyphase')
if augument:
if random.random()<0.5:
audio = randomstretch(audio)
if random.random()<0.5:
audio = addnoise(audio)
audio = preprocessing.minmax_scale(audio, axis=0)
audio = librosa.effects.preemphasis(audio)
hop_len = 200
if augument and random.random()<0.5:
hop_len = random.randint(160,240)
spec = librosa.feature.melspectrogram(y=audio, sr=s_r, n_fft=fft_len, hop_length=hop_len, n_mels=featurelen, fmax=8000)
spec = librosa.power_to_db(spec)
spec = (spec - spec.mean()) / spec.std()
spec = torch.FloatTensor(spec)
if augument == True and random.random()<0.5:
spec = spec_augment(spec)
return spec
def rawfuture(wav_path, augument = False):
audio, s_r = librosa.load(wav_path, sr=sample_rate, res_type='polyphase')
audio = preprocessing.minmax_scale(audio, axis=0)
if augument:
if random.random()<0.5:
audio = addnoise(audio)
if random.random()<0.5:
audio = randomstretch(audio)
audio = librosa.effects.preemphasis(audio)
spec = torch.FloatTensor(audio)
spec.unsqueeze_(0)
spec = (spec - spec.mean()) / spec.std()
return spec
class SpeechDataset(Dataset):
def __init__(self, index_path, augumentation = False):
self.Raw = False
with open(index_path,encoding='utf_8_sig') as f:
lines = f.readlines()
self.idx = []
for x in lines:
item = x.strip().split("\t")
line = []
line.append(item[0])
char_indx = uyghur_latin.encode(item[1])
line.append(char_indx)
self.idx.append(line)
self.augument = augumentation
def __getitem__(self, index):
wav_path, char_index = self.idx[index]
if self.Raw == True:
x = rawfuture(wav_path, self.augument)
else:
x = melfuture(wav_path, self.augument)
return x, char_index, wav_path
def __len__(self):
return len(self.idx)
def _collate_fn(batch):
batch = sorted(batch, key=lambda sample: sample[0].size(1), reverse=True)
input_lens = [sample[0].size(1) for sample in batch]
target_lens = [len(sample[1]) for sample in batch]
inputs = torch.zeros(len(batch), batch[0][0].size(0), max(input_lens) ,dtype=torch.float32)
targets = torch.zeros(len(batch), max(target_lens),dtype=torch.long).fill_(uyghur_latin.pad_idx)
target_lens = torch.IntTensor(target_lens)
input_lens = torch.IntTensor(input_lens)
paths = []
for x, sample in enumerate(batch):
tensor = sample[0]
target = sample[1]
seq_length = tensor.size(1)
inputs[x].narrow(1, 0, seq_length).copy_(tensor)
targets[x][:len(target)] = torch.LongTensor(target)
paths.append(sample[2])
return inputs, targets, input_lens, target_lens, paths
class SpeechDataLoader(DataLoader):
def __init__(self, *args, **kwargs):
"""
Creates a data loader for AudioDatasets.
"""
super(SpeechDataLoader, self).__init__(*args, **kwargs)
self.collate_fn = _collate_fn
# The following code is from: http://hetland.org/coding/python/levenshtein.py
def levenshtein(a,b):
"Calculates the Levenshtein distance between a and b."
n, m = len(a), len(b)
if n > m:
# Make sure n <= m, to use O(min(n,m)) space
a,b = b,a
n,m = m,n
current = list(range(n+1))
for i in range(1,m+1):
previous, current = current, [i]+[0]*n
for j in range(1,n+1):
add, delete = previous[j]+1, current[j-1]+1
change = previous[j-1]
if a[j-1] != b[i-1]:
change = change + 1
current[j] = min(add, delete, change)
return current[n]
def wer(s1, src):
sw = src.split()
return levenshtein(s1.split(),sw), len(sw)
def cer(s1, src):
return levenshtein(s1,src),len(src)
def cer_wer(preds, targets):
err_c, lettercnt, err_w, wordcnt = 0,0,0,0
for pred, target in zip(preds, targets):
c_er, c_cnt = cer(pred, target)
w_er, w_cnt = wer(pred, target)
err_c += c_er
lettercnt += c_cnt
wordcnt += w_cnt
err_w += w_er
return err_c, lettercnt, err_w, wordcnt
def random_speed():
y, s_r = librosa.load("test1.wav", sr=sample_rate, res_type='polyphase')
factor = random.uniform(0.8, 1.2)
new_sr = s_r*factor
new_y = librosa.core.resample(y,s_r,new_sr)
soundfile.write("test1_1.wav",new_y, s_r)
audio = librosa.effects.time_stretch(y,factor)
soundfile.write("test1_2.wav",audio, s_r)
def sinaq():
new_y, s_r = librosa.load("test1.wav", sr=sample_rate, res_type='polyphase')
new_y = addnoise(new_y)
#new_y = librosa.effects.preemphasis(new_y)
new_y = preprocessing.minmax_scale(new_y, axis=0)
soundfile.write("test1_1.wav",new_y, s_r)
new_y, s_r = librosa.load("test2.wav", sr=sample_rate, res_type='polyphase')
new_y = preprocessing.minmax_scale(new_y, axis=0)
new_y = addnoise(new_y)
#new_y = librosa.effects.preemphasis(new_y)
soundfile.write("test2_1.wav",new_y, s_r)
new_y, s_r = librosa.load("test3.wav", sr=sample_rate, res_type='polyphase')
new_y = preprocessing.minmax_scale(new_y, axis=0)
new_y = addnoise(new_y)
#new_y = librosa.effects.preemphasis(new_y)
soundfile.write("test3_1.wav",new_y, s_r)
new_y, s_r = librosa.load("test4.wav", sr=sample_rate, res_type='polyphase')
new_y = preprocessing.minmax_scale(new_y, axis=0)
new_y = addnoise(new_y)
#new_y = librosa.effects.preemphasis(new_y)
soundfile.write("test4_1.wav",new_y, s_r)
new_y, s_r = librosa.load("test6.wav", sr=sample_rate, res_type='polyphase')
new_y = preprocessing.minmax_scale(new_y, axis=0)
new_y = addnoise(new_y)
#new_y = librosa.effects.preemphasis(new_y)
soundfile.write("test6_1.wav",new_y, s_r)
if __name__ == "__main__":
#import matplotlib.pyplot as plt
#import librosa.display
#random_speed()
sinaq()
#y, s_r = librosa.load("test6.wav", sr=sample_rate, res_type='polyphase')
#soundfile.write("test6_1.wav",addnoise(y), s_r)
#soundfile.write("test6_2.wav",addnoise(y), s_r)
#soundfile.write("test6_3.wav",addnoise(y), s_r)
#soundfile.write("test6_4.wav",addnoise(y), s_r)
#soundfile.write("test6_5.wav",addnoise(y), s_r)

75
tekshur.py Normal file
View File

@ -0,0 +1,75 @@
import torch
from data import SpeechDataset, SpeechDataLoader, featurelen, uyghur_latin, cer
from GCGCResM import GCGCResM
from uformer import UFormer
from UDS2W2L50 import UDS2W2L50
from UFormerCTC2 import UFormerCTC2
import sys
import os
import glob
from tqdm import tqdm
def tekshurctc(model, hojjet, device):
training_set = SpeechDataset(hojjet, augumentation=False)
loader = SpeechDataLoader(training_set,num_workers=4, shuffle=False, batch_size=32)
line = []
with torch.no_grad():
pbar = tqdm(iter(loader), leave=True, total=len(loader))
for inputs, targets, input_lengths, _ , paths in pbar:
inputs = inputs.to(device,non_blocking=True)
outputs, output_lengths = model(inputs, input_lengths)
preds = model.greedydecode(outputs, output_lengths)
targets = [uyghur_latin.decode(target) for target in targets]
for pred, src, wavename in zip(preds, targets, paths):
xatasani , _ = cer(pred, src)
if xatasani >= 1:
xata = f"{wavename}\t{src}\t{xatasani}\n"
#xata = f"{src}\n{pred}\n\n"
line.append(xata)
return line
def tekshurs2s(model, hojjet, device):
training_set = SpeechDataset(hojjet, augumentation=False)
loader = SpeechDataLoader(training_set,num_workers=4, shuffle=False, batch_size=20)
line = []
with torch.no_grad():
pbar = tqdm(iter(loader), leave=True, total=len(loader))
for inputs, targets, input_lengths, _ , paths in pbar:
inputs = inputs.to(device,non_blocking=True)
targets = targets.to(device,non_blocking=True)
input_lengths = input_lengths.to(device,non_blocking=True)
outputs, _ = model(inputs, input_lengths, targets)
preds = model.greedydecode(outputs, 0)
targets = [uyghur_latin.decode(target) for target in targets]
for pred, src, wavename in zip(preds, targets, paths):
xatasani , _ = cer(pred, src)
if xatasani >= 5:
xata = f"{wavename}\t{src}\t{xatasani}\n"
#xata = f"{src}\n{pred}\n\n"
line.append(xata)
return line
if __name__ == '__main__':
device = 'cuda'
#model = GCGCResM(featurelen, load_best=False)
#model = UFormer(featurelen, load_best=False)
model = UDS2W2L50(featurelen, load_best=False)
#model = UFormerCTC2(featurelen, load_best=False)
model.to(device)
model.eval()
#'uyghur_train.csv' 'uyghur_thuyg20_train_small.csv', ''
#netije = tekshurs2s(model, 'uyghur_train.csv', device)
netije = tekshurctc(model, 'uyghur_thuyg20_test_small.csv', device)
with open('tek_test.csv','w',encoding='utf_8_sig') as f:
f.writelines(netije)

361
train.py Normal file
View File

@ -0,0 +1,361 @@
import math
import numpy as np
import os
import sys
import torch
import torch.nn.functional as F
import torch.nn as nn
from data import SpeechDataset, SpeechDataLoader, featurelen, cer_wer, cer, wer
from uyghur import uyghur_latin
from tqdm import tqdm
from GCGCResM import GCGCResM
from GCGCRes import GCGCRes
from GCGCRes1 import GCGCRes1
from GCGCRes2 import GCGCRes2
from QuartzNet import QuartzNet15x5, QuartzNet10x5, QuartzNet5x5
from UDS2W2L import UDS2W2L
from UDS2W2L3 import UDS2W2L3
from UDS2W2L5 import UDS2W2L5
from UDS2W2L50 import UDS2W2L50
from UDS2W2L8 import UDS2W2L8
from UDS2W2L80 import UDS2W2L80
#from FuncNet1 import FuncNet1
from UArilash0 import UArilash0
from UArilash1 import UArilash1
from UFormerCTC1 import UFormerCTC1
from UFormerCTC2 import UFormerCTC2
from UFormerCTC3 import UFormerCTC3
from UFormerCTC5 import UFormerCTC5
from UFormerCTC3N import UFormerCTC3N
from uformer1dgru import UFormer1DGRU
from UFormerCTC1N import UFormerCTC1N
from ConfModelN import ConfModelN
from ConfModelM import ConfModelM
from ConfModelM2D import ConfModelM2D
from tiny_wav2letter import TinyWav2Letter
from UDS2W2L050 import UDS2W2L050
from UDeepSpeech import UDeepSpeech
from Conv1D3InDS2 import Conv1D3InDS2
from UDS2W2LGLU0 import UDS2W2LGLU0
from UDS2W2LGLU import UDS2W2LGLU
from UDS2W2LGLU8 import UDS2W2LGLU8
from torch.optim.lr_scheduler import CosineAnnealingLR, CyclicLR, StepLR
import random
from torch.cuda.amp import GradScaler
# Fix seed
# seed = 17
# np.random.seed(seed)
# torch.manual_seed(seed)
# random.seed(seed)
class CustOpt:
def __init__(self, params, datalen, lr, min_lr = None):
if min_lr is None:
min_lr = lr
self.optimizer = torch.optim.Adam(params, lr=lr) #, weight_decay=0.00001
#self.optimizer = torch.optim.Adamax(params, lr=lr, weight_decay=0.00001)
#self.optimizer = torch.optim.AdamW(params, lr=lr, weight_decay = 0.00001)
#self.optimizer = torch.optim.SGD(params, lr=lr, momentum=0.9, weight_decay=0.00001)
self._step = 0
self.scheduler = CosineAnnealingLR(self.optimizer,T_max=datalen, eta_min = min_lr)
#self.scheduler = StepLR(optimizer, step_size=10, gamma=0.1)
#self.scheduler = CyclicLR(self.optimizer, T_max=datalen, eta_min = min_lr)
def step(self):
self.optimizer.step()
self.scheduler.step()
rate = self.scheduler.get_last_lr()[0]
return rate
def zero_grad(self):
self.optimizer.zero_grad()
#outputs format = B x F x T
def calctc_loss(outputs, targets, output_lengths, target_lengths):
loss = F.ctc_loss(outputs.permute(2,0,1).contiguous(), targets, output_lengths, target_lengths, blank = uyghur_latin.pad_idx, reduction='mean',zero_infinity=True)
return loss
def cal_loss(pred, gold):
"""
Calculate metrics
args:
pred: B x T x C
gold: B x T
input_lengths: B (for CTC)
target_lengths: B (for CTC)
"""
gold = gold.contiguous().view(-1) # (B*T)
pred = pred.contiguous().view(-1, pred.size(2)) # (B*T) x C
loss = F.cross_entropy(pred, gold, ignore_index=uyghur_latin.pad_idx, reduction="mean")
return loss
def validate(model, valid_loader):
chars = 0
words = 0
e_chars = 0
e_words = 0
avg_loss = 0
iter_cnt = 0
msg = ""
cer_val = 0.0
model.eval()
with torch.no_grad():
tlen = len(valid_loader)
vbar = tqdm(iter(valid_loader), leave=True, total=tlen)
for inputs, targets, input_lengths, target_lengths, _ in vbar:
inputs = inputs.to(device)
targets = targets.to(device)
input_lengths = input_lengths.to(device)
target_lengths = target_lengths.to(device)
if model_type == 'CTC':
outputs, output_lengths = model(inputs, input_lengths)
loss = calctc_loss(outputs, targets, output_lengths, target_lengths)
elif model_type =='S2S':
output_lengths = 0
outputs, tgt = model(inputs, input_lengths, targets)
loss = cal_loss(outputs, tgt)
elif model_type == 'JOINT':
output_lengths = 0
outputs, tgt = model(inputs, input_lengths, targets)
loss1 = cal_loss(outputs, tgt)
loss_ctc= calctc_loss(model.ctcOut, targets, model.ctcLen, target_lengths)
#loss = loss1*0.6 + loss_ctc*0.4
loss = loss1*0.78 + loss_ctc*0.22
#loss = loss1*0.22 + loss_ctc*0.78
preds = model.greedydecode(outputs, output_lengths)
targets = [uyghur_latin.decode(target) for target in targets]
for pred, src in zip(preds, targets):
e_char_cnt, char_cnt = cer(pred,src)
e_word_cnt, word_cnt = wer(pred, src)
e_chars += e_char_cnt
e_words += e_word_cnt
chars += char_cnt
words += word_cnt
iter_cnt += 1
avg_loss +=loss.item()
msg = f" VALIDATION: [CER:{e_chars/chars:.2%} ({e_chars}/{chars} letters) WER:{e_words/words:.2%} ({e_words}/{words} words), Avg loss:{avg_loss/iter_cnt:4f}]"
vbar.set_description(msg)
vbar.close()
cer_val = e_chars/chars
with open(log_name,'a', encoding='utf-8') as fp:
fp.write(msg+"\n")
#Print Last 3 validation results
result =""
result_cnt = 0
chars = 0
words = 0
e_chars = 0
e_words = 0
for pred, src in zip(preds, targets):
e_char_cnt, char_cnt = cer(pred,src)
e_word_cnt, word_cnt = wer(pred, src)
e_chars += e_char_cnt
e_words += e_word_cnt
chars += char_cnt
words += word_cnt
result += f" O:{src}\n"
result += f" P:{pred}\n"
result += f" CER: {e_char_cnt/char_cnt:.2%} ({e_char_cnt}/{char_cnt} letters), WER: {e_word_cnt/word_cnt:.2%} ({e_word_cnt}/{word_cnt} words)\n"
result_cnt += 1
if result_cnt >= 3:
break
print(result)
return cer_val
def train(model, train_loader):
total_loss = 0
iter_cnt = 0
msg =''
model.train()
pbar = tqdm(iter(train_loader), leave=True, total=mini_epoch_length)
for data in pbar:
optimizer.zero_grad()
inputs, targets, input_lengths, target_lengths, _ = data
inputs = inputs.to(device)
targets = targets.to(device)
input_lengths = input_lengths.to(device)
target_lengths = target_lengths.to(device)
if model_type == 'CTC':
outputs, output_lengths = model(inputs, input_lengths)
loss = calctc_loss(outputs, targets, output_lengths, target_lengths)
elif model_type =='S2S':
output_lengths = 0
outputs, tgt = model(inputs, input_lengths, targets)
loss = cal_loss(outputs, tgt)
elif model_type == 'JOINT':
output_lengths = 0
outputs, tgt = model(inputs, input_lengths, targets)
loss1 = cal_loss(outputs, tgt)
loss_ctc = calctc_loss(model.ctcOut, targets, model.ctcLen, target_lengths)
#loss = loss1*0.6 + loss_ctc*0.4
loss = loss1*0.78 + loss_ctc*0.22
#loss = loss1*0.22 + loss_ctc*0.78
loss.backward()
lr = optimizer.step()
total_loss += loss.item()
iter_cnt += 1
msg = f'[LR: {lr: .6f} Loss: {loss.item(): .5f}, Avg loss: {(total_loss/iter_cnt): .5f}]'
pbar.set_description(msg)
#torch.cuda.empty_cache()
if iter_cnt > mini_epoch_length:
break
pbar.close()
with open(log_name,'a', encoding='utf-8') as fp:
msg = f'Epoch[{(epoch+1):d}]:\t{msg}\n'
fp.write(msg)
def GetModel():
if model_type == 'CTC':
#model = GCGCResM(num_features_input = featurelen)
#model = UDS2W2L(num_features_input = featurelen)
#model = GCGCRes2(num_features_input = featurelen)
#model = GCGCRes(num_features_input = featurelen) # Bashqa yerde mengiwatidu
#model = GCGCRes1(num_features_input = featurelen) # Bashqa yerde mengiwatidu
#model = UDS2W2L50(num_features_input = featurelen)
#model = UDS2W2L80(num_features_input = featurelen)
#model = ConfModel(num_features_input = featurelen)
#model = QuartzNet15x5(num_features_input = featurelen)
#model = QuartzNet10x5(num_features_input = featurelen)
#model = QuartzNet5x5(num_features_input = featurelen)
#model = UArilash1(num_features_input = featurelen)
#model = UDeepSpeech(num_features_input = featurelen)
#model = UDS2W2L3(num_features_input = featurelen)
#model = TinyWav2Letter(num_features_input = featurelen)
#model = ConfModelM(num_features_input = featurelen)
#model = UDS2W2L050(num_features_input = featurelen)
#model = Conv1D3InDS2(num_features_input = featurelen)
#model = UDS2W2LGLU(num_features_input = featurelen)
model = UDS2W2LGLU8(num_features_input = featurelen)
elif model_type == 'S2S':
#model = UFormer(num_features_input = featurelen)
#model = UFormer1DGRU(num_features_input = featurelen)
#model = UFormerCTC(num_features_input = featurelen)
#model = UFormerCTC3(num_features_input = featurelen)
model = UFormerCTC3N(num_features_input = featurelen)
#model = UFormerCTC1N(num_features_input = featurelen)
elif model_type =='JOINT':
#model = UFormer(num_features_input = featurelen)
#model = UFormer1DGRU(num_features_input = featurelen)
#model = UFormerCTC(num_features_input = featurelen)
#model = UFormerCTC3(num_features_input = featurelen)
#model = UFormerCTC3N(num_features_input = featurelen)
model = UFormerCTC1N(num_features_input = featurelen)
return model
#Sinaydighan modellar
#UFormerCTC3N
#UDS2W2L5
#GCGCRes1
if __name__ == "__main__":
device = "cuda"
os.makedirs('./results',exist_ok=True)
model_type = 'CTC' # S2S, 'JOINT', 'CTC'
#train_file = 'uyghur_train.csv'
train_file = 'uyghur_thuyg20_train_small.csv'
test_file = 'uyghur_thuyg20_test_small.csv'
train_set = SpeechDataset(train_file, augumentation=False)
train_loader = SpeechDataLoader(train_set,num_workers=5, pin_memory = True, shuffle=True, batch_size=24)
validation_set = SpeechDataset(test_file, augumentation=False)
validation_loader = SpeechDataLoader(validation_set,num_workers=5, pin_memory = True, shuffle=True, batch_size=24)
print("="*50)
msg = f" Training Set: {train_file}, {len(train_set)} samples" + "\n"
msg += f" Validation Set: {test_file}, {len(validation_set)} samples" + "\n"
msg += f" Vocab Size : {uyghur_latin.vocab_size}"
print(msg)
model = GetModel()
print("="*50)
log_name = model.checkpoint + '.log'
with open(log_name,'a', encoding='utf-8') as fp:
fp.write(msg+'\n')
train_set.Raw = model.Raw #If it using RAW wave form data
validation_set.Raw = model.Raw #If it using RAW wave form data
model = model.to(device)
#Star train and validation
testfile=["test1.wav","test2.wav", "test3.wav","test4.wav","test5.wav","test6.wav"]
start_epoch = model.trained_epochs
mini_epoch_length = len(train_loader)
if mini_epoch_length > 1000:
mini_epoch_length = mini_epoch_length//2
#pass
optimizer = CustOpt(model.parameters(), mini_epoch_length//2, lr = 0.0001, min_lr=0.00001)
for epoch in range(start_epoch,1000):
torch.cuda.empty_cache()
model.eval()
msg = ""
for afile in testfile:
text = model.predict(afile,device)
text = f"{afile}-->{text}\n"
print(text,end="")
msg += text
with open(log_name,'a', encoding='utf-8') as fp:
fp.write(msg+'\n')
print("="*50)
print(f"Training Epoch[{(epoch+1):d}]:")
train(model, train_loader)
if (epoch+1) % 1 == 0:
print("Validating:")
model.save((epoch+1))
curcer = validate(model,validation_loader)
if curcer < model.best_cer:
model.best_cer = curcer
model.save((epoch+1),best=True)
model.save((epoch+1))