123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247 |
- import warnings
- from .imports import *
- from .torch_imports import *
- from .rnn_reg import LockedDropout,WeightDrop,EmbeddingDropout
- from .model import Stepper
- from .core import set_grad_enabled
- IS_TORCH_04 = LooseVersion(torch.__version__) >= LooseVersion('0.4')
- def seq2seq_reg(output, xtra, loss, alpha=0, beta=0):
- hs,dropped_hs = xtra
- if alpha: # Activation Regularization
- loss = loss + (alpha * dropped_hs[-1].pow(2).mean()).sum()
- if beta: # Temporal Activation Regularization (slowness)
- h = hs[-1]
- if len(h)>1: loss = loss + (beta * (h[1:] - h[:-1]).pow(2).mean()).sum()
- return loss
- def repackage_var(h):
- """Wraps h in new Variables, to detach them from their history."""
- if IS_TORCH_04: return h.detach() if type(h) == torch.Tensor else tuple(repackage_var(v) for v in h)
- else: return Variable(h.data) if type(h) == Variable else tuple(repackage_var(v) for v in h)
- class RNN_Encoder(nn.Module):
- """A custom RNN encoder network that uses
- - an embedding matrix to encode input,
- - a stack of LSTM or QRNN layers to drive the network, and
- - variational dropouts in the embedding and LSTM/QRNN layers
- The architecture for this network was inspired by the work done in
- "Regularizing and Optimizing LSTM Language Models".
- (https://arxiv.org/pdf/1708.02182.pdf)
- """
- initrange=0.1
- def __init__(self, ntoken, emb_sz, n_hid, n_layers, pad_token, bidir=False,
- dropouth=0.3, dropouti=0.65, dropoute=0.1, wdrop=0.5, qrnn=False):
- """ Default constructor for the RNN_Encoder class
- Args:
- bs (int): batch size of input data
- ntoken (int): number of vocabulary (or tokens) in the source dataset
- emb_sz (int): the embedding size to use to encode each token
- n_hid (int): number of hidden activation per LSTM layer
- n_layers (int): number of LSTM layers to use in the architecture
- pad_token (int): the int value used for padding text.
- dropouth (float): dropout to apply to the activations going from one LSTM layer to another
- dropouti (float): dropout to apply to the input layer.
- dropoute (float): dropout to apply to the embedding layer.
- wdrop (float): dropout used for a LSTM's internal (or hidden) recurrent weights.
- Returns:
- None
- """
- super().__init__()
- self.ndir = 2 if bidir else 1
- self.bs, self.qrnn = 1, qrnn
- self.encoder = nn.Embedding(ntoken, emb_sz, padding_idx=pad_token)
- self.encoder_with_dropout = EmbeddingDropout(self.encoder)
- if self.qrnn:
- #Using QRNN requires cupy: https://github.com/cupy/cupy
- from .torchqrnn.qrnn import QRNNLayer
- self.rnns = [QRNNLayer(emb_sz if l == 0 else n_hid, (n_hid if l != n_layers - 1 else emb_sz)//self.ndir,
- save_prev_x=True, zoneout=0, window=2 if l == 0 else 1, output_gate=True) for l in range(n_layers)]
- if wdrop:
- for rnn in self.rnns:
- rnn.linear = WeightDrop(rnn.linear, wdrop, weights=['weight'])
- else:
- self.rnns = [nn.LSTM(emb_sz if l == 0 else n_hid, (n_hid if l != n_layers - 1 else emb_sz)//self.ndir,
- 1, bidirectional=bidir) for l in range(n_layers)]
- if wdrop: self.rnns = [WeightDrop(rnn, wdrop) for rnn in self.rnns]
- self.rnns = torch.nn.ModuleList(self.rnns)
- self.encoder.weight.data.uniform_(-self.initrange, self.initrange)
- self.emb_sz,self.n_hid,self.n_layers,self.dropoute = emb_sz,n_hid,n_layers,dropoute
- self.dropouti = LockedDropout(dropouti)
- self.dropouths = nn.ModuleList([LockedDropout(dropouth) for l in range(n_layers)])
- def forward(self, input):
- """ Invoked during the forward propagation of the RNN_Encoder module.
- Args:
- input (Tensor): input of shape (sentence length x batch_size)
- Returns:
- raw_outputs (tuple(list (Tensor), list(Tensor)): list of tensors evaluated from each RNN layer without using
- dropouth, list of tensors evaluated from each RNN layer using dropouth,
- """
- sl,bs = input.size()
- if bs!=self.bs:
- self.bs=bs
- self.reset()
- with set_grad_enabled(self.training):
- emb = self.encoder_with_dropout(input, dropout=self.dropoute if self.training else 0)
- emb = self.dropouti(emb)
- raw_output = emb
- new_hidden,raw_outputs,outputs = [],[],[]
- for l, (rnn,drop) in enumerate(zip(self.rnns, self.dropouths)):
- current_input = raw_output
- with warnings.catch_warnings():
- warnings.simplefilter("ignore")
- raw_output, new_h = rnn(raw_output, self.hidden[l])
- new_hidden.append(new_h)
- raw_outputs.append(raw_output)
- if l != self.n_layers - 1: raw_output = drop(raw_output)
- outputs.append(raw_output)
- self.hidden = repackage_var(new_hidden)
- return raw_outputs, outputs
- def one_hidden(self, l):
- nh = (self.n_hid if l != self.n_layers - 1 else self.emb_sz)//self.ndir
- if IS_TORCH_04: return Variable(self.weights.new(self.ndir, self.bs, nh).zero_())
- else: return Variable(self.weights.new(self.ndir, self.bs, nh).zero_(), volatile=not self.training)
- def reset(self):
- if self.qrnn: [r.reset() for r in self.rnns]
- self.weights = next(self.parameters()).data
- if self.qrnn: self.hidden = [self.one_hidden(l) for l in range(self.n_layers)]
- else: self.hidden = [(self.one_hidden(l), self.one_hidden(l)) for l in range(self.n_layers)]
- class MultiBatchRNN(RNN_Encoder):
- def __init__(self, bptt, max_seq, *args, **kwargs):
- self.max_seq,self.bptt = max_seq,bptt
- super().__init__(*args, **kwargs)
- def concat(self, arrs):
- return [torch.cat([l[si] for l in arrs]) for si in range(len(arrs[0]))]
- def forward(self, input):
- sl,bs = input.size()
- for l in self.hidden:
- for h in l: h.data.zero_()
- raw_outputs, outputs = [],[]
- for i in range(0, sl, self.bptt):
- r, o = super().forward(input[i: min(i+self.bptt, sl)])
- if i>(sl-self.max_seq):
- raw_outputs.append(r)
- outputs.append(o)
- return self.concat(raw_outputs), self.concat(outputs)
- class LinearDecoder(nn.Module):
- initrange=0.1
- def __init__(self, n_out, n_hid, dropout, tie_encoder=None, bias=False):
- super().__init__()
- self.decoder = nn.Linear(n_hid, n_out, bias=bias)
- self.decoder.weight.data.uniform_(-self.initrange, self.initrange)
- self.dropout = LockedDropout(dropout)
- if bias: self.decoder.bias.data.zero_()
- if tie_encoder: self.decoder.weight = tie_encoder.weight
- def forward(self, input):
- raw_outputs, outputs = input
- output = self.dropout(outputs[-1])
- decoded = self.decoder(output.view(output.size(0)*output.size(1), output.size(2)))
- result = decoded.view(-1, decoded.size(1))
- return result, raw_outputs, outputs
- class LinearBlock(nn.Module):
- def __init__(self, ni, nf, drop):
- super().__init__()
- self.lin = nn.Linear(ni, nf)
- self.drop = nn.Dropout(drop)
- self.bn = nn.BatchNorm1d(ni)
- def forward(self, x): return self.lin(self.drop(self.bn(x)))
- class PoolingLinearClassifier(nn.Module):
- def __init__(self, layers, drops):
- super().__init__()
- self.layers = nn.ModuleList([
- LinearBlock(layers[i], layers[i + 1], drops[i]) for i in range(len(layers) - 1)])
- def pool(self, x, bs, is_max):
- f = F.adaptive_max_pool1d if is_max else F.adaptive_avg_pool1d
- return f(x.permute(1,2,0), (1,)).view(bs,-1)
- def forward(self, input):
- raw_outputs, outputs = input
- output = outputs[-1]
- sl,bs,_ = output.size()
- avgpool = self.pool(output, bs, False)
- mxpool = self.pool(output, bs, True)
- x = torch.cat([output[-1], mxpool, avgpool], 1)
- for l in self.layers:
- l_x = l(x)
- x = F.relu(l_x)
- return l_x, raw_outputs, outputs
- class SequentialRNN(nn.Sequential):
- def reset(self):
- for c in self.children():
- if hasattr(c, 'reset'): c.reset()
- def get_language_model(n_tok, emb_sz, n_hid, n_layers, pad_token,
- dropout=0.4, dropouth=0.3, dropouti=0.5, dropoute=0.1, wdrop=0.5, tie_weights=True, qrnn=False, bias=False):
- """Returns a SequentialRNN model.
- A RNN_Encoder layer is instantiated using the parameters provided.
- This is followed by the creation of a LinearDecoder layer.
- Also by default (i.e. tie_weights = True), the embedding matrix used in the RNN_Encoder
- is used to instantiate the weights for the LinearDecoder layer.
- The SequentialRNN layer is the native torch's Sequential wrapper that puts the RNN_Encoder and
- LinearDecoder layers sequentially in the model.
- Args:
- n_tok (int): number of unique vocabulary words (or tokens) in the source dataset
- emb_sz (int): the embedding size to use to encode each token
- n_hid (int): number of hidden activation per LSTM layer
- n_layers (int): number of LSTM layers to use in the architecture
- pad_token (int): the int value used for padding text.
- dropouth (float): dropout to apply to the activations going from one LSTM layer to another
- dropouti (float): dropout to apply to the input layer.
- dropoute (float): dropout to apply to the embedding layer.
- wdrop (float): dropout used for a LSTM's internal (or hidden) recurrent weights.
- tie_weights (bool): decide if the weights of the embedding matrix in the RNN encoder should be tied to the
- weights of the LinearDecoder layer.
- qrnn (bool): decide if the model is composed of LSTMS (False) or QRNNs (True).
- bias (bool): decide if the decoder should have a bias layer or not.
- Returns:
- A SequentialRNN model
- """
- rnn_enc = RNN_Encoder(n_tok, emb_sz, n_hid=n_hid, n_layers=n_layers, pad_token=pad_token,
- dropouth=dropouth, dropouti=dropouti, dropoute=dropoute, wdrop=wdrop, qrnn=qrnn)
- enc = rnn_enc.encoder if tie_weights else None
- return SequentialRNN(rnn_enc, LinearDecoder(n_tok, emb_sz, dropout, tie_encoder=enc, bias=bias))
- def get_rnn_classifier(bptt, max_seq, n_class, n_tok, emb_sz, n_hid, n_layers, pad_token, layers, drops, bidir=False,
- dropouth=0.3, dropouti=0.5, dropoute=0.1, wdrop=0.5, qrnn=False):
- rnn_enc = MultiBatchRNN(bptt, max_seq, n_tok, emb_sz, n_hid, n_layers, pad_token=pad_token, bidir=bidir,
- dropouth=dropouth, dropouti=dropouti, dropoute=dropoute, wdrop=wdrop, qrnn=qrnn)
- return SequentialRNN(rnn_enc, PoolingLinearClassifier(layers, drops))
- get_rnn_classifer=get_rnn_classifier
|