123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209 |
- from .core import *
- from .layers import *
- from .learner import *
- from .initializers import *
- model_meta = {
- resnet18:[8,6], resnet34:[8,6], resnet50:[8,6], resnet101:[8,6], resnet152:[8,6],
- vgg16:[0,22], vgg19:[0,22],
- resnext50:[8,6], resnext101:[8,6], resnext101_64:[8,6],
- wrn:[8,6], inceptionresnet_2:[-2,9], inception_4:[-1,9],
- dn121:[0,7], dn161:[0,7], dn169:[0,7], dn201:[0,7],
- }
- model_features = {inception_4: 3072, dn121: 2048, dn161: 4416,} # nasnetalarge: 4032*2}
- class ConvnetBuilder():
- """Class representing a convolutional network.
- Arguments:
- f: a model creation function (e.g. resnet34, vgg16, etc)
- c (int): size of the last layer
- is_multi (bool): is multilabel classification?
- (def here http://scikit-learn.org/stable/modules/multiclass.html)
- is_reg (bool): is a regression?
- ps (float or array of float): dropout parameters
- xtra_fc (list of ints): list of hidden layers with # hidden neurons
- xtra_cut (int): # layers earlier than default to cut the model, default is 0
- custom_head : add custom model classes that are inherited from nn.modules at the end of the model
- that is mentioned on Argument 'f'
- """
- def __init__(self, f, c, is_multi, is_reg, ps=None, xtra_fc=None, xtra_cut=0, custom_head=None, pretrained=True):
- self.f,self.c,self.is_multi,self.is_reg,self.xtra_cut = f,c,is_multi,is_reg,xtra_cut
- if xtra_fc is None: xtra_fc = [512]
- if ps is None: ps = [0.25]*len(xtra_fc) + [0.5]
- self.ps,self.xtra_fc = ps,xtra_fc
- if f in model_meta: cut,self.lr_cut = model_meta[f]
- else: cut,self.lr_cut = 0,0
- cut-=xtra_cut
- layers = cut_model(f(pretrained), cut)
- self.nf = model_features[f] if f in model_features else (num_features(layers)*2)
- if not custom_head: layers += [AdaptiveConcatPool2d(), Flatten()]
- self.top_model = nn.Sequential(*layers)
- n_fc = len(self.xtra_fc)+1
- if not isinstance(self.ps, list): self.ps = [self.ps]*n_fc
- if custom_head: fc_layers = [custom_head]
- else: fc_layers = self.get_fc_layers()
- self.n_fc = len(fc_layers)
- self.fc_model = to_gpu(nn.Sequential(*fc_layers))
- if not custom_head: apply_init(self.fc_model, kaiming_normal)
- self.model = to_gpu(nn.Sequential(*(layers+fc_layers)))
- @property
- def name(self): return f'{self.f.__name__}_{self.xtra_cut}'
- def create_fc_layer(self, ni, nf, p, actn=None):
- res=[nn.BatchNorm1d(num_features=ni)]
- if p: res.append(nn.Dropout(p=p))
- res.append(nn.Linear(in_features=ni, out_features=nf))
- if actn: res.append(actn)
- return res
- def get_fc_layers(self):
- res=[]
- ni=self.nf
- for i,nf in enumerate(self.xtra_fc):
- res += self.create_fc_layer(ni, nf, p=self.ps[i], actn=nn.ReLU())
- ni=nf
- final_actn = nn.Sigmoid() if self.is_multi else nn.LogSoftmax()
- if self.is_reg: final_actn = None
- res += self.create_fc_layer(ni, self.c, p=self.ps[-1], actn=final_actn)
- return res
- def get_layer_groups(self, do_fc=False):
- if do_fc:
- return [self.fc_model]
- idxs = [self.lr_cut]
- c = children(self.top_model)
- if len(c)==3: c = children(c[0])+c[1:]
- lgs = list(split_by_idxs(c,idxs))
- return lgs+[self.fc_model]
- class ConvLearner(Learner):
- """
- Class used to train a chosen supported covnet model. Eg. ResNet-34, etc.
- Arguments:
- data: training data for model
- models: model architectures to base learner
- precompute: bool to reuse precomputed activations
- **kwargs: parameters from Learner() class
- """
- def __init__(self, data, models, precompute=False, **kwargs):
- self.precompute = False
- super().__init__(data, models, **kwargs)
- if hasattr(data, 'is_multi') and not data.is_reg and self.metrics is None:
- self.metrics = [accuracy_thresh(0.5)] if self.data.is_multi else [accuracy]
- if precompute: self.save_fc1()
- self.freeze()
- self.precompute = precompute
- def _get_crit(self, data):
- if not hasattr(data, 'is_multi'): return super()._get_crit(data)
- return F.l1_loss if data.is_reg else F.binary_cross_entropy if data.is_multi else F.nll_loss
- @classmethod
- def pretrained(cls, f, data, ps=None, xtra_fc=None, xtra_cut=0, custom_head=None, precompute=False,
- pretrained=True, **kwargs):
- models = ConvnetBuilder(f, data.c, data.is_multi, data.is_reg,
- ps=ps, xtra_fc=xtra_fc, xtra_cut=xtra_cut, custom_head=custom_head, pretrained=pretrained)
- return cls(data, models, precompute, **kwargs)
- @classmethod
- def lsuv_learner(cls, f, data, ps=None, xtra_fc=None, xtra_cut=0, custom_head=None, precompute=False,
- needed_std=1.0, std_tol=0.1, max_attempts=10, do_orthonorm=False, **kwargs):
- models = ConvnetBuilder(f, data.c, data.is_multi, data.is_reg,
- ps=ps, xtra_fc=xtra_fc, xtra_cut=xtra_cut, custom_head=custom_head, pretrained=False)
- convlearn=cls(data, models, precompute, **kwargs)
- convlearn.lsuv_init()
- return convlearn
-
- @property
- def model(self): return self.models.fc_model if self.precompute else self.models.model
-
- def half(self):
- if self.fp16: return
- self.fp16 = True
- if type(self.model) != FP16: self.models.model = FP16(self.model)
- if not isinstance(self.models.fc_model, FP16): self.models.fc_model = FP16(self.models.fc_model)
- def float(self):
- if not self.fp16: return
- self.fp16 = False
- if type(self.models.model) == FP16: self.models.model = self.model.module.float()
- if type(self.models.fc_model) == FP16: self.models.fc_model = self.models.fc_model.module.float()
- @property
- def data(self): return self.fc_data if self.precompute else self.data_
- def create_empty_bcolz(self, n, name):
- return bcolz.carray(np.zeros((0,n), np.float32), chunklen=1, mode='w', rootdir=name)
- def set_data(self, data, precompute=False):
- super().set_data(data)
- if precompute:
- self.unfreeze()
- self.save_fc1()
- self.freeze()
- self.precompute = True
- else:
- self.freeze()
- def get_layer_groups(self):
- return self.models.get_layer_groups(self.precompute)
- def summary(self):
- precompute = self.precompute
- self.precompute = False
- res = super().summary()
- self.precompute = precompute
- return res
- def get_activations(self, force=False):
- tmpl = f'_{self.models.name}_{self.data.sz}.bc'
- # TODO: Somehow check that directory names haven't changed (e.g. added test set)
- names = [os.path.join(self.tmp_path, p+tmpl) for p in ('x_act', 'x_act_val', 'x_act_test')]
- if os.path.exists(names[0]) and not force:
- self.activations = [bcolz.open(p) for p in names]
- else:
- self.activations = [self.create_empty_bcolz(self.models.nf,n) for n in names]
- def save_fc1(self):
- self.get_activations()
- act, val_act, test_act = self.activations
- m=self.models.top_model
- if len(self.activations[0])!=len(self.data.trn_ds):
- predict_to_bcolz(m, self.data.fix_dl, act)
- if len(self.activations[1])!=len(self.data.val_ds):
- predict_to_bcolz(m, self.data.val_dl, val_act)
- if self.data.test_dl and (len(self.activations[2])!=len(self.data.test_ds)):
- if self.data.test_dl: predict_to_bcolz(m, self.data.test_dl, test_act)
- self.fc_data = ImageClassifierData.from_arrays(self.data.path,
- (act, self.data.trn_y), (val_act, self.data.val_y), self.data.bs, classes=self.data.classes,
- test = test_act if self.data.test_dl else None, num_workers=8)
- def freeze(self):
- """ Freeze all but the very last layer.
- Make all layers untrainable (i.e. frozen) except for the last layer.
- Returns:
- None
- """
- self.freeze_to(-1)
- def unfreeze(self):
- """ Unfreeze all layers.
- Make all layers trainable by unfreezing. This will also set the `precompute` to `False` since we can
- no longer pre-calculate the activation of frozen layers.
- Returns:
- None
- """
- self.freeze_to(0)
- self.precompute = False
|