Commit 542dfe7d authored by Oleh Astappiev's avatar Oleh Astappiev
Browse files

feat: add HSV and SIFT dataset export

parent d997f56f
"""
Title: Image similarity estimation using a Siamese Network with a triplet loss
Authors: [Hazem Essam](https://twitter.com/hazemessamm) and [Santiago L. Valdarrama](https://twitter.com/svpino)
Date created: 2021/03/25
Last modified: 2021/03/25
Description: Training a Siamese Network to compare the similarity of images using a triplet loss function.
"""
"""
## Introduction
A [Siamese Network](https://en.wikipedia.org/wiki/Siamese_neural_network) is a type of network architecture that
contains two or more identical subnetworks used to generate feature vectors for each input and compare them.
Siamese Networks can be applied to different use cases, like detecting duplicates, finding anomalies, and face recognition.
This example uses a Siamese Network with three identical subnetworks. We will provide three images to the model, where
two of them will be similar (_anchor_ and _positive_ samples), and the third will be unrelated (a _negative_ example.)
Our goal is for the model to learn to estimate the similarity between images.
For the network to learn, we use a triplet loss function. You can find an introduction to triplet loss in the
[FaceNet paper](https://arxiv.org/pdf/1503.03832.pdf) by Schroff et al,. 2015. In this example, we define the triplet
loss function as follows:
`L(A, P, N) = max(‖f(A) - f(P)‖² - ‖f(A) - f(N)‖² + margin, 0)`
This example uses the [Totally Looks Like dataset](https://sites.google.com/view/totally-looks-like-dataset)
by [Rosenfeld et al., 2018](https://arxiv.org/pdf/1803.01485v3.pdf).
"""
"""
## Setup
"""
import matplotlib.pyplot as plt
import numpy as np
import os
import random
import tensorflow as tf
from pathlib import Path
from tensorflow.keras import applications, layers, losses, optimizers, metrics, Model
from tensorflow.keras.applications import resnet
target_shape = (200, 200)
"""
## Load the dataset
We are going to load the *Totally Looks Like* dataset and unzip it inside the `~/.keras` directory
in the local environment.
The dataset consists of two separate files:
* `left.zip` contains the images that we will use as the anchor.
* `right.zip` contains the images that we will use as the positive sample (an image that looks like the anchor).
"""
cache_dir = Path(Path.home()) / ".keras"
anchor_images_path = cache_dir / "left"
positive_images_path = cache_dir / "right"
"""shell
gdown --id 1jvkbTr_giSP3Ru8OwGNCg6B4PvVbcO34
gdown --id 1EzBZUb_mh_Dp_FKD0P4XiYYSd0QBH5zW
unzip -oq left.zip -d $cache_dir
unzip -oq right.zip -d $cache_dir
"""
"""
## Preparing the data
We are going to use a `tf.data` pipeline to load the data and generate the triplets that we
need to train the Siamese network.
We'll set up the pipeline using a zipped list with anchor, positive, and negative filenames as
the source. The pipeline will load and preprocess the corresponding images.
"""
def preprocess_image(filename):
"""
Load the specified file as a JPEG image, preprocess it and
resize it to the target shape.
"""
image_string = tf.io.read_file(filename)
image = tf.image.decode_jpeg(image_string, channels=3)
image = tf.image.convert_image_dtype(image, tf.float32)
image = tf.image.resize(image, target_shape)
return image
def preprocess_triplets(anchor, positive, negative):
"""
Given the filenames corresponding to the three images, load and
preprocess them.
"""
return (
preprocess_image(anchor),
preprocess_image(positive),
preprocess_image(negative),
)
"""
Let's setup our data pipeline using a zipped list with an anchor, positive,
and negative image filename as the source. The output of the pipeline
contains the same triplet with every image loaded and preprocessed.
"""
# We need to make sure both the anchor and positive images are loaded in
# sorted order so we can match them together.
anchor_images = sorted(
[str(anchor_images_path / f) for f in os.listdir(anchor_images_path)]
)
positive_images = sorted(
[str(positive_images_path / f) for f in os.listdir(positive_images_path)]
)
image_count = len(anchor_images)
anchor_dataset = tf.data.Dataset.from_tensor_slices(anchor_images)
positive_dataset = tf.data.Dataset.from_tensor_slices(positive_images)
# To generate the list of negative images, let's randomize the list of
# available images and concatenate them together.
rng = np.random.RandomState(seed=42)
rng.shuffle(anchor_images)
rng.shuffle(positive_images)
negative_images = anchor_images + positive_images
np.random.RandomState(seed=32).shuffle(negative_images)
negative_dataset = tf.data.Dataset.from_tensor_slices(negative_images)
negative_dataset = negative_dataset.shuffle(buffer_size=4096)
dataset = tf.data.Dataset.zip((anchor_dataset, positive_dataset, negative_dataset))
dataset = dataset.shuffle(buffer_size=1024)
dataset = dataset.map(preprocess_triplets)
# Let's now split our dataset in train and validation.
train_dataset = dataset.take(round(image_count * 0.8))
val_dataset = dataset.skip(round(image_count * 0.8))
train_dataset = train_dataset.batch(32, drop_remainder=False)
train_dataset = train_dataset.prefetch(tf.data.AUTOTUNE)
val_dataset = val_dataset.batch(32, drop_remainder=False)
val_dataset = val_dataset.prefetch(tf.data.AUTOTUNE)
"""
Let's take a look at a few examples of triplets. Notice how the first two images
look alike while the third one is always different.
"""
def visualize(anchor, positive, negative):
"""Visualize a few triplets from the supplied batches."""
def show(ax, image):
ax.imshow(image)
ax.get_xaxis().set_visible(False)
ax.get_yaxis().set_visible(False)
fig = plt.figure(figsize=(9, 9))
axs = fig.subplots(3, 3)
for i in range(3):
show(axs[i, 0], anchor[i])
show(axs[i, 1], positive[i])
show(axs[i, 2], negative[i])
visualize(*list(train_dataset.take(1).as_numpy_iterator())[0])
"""
## Setting up the embedding generator model
Our Siamese Network will generate embeddings for each of the images of the
triplet. To do this, we will use a ResNet50 model pretrained on ImageNet and
connect a few `Dense` layers to it so we can learn to separate these
embeddings.
We will freeze the weights of all the layers of the model up until the layer `conv5_block1_out`.
This is important to avoid affecting the weights that the model has already learned.
We are going to leave the bottom few layers trainable, so that we can fine-tune their weights
during training.
"""
base_cnn = resnet.ResNet50(
weights="imagenet", input_shape=target_shape + (3,), include_top=False
)
flatten = layers.Flatten()(base_cnn.output)
dense1 = layers.Dense(512, activation="relu")(flatten)
dense1 = layers.BatchNormalization()(dense1)
dense2 = layers.Dense(256, activation="relu")(dense1)
dense2 = layers.BatchNormalization()(dense2)
output = layers.Dense(256)(dense2)
embedding = Model(base_cnn.input, output, name="Embedding")
trainable = False
for layer in base_cnn.layers:
if layer.name == "conv5_block1_out":
trainable = True
layer.trainable = trainable
"""
## Setting up the Siamese Network model
The Siamese network will receive each of the triplet images as an input,
generate the embeddings, and output the distance between the anchor and the
positive embedding, as well as the distance between the anchor and the negative
embedding.
To compute the distance, we can use a custom layer `DistanceLayer` that
returns both values as a tuple.
"""
class DistanceLayer(layers.Layer):
"""
This layer is responsible for computing the distance between the anchor
embedding and the positive embedding, and the anchor embedding and the
negative embedding.
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
def call(self, anchor, positive, negative):
ap_distance = tf.reduce_sum(tf.square(anchor - positive), -1)
an_distance = tf.reduce_sum(tf.square(anchor - negative), -1)
return (ap_distance, an_distance)
anchor_input = layers.Input(name="anchor", shape=target_shape + (3,))
positive_input = layers.Input(name="positive", shape=target_shape + (3,))
negative_input = layers.Input(name="negative", shape=target_shape + (3,))
distances = DistanceLayer()(
embedding(resnet.preprocess_input(anchor_input)),
embedding(resnet.preprocess_input(positive_input)),
embedding(resnet.preprocess_input(negative_input)),
)
siamese_network = Model(
inputs=[anchor_input, positive_input, negative_input], outputs=distances
)
"""
## Putting everything together
We now need to implement a model with custom training loop so we can compute
the triplet loss using the three embeddings produced by the Siamese network.
Let's create a `Mean` metric instance to track the loss of the training process.
"""
class SiameseModel(Model):
"""The Siamese Network model with a custom training and testing loops.
Computes the triplet loss using the three embeddings produced by the Siamese Network.
The triplet loss is defined as:
L(A, P, N) = max(‖f(A) - f(P)‖² - ‖f(A) - f(N)‖² + margin, 0)
"""
def __init__(self, siamese_network, margin=0.5):
super(SiameseModel, self).__init__()
self.siamese_network = siamese_network
self.margin = margin
self.loss_tracker = metrics.Mean(name="loss")
def call(self, inputs):
return self.siamese_network(inputs)
def train_step(self, data):
# GradientTape is a context manager that records every operation that
# you do inside. We are using it here to compute the loss so we can get
# the gradients and apply them using the optimizer specified in
# `compile()`.
with tf.GradientTape() as tape:
loss = self._compute_loss(data)
# Storing the gradients of the loss function with respect to the
# weights/parameters.
gradients = tape.gradient(loss, self.siamese_network.trainable_weights)
# Applying the gradients on the model using the specified optimizer
self.optimizer.apply_gradients(
zip(gradients, self.siamese_network.trainable_weights)
)
# Let's update and return the training loss metric.
self.loss_tracker.update_state(loss)
return {"loss": self.loss_tracker.result()}
def test_step(self, data):
loss = self._compute_loss(data)
# Let's update and return the loss metric.
self.loss_tracker.update_state(loss)
return {"loss": self.loss_tracker.result()}
def _compute_loss(self, data):
# The output of the network is a tuple containing the distances
# between the anchor and the positive example, and the anchor and
# the negative example.
ap_distance, an_distance = self.siamese_network(data)
# Computing the Triplet Loss by subtracting both distances and
# making sure we don't get a negative value.
loss = ap_distance - an_distance
loss = tf.maximum(loss + self.margin, 0.0)
return loss
@property
def metrics(self):
# We need to list our metrics here so the `reset_states()` can be
# called automatically.
return [self.loss_tracker]
"""
## Training
We are now ready to train our model.
"""
siamese_model = SiameseModel(siamese_network)
siamese_model.compile(optimizer=optimizers.Adam(0.0001))
siamese_model.fit(train_dataset, epochs=10, validation_data=val_dataset)
"""
## Inspecting what the network has learned
At this point, we can check how the network learned to separate the embeddings
depending on whether they belong to similar images.
We can use [cosine similarity](https://en.wikipedia.org/wiki/Cosine_similarity) to measure the
similarity between embeddings.
Let's pick a sample from the dataset to check the similarity between the
embeddings generated for each image.
"""
sample = next(iter(train_dataset))
visualize(*sample)
anchor, positive, negative = sample
anchor_embedding, positive_embedding, negative_embedding = (
embedding(resnet.preprocess_input(anchor)),
embedding(resnet.preprocess_input(positive)),
embedding(resnet.preprocess_input(negative)),
)
"""
Finally, we can compute the cosine similarity between the anchor and positive
images and compare it with the similarity between the anchor and the negative
images.
We should expect the similarity between the anchor and positive images to be
larger than the similarity between the anchor and the negative images.
"""
cosine_similarity = metrics.CosineSimilarity()
positive_similarity = cosine_similarity(anchor_embedding, positive_embedding)
print("Positive similarity:", positive_similarity.numpy())
negative_similarity = cosine_similarity(anchor_embedding, negative_embedding)
print("Negative similarity", negative_similarity.numpy())
"""
## Summary
1. The `tf.data` API enables you to build efficient input pipelines for your model. It is
particularly useful if you have a large dataset. You can learn more about `tf.data`
pipelines in [tf.data: Build TensorFlow input pipelines](https://www.tensorflow.org/guide/data).
2. In this example, we use a pre-trained ResNet50 as part of the subnetwork that generates
the feature embeddings. By using [transfer learning](https://www.tensorflow.org/guide/keras/transfer_learning?hl=en),
we can significantly reduce the training time and size of the dataset.
3. Notice how we are [fine-tuning](https://www.tensorflow.org/guide/keras/transfer_learning?hl=en#fine-tuning)
the weights of the final layers of the ResNet50 network but keeping the rest of the layers untouched.
Using the name assigned to each layer, we can freeze the weights to a certain point and keep the last few layers open.
4. We can create custom layers by creating a class that inherits from `tf.keras.layers.Layer`,
as we did in the `DistanceLayer` class.
5. We used a cosine similarity metric to measure how to 2 output embeddings are similar to each other.
6. You can implement a custom training loop by overriding the `train_step()` method. `train_step()` uses
[`tf.GradientTape`](https://www.tensorflow.org/api_docs/python/tf/GradientTape),
which records every operation that you perform inside it. In this example, we use it to access the
gradients passed to the optimizer to update the model weights at every step. For more details, check out the
[Intro to Keras for researchers](https://keras.io/getting_started/intro_to_keras_for_researchers/)
and [Writing a training loop from scratch](https://www.tensorflow.org/guide/keras/writing_a_training_loop_from_scratch?hl=en).
"""
\ No newline at end of file
import sys
sys.path.append("..")
import csv
from src.utils.hsv import *
from src.utils.sift import *
import tensorflow as tf
from utils.common import *
from utils.distance import *
from src.data.embeddings import *
from src.model.alexnet import AlexNetModel
from tensorflow.keras import layers, Model, models, datasets
# Load dataset
(train_images, train_labels), (test_images, test_labels) = datasets.cifar10.load_data()
cifar10_images = np.concatenate([train_images, test_images])
cifar10_labels = np.concatenate([train_labels, test_labels])
cifar10_vds = tf.data.Dataset.from_tensor_slices((cifar10_images, cifar10_labels))
def export_hsv():
header = ['ID', 'Label', 'HSV vector']
with open('../data/hsv.csv', 'w', encoding='UTF8', newline='') as f:
writer = csv.writer(f, delimiter=";")
# write the header
writer.writerow(header)
for i, (image, label) in enumerate(cifar10_vds):
a, b, c, hist_array = extract_hsv(image.numpy())
label_str = ','.join(map(str, label.numpy()))
value_str = ','.join(map(str, hist_array))
writer.writerow([i, label_str, value_str])
def export_sift():
header = ['ID', 'Label', 'SIFT descriptors']
with open('../data/sift.csv', 'w', encoding='UTF8', newline='') as f:
writer = csv.writer(f, delimiter=";")
# write the header
writer.writerow(header)
for i, (image, label) in enumerate(cifar10_vds):
# from smaller image only smaller number of key points can be extracted
img = cv2.resize(image.numpy(), (230, 230))
keypoints, features = extract_sift(img)
label_str = ','.join(map(str, label.numpy()))
if features is not None:
value_str = ','.join(map(str, features.flatten()))
else:
value_str = 'None'
print('Unable to extract SIFT from image', i)
writer.writerow([i, label_str, value_str])
def export_embeddings():
header = ['ID', 'Label', 'Siamese Embeddings']
with open('../data/siamese.csv', 'w', encoding='UTF8', newline='') as f:
writer = csv.writer(f, delimiter=";")
# write the header
writer.writerow(header)
seamese = models.load_model(get_modeldir('seamese1.tf'))
embedding_vds = (cifar10_vds.map(process_images_couple).batch(batch_size=32, drop_remainder=False))
print('predicting embeddings')
embeddings = seamese.predict(embedding_vds)
print('embeddings done')
for i, (label) in enumerate(cifar10_labels):
label_str = label
value_str = ','.join(map(str, embeddings[i]))
writer.writerow([i, label_str, value_str])
# export_hsv()
export_sift()
# export_embeddings()
print('done')
import sys
sys.path.append("..")
from src.utils.hsv import *
from src.utils.sift import *
import tensorflow as tf
from tensorflow.keras import datasets
# Load dataset
(train_images, train_labels), (test_images, test_labels) = datasets.cifar10.load_data()
cifar10_vds = tf.data.Dataset.from_tensor_slices((np.concatenate([train_images, test_images]), np.concatenate([train_labels, test_labels])))
# test HSV
print('test HSV')
plot_hsv(cifar10_vds)
print('test SIFT')
plot_sift(cifar10_vds)
print('done')
numpy~=1.19.5
tensorflow~=2.6.1
matplotlib~=3.4.3
opencv-python~=4.5.5.62
...@@ -21,7 +21,7 @@ def process_images(image): ...@@ -21,7 +21,7 @@ def process_images(image):
return image return image
def _show_subplot(nrows, ncols, index, image, title = None): def subplot_image(nrows, ncols, index, image, title = None):
ax = plt.subplot(nrows, ncols, index) ax = plt.subplot(nrows, ncols, index)
ax.imshow(image) ax.imshow(image)
if title is not None: if title is not None:
...@@ -32,15 +32,15 @@ def _show_subplot(nrows, ncols, index, image, title = None): ...@@ -32,15 +32,15 @@ def _show_subplot(nrows, ncols, index, image, title = None):
def plot_grid25(dataset): def plot_grid25(dataset):
plt.figure(figsize=(20, 20)) plt.figure(figsize=(20, 20))
for i, (image, label) in enumerate(dataset.take(25)): for i, (image, label) in enumerate(dataset.take(25)):
_show_subplot(5, 5, i + 1, image, CIFAR10_CLASS_NAMES[label.numpy()[0]]) subplot_image(5, 5, i + 1, image, CIFAR10_CLASS_NAMES[label.numpy()[0]])
plt.show() plt.show()
def plot_tuple(anchor, positive, negative): def plot_tuple(anchor, positive, negative):
plt.figure(figsize=(9, 3)) plt.figure(figsize=(9, 3))
_show_subplot(1, 3, 1, anchor) subplot_image(1, 3, 1, anchor)
_show_subplot(1, 3, 2, positive) subplot_image(1, 3, 2, positive)
_show_subplot(1, 3, 3, negative) subplot_image(1, 3, 3, negative)
plt.show() plt.show()
......
import matplotlib.pyplot as plt
import numpy as np
import cv2
from src.utils.common import subplot_image
def extract_hsv(image):
"""Extract a 3 color channels histogram from the HSV"""
hsv = cv2.cvtColor(image, cv2.COLOR_RGB2HSV)
# The ranges of the 3 HSV channels in opencv are 0-180, 0-256, 0-256 respectively
# Bins is set to 1365, so that each picture can be represented by a 4000-dimensional vector
histh = cv2.calcHist([hsv], [0], None, [1365], [0, 180])
hists = cv2.calcHist([hsv], [1], None, [1365], [0, 256])
histv = cv2.calcHist([hsv], [2], None, [1365], [0, 256])
# normalize the histogram
histh /= histh.sum()
hists /= hists.sum()
histv /= histv.sum()
hist_array = np.array([histh, hists, histv])
# return the flattened histogram as the feature vector
return histh, hists, histv, hist_array.flatten()
def plot_hsv(dataset):
plt.figure(figsize=(20, 20))
for i, (image, label) in enumerate(dataset.take(3)):
subplot_image(3, 2, i * 2 + 1, image, "Original image")
hist0_s, hist1_s, hist2_s, hist_s = extract_hsv(image.numpy())
# print('the length of histogram of the sample', len(hist_s))
# subplot_image(3, 2, i * 2 + 2, image, "HSV Histogram")
ax = plt.subplot(3, 2, i * 2 + 2)
# ax.imshow(image)
ax.set_title("HSV Histogram")
ax.plot(hist0_s, label='H')
ax.plot(hist1_s, label='S')
ax.plot(hist2_s, label='V')
plt.xlabel("Bins")
plt.ylabel("percentage of Pixels")
plt.legend()
# ax.axis('off')
plt.show()
import matplotlib.pyplot as plt
import cv2
from src.utils.common import subplot_image
def extract_sift(image):
sift = cv2.SIFT_create(32)
# Calculate the keypoint and each point description of the image
keypoints, features = sift.detectAndCompute(image, None)
return keypoints, features
def plot_sift(dataset):
plt.figure(figsize=(20, 20))
for i, (image, label) in enumerate(dataset.take(3)):
subplot_image(3, 2, i * 2 + 1, image, "Original image")
# from smaller image only smaller number of key points can be extracted
img = cv2.resize(image.numpy(), (230, 230))
keypoints, features = extract_sift(img)
img_kp = img.copy()
cv2.drawKeypoints(img_kp, keypoints, img_kp, color=(255, 0, 0))