Spaces:
Runtime error
Runtime error
| # This file is based on the StyleGAN by Cheong et. al | |
| # https://keras.io/examples/generative/stylegan/ | |
| import numpy as np | |
| import tensorflow as tf | |
| from tensorflow import keras | |
| from tensorflow.keras import layers | |
| from tensorflow.keras.models import Sequential | |
| from tensorflow_addons.layers import InstanceNormalization | |
| def log2(x): | |
| return int(np.log2(x)) | |
| # we use different batch size for different resolution, so larger image size | |
| # could fit into GPU memory. The keys is image resolution in log2 | |
| batch_sizes = {2: 16, 3: 16, 4: 16, 5: 16, 6: 16, 7: 8, 8: 4, 9: 2, 10: 1} | |
| # We adjust the train step accordingly | |
| train_step_ratio = {k: batch_sizes[2] / v for k, v in batch_sizes.items()} | |
| def fade_in(alpha, a, b): | |
| return alpha * a + (1.0 - alpha) * b | |
| def wasserstein_loss(y_true, y_pred): | |
| return -tf.reduce_mean(y_true * y_pred) | |
| def pixel_norm(x, epsilon=1e-8): | |
| return x / tf.math.sqrt(tf.reduce_mean(x ** 2, axis=-1, keepdims=True) + epsilon) | |
| def minibatch_std(input_tensor, epsilon=1e-8): | |
| n, h, w, c = tf.shape(input_tensor) | |
| group_size = tf.minimum(4, n) | |
| x = tf.reshape(input_tensor, [group_size, -1, h, w, c]) | |
| group_mean, group_var = tf.nn.moments(x, axes=(0), keepdims=False) | |
| group_std = tf.sqrt(group_var + epsilon) | |
| avg_std = tf.reduce_mean(group_std, axis=[1, 2, 3], keepdims=True) | |
| x = tf.tile(avg_std, [group_size, h, w, 1]) | |
| return tf.concat([input_tensor, x], axis=-1) | |
| class EqualizedConv(layers.Layer): | |
| def __init__(self, out_channels, kernel=3, gain=2, **kwargs): | |
| super(EqualizedConv, self).__init__(**kwargs) | |
| self.kernel = kernel | |
| self.out_channels = out_channels | |
| self.gain = gain | |
| self.pad = kernel != 1 | |
| def build(self, input_shape): | |
| self.in_channels = input_shape[-1] | |
| initializer = keras.initializers.RandomNormal(mean=0.0, stddev=1.0) | |
| self.w = self.add_weight( | |
| shape=[self.kernel, self.kernel, self.in_channels, self.out_channels], | |
| initializer=initializer, | |
| trainable=True, | |
| name="kernel", | |
| ) | |
| self.b = self.add_weight( | |
| shape=(self.out_channels,), initializer="zeros", trainable=True, name="bias" | |
| ) | |
| fan_in = self.kernel * self.kernel * self.in_channels | |
| self.scale = tf.sqrt(self.gain / fan_in) | |
| def call(self, inputs): | |
| if self.pad: | |
| x = tf.pad(inputs, [[0, 0], [1, 1], [1, 1], [0, 0]], mode="REFLECT") | |
| else: | |
| x = inputs | |
| output = ( | |
| tf.nn.conv2d(x, self.scale * self.w, strides=1, padding="VALID") + self.b | |
| ) | |
| return output | |
| class EqualizedDense(layers.Layer): | |
| def __init__(self, units, gain=2, learning_rate_multiplier=1, **kwargs): | |
| super(EqualizedDense, self).__init__(**kwargs) | |
| self.units = units | |
| self.gain = gain | |
| self.learning_rate_multiplier = learning_rate_multiplier | |
| def build(self, input_shape): | |
| self.in_channels = input_shape[-1] | |
| initializer = keras.initializers.RandomNormal( | |
| mean=0.0, stddev=1.0 / self.learning_rate_multiplier | |
| ) | |
| self.w = self.add_weight( | |
| shape=[self.in_channels, self.units], | |
| initializer=initializer, | |
| trainable=True, | |
| name="kernel", | |
| ) | |
| self.b = self.add_weight( | |
| shape=(self.units,), initializer="zeros", trainable=True, name="bias" | |
| ) | |
| fan_in = self.in_channels | |
| self.scale = tf.sqrt(self.gain / fan_in) | |
| def call(self, inputs): | |
| output = tf.add(tf.matmul(inputs, self.scale * self.w), self.b) | |
| return output * self.learning_rate_multiplier | |
| class AddNoise(layers.Layer): | |
| def build(self, input_shape): | |
| n, h, w, c = input_shape[0] | |
| initializer = keras.initializers.RandomNormal(mean=0.0, stddev=1.0) | |
| self.b = self.add_weight( | |
| shape=[1, 1, 1, c], initializer=initializer, trainable=True, name="kernel" | |
| ) | |
| def call(self, inputs): | |
| x, noise = inputs | |
| output = x + self.b * noise | |
| return output | |
| class AdaIN(layers.Layer): | |
| def __init__(self, gain=1, **kwargs): | |
| super(AdaIN, self).__init__(**kwargs) | |
| self.gain = gain | |
| def build(self, input_shapes): | |
| x_shape = input_shapes[0] | |
| w_shape = input_shapes[1] | |
| self.w_channels = w_shape[-1] | |
| self.x_channels = x_shape[-1] | |
| self.dense_1 = EqualizedDense(self.x_channels, gain=1) | |
| self.dense_2 = EqualizedDense(self.x_channels, gain=1) | |
| def call(self, inputs): | |
| x, w = inputs | |
| ys = tf.reshape(self.dense_1(w), (-1, 1, 1, self.x_channels)) | |
| yb = tf.reshape(self.dense_2(w), (-1, 1, 1, self.x_channels)) | |
| return ys * x + yb | |
| def Mapping(num_stages, input_shape=512): | |
| z = layers.Input(shape=(input_shape,)) | |
| w = pixel_norm(z) | |
| class_embedding = layers.Input(shape=512) | |
| for i in range(8): | |
| w = EqualizedDense(512, learning_rate_multiplier=0.01)(w) | |
| w = w + class_embedding | |
| w = layers.LeakyReLU(0.2)(w) | |
| w = tf.tile(tf.expand_dims(w, 1), (1, num_stages, 1)) | |
| return keras.Model([z, class_embedding], w, name="mapping") | |
| class Generator: | |
| def __init__(self, start_res_log2, target_res_log2): | |
| self.start_res_log2 = start_res_log2 | |
| self.target_res_log2 = target_res_log2 | |
| self.num_stages = target_res_log2 - start_res_log2 + 1 | |
| # list of generator blocks at increasing resolution | |
| self.g_blocks = [] | |
| # list of layers to convert g_block activation to RGB | |
| self.to_rgb = [] | |
| # list of noise input of different resolutions into g_blocks | |
| self.noise_inputs = [] | |
| # filter size to use at each stage, keys are log2(resolution) | |
| self.filter_nums = { | |
| 0: 512, | |
| 1: 512, | |
| 2: 512, # 4x4 | |
| 3: 512, # 8x8 | |
| 4: 512, # 16x16 | |
| 5: 512, # 32x32 | |
| 6: 256, # 64x64 | |
| 7: 128, # 128x128 | |
| 8: 64, # 256x256 | |
| 9: 32, # 512x512 | |
| 10: 16, | |
| } # 1024x1024 | |
| start_res = 2 ** start_res_log2 | |
| self.input_shape = (start_res, start_res, self.filter_nums[start_res_log2]) | |
| self.g_input = layers.Input(self.input_shape, name="generator_input") | |
| for i in range(start_res_log2, target_res_log2 + 1): | |
| filter_num = self.filter_nums[i] | |
| res = 2 ** i | |
| self.noise_inputs.append( | |
| layers.Input(shape=(res, res, 1), name=f"noise_{res}x{res}") | |
| ) | |
| to_rgb = Sequential( | |
| [ | |
| layers.InputLayer(input_shape=(res, res, filter_num)), | |
| EqualizedConv(7, 1, gain=1), # CHANGE NO OF CHANNELS | |
| ], | |
| name=f"to_rgb_{res}x{res}", | |
| ) | |
| self.to_rgb.append(to_rgb) | |
| is_base = i == self.start_res_log2 | |
| if is_base: | |
| input_shape = (res, res, self.filter_nums[i - 1]) | |
| else: | |
| input_shape = (2 ** (i - 1), 2 ** (i - 1), self.filter_nums[i - 1]) | |
| g_block = self.build_block( | |
| filter_num, res=res, input_shape=input_shape, is_base=is_base | |
| ) | |
| self.g_blocks.append(g_block) | |
| def build_block(self, filter_num, res, input_shape, is_base): | |
| input_tensor = layers.Input(shape=input_shape, name=f"g_{res}") | |
| noise = layers.Input(shape=(res, res, 1), name=f"noise_{res}") | |
| w = layers.Input(shape=512) | |
| x = input_tensor | |
| if not is_base: | |
| x = layers.UpSampling2D((2, 2))(x) | |
| x = EqualizedConv(filter_num, 3)(x) | |
| x = AddNoise()([x, noise]) | |
| x = layers.LeakyReLU(0.2)(x) | |
| x = InstanceNormalization()(x) | |
| x = AdaIN()([x, w]) | |
| x = EqualizedConv(filter_num, 3)(x) | |
| x = AddNoise()([x, noise]) | |
| x = layers.LeakyReLU(0.2)(x) | |
| x = InstanceNormalization()(x) | |
| x = AdaIN()([x, w]) | |
| return keras.Model([input_tensor, w, noise], x, name=f"genblock_{res}x{res}") | |
| def grow(self, res_log2): | |
| res = 2 ** res_log2 | |
| num_stages = res_log2 - self.start_res_log2 + 1 | |
| w = layers.Input(shape=(self.num_stages, 512), name="w") | |
| alpha = layers.Input(shape=(1), name="g_alpha") | |
| x = self.g_blocks[0]([self.g_input, w[:, 0], self.noise_inputs[0]]) | |
| if num_stages == 1: | |
| rgb = self.to_rgb[0](x) | |
| else: | |
| for i in range(1, num_stages - 1): | |
| x = self.g_blocks[i]([x, w[:, i], self.noise_inputs[i]]) | |
| old_rgb = self.to_rgb[num_stages - 2](x) | |
| old_rgb = layers.UpSampling2D((2, 2))(old_rgb) | |
| i = num_stages - 1 | |
| x = self.g_blocks[i]([x, w[:, i], self.noise_inputs[i]]) | |
| new_rgb = self.to_rgb[i](x) | |
| rgb = fade_in(alpha[0], new_rgb, old_rgb) | |
| return keras.Model( | |
| [self.g_input, w, self.noise_inputs, alpha], | |
| rgb, | |
| name=f"generator_{res}_x_{res}", | |
| ) | |
| class Discriminator: | |
| def __init__(self, start_res_log2, target_res_log2): | |
| self.start_res_log2 = start_res_log2 | |
| self.target_res_log2 = target_res_log2 | |
| self.num_stages = target_res_log2 - start_res_log2 + 1 | |
| # filter size to use at each stage, keys are log2(resolution) | |
| self.filter_nums = { | |
| 0: 512, | |
| 1: 512, | |
| 2: 512, # 4x4 | |
| 3: 512, # 8x8 | |
| 4: 512, # 16x16 | |
| 5: 512, # 32x32 | |
| 6: 256, # 64x64 | |
| 7: 128, # 128x128 | |
| 8: 64, # 256x256 | |
| 9: 32, # 512x512 | |
| 10: 16, | |
| } # 1024x1024 | |
| # list of discriminator blocks at increasing resolution | |
| self.d_blocks = [] | |
| # list of layers to convert RGB into activation for d_blocks inputs | |
| self.from_rgb = [] | |
| # Conditional embedding | |
| # self.embedding = layers.Embedding(5, 256) | |
| for res_log2 in range(self.start_res_log2, self.target_res_log2 + 1): | |
| res = 2 ** res_log2 | |
| filter_num = self.filter_nums[res_log2] | |
| from_rgb = Sequential( | |
| [ | |
| layers.InputLayer( | |
| input_shape=(res, res, 7), name=f"from_rgb_input_{res}" # CHANGE NO OF CHANNELS | |
| ), | |
| EqualizedConv(filter_num, 1), | |
| layers.LeakyReLU(0.2), | |
| ], | |
| name=f"from_rgb_{res}", | |
| ) | |
| self.from_rgb.append(from_rgb) | |
| input_shape = (res, res, filter_num) | |
| if len(self.d_blocks) == 0: | |
| d_block = self.build_base(filter_num, res) | |
| else: | |
| d_block = self.build_block( | |
| filter_num, self.filter_nums[res_log2 - 1], res | |
| ) | |
| self.d_blocks.append(d_block) | |
| def build_base(self, filter_num, res): | |
| input_tensor = layers.Input(shape=(res, res, filter_num), name=f"d_{res}") | |
| x = minibatch_std(input_tensor) | |
| x = EqualizedConv(filter_num, 3)(x) | |
| x = layers.LeakyReLU(0.2)(x) | |
| x = layers.Flatten()(x) | |
| x = EqualizedDense(filter_num)(x) | |
| x = layers.LeakyReLU(0.2)(x) | |
| x = EqualizedDense(1)(x) | |
| return keras.Model(input_tensor, x, name=f"d_{res}") | |
| def build_block(self, filter_num_1, filter_num_2, res): | |
| input_tensor = layers.Input(shape=(res, res, filter_num_1), name=f"d_{res}") | |
| x = EqualizedConv(filter_num_1, 3)(input_tensor) | |
| x = layers.LeakyReLU(0.2)(x) | |
| x = EqualizedConv(filter_num_2)(x) | |
| x = layers.LeakyReLU(0.2)(x) | |
| x = layers.AveragePooling2D((2, 2))(x) | |
| return keras.Model(input_tensor, x, name=f"d_{res}") | |
| def grow(self, res_log2): | |
| res = 2 ** res_log2 | |
| idx = res_log2 - self.start_res_log2 | |
| alpha = layers.Input(shape=(1), name="d_alpha") | |
| input_image = layers.Input(shape=(res, res, 7), name="input_image") # CHANGE NO OF CHANNELS | |
| class_embedding = layers.Input(shape=512, name="class_embedding") | |
| x = self.from_rgb[idx](input_image) | |
| x = AdaIN()([x, class_embedding]) | |
| x = self.d_blocks[idx](x) | |
| if idx > 0: | |
| idx -= 1 | |
| downsized_image = layers.AveragePooling2D((2, 2))(input_image) | |
| y = self.from_rgb[idx](downsized_image) | |
| x = fade_in(alpha[0], x, y) | |
| for i in range(idx, -1, -1): | |
| x = AdaIN()([x, class_embedding]) | |
| x = self.d_blocks[i](x) | |
| return keras.Model([input_image, class_embedding, alpha], x, name=f"discriminator_{res}_x_{res}") | |
| class cStyleGAN(tf.keras.Model): | |
| def __init__(self, z_dim=512, target_res=64, start_res=4): | |
| super(cStyleGAN, self).__init__() | |
| self.z_dim = z_dim | |
| self.target_res_log2 = log2(target_res) | |
| self.start_res_log2 = log2(start_res) | |
| self.current_res_log2 = self.target_res_log2 | |
| self.num_stages = self.target_res_log2 - self.start_res_log2 + 1 | |
| self.alpha = tf.Variable(1.0, dtype=tf.float32, trainable=False, name="alpha") | |
| self.mapping = Mapping(num_stages=self.num_stages) | |
| self.embedding = layers.Embedding(5, 512) | |
| self.d_builder = Discriminator(self.start_res_log2, self.target_res_log2) | |
| self.g_builder = Generator(self.start_res_log2, self.target_res_log2) | |
| self.g_input_shape = self.g_builder.input_shape | |
| self.phase = None | |
| self.train_step_counter = tf.Variable(0, dtype=tf.int32, trainable=False) | |
| self.loss_weights = {"gradient_penalty": 10, "drift": 0.001} | |
| def grow_model(self, res): | |
| tf.keras.backend.clear_session() | |
| res_log2 = log2(res) | |
| self.generator = self.g_builder.grow(res_log2) | |
| self.discriminator = self.d_builder.grow(res_log2) | |
| self.current_res_log2 = res_log2 | |
| print(f"\nModel resolution:{res}x{res}") | |
| def compile( | |
| self, steps_per_epoch, phase, res, d_optimizer, g_optimizer, *args, **kwargs | |
| ): | |
| self.loss_weights = kwargs.pop("loss_weights", self.loss_weights) | |
| self.steps_per_epoch = steps_per_epoch | |
| if res != 2 ** self.current_res_log2: | |
| self.grow_model(res) | |
| self.d_optimizer = d_optimizer | |
| self.g_optimizer = g_optimizer | |
| self.train_step_counter.assign(0) | |
| self.phase = phase | |
| self.d_loss_metric = keras.metrics.Mean(name="d_loss") | |
| self.g_loss_metric = keras.metrics.Mean(name="g_loss") | |
| super(cStyleGAN, self).compile(*args, **kwargs) | |
| def metrics(self): | |
| return [self.d_loss_metric, self.g_loss_metric] | |
| def generate_noise(self, batch_size): | |
| noise = [ | |
| tf.random.normal((batch_size, 2 ** res, 2 ** res, 1)) | |
| for res in range(self.start_res_log2, self.target_res_log2 + 1) | |
| ] | |
| return noise | |
| def gradient_loss(self, grad): | |
| loss = tf.square(grad) | |
| loss = tf.reduce_sum(loss, axis=tf.range(1, tf.size(tf.shape(loss)))) | |
| loss = tf.sqrt(loss) | |
| loss = tf.reduce_mean(tf.square(loss - 1)) | |
| return loss | |
| def train_step(self, data_tuple): | |
| real_images, class_label = data_tuple | |
| self.train_step_counter.assign_add(1) | |
| if self.phase == "TRANSITION": | |
| self.alpha.assign( | |
| tf.cast(self.train_step_counter / self.steps_per_epoch, tf.float32) | |
| ) | |
| elif self.phase == "STABLE": | |
| self.alpha.assign(1.0) | |
| else: | |
| raise NotImplementedError | |
| alpha = tf.expand_dims(self.alpha, 0) | |
| batch_size = tf.shape(real_images)[0] | |
| real_labels = tf.ones(batch_size) | |
| fake_labels = -tf.ones(batch_size) | |
| z = tf.random.normal((batch_size, self.z_dim)) | |
| const_input = tf.ones(tuple([batch_size] + list(self.g_input_shape))) | |
| noise = self.generate_noise(batch_size) | |
| # generator | |
| with tf.GradientTape() as g_tape: | |
| class_embedding = self.embedding(class_label) | |
| w = self.mapping([z, class_embedding]) | |
| fake_images = self.generator([const_input, w, noise, alpha]) | |
| pred_fake = self.discriminator([fake_images, class_embedding, alpha]) | |
| g_loss = wasserstein_loss(real_labels, pred_fake) | |
| trainable_weights = ( | |
| self.embedding.trainable_weights + self.mapping.trainable_weights + self.generator.trainable_weights | |
| ) | |
| gradients = g_tape.gradient(g_loss, trainable_weights) | |
| self.g_optimizer.apply_gradients(zip(gradients, trainable_weights)) | |
| # discriminator | |
| with tf.GradientTape() as gradient_tape, tf.GradientTape() as total_tape: | |
| # class_embedding = self.embedding(class_label) | |
| # forward pass | |
| pred_fake = self.discriminator([fake_images, class_embedding, alpha]) | |
| pred_real = self.discriminator([real_images, class_embedding, alpha]) | |
| epsilon = tf.random.uniform((batch_size, 1, 1, 1)) | |
| interpolates = epsilon * real_images + (1 - epsilon) * fake_images | |
| gradient_tape.watch(interpolates) | |
| pred_fake_grad = self.discriminator([interpolates, class_embedding, alpha]) | |
| # calculate losses | |
| loss_fake = wasserstein_loss(fake_labels, pred_fake) | |
| loss_real = wasserstein_loss(real_labels, pred_real) | |
| loss_fake_grad = wasserstein_loss(fake_labels, pred_fake_grad) | |
| # gradient penalty | |
| gradients_fake = gradient_tape.gradient(loss_fake_grad, [interpolates]) | |
| gradient_penalty = self.loss_weights[ | |
| "gradient_penalty" | |
| ] * self.gradient_loss(gradients_fake) | |
| # drift loss | |
| all_pred = tf.concat([pred_fake, pred_real], axis=0) | |
| drift_loss = self.loss_weights["drift"] * tf.reduce_mean(all_pred ** 2) | |
| d_loss = loss_fake + loss_real + gradient_penalty + drift_loss | |
| gradients = total_tape.gradient( | |
| d_loss, self.discriminator.trainable_weights | |
| ) | |
| self.d_optimizer.apply_gradients( | |
| zip(gradients, self.discriminator.trainable_weights) | |
| ) | |
| # Update metrics | |
| self.d_loss_metric.update_state(d_loss) | |
| self.g_loss_metric.update_state(g_loss) | |
| return { | |
| "d_loss": self.d_loss_metric.result(), | |
| "g_loss": self.g_loss_metric.result(), | |
| } | |
| def call(self, inputs: dict()): | |
| style_code = inputs.get("style_code", None) | |
| z = inputs.get("z", None) | |
| noise = inputs.get("noise", None) | |
| class_label = inputs.get("class_label", 0) | |
| batch_size = inputs.get("batch_size", 1) | |
| alpha = inputs.get("alpha", 1.0) | |
| alpha = tf.expand_dims(alpha, 0) | |
| class_embedding = self.embedding(class_label) | |
| if style_code is None: | |
| if z is None: | |
| z = tf.random.normal((batch_size, self.z_dim)) | |
| style_code = self.mapping([z, class_embedding]) | |
| if noise is None: | |
| noise = self.generate_noise(batch_size) | |
| # self.alpha.assign(alpha) | |
| const_input = tf.ones(tuple([batch_size] + list(self.g_input_shape))) | |
| images = self.generator([const_input, style_code, noise, alpha]) | |
| # images = np.clip((images * 0.5 + 0.5) * 255, 0, 255).astype(np.uint8) | |
| images = tf.clip_by_value((images * 0.5 + 0.5) * 255, 0, 255) | |
| return images | |