123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402 |
- "Provides convenient callbacks for Learners that write model images, metrics/losses, stats and histograms to Tensorboard"
- from fastai.basic_train import Learner
- from fastai.basic_data import DatasetType, DataBunch
- from fastai.vision import Image
- from fastai.callbacks import LearnerCallback
- from fastai.core import *
- from fastai.torch_core import *
- from threading import Thread, Event
- from time import sleep
- from queue import Queue
- import statistics
- import torchvision.utils as vutils
- from abc import ABC, abstractmethod
- from tensorboardX import SummaryWriter
- __all__=['LearnerTensorboardWriter', 'GANTensorboardWriter', 'ImageGenTensorboardWriter']
- class LearnerTensorboardWriter(LearnerCallback):
- def __init__(self, learn:Learner, base_dir:Path, name:str, loss_iters:int=25, hist_iters:int=500, stats_iters:int=100):
- super().__init__(learn=learn)
- self.base_dir = base_dir
- self.name = name
- log_dir = base_dir/name
- self.tbwriter = SummaryWriter(log_dir=str(log_dir))
- self.loss_iters = loss_iters
- self.hist_iters = hist_iters
- self.stats_iters = stats_iters
- self.hist_writer = HistogramTBWriter()
- self.stats_writer = ModelStatsTBWriter()
- self.data = None
- self.metrics_root = '/metrics/'
- self._update_batches_if_needed()
- def _update_batches_if_needed(self):
- # one_batch function is extremely slow with large datasets. This is an optimization.
- # Note that also we want to always show the same batches so we can see changes
- # in tensorboard
- update_batches = self.data is not self.learn.data
- if update_batches:
- self.data = self.learn.data
- self.trn_batch = self.learn.data.one_batch(
- ds_type=DatasetType.Train, detach=True, denorm=False, cpu=False)
- self.val_batch = self.learn.data.one_batch(
- ds_type=DatasetType.Valid, detach=True, denorm=False, cpu=False)
- def _write_model_stats(self, iteration:int):
- self.stats_writer.write(
- model=self.learn.model, iteration=iteration, tbwriter=self.tbwriter)
- def _write_training_loss(self, iteration:int, last_loss:Tensor):
- scalar_value = to_np(last_loss)
- tag = self.metrics_root + 'train_loss'
- self.tbwriter.add_scalar(tag=tag, scalar_value=scalar_value, global_step=iteration)
- def _write_weight_histograms(self, iteration:int):
- self.hist_writer.write(
- model=self.learn.model, iteration=iteration, tbwriter=self.tbwriter)
- #TODO: Relying on a specific hardcoded start_idx here isn't great. Is there a better solution?
- def _write_metrics(self, iteration:int, last_metrics:MetricsList, start_idx:int=2):
- recorder = self.learn.recorder
- for i, name in enumerate(recorder.names[start_idx:]):
- if len(last_metrics) < i+1: return
- scalar_value = last_metrics[i]
- tag = self.metrics_root + name
- self.tbwriter.add_scalar(tag=tag, scalar_value=scalar_value, global_step=iteration)
- def on_batch_end(self, last_loss:Tensor, iteration:int, **kwargs):
- if iteration == 0: return
- self._update_batches_if_needed()
- if iteration % self.loss_iters == 0:
- self._write_training_loss(iteration=iteration, last_loss=last_loss)
- if iteration % self.hist_iters == 0:
- self._write_weight_histograms(iteration=iteration)
- # Doing stuff here that requires gradient info, because they get zeroed out afterwards in training loop
- def on_backward_end(self, iteration:int, **kwargs):
- if iteration == 0: return
- self._update_batches_if_needed()
- if iteration % self.stats_iters == 0:
- self._write_model_stats(iteration=iteration)
- def on_epoch_end(self, last_metrics:MetricsList, iteration:int, **kwargs):
- self._write_metrics(iteration=iteration, last_metrics=last_metrics)
- # TODO: We're overriding almost everything here. Seems like a good idea to question that ("is a" vs "has a")
- class GANTensorboardWriter(LearnerTensorboardWriter):
- def __init__(self, learn:Learner, base_dir:Path, name:str, loss_iters:int=25, hist_iters:int=500,
- stats_iters:int=100, visual_iters:int=100):
- super().__init__(learn=learn, base_dir=base_dir, name=name, loss_iters=loss_iters,
- hist_iters=hist_iters, stats_iters=stats_iters)
- self.visual_iters = visual_iters
- self.img_gen_vis = ImageTBWriter()
- self.gen_stats_updated = True
- self.crit_stats_updated = True
- # override
- def _write_weight_histograms(self, iteration:int):
- trainer = self.learn.gan_trainer
- generator = trainer.generator
- critic = trainer.critic
- self.hist_writer.write(
- model=generator, iteration=iteration, tbwriter=self.tbwriter, name='generator')
- self.hist_writer.write(
- model=critic, iteration=iteration, tbwriter=self.tbwriter, name='critic')
- # override
- def _write_model_stats(self, iteration:int):
- trainer = self.learn.gan_trainer
- generator = trainer.generator
- critic = trainer.critic
- # Don't want to write stats when model is not iterated on and hence has zeroed out gradients
- gen_mode = trainer.gen_mode
- if gen_mode and not self.gen_stats_updated:
- self.stats_writer.write(
- model=generator, iteration=iteration, tbwriter=self.tbwriter, name='gen_model_stats')
- self.gen_stats_updated = True
- if not gen_mode and not self.crit_stats_updated:
- self.stats_writer.write(
- model=critic, iteration=iteration, tbwriter=self.tbwriter, name='crit_model_stats')
- self.crit_stats_updated = True
- # override
- def _write_training_loss(self, iteration:int, last_loss:Tensor):
- trainer = self.learn.gan_trainer
- recorder = trainer.recorder
- if len(recorder.losses) > 0:
- scalar_value = to_np((recorder.losses[-1:])[0])
- tag = self.metrics_root + 'train_loss'
- self.tbwriter.add_scalar(tag=tag, scalar_value=scalar_value, global_step=iteration)
- def _write(self, iteration:int):
- trainer = self.learn.gan_trainer
- #TODO: Switching gen_mode temporarily seems a bit hacky here. Certainly not a good side-effect. Is there a better way?
- gen_mode = trainer.gen_mode
- try:
- trainer.switch(gen_mode=True)
- self.img_gen_vis.write(learn=self.learn, trn_batch=self.trn_batch, val_batch=self.val_batch,
- iteration=iteration, tbwriter=self.tbwriter)
- finally:
- trainer.switch(gen_mode=gen_mode)
- # override
- def on_batch_end(self, iteration:int, **kwargs):
- super().on_batch_end(iteration=iteration, **kwargs)
- if iteration == 0: return
- if iteration % self.visual_iters == 0:
- self._write(iteration=iteration)
- # override
- def on_backward_end(self, iteration:int, **kwargs):
- if iteration == 0: return
- self._update_batches_if_needed()
- #TODO: This could perhaps be implemented as queues of requests instead but that seemed like overkill.
- # But I'm not the biggest fan of maintaining these boolean flags either... Review pls.
- if iteration % self.stats_iters == 0:
- self.gen_stats_updated = False
- self.crit_stats_updated = False
- if not (self.gen_stats_updated and self.crit_stats_updated):
- self._write_model_stats(iteration=iteration)
- class ImageGenTensorboardWriter(LearnerTensorboardWriter):
- def __init__(self, learn:Learner, base_dir:Path, name:str, loss_iters:int=25, hist_iters:int=500,
- stats_iters: int = 100, visual_iters: int = 100):
- super().__init__(learn=learn, base_dir=base_dir, name=name, loss_iters=loss_iters, hist_iters=hist_iters,
- stats_iters=stats_iters)
- self.visual_iters = visual_iters
- self.img_gen_vis = ImageTBWriter()
- def _write(self, iteration:int):
- self.img_gen_vis.write(learn=self.learn, trn_batch=self.trn_batch, val_batch=self.val_batch,
- iteration=iteration, tbwriter=self.tbwriter)
- # override
- def on_batch_end(self, iteration:int, **kwargs):
- super().on_batch_end(iteration=iteration, **kwargs)
- if iteration == 0: return
- if iteration % self.visual_iters == 0:
- self._write(iteration=iteration)
- #------PRIVATE-----------
- class TBWriteRequest(ABC):
- def __init__(self, tbwriter: SummaryWriter, iteration:int):
- super().__init__()
- self.tbwriter = tbwriter
- self.iteration = iteration
- @abstractmethod
- def write(self):
- pass
- # SummaryWriter writes tend to block quite a bit. This gets around that and greatly boosts performance.
- # Not all tensorboard writes are using this- just the ones that take a long time. Note that the
- # SummaryWriter does actually use a threadsafe consumer/producer design ultimately to write to Tensorboard,
- # so writes done outside of this async loop should be fine.
- class AsyncTBWriter():
- def __init__(self):
- super().__init__()
- self.stop_request = Event()
- self.queue = Queue()
- self.thread = Thread(target=self._queue_processor, daemon=True)
- self.thread.start()
- def request_write(self, request: TBWriteRequest):
- if self.stop_request.isSet():
- raise Exception('Close was already called! Cannot perform this operation.')
- self.queue.put(request)
- def _queue_processor(self):
- while not self.stop_request.isSet():
- while not self.queue.empty():
- request = self.queue.get()
- request.write()
- sleep(0.2)
- #Provided this to stop thread explicitly or by context management (with statement) but thread should end on its own
- # upon program exit, due to being a daemon. So using this is probably unecessary.
- def close(self):
- self.stop_request.set()
- self.thread.join()
- def __enter__(self):
- # Nothing to do, thread already started. Could start thread here to enforce use of context manager
- # (but that sounds like a pain and a bit unweildy and unecessary for actual usage)
- pass
- def __exit__(self, exc_type, exc_value, traceback):
- self.close()
- asyncTBWriter = AsyncTBWriter()
- class ModelImageSet():
- @staticmethod
- def get_list_from_model(learn:Learner, ds_type:DatasetType, batch:Tuple)->[]:
- image_sets = []
- x,y = batch[0],batch[1]
- preds = learn.pred_batch(ds_type=ds_type, batch=(x,y), reconstruct=True)
-
- for orig_px, real_px, gen in zip(x,y,preds):
- orig = Image(px=orig_px)
- real = Image(px=real_px)
- image_set = ModelImageSet(orig=orig, real=real, gen=gen)
- image_sets.append(image_set)
- return image_sets
- def __init__(self, orig:Image, real:Image, gen:Image):
- self.orig = orig
- self.real = real
- self.gen = gen
- class HistogramTBRequest(TBWriteRequest):
- def __init__(self, model:nn.Module, iteration:int, tbwriter:SummaryWriter, name:str):
- super().__init__(tbwriter=tbwriter, iteration=iteration)
- self.params = [(name, values.clone().detach().cpu()) for (name, values) in model.named_parameters()]
- self.name = name
- # override
- def write(self):
- try:
- for param_name, values in self.params:
- tag = self.name + '/weights/' + param_name
- self.tbwriter.add_histogram(tag=tag, values=values, global_step=self.iteration)
- except Exception as e:
- print(("Failed to write model histograms to Tensorboard: {0}").format(e))
- #If this isn't done async then this is sloooooow
- class HistogramTBWriter():
- def __init__(self):
- super().__init__()
- def write(self, model:nn.Module, iteration:int, tbwriter:SummaryWriter, name:str='model'):
- request = HistogramTBRequest(model=model, iteration=iteration, tbwriter=tbwriter, name=name)
- asyncTBWriter.request_write(request)
- class ModelStatsTBRequest(TBWriteRequest):
- def __init__(self, model:nn.Module, iteration:int, tbwriter:SummaryWriter, name:str):
- super().__init__(tbwriter=tbwriter, iteration=iteration)
- self.gradients = [x.grad.clone().detach().cpu() for x in model.parameters() if x.grad is not None]
- self.name = name
- self.gradients_root = '/gradients/'
- # override
- def write(self):
- try:
- if len(self.gradients) == 0: return
- gradient_nps = [to_np(x.data) for x in self.gradients]
- avg_norm = sum(x.data.norm() for x in self.gradients)/len(self.gradients)
- self.tbwriter.add_scalar(
- tag=self.name + self.gradients_root + 'avg_norm', scalar_value=avg_norm, global_step=self.iteration)
- median_norm = statistics.median(x.data.norm() for x in self.gradients)
- self.tbwriter.add_scalar(
- tag=self.name + self.gradients_root + 'median_norm', scalar_value=median_norm, global_step=self.iteration)
- max_norm = max(x.data.norm() for x in self.gradients)
- self.tbwriter.add_scalar(
- tag=self.name + self.gradients_root + 'max_norm', scalar_value=max_norm, global_step=self.iteration)
- min_norm = min(x.data.norm() for x in self.gradients)
- self.tbwriter.add_scalar(
- tag=self.name + self.gradients_root + 'min_norm', scalar_value=min_norm, global_step=self.iteration)
- num_zeros = sum((np.asarray(x) == 0.0).sum() for x in gradient_nps)
- self.tbwriter.add_scalar(
- tag=self.name + self.gradients_root + 'num_zeros', scalar_value=num_zeros, global_step=self.iteration)
- avg_gradient = sum(x.data.mean() for x in self.gradients)/len(self.gradients)
- self.tbwriter.add_scalar(
- tag=self.name + self.gradients_root + 'avg_gradient', scalar_value=avg_gradient, global_step=self.iteration)
- median_gradient = statistics.median(x.data.median() for x in self.gradients)
- self.tbwriter.add_scalar(
- tag=self.name + self.gradients_root + 'median_gradient', scalar_value=median_gradient, global_step=self.iteration)
- max_gradient = max(x.data.max() for x in self.gradients)
- self.tbwriter.add_scalar(
- tag=self.name + self.gradients_root + 'max_gradient', scalar_value=max_gradient, global_step=self.iteration)
- min_gradient = min(x.data.min() for x in self.gradients)
- self.tbwriter.add_scalar(
- tag=self.name + self.gradients_root + 'min_gradient', scalar_value=min_gradient, global_step=self.iteration)
- except Exception as e:
- print(("Failed to write model stats to Tensorboard: {0}").format(e))
- class ModelStatsTBWriter():
- def write(self, model:nn.Module, iteration:int, tbwriter:SummaryWriter, name:str='model_stats'):
- request = ModelStatsTBRequest(model=model, iteration=iteration, tbwriter=tbwriter, name=name)
- asyncTBWriter.request_write(request)
- class ImageTBRequest(TBWriteRequest):
- def __init__(self, learn:Learner, batch:Tuple, iteration:int, tbwriter:SummaryWriter, ds_type:DatasetType):
- super().__init__(tbwriter=tbwriter, iteration=iteration)
- self.image_sets = ModelImageSet.get_list_from_model(learn=learn, batch=batch, ds_type=ds_type)
- self.ds_type = ds_type
- # override
- def write(self):
- try:
- orig_images = []
- gen_images = []
- real_images = []
- for image_set in self.image_sets:
- orig_images.append(image_set.orig.px)
- gen_images.append(image_set.gen.px)
- real_images.append(image_set.real.px)
- prefix = self.ds_type.name
- self.tbwriter.add_image(
- tag=prefix + ' orig images', img_tensor=vutils.make_grid(orig_images, normalize=True),
- global_step=self.iteration)
- self.tbwriter.add_image(
- tag=prefix + ' gen images', img_tensor=vutils.make_grid(gen_images, normalize=True),
- global_step=self.iteration)
- self.tbwriter.add_image(
- tag=prefix + ' real images', img_tensor=vutils.make_grid(real_images, normalize=True),
- global_step=self.iteration)
- except Exception as e:
- print(("Failed to write images to Tensorboard: {0}").format(e))
- #If this isn't done async then this is noticeably slower
- class ImageTBWriter():
- def __init__(self):
- super().__init__()
- def write(self, learn:Learner, trn_batch:Tuple, val_batch:Tuple, iteration:int, tbwriter:SummaryWriter):
- self._write_for_dstype(learn=learn, batch=val_batch, iteration=iteration,
- tbwriter=tbwriter, ds_type=DatasetType.Valid)
- self._write_for_dstype(learn=learn, batch=trn_batch, iteration=iteration,
- tbwriter=tbwriter, ds_type=DatasetType.Train)
- def _write_for_dstype(self, learn:Learner, batch:Tuple, iteration:int, tbwriter:SummaryWriter, ds_type:DatasetType):
- request = ImageTBRequest(learn=learn, batch=batch, iteration=iteration, tbwriter=tbwriter, ds_type=ds_type)
- asyncTBWriter.request_write(request)
|