Commit 5283d190 authored by Oleh Astappiev's avatar Oleh Astappiev
Browse files

Initial commit

parents
# garbadge
.idea
__pycache__
# non-persistend data
data
models
logs
from common import get_modeldir, get_logdir, target_shape
import tensorflow as tf
from tensorflow.keras import layers, models, callbacks
class AlexNet:
def __init__(self):
super(AlexNet, self).__init__()
self.model = None
def get_model(self):
if self.model is None:
self.model = models.Sequential([
layers.Conv2D(filters=96, kernel_size=(11, 11), strides=(4, 4), activation='relu', input_shape=target_shape + (3,)),
layers.BatchNormalization(),
layers.MaxPool2D(pool_size=(3, 3), strides=(2, 2)),
layers.Conv2D(filters=256, kernel_size=(5, 5), strides=(1, 1), activation='relu', padding="same"),
layers.BatchNormalization(),
layers.MaxPool2D(pool_size=(3, 3), strides=(2, 2)),
layers.Conv2D(filters=384, kernel_size=(3, 3), strides=(1, 1), activation='relu', padding="same"),
layers.BatchNormalization(),
layers.Conv2D(filters=384, kernel_size=(3, 3), strides=(1, 1), activation='relu', padding="same"),
layers.BatchNormalization(),
layers.Conv2D(filters=256, kernel_size=(3, 3), strides=(1, 1), activation='relu', padding="same"),
layers.BatchNormalization(),
layers.MaxPool2D(pool_size=(3, 3), strides=(2, 2)),
layers.Flatten(),
layers.Dense(4096, activation='relu'),
layers.Dropout(0.5),
layers.Dense(4096, activation='relu'),
layers.Dropout(0.5),
layers.Dense(10, activation='softmax')
])
return self.model
def train_model(self, train_ds, validation_ds, test_ds):
tensorboard_cb = callbacks.TensorBoard(get_logdir("alexnet/fit"))
# optimizer='adam', SGD W
self.get_model()
self.model.compile(loss='sparse_categorical_crossentropy', optimizer=tf.optimizers.SGD(learning_rate=0.001), metrics=['accuracy'])
self.model.summary()
self.model.fit(train_ds, epochs=50, validation_data=validation_ds, validation_freq=1, callbacks=[tensorboard_cb])
self.model.evaluate(test_ds)
def save_model(self, name):
self.model.save(get_modeldir(name))
def load_model(self, name):
self.model = models.load_model(get_modeldir(name))
from common import *
from cifar10_tuples import *
from alexnet import AlexNet
from tensorflow.keras import datasets, layers, models, losses, callbacks, applications, optimizers, metrics, Model
from tensorflow.keras.applications import resnet
run_suffix = '-04'
(train_images, train_labels), (test_images, test_labels) = datasets.cifar10.load_data()
validation_images, validation_labels = train_images[:5000], train_labels[:5000]
train_images, train_labels = train_images[5000:], train_labels[5000:]
alexnet_train_ds = tf.data.Dataset.from_tensor_slices((train_images, train_labels))
alexnet_test_ds = tf.data.Dataset.from_tensor_slices((test_images, test_labels))
alexnet_validation_ds = tf.data.Dataset.from_tensor_slices((validation_images, validation_labels))
train_ds_size = tf.data.experimental.cardinality(alexnet_train_ds).numpy()
test_ds_size = tf.data.experimental.cardinality(alexnet_test_ds).numpy()
validation_ds_size = tf.data.experimental.cardinality(alexnet_validation_ds).numpy()
print("Training data size:", train_ds_size)
print("Test data size:", test_ds_size)
print("Validation data size:", validation_ds_size)
alexnet_train_ds = (alexnet_train_ds.map(process_images_couple).shuffle(buffer_size=train_ds_size).batch(batch_size=32, drop_remainder=True))
alexnet_test_ds = (alexnet_test_ds.map(process_images_couple).shuffle(buffer_size=train_ds_size).batch(batch_size=32, drop_remainder=True))
alexnet_validation_ds = (alexnet_validation_ds.map(process_images_couple).shuffle(buffer_size=train_ds_size).batch(batch_size=32, drop_remainder=True))
# plot_first5_fig(alexnet_train_ds)
# plot_first5_fig(alexnet_test_ds)
# plot_first5_fig(alexnet_validation_ds)
alexnet = AlexNet()
# alexnet.train_model(alexnet_train_ds, alexnet_validation_ds, alexnet_test_ds)
# alexnet.save_model('alexnet_cifar10' + run_suffix)
alexnet.load_model('alexnet_cifar10' + run_suffix)
print('alexnet evaluate')
alexnet.get_model().evaluate(alexnet_test_ds)
# (anchor_images, anchor_labels), (positive_images, positive_labels), (negative_images, negative_labels) = produce_tuples()
# save_tuples(anchor_images, anchor_labels, positive_images, positive_labels, negative_images, negative_labels)
# (anchor_images, anchor_labels), (positive_images, positive_labels), (negative_images, negative_labels) = load_tuples()
tuples_ds = prepare_dataset()
tuples_ds_size = tf.data.experimental.cardinality(tuples_ds).numpy()
# sample = next(iter(tuples_ds))
# visualize(*sample)
# Let's now split our dataset in train and validation.
siamese_train_ds = tuples_ds.take(round(tuples_ds_size * 0.8))
siamese_validation_ds = tuples_ds.skip(round(tuples_ds_size * 0.8))
class DistanceLayer(layers.Layer):
def __init__(self, **kwargs):
super().__init__(**kwargs)
def call(self, anchor, positive, negative):
ap_distance = tf.reduce_sum(tf.square(anchor - positive), -1)
an_distance = tf.reduce_sum(tf.square(anchor - negative), -1)
return (ap_distance, an_distance)
anchor_input = layers.Input(name="anchor", shape=target_shape + (3,))
positive_input = layers.Input(name="positive", shape=target_shape + (3,))
negative_input = layers.Input(name="negative", shape=target_shape + (3,))
alexnet_model = alexnet.get_model()
distances = DistanceLayer()(
alexnet_model(resnet.preprocess_input(anchor_input)),
alexnet_model(resnet.preprocess_input(positive_input)),
alexnet_model(resnet.preprocess_input(negative_input)),
)
siamese_network = Model(
inputs=[anchor_input, positive_input, negative_input], outputs=distances
)
"""
## Putting everything together
We now need to implement a model with custom training loop so we can compute
the triplet loss using the three embeddings produced by the Siamese network.
Let's create a `Mean` metric instance to track the loss of the training process.
"""
class SiameseModel(Model):
"""The Siamese Network model with a custom training and testing loops.
Computes the triplet loss using the three embeddings produced by the Siamese Network.
The triplet loss is defined as:
L(A, P, N) = max(‖f(A) - f(P)‖² - ‖f(A) - f(N)‖² + margin, 0)
"""
def __init__(self, siamese_network, margin=0.5):
super(SiameseModel, self).__init__()
self.siamese_network = siamese_network
self.margin = margin
self.loss_tracker = metrics.Mean(name="loss")
def call(self, inputs):
return self.siamese_network(inputs)
def train_step(self, data):
# GradientTape is a context manager that records every operation that
# you do inside. We are using it here to compute the loss so we can get
# the gradients and apply them using the optimizer specified in
# `compile()`.
with tf.GradientTape() as tape:
loss = self._compute_loss(data)
# Storing the gradients of the loss function with respect to the
# weights/parameters.
gradients = tape.gradient(loss, self.siamese_network.trainable_weights)
# Applying the gradients on the model using the specified optimizer
self.optimizer.apply_gradients(
zip(gradients, self.siamese_network.trainable_weights)
)
# Let's update and return the training loss metric.
self.loss_tracker.update_state(loss)
return {"loss": self.loss_tracker.result()}
def test_step(self, data):
loss = self._compute_loss(data)
# Let's update and return the loss metric.
self.loss_tracker.update_state(loss)
return {"loss": self.loss_tracker.result()}
def _compute_loss(self, data):
# The output of the network is a tuple containing the distances
# between the anchor and the positive example, and the anchor and
# the negative example.
ap_distance, an_distance = self.siamese_network(data)
# Computing the Triplet Loss by subtracting both distances and
# making sure we don't get a negative value.
loss = ap_distance - an_distance
loss = tf.maximum(loss + self.margin, 0.0)
return loss
@property
def metrics(self):
# We need to list our metrics here so the `reset_states()` can be
# called automatically.
return [self.loss_tracker]
"""
## Training
We are now ready to train our model.
"""
tensorboard_cb = callbacks.TensorBoard(get_logdir("siamese/fit"))
siamese_model = SiameseModel(siamese_network)
siamese_model.compile(optimizer=optimizers.Adam(0.0001))
siamese_model.fit(siamese_train_ds, epochs=10, validation_data=siamese_validation_ds, callbacks=[tensorboard_cb])
# print('saving siamese')
# siamese_model.save(get_modeldir('siamese_cifar10' + run_suffix))
# ValueError: Model <__main__.SiameseModel object at 0x7f9070531730> cannot be saved because the input shapes have not been set. Usually, input shapes are automatically determined from calling `.fit()` or `.predict()`. To manually set the shapes, call `model.build(input_shape)`.
print('saving alexnet2')
alexnet_model.save(get_modeldir('alexnet2_cifar10' + run_suffix))
# print('siamese evaluate')
# siamese_model.evaluate(alexnet_test_ds)
# ValueError: Layer model expects 3 input(s), but it received 2 input tensors. Inputs received: [<tf.Tensor 'IteratorGetNext:0' shape=(32, 227, 227, 3) dtype=float32>, <tf.Tensor 'IteratorGetNext:1' shape=(32, 1) dtype=uint8>]
print('alexnet evaluate')
alexnet_model.evaluate(alexnet_test_ds)
"""
## Inspecting what the network has learned
At this point, we can check how the network learned to separate the embeddings
depending on whether they belong to similar images.
We can use [cosine similarity](https://en.wikipedia.org/wiki/Cosine_similarity) to measure the
similarity between embeddings.
Let's pick a sample from the dataset to check the similarity between the
embeddings generated for each image.
"""
sample = next(iter(siamese_train_ds))
# visualize(*sample)
anchor, positive, negative = sample
anchor_embedding, positive_embedding, negative_embedding = (
alexnet_model(resnet.preprocess_input(anchor)),
alexnet_model(resnet.preprocess_input(positive)),
alexnet_model(resnet.preprocess_input(negative)),
)
"""
Finally, we can compute the cosine similarity between the anchor and positive
images and compare it with the similarity between the anchor and the negative
images.
We should expect the similarity between the anchor and positive images to be
larger than the similarity between the anchor and the negative images.
"""
cosine_similarity = metrics.CosineSimilarity()
positive_similarity = cosine_similarity(anchor_embedding, positive_embedding)
print("Positive similarity:", positive_similarity.numpy())
negative_similarity = cosine_similarity(anchor_embedding, negative_embedding)
print("Negative similarity", negative_similarity.numpy())
import numpy as np
import _pickle as pickle
import matplotlib.pyplot as plt
from common import get_datadir, process_images
from tensorflow.keras import datasets
from tensorflow import data
def shuffle_arrays(arrays, set_seed=-1):
"""Shuffles arrays in-place, in the same order, along axis=0
Parameters:
-----------
arrays : List of NumPy arrays.
set_seed : Seed value if int >= 0, else seed is random.
"""
assert all(len(arr) == len(arrays[0]) for arr in arrays)
seed = np.random.randint(0, 2**(32 - 1) - 1) if set_seed < 0 else set_seed
for arr in arrays:
rstate = np.random.RandomState(seed)
rstate.shuffle(arr)
def produce_tuples():
(train_images, train_labels), (test_images, test_labels) = datasets.cifar10.load_data()
total_labels = 10
images_per_label = 6000
tuples_per_label = int(images_per_label / 3)
total_tuples = int(tuples_per_label * total_labels)
# re arrange whole dataset by labels
labels_stat = np.zeros((total_labels), dtype='uint16')
labels_train = np.empty((total_labels, images_per_label, 32, 32, 3), dtype='uint8')
for i in range(len(train_images)):
tr_leb = train_labels[i]
tr_img = train_images[i]
labels_train[tr_leb, labels_stat[tr_leb]] = tr_img
labels_stat[tr_leb] += 1
for i in range(len(test_images)):
tr_leb = test_labels[i]
tr_img = test_images[i]
labels_train[tr_leb, labels_stat[tr_leb]] = tr_img
labels_stat[tr_leb] += 1
# shuffle all
for i in range(total_labels):
np.random.shuffle(labels_train[i])
# create tuples
anchor_images = np.empty((total_tuples, 32, 32, 3), dtype='uint8')
anchor_labels = np.empty((total_tuples), dtype='uint8')
for i in range(total_labels):
for j in range(tuples_per_label):
anchor_labels[i * tuples_per_label + j] = i
anchor_images[i * tuples_per_label + j] = labels_train[i, j]
positive_images = np.empty((total_tuples, 32, 32, 3), dtype='uint8')
positive_labels = np.empty((total_tuples), dtype='uint8')
for i in range(total_labels):
for j in range(tuples_per_label):
positive_labels[i * tuples_per_label + j] = i
positive_images[i * tuples_per_label + j] = labels_train[i, tuples_per_label + j]
negative_images = np.empty((total_tuples, 32, 32, 3), dtype='uint8')
negative_labels = np.empty((total_tuples), dtype='uint8')
for i in range(total_labels):
for j in range(tuples_per_label):
negative_labels[i * tuples_per_label + j] = i
negative_images[i * tuples_per_label + j] = labels_train[i, tuples_per_label * 2 + j]
# we need to ensure we use random kind of negative images, but without images from anchor label
shuffle_arrays([negative_labels, negative_images])
for i in range(total_labels):
k = ((i + 1) * tuples_per_label, 0)[i == 9]
for j in range(tuples_per_label):
c = i * tuples_per_label + j
tmp_label = negative_labels[c]
if tmp_label == i:
tmp_image = negative_images[c]
while negative_labels[k] == i:
k += 1
negative_labels[c] = negative_labels[k]
negative_images[c] = negative_images[k]
negative_labels[k] = tmp_label
negative_images[k] = tmp_image
# randomize them one more time
for i in range(total_labels):
shuffle_arrays([
negative_labels[i * tuples_per_label:(i + 1) * tuples_per_label],
negative_images[i * tuples_per_label:(i + 1) * tuples_per_label]
])
return (anchor_images, anchor_labels), (positive_images, positive_labels), (negative_images, negative_labels)
def save_tuples(anchor_images, anchor_labels, positive_images, positive_labels, negative_images, negative_labels):
data = [anchor_images, anchor_labels, positive_images, positive_labels, negative_images, negative_labels]
with open(get_datadir('cifar10_tuples.pkl'), 'wb') as outfile:
pickle.dump(data, outfile, -1)
def load_tuples():
with open(get_datadir('cifar10_tuples.pkl'), 'rb') as infile:
result = pickle.load(infile)
return (result[0], result[1]), (result[2], result[3]), (result[4], result[5])
def prepare_dataset():
(anchor_images, anchor_labels), (positive_images, positive_labels), (negative_images, negative_labels) = load_tuples()
# anchor_ds = data.Dataset.from_tensor_slices((anchor_images, anchor_labels))
# positive_ds = data.Dataset.from_tensor_slices((positive_images, positive_labels))
# negative_ds = data.Dataset.from_tensor_slices((negative_images, negative_labels))
anchor_ds = data.Dataset.from_tensor_slices(anchor_images)
positive_ds = data.Dataset.from_tensor_slices(positive_images)
negative_ds = data.Dataset.from_tensor_slices(negative_images)
anchor_ds = (anchor_ds.map(process_images).batch(batch_size=32, drop_remainder=True))
positive_ds = (positive_ds.map(process_images).batch(batch_size=32, drop_remainder=True))
negative_ds = (negative_ds.map(process_images).batch(batch_size=32, drop_remainder=True))
dataset = data.Dataset.zip((anchor_ds, positive_ds, negative_ds))
dataset = dataset.shuffle(buffer_size=1024)
return dataset
def visualize(anchor, positive, negative):
"""Visualize a few triplets from the supplied batches."""
def show(ax, image):
ax.imshow(image)
ax.xaxis.set_visible(False)
ax.yaxis.set_visible(False)
fig = plt.figure(figsize=(9, 9))
axs = fig.subplots(3, 3)
for i in range(3):
show(axs[i, 0], anchor[0][i])
show(axs[i, 1], positive[0][i])
show(axs[i, 2], negative[0][i])
plt.show()
import tensorflow as tf
from os import path, curdir
import time
import matplotlib.pyplot as plt
target_shape = (227, 227)
CIFAR10_CLASS_NAMES = ['airplane', 'automobile', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']
def process_images_couple(image, label):
return process_images(image), label
def process_images(image):
# Normalize images to have a mean of 0 and standard deviation of 1
image = tf.image.per_image_standardization(image)
# Resize images from 32x32 to 277x277
image = tf.image.resize(image, target_shape)
return image
def plot_first5_fig(dataset):
plt.figure(figsize=(20, 20))
for i, (image, label) in enumerate(dataset.take(5)):
ax = plt.subplot(5, 5, i + 1)
plt.imshow(image)
plt.title(CIFAR10_CLASS_NAMES[label.numpy()[0]])
plt.axis('off')
plt.show()
def get_logdir(subfolder):
return path.join(path.join(path.join(curdir, "logs"), subfolder), time.strftime("run_%Y_%m_%d-%H_%M_%S"))
def get_modeldir(name):
return path.join(path.join(curdir, "models"), name)
def get_datadir(name):
return path.join(path.join(curdir, "data"), name)
"""
Title: Image similarity estimation using a Siamese Network with a triplet loss
Authors: [Hazem Essam](https://twitter.com/hazemessamm) and [Santiago L. Valdarrama](https://twitter.com/svpino)
Date created: 2021/03/25
Last modified: 2021/03/25
Description: Training a Siamese Network to compare the similarity of images using a triplet loss function.
"""
"""
## Introduction
A [Siamese Network](https://en.wikipedia.org/wiki/Siamese_neural_network) is a type of network architecture that
contains two or more identical subnetworks used to generate feature vectors for each input and compare them.
Siamese Networks can be applied to different use cases, like detecting duplicates, finding anomalies, and face recognition.
This example uses a Siamese Network with three identical subnetworks. We will provide three images to the model, where
two of them will be similar (_anchor_ and _positive_ samples), and the third will be unrelated (a _negative_ example.)
Our goal is for the model to learn to estimate the similarity between images.
For the network to learn, we use a triplet loss function. You can find an introduction to triplet loss in the
[FaceNet paper](https://arxiv.org/pdf/1503.03832.pdf) by Schroff et al,. 2015. In this example, we define the triplet
loss function as follows:
`L(A, P, N) = max(‖f(A) - f(P)‖² - ‖f(A) - f(N)‖² + margin, 0)`
This example uses the [Totally Looks Like dataset](https://sites.google.com/view/totally-looks-like-dataset)
by [Rosenfeld et al., 2018](https://arxiv.org/pdf/1803.01485v3.pdf).
"""
"""
## Setup
"""
import matplotlib.pyplot as plt
import numpy as np
import os
import random
import tensorflow as tf
from pathlib import Path
from tensorflow.keras import applications, layers, losses, optimizers, metrics, Model
from tensorflow.keras.applications import resnet
target_shape = (200, 200)
"""
## Load the dataset
We are going to load the *Totally Looks Like* dataset and unzip it inside the `~/.keras` directory
in the local environment.
The dataset consists of two separate files:
* `left.zip` contains the images that we will use as the anchor.
* `right.zip` contains the images that we will use as the positive sample (an image that looks like the anchor).
"""
cache_dir = Path(Path.home()) / ".keras"
anchor_images_path = cache_dir / "left"
positive_images_path = cache_dir / "right"
"""shell
gdown --id 1jvkbTr_giSP3Ru8OwGNCg6B4PvVbcO34
gdown --id 1EzBZUb_mh_Dp_FKD0P4XiYYSd0QBH5zW
unzip -oq left.zip -d $cache_dir
unzip -oq right.zip -d $cache_dir
"""
"""
## Preparing the data
We are going to use a `tf.data` pipeline to load the data and generate the triplets that we
need to train the Siamese network.
We'll set up the pipeline using a zipped list with anchor, positive, and negative filenames as
the source. The pipeline will load and preprocess the corresponding images.
"""
def preprocess_image(filename):
"""
Load the specified file as a JPEG image, preprocess it and
resize it to the target shape.
"""
image_string = tf.io.read_file(filename)
image = tf.image.decode_jpeg(image_string, channels=3)
image = tf.image.convert_image_dtype(image, tf.float32)
image = tf.image.resize(image, target_shape)
return image
def preprocess_triplets(anchor, positive, negative):
"""
Given the filenames corresponding to the three images, load and
preprocess them.
"""
return (
preprocess_image(anchor),
preprocess_image(positive),
preprocess_image(negative),
)
"""
Let's setup our data pipeline using a zipped list with an anchor, positive,
and negative image filename as the source. The output of the pipeline
contains the same triplet with every image loaded and preprocessed.
"""
# We need to make sure both the anchor and positive images are loaded in
# sorted order so we can match them together.
anchor_images = sorted(
[str(anchor_images_path / f) for f in os.listdir(anchor_images_path)]
)
positive_images = sorted(
[str(positive_images_path / f) for f in os.listdir(positive_images_path)]
)
image_count = len(anchor_images)
anchor_dataset = tf.data.Dataset.from_tensor_slices(anchor_images)
positive_dataset = tf.data.Dataset.from_tensor_slices(positive_images)
# To generate the list of negative images, let's randomize the list of
# available images and concatenate them together.
rng = np.random.RandomState(seed=42)
rng.shuffle(anchor_images)
rng.shuffle(positive_images)
negative_images = anchor_images + positive_images