Removed obsolete auto-encoder code

Signed-off-by: Jim Martens <github@2martens.de>
This commit is contained in:
Jim Martens 2019-09-02 13:10:12 +02:00
parent afdf7c96aa
commit d58179fa2a
10 changed files with 2 additions and 1578 deletions

View File

@ -18,7 +18,6 @@
Masterthesis package.
Subpackages:
``aae``: provides an implementation of Adversarial Auto Encoders
``ssd_keras``: provides an implementation of SSD
Modules:

View File

@ -1,29 +0,0 @@
# -*- coding: utf-8 -*-
#
# Copyright 2019 Jim Martens
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Provides an AAE implementation.
Modules:
``model``: provides the keras models of the AAE implementation
``train``: provides functionality to train the AAE
``util``: provides helper functionality for visualization
Todos:
- make the implementation compatible with the YCB Video dataset
"""

View File

@ -1,110 +0,0 @@
# -*- coding: utf-8 -*-
#
# Copyright 2019 Jim Martens
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Data functionality for my AAE implementation.
This module provides a function to prepare the training data.
Functions:
prepare_training_data(...): prepares the mnist training data
"""
import pickle
from typing import Sequence
from typing import Tuple
import numpy as np
import tensorflow as tf
K = tf.keras.backend
def prepare_training_data(test_fold_id: int,
inlier_classes: Sequence[int],
total_classes: int,
fold_prefix: str = 'data/data_fold_',
batch_size: int = 128,
folds: int = 5) -> Tuple[tf.data.Dataset, tf.data.Dataset]:
"""
Prepares the MNIST training data.
Args:
test_fold_id: id of test fold
inlier_classes: list of class ids that are considered inliers
total_classes: total number of classes
fold_prefix: the prefix for the fold pickle files (default: 'data/data_fold_')
batch_size: size of batch (default: 128)
folds: number of folds (default: 5)
Returns:
A tuple (train dataset, valid dataset)
"""
# prepare data
mnist_train = []
mnist_valid = []
for i in range(folds):
if i != test_fold_id: # exclude testing fold, representing 20% of each class
with open(f"{fold_prefix}{i:d}.pkl", 'rb') as pkl:
fold = pickle.load(pkl)
if len(mnist_valid) == 0: # single out one fold, comprising 20% of each class
mnist_valid = fold
else: # form train set from remaining folds, comprising 60% of each class
mnist_train += fold
outlier_classes = []
for i in range(total_classes):
if i not in inlier_classes:
outlier_classes.append(i)
# keep only train classes
mnist_train = [x for x in mnist_train if x[0] in inlier_classes]
def _list_of_pairs_to_numpy(list_of_pairs: Sequence[Tuple[int, np.ndarray]]) -> Tuple[np.ndarray, np.ndarray]:
"""
Converts a list of pairs to a numpy array.
Args:
list_of_pairs: list of pairs
Returns:
tuple (feature array, label array)
"""
return np.asarray([x[1] for x in list_of_pairs], np.float32), np.asarray([x[0] for x in list_of_pairs], np.int)
mnist_train_x, mnist_train_y = _list_of_pairs_to_numpy(mnist_train)
mnist_valid_x, mnist_valid_y = _list_of_pairs_to_numpy(mnist_valid)
# get dataset
train_dataset = tf.data.Dataset.from_tensor_slices((mnist_train_x, mnist_train_y))
train_dataset = train_dataset.shuffle(mnist_train_x.shape[0]).batch(batch_size,
drop_remainder=True).map(_normalize)
valid_dataset = tf.data.Dataset.from_tensor_slices((mnist_valid_x, mnist_valid_y))
valid_dataset = valid_dataset.shuffle(mnist_valid_x.shape[0]).batch(batch_size,
drop_remainder=True).map(_normalize)
return train_dataset, valid_dataset
def _normalize(feature: tf.Tensor, label: tf.Tensor) -> Tuple[tf.Tensor, tf.Tensor]:
"""
Normalizes a tensor from a 0-255 range to a 0-1 range and adds one dimension.
:param feature: tensor to be normalized
:param label: label tensor
:return: normalized tensor
"""
return K.expand_dims(tf.divide(feature, 255.0)), label

View File

@ -1,193 +0,0 @@
# -*- coding: utf-8 -*-
#
# Copyright 2019 Jim Martens
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Provides the models of my AAE implementation.
Classes:
``Encoder``: encodes an image input to a latent space
``Decoder``: decodes data from a latent space to resemble input data
``XDiscriminator``: differentiates between real input data and decoded input data
``ZDiscriminator``: differentiates between z values drawn from a normal distribution (real) and the encoded input
(fake)
"""
import tensorflow as tf
# shortcuts for tensorflow - quasi imports
keras = tf.keras
k = tf.keras.backend
class Encoder(keras.Model):
"""
Encodes input to a latent space.
Args:
zsize: size of the latent space
"""
def __init__(self, zsize: int) -> None:
super().__init__(name='encoder')
weight_init = keras.initializers.RandomNormal(mean=0, stddev=0.02)
self.conv1 = keras.layers.Conv2D(filters=zsize * 4, kernel_size=3, strides=2, name='conv1',
padding='same', kernel_initializer=weight_init,
activation=keras.activations.sigmoid)
self.conv2 = keras.layers.Conv2D(filters=zsize * 2, kernel_size=3, strides=2, name='conv2',
padding='same', kernel_initializer=weight_init)
self.conv2_a = keras.layers.ReLU()
self.conv3 = keras.layers.Conv2D(filters=zsize, kernel_size=3, strides=2, name='conv3',
padding='same', kernel_initializer=weight_init)
self.conv3_a = keras.layers.ReLU()
self.flatten = keras.layers.Flatten(name='flatten')
self.latent = keras.layers.Dense(units=zsize * (2 ** 5), name='latent')
def call(self, inputs: tf.Tensor, **kwargs) -> tf.Tensor:
"""See base class."""
result = self.conv1(inputs)
result = self.conv2(result)
result = self.conv2_a(result)
result = self.conv3(result)
result = self.conv3_a(result)
result = self.flatten(result)
result = self.latent(result)
return result
class Decoder(keras.Model):
"""
Generates input data from latent space values.
"""
def __init__(self, channels: int, zsize: int, image_size: int) -> None:
"""
Initializes the Decoder class.
Args:
channels: number of channels in the input image
zsize: size of the latent space
image_size: size of height/width of input image
"""
super().__init__(name='decoder')
weight_init = keras.initializers.RandomNormal(mean=0, stddev=0.02)
# calculate dimension of last conv layer in encoder
conv_image_size = image_size / (2 ** 3)
dimensions = zsize * conv_image_size * conv_image_size
self.conv_shape = (-1, conv_image_size, conv_image_size, zsize)
self.transform = keras.layers.Dense(units=dimensions, name='input_transform')
self.deconv1 = keras.layers.Conv2DTranspose(filters=zsize, kernel_size=3, strides=1, name='deconv1',
padding='same', kernel_initializer=weight_init)
self.deconv1_a = keras.layers.ReLU()
self.deconv2 = keras.layers.Conv2DTranspose(filters=zsize * 2, kernel_size=3, strides=2, name='deconv2',
padding='same', kernel_initializer=weight_init)
self.deconv2_a = keras.layers.ReLU()
self.deconv3 = keras.layers.Conv2DTranspose(filters=zsize * 4, kernel_size=3, strides=2, name='deconv3',
padding='same', kernel_initializer=weight_init)
self.deconv3_a = keras.layers.ReLU()
self.deconv4 = keras.layers.Conv2DTranspose(filters=channels, kernel_size=3, strides=2, name='deconv4',
padding='same', kernel_initializer=weight_init)
def call(self, inputs: tf.Tensor, **kwargs) -> tf.Tensor:
"""See base class."""
result = self.transform(inputs)
result = tf.reshape(result, self.conv_shape)
result = self.deconv1(result)
result = self.deconv1_a(result)
result = self.deconv2(result)
result = self.deconv2_a(result)
result = self.deconv3(result)
result = self.deconv3_a(result)
result = self.deconv4(result)
result = k.sigmoid(result)
return result
class ZDiscriminator(keras.Model):
"""
Discriminates between encoded inputs and latent space distribution.
The latent space value is drawn from a normal distribution with ``0`` mean
and a variance of ``1``.
"""
def __init__(self) -> None:
super().__init__(name='zdiscriminator')
weight_init = keras.initializers.RandomNormal(mean=0, stddev=0.02)
self.zd1 = keras.layers.Dense(units=128, name='zd1', kernel_initializer=weight_init)
self.zd1_a = keras.layers.LeakyReLU(alpha=0.2)
self.zd2 = keras.layers.Dense(units=128, name='zd2', kernel_initializer=weight_init)
self.zd2_a = keras.layers.LeakyReLU(alpha=0.2)
self.zd3 = keras.layers.Dense(units=1, name='zd3', activation='sigmoid',
kernel_initializer=weight_init)
def call(self, inputs: tf.Tensor, **kwargs) -> tf.Tensor:
"""See base class."""
result = self.zd1(inputs)
result = self.zd1_a(result)
result = self.zd2(result)
result = self.zd2_a(result)
result = self.zd3(result)
return result
class XDiscriminator(keras.Model):
"""
Discriminates between generated inputs and the actual inputs.
"""
def __init__(self) -> None:
super().__init__(name='xdiscriminator')
weight_init = keras.initializers.RandomNormal(mean=0, stddev=0.02)
self.x_padded = keras.layers.ZeroPadding2D(padding=1)
self.xd1 = keras.layers.Conv2D(filters=64, kernel_size=4, strides=2, name='xd1',
padding='valid', kernel_initializer=weight_init)
self.xd1_a = keras.layers.LeakyReLU(alpha=0.2)
self.xd1_a_padded = keras.layers.ZeroPadding2D(padding=1)
self.xd2 = keras.layers.Conv2D(filters=256, kernel_size=4, strides=2, name='xd2',
padding='valid', kernel_initializer=weight_init)
self.xd2_bn = keras.layers.BatchNormalization()
self.xd2_a = keras.layers.LeakyReLU(alpha=0.2)
self.xd2_a_padded = keras.layers.ZeroPadding2D(padding=1)
self.xd3 = keras.layers.Conv2D(filters=512, kernel_size=4, strides=2, name='xd3',
padding='valid', kernel_initializer=weight_init)
self.xd3_bn = keras.layers.BatchNormalization()
self.xd3_a = keras.layers.LeakyReLU(alpha=0.2)
self.xd4 = keras.layers.Conv2D(filters=1, kernel_size=4, strides=1, name='xd4',
padding='valid', kernel_initializer=weight_init,
activation='sigmoid')
def call(self, inputs: tf.Tensor, **kwargs) -> tf.Tensor:
"""See base class."""
result = self.x_padded(inputs)
result = self.xd1(result)
result = self.xd1_a(result)
result = self.xd1_a_padded(result)
result = self.xd2(result)
result = self.xd2_bn(result)
result = self.xd2_a(result)
result = self.xd2_a_padded(result)
result = self.xd3(result)
result = self.xd3_bn(result)
result = self.xd3_a(result)
result = self.xd4(result)
return result

View File

@ -1,152 +0,0 @@
# -*- coding: utf-8 -*-
#
# Copyright 2019 Jim Martens
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Functionality to run my auto-encoder implementation.
This module provides a function to run a trained simple auto-encoder.
Functions:
run_simple(...): runs a trained simple auto-encoder
"""
import os
import time
from typing import Dict, Tuple
import tensorflow as tf
from tensorflow.python.ops import summary_ops_v2
# shortcuts for tensorflow sub packages and classes
from twomartens.masterthesis.aae import model, train, util
K = tf.keras.backend
tfe = tf.contrib.eager
def run_simple(dataset: tf.data.Dataset,
iteration: int,
weights_prefix: str,
image_size: int,
channels: int = 3,
zsize: int = 64,
batch_size: int = 16,
verbose: bool = False) -> None:
"""
Runs the trained auto-encoder for given data set.
This function runs the trained auto-encoder
Args:
dataset: run dataset
iteration: identifier for the used training run
weights_prefix: prefix for trained weights directory
image_size: height/width of input image
channels: number of channels in input image (default: 3)
zsize: size of the intermediary z (default: 64)
batch_size: size of each batch (default: 16)
verbose: if True training progress is printed to console (default: False)
"""
# checkpointed tensors and variables
checkpointables = {
# get models
'encoder': model.Encoder(zsize),
'decoder': model.Decoder(channels, zsize, image_size),
}
global_step = tf.train.get_or_create_global_step()
# checkpoint
checkpoint_dir = os.path.join(weights_prefix, str(iteration) + '/')
os.makedirs(checkpoint_dir, exist_ok=True)
latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint = tf.train.Checkpoint(**checkpointables)
checkpoint.restore(latest_checkpoint)
outputs = _run_one_epoch_simple(dataset,
batch_size=batch_size,
global_step=global_step,
**checkpointables)
if verbose:
print((
f"run time: {outputs['time']:.2f}, "
f"Encoder + Decoder loss: {outputs['enc_dec_loss']:.3f}"
))
def _run_one_epoch_simple(dataset: tf.data.Dataset,
batch_size: int,
encoder: model.Encoder,
decoder: model.Decoder,
global_step: tf.Variable) -> Dict[str, float]:
with summary_ops_v2.always_record_summaries():
start_time = time.time()
enc_dec_loss_avg = tfe.metrics.Mean(name='encoder_decoder_loss',
dtype=tf.float32)
for x in dataset:
reconstruction_loss, x_decoded = _run_enc_dec_step_simple(encoder=encoder,
decoder=decoder,
inputs=x,
global_step=global_step)
enc_dec_loss_avg(reconstruction_loss)
if int(global_step % train.LOG_FREQUENCY) == 0:
comparison = K.concatenate([x[:int(batch_size / 2)], x_decoded[:int(batch_size / 2)]], axis=0)
grid = util.prepare_image(comparison.cpu(), nrow=int(batch_size / 2))
summary_ops_v2.image(name='reconstruction',
tensor=K.expand_dims(grid, axis=0), max_images=1,
step=global_step)
global_step.assign_add(1)
end_time = time.time()
run_time = end_time - start_time
# final losses of epoch
outputs = {
'enc_dec_loss': enc_dec_loss_avg.result(False),
'run_time': run_time
}
return outputs
def _run_enc_dec_step_simple(encoder: model.Encoder, decoder: model.Decoder,
inputs: tf.Tensor,
global_step: tf.Variable) -> Tuple[tf.Tensor, tf.Tensor]:
"""
Runs the encoder and decoder jointly for one step (one batch).
Args:
encoder: instance of encoder model
decoder: instance of decoder model
inputs: inputs from data set
global_step: the global step variable
Returns:
tuple of reconstruction loss, reconstructed input, latent space value
"""
z = encoder(inputs)
x_decoded = decoder(z)
reconstruction_loss = tf.losses.log_loss(inputs, x_decoded)
if int(global_step % train.LOG_FREQUENCY) == 0:
summary_ops_v2.scalar(name='reconstruction_loss', tensor=reconstruction_loss,
step=global_step)
return reconstruction_loss, x_decoded

View File

@ -1,247 +0,0 @@
# -*- coding: utf-8 -*-
#
# Copyright 2019 Jim Martens
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Training functionality for my AAE implementation.
This module provides a function to train a simple auto-encoder.
Attributes:
LOG_FREQUENCY: number of steps that must pass before logging happens
Functions:
train_simple(...): trains a simple auto-encoder only with reconstruction loss
"""
import os
import time
from typing import Dict
from typing import Tuple
import tensorflow as tf
from tensorflow.python.ops import summary_ops_v2
from twomartens.masterthesis.aae import model
from twomartens.masterthesis.aae import util
# shortcuts for tensorflow sub packages and classes
K = tf.keras.backend
tfe = tf.contrib.eager
LOG_FREQUENCY: int = 10
def train_simple(dataset: tf.data.Dataset,
iteration: int,
weights_prefix: str,
image_size: int,
channels: int = 3,
zsize: int = 64,
lr: float = 0.0001,
train_epoch: int = 1,
batch_size: int = 16,
verbose: bool = False) -> None:
"""
Trains auto-encoder for given data set.
This function creates checkpoints after every
epoch as well as after finishing training (or stopping early). When starting
this function with the same ``iteration`` then the training will try to
continue where it ended last time by restoring a saved checkpoint.
The loss values are provided as scalar summaries. Reconstruction images are
provided as summary images.
Args:
dataset: train dataset
iteration: identifier for the current training run
weights_prefix: prefix for weights directory
image_size: height/width of input image
channels: number of channels in input image (default: 3)
zsize: size of the intermediary z (default: 64)
lr: initial learning rate (default: 0.0001)
train_epoch: number of epochs to train (default: 1)
batch_size: size of each batch (default: 16)
verbose: if True training progress is printed to console (default: False)
"""
# checkpointed tensors and variables
checkpointables = {
'learning_rate_var': K.variable(lr),
}
checkpointables.update({
# get models
'encoder': model.Encoder(zsize),
'decoder': model.Decoder(channels, zsize, image_size),
# define optimizers
'enc_dec_optimizer': tf.train.AdamOptimizer(learning_rate=checkpointables['learning_rate_var']),
# global step counter
'epoch_var': K.variable(-1, dtype=tf.int64),
'global_step': tf.train.get_or_create_global_step(),
'global_step_enc_dec': K.variable(0, dtype=tf.int64),
})
# checkpoint
checkpoint_dir = os.path.join(weights_prefix, str(iteration) + '/')
os.makedirs(checkpoint_dir, exist_ok=True)
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')
latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint = tf.train.Checkpoint(**checkpointables)
checkpoint.restore(latest_checkpoint)
def _get_last_epoch(epoch_var: tf.Variable, **kwargs) -> int:
return int(epoch_var)
last_epoch = _get_last_epoch(**checkpointables)
previous_epochs = 0
if last_epoch != -1:
previous_epochs = last_epoch + 1
with summary_ops_v2.always_record_summaries():
summary_ops_v2.scalar(name='learning_rate', tensor=checkpointables['learning_rate_var'],
step=checkpointables['global_step'])
for epoch in range(train_epoch - previous_epochs):
_epoch = epoch + previous_epochs
outputs = _train_one_epoch_simple(_epoch, dataset,
verbose=verbose,
batch_size=batch_size,
**checkpointables)
if verbose:
print((
f"[{_epoch + 1:d}/{train_epoch:d}] - "
f"train time: {outputs['per_epoch_time']:.2f}, "
f"Encoder + Decoder loss: {outputs['enc_dec_loss']:.3f}"
))
# save weights at end of epoch
checkpoint.save(checkpoint_prefix)
if verbose:
print("Training finish!... save model weights")
# save trained models
checkpoint.save(checkpoint_prefix)
def _train_one_epoch_simple(epoch: int,
dataset: tf.data.Dataset,
verbose: bool,
batch_size: int,
learning_rate_var: tf.Variable,
decoder: model.Decoder,
encoder: model.Encoder,
enc_dec_optimizer: tf.train.Optimizer,
global_step: tf.Variable,
global_step_enc_dec: tf.Variable,
epoch_var: tf.Variable) -> Dict[str, float]:
with summary_ops_v2.always_record_summaries():
epoch_var.assign(epoch)
epoch_start_time = time.time()
# define loss variables
enc_dec_loss_avg = tfe.metrics.Mean(name='encoder_decoder_loss', dtype=tf.float32)
# update learning rate
if (epoch + 1) % 30 == 0:
learning_rate_var.assign(learning_rate_var.value() / 4)
summary_ops_v2.scalar(name='learning_rate', tensor=learning_rate_var,
step=global_step)
if verbose:
print("learning rate change!")
for x in dataset:
reconstruction_loss, x_decoded = _train_enc_dec_step_simple(encoder=encoder,
decoder=decoder,
optimizer=enc_dec_optimizer,
inputs=x,
global_step_enc_dec=global_step_enc_dec,
global_step=global_step)
enc_dec_loss_avg(reconstruction_loss)
if int(global_step % LOG_FREQUENCY) == 0:
comparison = K.concatenate([x[:int(batch_size / 2)], x_decoded[:int(batch_size / 2)]], axis=0)
grid = util.prepare_image(comparison.cpu(), nrow=int(batch_size/2))
summary_ops_v2.image(name='reconstruction',
tensor=K.expand_dims(grid, axis=0), max_images=1,
step=global_step)
global_step.assign_add(1)
epoch_end_time = time.time()
per_epoch_time = epoch_end_time - epoch_start_time
# final losses of epoch
outputs = {
'enc_dec_loss': enc_dec_loss_avg.result(False),
'per_epoch_time': per_epoch_time,
}
return outputs
def _train_enc_dec_step_simple(encoder: model.Encoder, decoder: model.Decoder,
optimizer: tf.train.Optimizer,
inputs: tf.Tensor,
global_step: tf.Variable,
global_step_enc_dec: tf.Variable) -> Tuple[tf.Tensor, tf.Tensor]:
"""
Trains the encoder and decoder jointly for one step (one batch).
Args:
encoder: instance of encoder model
decoder: instance of decoder model
optimizer: instance of chosen optimizer
inputs: inputs from data set
global_step: the global step variable
global_step_enc_dec: global step variable for enc_dec
Returns:
tuple of reconstruction loss, reconstructed input, z value
"""
with tf.GradientTape() as tape:
z = encoder(inputs)
x_decoded = decoder(z)
reconstruction_loss = tf.losses.log_loss(inputs, x_decoded)
enc_dec_grads = tape.gradient(reconstruction_loss,
encoder.trainable_variables + decoder.trainable_variables)
if int(global_step % LOG_FREQUENCY) == 0:
summary_ops_v2.scalar(name='reconstruction_loss', tensor=reconstruction_loss,
step=global_step)
for grad, variable in zip(enc_dec_grads, encoder.trainable_variables + decoder.trainable_variables):
summary_ops_v2.histogram(name='gradients/' + variable.name, tensor=tf.math.l2_normalize(grad),
step=global_step)
summary_ops_v2.histogram(name='variables/' + variable.name, tensor=tf.math.l2_normalize(variable),
step=global_step)
optimizer.apply_gradients(zip(enc_dec_grads,
encoder.trainable_variables + decoder.trainable_variables),
global_step=global_step_enc_dec)
return reconstruction_loss, x_decoded
if __name__ == "__main__":
from twomartens.masterthesis.aae.data import prepare_training_data
tf.enable_eager_execution()
inlier_classes = [8]
iteration = 2
train_dataset, _ = prepare_training_data(test_fold_id=0, inlier_classes=inlier_classes,
total_classes=10)
train_summary_writer = summary_ops_v2.create_file_writer(
'./summaries/train/number-' + str(inlier_classes[0]) + '/' + str(iteration))
with train_summary_writer.as_default():
train_simple(dataset=train_dataset, iteration=iteration,
weights_prefix='weights/' + str(inlier_classes[0]) + '/')

View File

@ -1,569 +0,0 @@
# -*- coding: utf-8 -*-
#
# Copyright 2019 Jim Martens
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Training functionality for my AAE implementation.
This module provides functions to train the Adversarial Auto Encoder.
Attributes:
GRACE: specifies the number of epochs that the training loss can stagnate or worsen
before the training is stopped early
TOTAL_LOSS_GRACE_CAP: upper limit for total loss, grace countdown only enabled if total loss higher
Functions:
prepare_training_data(...): prepares the mnist training data
train(...): trains the AAE models
Todos:
- fix early stopping
- fix losses reaching exactly zero
"""
import functools
import os
import time
from typing import Callable
from typing import Dict
from typing import Tuple
import math
import tensorflow as tf
from tensorflow.python.ops import summary_ops_v2
from twomartens.masterthesis.aae import model
from twomartens.masterthesis.aae import util
from twomartens.masterthesis.aae.train import LOG_FREQUENCY
# shortcuts for tensorflow sub packages and classes
K = tf.keras.backend
tfe = tf.contrib.eager
GRACE: int = 10
TOTAL_LOSS_GRACE_CAP: int = 6
def train(dataset: tf.data.Dataset,
iteration: int,
weights_prefix: str,
channels: int = 1,
zsize: int = 32,
lr: float = 0.002,
batch_size: int = 128,
train_epoch: int = 80,
verbose: bool = True,
early_stopping: bool = False) -> None:
"""
Trains AAE for given data set.
This function provides early stopping and creates checkpoints after every
epoch as well as after finishing training (or stopping early). When starting
this function with the same ``iteration`` then the training will try to
continue where it ended last time by restoring a saved checkpoint.
The loss values are provided as scalar summaries. Reconstruction and sample
images are provided as summary images.
Args:
dataset: train dataset
iteration: identifier for the current training run
weights_prefix: prefix for weights directory
channels: number of channels in input image (default: 1)
zsize: size of the intermediary z (default: 32)
lr: initial learning rate (default: 0.002)
batch_size: the size of each batch (default: 128)
train_epoch: number of epochs to train (default: 80)
verbose: if True prints train progress info to console (default: True)
early_stopping: if True the early stopping mechanic is enabled (default: False)
Notes:
The training stops early if for ``GRACE`` number of epochs the loss is not
decreasing. Specifically all individual losses are accounted for and any one
of those not decreasing triggers a ``strike``. If the total loss, which is
a sum of all individual losses, is also not decreasing and has a total
value of more than ``TOTAL_LOSS_GRACE_CAP``, the counter for the remaining grace period is
decreased. If in any epoch afterwards all losses are decreasing the grace
period is reset to ``GRACE``. Lastly the training loop will be stopped early
if the grace counter reaches ``0`` at the end of an epoch.
"""
# non-preserved tensors
y_real = K.ones(batch_size)
y_fake = K.zeros(batch_size)
sample = K.expand_dims(K.expand_dims(K.random_normal((64, zsize)), axis=1), axis=1)
# z generator function
z_generator = functools.partial(_get_z_variable, batch_size=batch_size, zsize=zsize)
# non-preserved python variables
encoder_lowest_loss = math.inf
decoder_lowest_loss = math.inf
enc_dec_lowest_loss = math.inf
zd_lowest_loss = math.inf
xd_lowest_loss = math.inf
total_lowest_loss = math.inf
grace_period = GRACE
# checkpointed tensors and variables
checkpointables = {
'learning_rate_var': K.variable(lr),
}
checkpointables.update({
# get models
'encoder': model.Encoder(zsize),
'decoder': model.Decoder(channels, zsize),
'z_discriminator': model.ZDiscriminator(),
'x_discriminator': model.XDiscriminator(),
# define optimizers
'decoder_optimizer': tf.train.AdamOptimizer(learning_rate=checkpointables['learning_rate_var'],
beta1=0.5, beta2=0.999),
'enc_dec_optimizer': tf.train.AdamOptimizer(learning_rate=checkpointables['learning_rate_var'],
beta1=0.5, beta2=0.999),
'z_discriminator_optimizer': tf.train.AdamOptimizer(learning_rate=checkpointables['learning_rate_var'],
beta1=0.5, beta2=0.999),
'x_discriminator_optimizer': tf.train.AdamOptimizer(learning_rate=checkpointables['learning_rate_var'],
beta1=0.5, beta2=0.999),
# global step counter
'epoch_var': K.variable(-1, dtype=tf.int64),
'global_step': tf.train.get_or_create_global_step(),
'global_step_decoder': K.variable(0, dtype=tf.int64),
'global_step_enc_dec': K.variable(0, dtype=tf.int64),
'global_step_xd': K.variable(0, dtype=tf.int64),
'global_step_zd': K.variable(0, dtype=tf.int64),
})
# checkpoint
checkpoint_dir = os.path.join(weights_prefix, str(iteration) + '/')
os.makedirs(checkpoint_dir, exist_ok=True)
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')
latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint = tf.train.Checkpoint(**checkpointables)
checkpoint.restore(latest_checkpoint)
def _get_last_epoch(epoch_var: tf.Variable, **kwargs) -> int:
return int(epoch_var)
last_epoch = _get_last_epoch(**checkpointables)
previous_epochs = 0
if last_epoch != -1:
previous_epochs = last_epoch + 1
with summary_ops_v2.always_record_summaries():
summary_ops_v2.scalar(name='learning_rate', tensor=checkpointables['learning_rate_var'],
step=checkpointables['global_step'])
for epoch in range(train_epoch - previous_epochs):
_epoch = epoch + previous_epochs
outputs = _train_one_epoch(_epoch, dataset, targets_real=y_real,
targets_fake=y_fake, z_generator=z_generator,
verbose=verbose, batch_size=batch_size,
**checkpointables)
if verbose:
print((
f"[{_epoch + 1:d}/{train_epoch:d}] - "
f"train time: {outputs['per_epoch_time']:.2f}, "
f"Decoder loss: {outputs['decoder_loss']:.3f}, "
f"X Discriminator loss: {outputs['xd_loss']:.3f}, "
f"Z Discriminator loss: {outputs['zd_loss']:.3f}, "
f"Encoder + Decoder loss: {outputs['enc_dec_loss']:.3f}, "
f"Encoder loss: {outputs['encoder_loss']:.3f}"
))
# save sample image summary
def _save_sample(decoder: model.Decoder, global_step: tf.Variable, **kwargs) -> None:
resultsample = decoder(sample).cpu()
grid = util.prepare_image(resultsample)
summary_ops_v2.image(name='sample', tensor=K.expand_dims(grid, axis=0),
max_images=1, step=global_step)
with summary_ops_v2.always_record_summaries():
_save_sample(**checkpointables)
# save weights at end of epoch
checkpoint.save(checkpoint_prefix)
# check for improvements in error reduction - otherwise early stopping
if early_stopping:
strike = False
total_strike = False
total_loss = outputs['encoder_loss'] + outputs['decoder_loss'] + outputs['enc_dec_loss'] + \
outputs['xd_loss'] + outputs['zd_loss']
if total_loss < total_lowest_loss:
total_lowest_loss = total_loss
elif total_loss > TOTAL_LOSS_GRACE_CAP:
total_strike = True
if outputs['encoder_loss'] < encoder_lowest_loss:
encoder_lowest_loss = outputs['encoder_loss']
else:
strike = True
if outputs['decoder_loss'] < decoder_lowest_loss:
decoder_lowest_loss = outputs['decoder_loss']
else:
strike = True
if outputs['enc_dec_loss'] < enc_dec_lowest_loss:
enc_dec_lowest_loss = outputs['enc_dec_loss']
else:
strike = True
if outputs['xd_loss'] < xd_lowest_loss:
xd_lowest_loss = outputs['xd_loss']
else:
strike = True
if outputs['zd_loss'] < zd_lowest_loss:
zd_lowest_loss = outputs['zd_loss']
else:
strike = True
if strike and total_strike:
grace_period -= 1
elif strike:
pass
else:
grace_period = GRACE
if grace_period == 0:
break
if verbose:
if grace_period > 0:
print("Training finish!... save model weights")
if grace_period == 0:
print("Training stopped early!... save model weights")
# save trained models
checkpoint.save(checkpoint_prefix)
def _train_one_epoch(epoch: int,
dataset: tf.data.Dataset,
targets_real: tf.Tensor,
verbose: bool,
batch_size: int,
targets_fake: tf.Tensor,
z_generator: Callable[[], tf.Variable],
learning_rate_var: tf.Variable,
decoder: model.Decoder,
encoder: model.Encoder,
x_discriminator: model.XDiscriminator,
z_discriminator: model.ZDiscriminator,
decoder_optimizer: tf.train.Optimizer,
x_discriminator_optimizer: tf.train.Optimizer,
z_discriminator_optimizer: tf.train.Optimizer,
enc_dec_optimizer: tf.train.Optimizer,
global_step: tf.Variable,
global_step_xd: tf.Variable,
global_step_zd: tf.Variable,
global_step_decoder: tf.Variable,
global_step_enc_dec: tf.Variable,
epoch_var: tf.Variable) -> Dict[str, float]:
with summary_ops_v2.always_record_summaries():
epoch_var.assign(epoch)
epoch_start_time = time.time()
# define loss variables
encoder_loss_avg = tfe.metrics.Mean(name='encoder_loss', dtype=tf.float32)
decoder_loss_avg = tfe.metrics.Mean(name='decoder_loss', dtype=tf.float32)
enc_dec_loss_avg = tfe.metrics.Mean(name='encoder_decoder_loss', dtype=tf.float32)
zd_loss_avg = tfe.metrics.Mean(name='z_discriminator_loss', dtype=tf.float32)
xd_loss_avg = tfe.metrics.Mean(name='x_discriminator_loss', dtype=tf.float32)
# update learning rate
if (epoch + 1) % 30 == 0:
learning_rate_var.assign(learning_rate_var.value() / 4)
summary_ops_v2.scalar(name='learning_rate', tensor=learning_rate_var,
step=global_step)
if verbose:
print("learning rate change!")
for x, _ in dataset:
# x discriminator
_xd_train_loss = _train_xdiscriminator_step(x_discriminator=x_discriminator,
decoder=decoder,
optimizer=x_discriminator_optimizer,
inputs=x,
targets_real=targets_real,
targets_fake=targets_fake,
global_step_xd=global_step_xd,
global_step=global_step,
z_generator=z_generator)
xd_loss_avg(_xd_train_loss)
# --------
# decoder
_decoder_train_loss = _train_decoder_step(decoder=decoder,
x_discriminator=x_discriminator,
optimizer=decoder_optimizer,
targets=targets_real,
global_step_decoder=global_step_decoder,
global_step=global_step,
z_generator=z_generator)
decoder_loss_avg(_decoder_train_loss)
# ---------
# z discriminator
_zd_train_loss = _train_zdiscriminator_step(z_discriminator=z_discriminator,
encoder=encoder,
optimizer=z_discriminator_optimizer,
inputs=x,
targets_real=targets_real,
targets_fake=targets_fake,
global_step_zd=global_step_zd,
global_step=global_step,
z_generator=z_generator)
zd_loss_avg(_zd_train_loss)
# -----------
# encoder + decoder
encoder_loss, reconstruction_loss, x_decoded = _train_enc_dec_step(encoder=encoder,
decoder=decoder,
z_discriminator=z_discriminator,
optimizer=enc_dec_optimizer,
inputs=x,
targets=targets_real,
global_step_enc_dec=global_step_enc_dec,
global_step=global_step)
enc_dec_loss_avg(reconstruction_loss)
encoder_loss_avg(encoder_loss)
if int(global_step % LOG_FREQUENCY) == 0:
comparison = K.concatenate([x[:batch_size/2], x_decoded[:batch_size/2]], axis=0)
grid = util.prepare_image(comparison.cpu(), nrow=int(batch_size/2))
summary_ops_v2.image(name='reconstruction',
tensor=K.expand_dims(grid, axis=0), max_images=1,
step=global_step)
global_step.assign_add(1)
epoch_end_time = time.time()
per_epoch_time = epoch_end_time - epoch_start_time
# final losses of epoch
outputs = {
'decoder_loss': decoder_loss_avg.result(False),
'encoder_loss': encoder_loss_avg.result(False),
'enc_dec_loss': enc_dec_loss_avg.result(False),
'xd_loss': xd_loss_avg.result(False),
'zd_loss': zd_loss_avg.result(False),
'per_epoch_time': per_epoch_time,
}
return outputs
def _train_xdiscriminator_step(x_discriminator: model.XDiscriminator,
decoder: model.Decoder,
optimizer: tf.train.Optimizer,
inputs: tf.Tensor,
targets_real: tf.Tensor,
targets_fake: tf.Tensor,
global_step: tf.Variable,
global_step_xd: tf.Variable,
z_generator: Callable[[], tf.Variable]) -> tf.Tensor:
"""
Trains the x discriminator model for one step (one batch).
:param x_discriminator: instance of x discriminator model
:param decoder: instance of decoder model
:param optimizer: instance of chosen optimizer
:param inputs: inputs from dataset
:param targets_real: target tensor for real loss calculation
:param targets_fake: target tensor for fake loss calculation
:param global_step: the global step variable
:param global_step_xd: global step variable for xd
:param z_generator: callable function that returns a z variable
:return: the calculated loss
"""
with tf.GradientTape() as tape:
xd_result_1 = tf.squeeze(x_discriminator(inputs))
xd_real_loss = tf.losses.log_loss(targets_real, xd_result_1)
z = z_generator()
x_fake = decoder(z)
xd_result_2 = tf.squeeze(x_discriminator(x_fake))
xd_fake_loss = tf.losses.log_loss(targets_fake, xd_result_2)
_xd_train_loss = xd_real_loss + xd_fake_loss
xd_grads = tape.gradient(_xd_train_loss, x_discriminator.trainable_variables)
if int(global_step % LOG_FREQUENCY) == 0:
summary_ops_v2.scalar(name='x_discriminator_real_loss', tensor=xd_real_loss,
step=global_step)
summary_ops_v2.scalar(name='x_discriminator_fake_loss', tensor=xd_fake_loss,
step=global_step)
summary_ops_v2.scalar(name='x_discriminator_loss', tensor=_xd_train_loss,
step=global_step)
for grad, variable in zip(xd_grads, x_discriminator.trainable_variables):
summary_ops_v2.histogram(name='gradients/' + variable.name, tensor=tf.math.l2_normalize(grad),
step=global_step)
summary_ops_v2.histogram(name='variables/' + variable.name, tensor=tf.math.l2_normalize(variable),
step=global_step)
optimizer.apply_gradients(zip(xd_grads, x_discriminator.trainable_variables),
global_step=global_step_xd)
return _xd_train_loss
def _train_decoder_step(decoder: model.Decoder,
x_discriminator: model.XDiscriminator,
optimizer: tf.train.Optimizer,
targets: tf.Tensor,
global_step: tf.Variable,
global_step_decoder: tf.Variable,
z_generator: Callable[[], tf.Variable]) -> tf.Tensor:
"""
Trains the decoder model for one step (one batch).
:param decoder: instance of decoder model
:param x_discriminator: instance of the x discriminator model
:param optimizer: instance of chosen optimizer
:param targets: target tensor for loss calculation
:param global_step: the global step variable
:param global_step_decoder: global step variable for decoder
:param z_generator: callable function that returns a z variable
:return: the calculated loss
"""
with tf.GradientTape() as tape:
z = z_generator()
x_fake = decoder(z)
xd_result = tf.squeeze(x_discriminator(x_fake))
_decoder_train_loss = tf.losses.log_loss(targets, xd_result)
grads = tape.gradient(_decoder_train_loss, decoder.trainable_variables)
if int(global_step % LOG_FREQUENCY) == 0:
summary_ops_v2.scalar(name='decoder_loss', tensor=_decoder_train_loss,
step=global_step)
for grad, variable in zip(grads, decoder.trainable_variables):
summary_ops_v2.histogram(name='gradients/' + variable.name, tensor=tf.math.l2_normalize(grad),
step=global_step)
summary_ops_v2.histogram(name='variables/' + variable.name, tensor=tf.math.l2_normalize(variable),
step=global_step)
optimizer.apply_gradients(zip(grads, decoder.trainable_variables),
global_step=global_step_decoder)
return _decoder_train_loss
def _train_zdiscriminator_step(z_discriminator: model.ZDiscriminator,
encoder: model.Encoder,
optimizer: tf.train.Optimizer,
inputs: tf.Tensor,
targets_real: tf.Tensor,
targets_fake: tf.Tensor,
global_step: tf.Variable,
global_step_zd: tf.Variable,
z_generator: Callable[[], tf.Variable]) -> tf.Tensor:
"""
Trains the z discriminator one step (one batch).
:param z_discriminator: instance of z discriminator model
:param encoder: instance of encoder model
:param optimizer: instance of chosen optimizer
:param inputs: inputs from dataset
:param targets_real: target tensor for real loss calculation
:param targets_fake: target tensor for fake loss calculation
:param global_step: the global step variable
:param global_step_zd: global step variable for zd
:param z_generator: callable function that returns a z variable
:return: the calculated loss
"""
with tf.GradientTape() as tape:
z = z_generator()
zd_result = tf.squeeze(z_discriminator(z))
zd_real_loss = tf.losses.log_loss(targets_real, zd_result)
z = tf.squeeze(encoder(inputs))
zd_result = tf.squeeze(z_discriminator(z))
zd_fake_loss = tf.losses.log_loss(targets_fake, zd_result)
_zd_train_loss = zd_real_loss + zd_fake_loss
zd_grads = tape.gradient(_zd_train_loss, z_discriminator.trainable_variables)
if int(global_step % LOG_FREQUENCY) == 0:
summary_ops_v2.scalar(name='z_discriminator_real_loss', tensor=zd_real_loss,
step=global_step)
summary_ops_v2.scalar(name='z_discriminator_fake_loss', tensor=zd_fake_loss,
step=global_step)
summary_ops_v2.scalar(name='z_discriminator_loss', tensor=_zd_train_loss,
step=global_step)
for grad, variable in zip(zd_grads, z_discriminator.trainable_variables):
summary_ops_v2.histogram(name='gradients/' + variable.name, tensor=tf.math.l2_normalize(grad),
step=global_step)
summary_ops_v2.histogram(name='variables/' + variable.name, tensor=tf.math.l2_normalize(variable),
step=global_step)
optimizer.apply_gradients(zip(zd_grads, z_discriminator.trainable_variables),
global_step=global_step_zd)
return _zd_train_loss
def _train_enc_dec_step(encoder: model.Encoder, decoder: model.Decoder,
z_discriminator: model.ZDiscriminator,
optimizer: tf.train.Optimizer,
inputs: tf.Tensor,
targets: tf.Tensor,
global_step: tf.Variable,
global_step_enc_dec: tf.Variable) -> Tuple[tf.Tensor, tf.Tensor, tf.Tensor]:
"""
Trains the encoder and decoder jointly for one step (one batch).
:param encoder: instance of encoder model
:param decoder: instance of decoder model
:param z_discriminator: instance of z discriminator model
:param optimizer: instance of chosen optimizer
:param inputs: inputs from dataset
:param targets: target tensor for loss calculation
:param global_step: the global step variable
:param global_step_enc_dec: global step variable for enc_dec
:return: tuple of encoder loss, reconstruction loss, reconstructed input
"""
with tf.GradientTape() as tape:
z = encoder(inputs)
x_decoded = decoder(z)
zd_result = tf.squeeze(z_discriminator(tf.squeeze(z)))
encoder_loss = tf.losses.log_loss(targets, zd_result) * 2.0
reconstruction_loss = tf.losses.log_loss(inputs, x_decoded)
_enc_dec_train_loss = encoder_loss + reconstruction_loss
enc_dec_grads = tape.gradient(_enc_dec_train_loss,
encoder.trainable_variables + decoder.trainable_variables)
if int(global_step % LOG_FREQUENCY) == 0:
summary_ops_v2.scalar(name='encoder_loss', tensor=encoder_loss,
step=global_step)
summary_ops_v2.scalar(name='reconstruction_loss', tensor=reconstruction_loss,
step=global_step)
summary_ops_v2.scalar(name='encoder_decoder_loss', tensor=_enc_dec_train_loss,
step=global_step)
for grad, variable in zip(enc_dec_grads, encoder.trainable_variables + decoder.trainable_variables):
summary_ops_v2.histogram(name='gradients/' + variable.name, tensor=tf.math.l2_normalize(grad),
step=global_step)
summary_ops_v2.histogram(name='variables/' + variable.name, tensor=tf.math.l2_normalize(variable),
step=global_step)
optimizer.apply_gradients(zip(enc_dec_grads,
encoder.trainable_variables + decoder.trainable_variables),
global_step=global_step_enc_dec)
return encoder_loss, reconstruction_loss, x_decoded
def _get_z_variable(batch_size: int, zsize: int) -> tf.Variable:
"""
Creates and returns a z variable taken from a normal distribution.
:param batch_size: size of the batch
:param zsize: size of the z latent space
:return: created variable
"""
z = K.reshape(K.random_normal((batch_size, zsize)), (-1, 1, 1, zsize))
return K.variable(z)

View File

@ -1,172 +0,0 @@
# -*- coding: utf-8 -*-
#
# Copyright 2019 Jim Martens
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Utility functionality for visualizing predictions.
Functions:
prepare_image(...): prepares a tensor to be visualized as an image
"""
import math
from typing import Sequence
from typing import Tuple
from typing import Union
import numpy as np
import tensorflow as tf
k = tf.keras.backend
def prepare_image(tensor: Union[tf.Tensor, Sequence[tf.Tensor]], nrow: int = 8,
padding: int = 2,
normalize: bool = False, range_value: Tuple[float, float] = None,
scale_each: bool = False, pad_value: float = 0.0) -> tf.Tensor:
"""
Prepares a tensor to be saved as image and returns it.
Args:
tensor: Image to be saved.
given a mini-batch tensor, saves the tensor as a grid of images by calling make_grid.
nrow: Number of images displayed in each row of the grid.
The Final grid size is (B / nrow, nrow). Default is 8.
padding: amount of padding. Default is 2.
normalize: If True, shift the image to the range (0, 1),
by subtracting the minimum and dividing by the maximum pixel value.
range_value: tuple (min, max) where min and max are numbers,
then these numbers are used to normalize the image. By default, min and max
are computed from the tensor.
scale_each: If True, scale each image in the batch of
images separately rather than the (min, max) over all images.
pad_value: Value for the padded pixels.
Returns:
the prepared tensor
"""
grid = _make_grid(tensor, nrow, padding, normalize, range_value,
scale_each, pad_value)
min_pixel_value = 0
max_pixel_value = 255
grid *= max_pixel_value
grid = tf.clip_by_value(grid, min_pixel_value, max_pixel_value)
grid = tf.cast(grid, tf.uint8)
return grid
def _make_grid(tensor: Union[tf.Tensor, Sequence[tf.Tensor]], nrow: int = 8, padding: int = 2,
normalize: bool = False, range_value: Tuple[float, float] = None,
scale_each: bool = False, pad_value: float = 0.0) -> tf.Tensor:
"""
Make a grid of images.
Example:
See this notebook `here <https://gist.github.com/anonymous/bf16430f7750c023141c562f3e9f2a91>`_
:param tensor: 4D mini-batch Tensor of shape (B x C x H x W)
or a list of images all of the same size.
:param nrow: Number of images displayed in each row of the grid.
The Final grid size is (B / nrow, nrow). Default is 8.
:param padding: amount of padding. Default is 2.
:param normalize: If True, shift the image to the range (0, 1),
by subtracting the minimum and dividing by the maximum pixel value.
:param range_value: tuple (min, max) where min and max are numbers,
then these numbers are used to normalize the image. By default, min and max
are computed from the tensor.
:param scale_each: If True, scale each image in the batch of
images separately rather than the (min, max) over all images.
:param pad_value: Value for the padded pixels.
:return: tensor containing image grid
"""
if not (tf.contrib.framework.is_tensor(tensor) or
(isinstance(tensor, list) and all(tf.contrib.framework.is_tensor(t) for t in tensor))):
raise TypeError('tensor or list of tensors expected, got {}'.format(type(tensor)))
# if list of tensors, convert to a 4D mini-batch Tensor
if isinstance(tensor, list):
tensor = k.stack(tensor, axis=0)
tensor_shape = tf.shape(tensor).numpy()
tensor_rank = tf.rank(tensor).numpy()
if tensor_rank == 2: # single image H x W
tensor = k.reshape(tensor, (tensor_shape[0], tensor_shape[1], 1))
if tensor_rank == 3: # single image
if tensor_shape[2] == 1: # if single-channel, convert to 3-channel
tensor = k.concatenate((tensor, tensor, tensor), axis=2)
tensor = k.reshape(tensor, (1, tensor_shape[0], tensor_shape[1], tensor_shape[2]))
if tensor_rank == 4 and tensor_shape[3] == 1: # single-channel images
tensor = k.concatenate((tensor, tensor, tensor), axis=3)
if normalize is True:
if range_value is not None:
assert isinstance(range_value, tuple), \
"range_value has to be a tuple (min, max) if specified. min and max are numbers"
def norm_ip(img: tf.Tensor, min_v: float, max_v: float) -> tf.Tensor:
"""
Internal function to clip given tensor to given min and max values.
:param img: tensor to be clipped
:param min_v: min value
:param max_v: max value
:return: clipped tensor
"""
img = tf.clip_by_value(img, min_v, max_v)
img = tf.add(img, -min_v)
return tf.divide(img, max_v - min_v + 1e-5)
def norm_range(t: tf.Tensor, range_v: Tuple[float, float] = None) -> tf.Tensor:
"""
Internal function to normalize a tensor to a given range.
:param t: tensor to be normalized
:param range_v: tuple with (min, max) range values
:return: normalized tensor
"""
if range_v is not None:
return norm_ip(t, range_v[0], range_v[1])
else:
return norm_ip(t, float(k.min(t)), float(k.max(t)))
if scale_each is True:
updated_tensors = []
for t in tensor: # loop over mini-batch dimension
updated_tensors.append(norm_range(t, range_value))
tensor = k.constant(np.array(updated_tensors))
else:
tensor = norm_range(tensor, range_value)
if tensor_shape[0] == 1:
return tf.squeeze(tensor)
# make the mini-batch of images into a grid
nmaps = tensor_shape[0]
xmaps = min(nrow, nmaps)
ymaps = int(math.ceil(float(nmaps) / xmaps))
height, width = int(tensor_shape[1] + padding), int(tensor_shape[2] + padding)
grid = tf.fill((height * ymaps + padding, width * xmaps + padding, 3), pad_value).numpy()
tensor_numpy = tensor.numpy()
i = 0
for y in range(ymaps):
for x in range(xmaps):
if i >= nmaps:
break
start_height = y * height + padding
start_width = x * width + padding
np.copyto(grid[start_height: start_height + height - padding,
start_width:start_width + width - padding], tensor_numpy[i, :, :, :])
i = i + 1
return k.constant(grid)

View File

@ -69,14 +69,12 @@ def prepare(args: argparse.Namespace) -> None:
def train(args: argparse.Namespace) -> None:
_train_execute_action(args, _ssd_train, _auto_encoder_train)
_train_execute_action(args, _ssd_train)
def test(args: argparse.Namespace) -> None:
if args.network == "ssd" or args.network == "bayesian_ssd":
_ssd_test(args)
elif args.network == "auto_encoder":
_auto_encoder_test(args)
def evaluate(args: argparse.Namespace) -> None:
@ -152,11 +150,9 @@ def _config_execute_action(args: argparse.Namespace, on_get: callable,
on_list()
def _train_execute_action(args: argparse.Namespace, on_ssd: callable, on_auto_encoder: callable) -> None:
def _train_execute_action(args: argparse.Namespace, on_ssd: callable) -> None:
if args.network == "ssd" or args.network == "bayesian_ssd":
on_ssd(args)
elif args.network == "auto_encoder":
on_auto_encoder(args)
def _ssd_train(args: argparse.Namespace) -> None:
@ -1060,81 +1056,3 @@ def _visualise_ose_f1(open_set_error: np.ndarray, f1_scores: np.ndarray,
pyplot.savefig(f"{output_path}/ose-f1-{file_suffix}.png")
pyplot.close(figure)
def _auto_encoder_train(args: argparse.Namespace) -> None:
import os
from tensorflow.python.ops import summary_ops_v2
from twomartens.masterthesis import data
from twomartens.masterthesis.aae import train
tf.enable_eager_execution()
coco_path = args.coco_path
category = args.category
batch_size = 16
image_size = 256
coco_data = data.load_coco_train(coco_path, category, num_epochs=args.num_epochs, batch_size=batch_size,
resized_shape=(image_size, image_size))
summary_path = conf.get_property("Paths.summary")
summary_path = f"{summary_path}/{args.network}/train/category-{category}/{args.iteration}"
train_summary_writer = summary_ops_v2.create_file_writer(
summary_path
)
os.makedirs(summary_path, exist_ok=True)
weights_path = conf.get_property("Paths.weights")
weights_path = f"{weights_path}/{args.network}/category-{category}"
os.makedirs(weights_path, exist_ok=True)
if args.debug:
with train_summary_writer.as_default():
train.train_simple(coco_data, iteration=args.iteration,
weights_prefix=weights_path,
zsize=16, lr=0.0001, verbose=args.verbose, image_size=image_size,
channels=3, train_epoch=args.num_epochs, batch_size=batch_size)
else:
train.train_simple(coco_data, iteration=args.iteration,
weights_prefix=weights_path,
zsize=16, lr=0.0001, verbose=args.verbose, image_size=image_size,
channels=3, train_epoch=args.num_epochs, batch_size=batch_size)
def _auto_encoder_test(args: argparse.Namespace) -> None:
import os
from tensorflow.python.ops import summary_ops_v2
from twomartens.masterthesis import data
from twomartens.masterthesis.aae import run
tf.enable_eager_execution()
coco_path = conf.get_property("Paths.coco")
category = args.category
category_trained = args.category_trained
batch_size = 16
image_size = 256
coco_data = data.load_coco_val(coco_path, category, num_epochs=1,
batch_size=batch_size, resized_shape=(image_size, image_size))
summary_path = conf.get_property("Paths.summary")
summary_path = f"{summary_path}/{args.network}/val/category-{category}/{args.iteration}"
os.makedirs(summary_path, exist_ok=True)
use_summary_writer = summary_ops_v2.create_file_writer(
summary_path
)
weights_path = conf.get_property("Paths.weights")
weights_path = f"{weights_path}/{args.network}/category-{category_trained}"
os.makedirs(weights_path, exist_ok=True)
if args.debug:
with use_summary_writer.as_default():
run.run_simple(coco_data, iteration=args.iteration_trained,
weights_prefix=weights_path,
zsize=16, verbose=args.verbose, channels=3, batch_size=batch_size,
image_size=image_size)
else:
run.run_simple(coco_data, iteration=args.iteration_trained,
weights_prefix=weights_path,
zsize=16, verbose=args.verbose, channels=3, batch_size=batch_size,
image_size=image_size)

View File

@ -155,13 +155,9 @@ def _build_train(parser: argparse.ArgumentParser) -> None:
sub_parsers.required = True
ssd_parser = sub_parsers.add_parser("ssd", help="SSD")
# ssd_bayesian_parser = sub_parsers.add_parser("bayesian_ssd", help="SSD with dropout layers")
auto_encoder_parser = sub_parsers.add_parser("auto_encoder", help="Auto-encoder network")
# build sub parsers
_build_ssd_train(ssd_parser)
# _build_bayesian_ssd(ssd_bayesian_parser)
_build_auto_encoder_train(auto_encoder_parser)
def _build_ssd_train(parser: argparse.ArgumentParser) -> None:
@ -169,36 +165,21 @@ def _build_ssd_train(parser: argparse.ArgumentParser) -> None:
parser.add_argument("iteration", type=int, help="the training iteration")
def _build_auto_encoder_train(parser: argparse.ArgumentParser) -> None:
parser.add_argument("category", type=int, help="the COCO category to use")
parser.add_argument("num_epochs", type=int, help="the number of epochs to train", default=80)
parser.add_argument("iteration", type=int, help="the training iteration")
def _build_test(parser: argparse.ArgumentParser) -> None:
sub_parsers = parser.add_subparsers(dest="network")
sub_parsers.required = True
ssd_bayesian_parser = sub_parsers.add_parser("bayesian_ssd", help="SSD with dropout layers")
ssd_parser = sub_parsers.add_parser("ssd", help="SSD")
auto_encoder_parser = sub_parsers.add_parser("auto_encoder", help="Auto-encoder network")
# build sub parsers
_build_ssd_test(ssd_bayesian_parser)
_build_ssd_test(ssd_parser)
_build_auto_encoder_test(auto_encoder_parser)
def _build_ssd_test(parser: argparse.ArgumentParser) -> None:
parser.add_argument("iteration", type=int, help="the validation iteration")
parser.add_argument("train_iteration", type=int, help="the train iteration")
def _build_auto_encoder_test(parser: argparse.ArgumentParser) -> None:
parser.add_argument("category", type=int, help="the COCO category to validate")
parser.add_argument("category_trained", type=int, help="the trained COCO category")
parser.add_argument("iteration", type=int, help="the validation iteration")
parser.add_argument("iteration_trained", type=int, help="the training iteration")
def _build_evaluate(parser: argparse.ArgumentParser) -> None:
@ -228,11 +209,9 @@ def _build_visualise_metrics(parser: argparse.ArgumentParser) -> None:
ssd_bayesian_parser = sub_parsers.add_parser("bayesian_ssd", help="SSD with dropout layers")
ssd_parser = sub_parsers.add_parser("ssd", help="SSD")
auto_encoder_parser = sub_parsers.add_parser("auto_encoder", help="Auto-encoder network")
ssd_bayesian_parser.add_argument("iteration", type=int, help="the validation iteration to use")
ssd_parser.add_argument("iteration", type=int, help="the validation iteration to use")
auto_encoder_parser.add_argument("iteration", type=int, help="the validation iteration to use")
def _build_measure(parser: argparse.ArgumentParser) -> None: