Commit 181ad96e authored by Oleh Astappiev's avatar Oleh Astappiev
Browse files

feat: create Alexnet model class

parent 00708055
from common import get_modeldir, get_logdir, target_shape
import tensorflow as tf
from tensorflow.keras import layers, models, callbacks
class AlexNet:
def __init__(self):
super(AlexNet, self).__init__()
self.model = None
def get_model(self):
if self.model is None:
self.model = models.Sequential([
layers.Conv2D(filters=96, kernel_size=(11, 11), strides=(4, 4), activation='relu', input_shape=target_shape + (3,)),
layers.BatchNormalization(),
layers.MaxPool2D(pool_size=(3, 3), strides=(2, 2)),
layers.Conv2D(filters=256, kernel_size=(5, 5), strides=(1, 1), activation='relu', padding="same"),
layers.BatchNormalization(),
layers.MaxPool2D(pool_size=(3, 3), strides=(2, 2)),
layers.Conv2D(filters=384, kernel_size=(3, 3), strides=(1, 1), activation='relu', padding="same"),
layers.BatchNormalization(),
layers.Conv2D(filters=384, kernel_size=(3, 3), strides=(1, 1), activation='relu', padding="same"),
layers.BatchNormalization(),
layers.Conv2D(filters=256, kernel_size=(3, 3), strides=(1, 1), activation='relu', padding="same"),
layers.BatchNormalization(),
layers.MaxPool2D(pool_size=(3, 3), strides=(2, 2)),
layers.Flatten(),
layers.Dense(4096, activation='relu'),
layers.Dropout(0.5),
layers.Dense(4096, activation='relu'),
layers.Dropout(0.5),
layers.Dense(10, activation='softmax')
])
return self.model
def train_model(self, train_ds, validation_ds, test_ds):
tensorboard_cb = callbacks.TensorBoard(get_logdir("alexnet/fit"))
# optimizer='adam', SGD W
self.get_model()
self.model.compile(loss='sparse_categorical_crossentropy', optimizer=tf.optimizers.SGD(learning_rate=0.001), metrics=['accuracy'])
self.model.summary()
self.model.fit(train_ds, epochs=50, validation_data=validation_ds, validation_freq=1, callbacks=[tensorboard_cb])
self.model.evaluate(test_ds)
def save_model(self, name):
self.model.save(get_modeldir(name))
def load_model(self, name):
self.model = models.load_model(get_modeldir(name))
from model.alexnet import AlexNetModel
from common import get_modeldir
model_name = 'alexnet_cifar10-new'
alexnet = AlexNetModel()
# train
train_ds, test_ds, validation_ds = alexnet.x_dataset()
alexnet.x_train(train_ds, test_ds)
alexnet.evaluate(validation_ds)
# save
alexnet.save_weights(get_modeldir(model_name + '.h5'))
alexnet.save(get_modeldir(model_name + '.tf'))
# print('evaluate')
# res = alexnet.predict(validation_ds)
...@@ -3,6 +3,8 @@ from os import path, curdir ...@@ -3,6 +3,8 @@ from os import path, curdir
import time import time
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
# buffer_size = 5000
buffer_size = 50000
target_shape = (227, 227) target_shape = (227, 227)
CIFAR10_CLASS_NAMES = ['airplane', 'automobile', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck'] CIFAR10_CLASS_NAMES = ['airplane', 'automobile', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']
...@@ -19,23 +21,36 @@ def process_images(image): ...@@ -19,23 +21,36 @@ def process_images(image):
return image return image
def plot_first5_fig(dataset): def _show_subplot(nrows, ncols, index, image, title = None):
ax = plt.subplot(nrows, ncols, index)
ax.imshow(image)
if title is not None:
ax.set_title(title)
ax.axis('off')
def plot_grid25(dataset):
plt.figure(figsize=(20, 20)) plt.figure(figsize=(20, 20))
for i, (image, label) in enumerate(dataset.take(5)): for i, (image, label) in enumerate(dataset.take(25)):
ax = plt.subplot(5, 5, i + 1) _show_subplot(5, 5, i + 1, image, CIFAR10_CLASS_NAMES[label.numpy()[0]])
plt.imshow(image) plt.show()
plt.title(CIFAR10_CLASS_NAMES[label.numpy()[0]])
plt.axis('off')
def plot_tuple(anchor, positive, negative):
plt.figure(figsize=(9, 3))
_show_subplot(1, 3, 1, anchor)
_show_subplot(1, 3, 2, positive)
_show_subplot(1, 3, 3, negative)
plt.show() plt.show()
def get_logdir(subfolder): def get_logdir(subfolder):
return path.join(path.join(path.join(curdir, "logs"), subfolder), time.strftime("run_%Y_%m_%d-%H_%M_%S")) return path.join(path.join(path.join(curdir, "../logs"), subfolder), time.strftime("run_%Y_%m_%d-%H_%M_%S"))
def get_modeldir(name): def get_modeldir(name):
return path.join(path.join(curdir, "models"), name) return path.join(path.join(curdir, "../models"), name)
def get_datadir(name): def get_datadir(name):
return path.join(path.join(curdir, "data"), name) return path.join(path.join(curdir, "../data"), name)
import numpy as np import numpy as np
import _pickle as pickle import _pickle as pickle
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
from common import get_datadir, process_images from src.common import get_datadir, process_images
from tensorflow.keras import datasets from tensorflow.keras import datasets
from tensorflow import data from tensorflow import data
...@@ -73,7 +73,7 @@ def produce_tuples(): ...@@ -73,7 +73,7 @@ def produce_tuples():
negative_labels[i * tuples_per_label + j] = i negative_labels[i * tuples_per_label + j] = i
negative_images[i * tuples_per_label + j] = labels_train[i, tuples_per_label * 2 + j] negative_images[i * tuples_per_label + j] = labels_train[i, tuples_per_label * 2 + j]
# we need to ensure we use random kind of negative images, but without images from anchor label # we need to ensure we use random labels, but without images from anchor label
shuffle_arrays([negative_labels, negative_images]) shuffle_arrays([negative_labels, negative_images])
for i in range(total_labels): for i in range(total_labels):
...@@ -116,11 +116,7 @@ def load_tuples(): ...@@ -116,11 +116,7 @@ def load_tuples():
def prepare_dataset(): def prepare_dataset():
(anchor_images, anchor_labels), (positive_images, positive_labels), (negative_images, negative_labels) = load_tuples() (anchor_images, anchor_labels), (positive_images, positive_labels), (negative_images, negative_labels) = produce_tuples()
# anchor_ds = data.Dataset.from_tensor_slices((anchor_images, anchor_labels))
# positive_ds = data.Dataset.from_tensor_slices((positive_images, positive_labels))
# negative_ds = data.Dataset.from_tensor_slices((negative_images, negative_labels))
anchor_ds = data.Dataset.from_tensor_slices(anchor_images) anchor_ds = data.Dataset.from_tensor_slices(anchor_images)
positive_ds = data.Dataset.from_tensor_slices(positive_images) positive_ds = data.Dataset.from_tensor_slices(positive_images)
...@@ -131,7 +127,7 @@ def prepare_dataset(): ...@@ -131,7 +127,7 @@ def prepare_dataset():
negative_ds = (negative_ds.map(process_images).batch(batch_size=32, drop_remainder=True)) negative_ds = (negative_ds.map(process_images).batch(batch_size=32, drop_remainder=True))
dataset = data.Dataset.zip((anchor_ds, positive_ds, negative_ds)) dataset = data.Dataset.zip((anchor_ds, positive_ds, negative_ds))
dataset = dataset.shuffle(buffer_size=1024) # dataset = dataset.shuffle(buffer_size=1024)
return dataset return dataset
......
from src.common import *
import tensorflow as tf
from tensorflow.keras import layers, callbacks, datasets, Sequential
class AlexNetModel(Sequential):
def __init__(self):
super(AlexNetModel, self).__init__([
layers.Conv2D(filters=96, kernel_size=(11, 11), strides=(4, 4), activation='relu', input_shape=target_shape + (3,)),
layers.BatchNormalization(),
layers.MaxPool2D(pool_size=(3, 3), strides=(2, 2)),
layers.Conv2D(filters=256, kernel_size=(5, 5), strides=(1, 1), activation='relu', padding="same"),
layers.BatchNormalization(),
layers.MaxPool2D(pool_size=(3, 3), strides=(2, 2)),
layers.Conv2D(filters=384, kernel_size=(3, 3), strides=(1, 1), activation='relu', padding="same"),
layers.BatchNormalization(),
layers.Conv2D(filters=384, kernel_size=(3, 3), strides=(1, 1), activation='relu', padding="same"),
layers.BatchNormalization(),
layers.Conv2D(filters=256, kernel_size=(3, 3), strides=(1, 1), activation='relu', padding="same"),
layers.BatchNormalization(),
layers.MaxPool2D(pool_size=(3, 3), strides=(2, 2)),
layers.Flatten(),
layers.Dense(4096, activation='relu'),
layers.Dropout(0.5),
layers.Dense(4096, activation='relu'),
layers.Dropout(rate=0.5),
layers.Dense(name='unfreeze', units=10, activation='softmax')
])
@staticmethod
def x_dataset():
(train_images, train_labels), (test_images, test_labels) = datasets.cifar10.load_data()
validation_images, validation_labels = train_images[:5000], train_labels[:5000]
train_images, train_labels = train_images[5000:], train_labels[5000:]
train_ds = tf.data.Dataset.from_tensor_slices((train_images, train_labels))
test_ds = tf.data.Dataset.from_tensor_slices((test_images, test_labels))
validation_ds = tf.data.Dataset.from_tensor_slices((validation_images, validation_labels))
train_ds_size = tf.data.experimental.cardinality(train_ds).numpy()
test_ds_size = tf.data.experimental.cardinality(test_ds).numpy()
validation_ds_size = tf.data.experimental.cardinality(validation_ds).numpy()
print("Training data size:", train_ds_size)
print("Test data size:", test_ds_size)
print("Validation data size:", validation_ds_size)
# plot_grid25(train_ds)
# plot_grid25(test_ds)
# plot_grid25(validation_ds)
train_ds = (train_ds.map(process_images_couple).shuffle(buffer_size=train_ds_size).batch(batch_size=32, drop_remainder=True))
test_ds = (test_ds.map(process_images_couple).shuffle(buffer_size=train_ds_size).batch(batch_size=32, drop_remainder=True))
validation_ds = (validation_ds.map(process_images_couple).shuffle(buffer_size=train_ds_size).batch(batch_size=32, drop_remainder=True))
return train_ds, test_ds, validation_ds
def x_train(self, train_ds, validation_ds):
tensorboard_cb = callbacks.TensorBoard(get_logdir("alexnet/fit"))
# optimizer='adam', SGD W
self.compile(loss='sparse_categorical_crossentropy', optimizer=tf.optimizers.SGD(learning_rate=0.001), metrics=['accuracy'])
self.summary()
self.fit(train_ds, epochs=50, validation_data=validation_ds, validation_freq=1, callbacks=[tensorboard_cb])
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment