TXTOCR
Third place solution for "Text OCR"
This notebook describes my approach of how to solve the TXTOCR challenge.
At a high-level, there are two steps involved:
1. Detect word boundaries and trim the image.
2. Train a deep learning model with LSTM and an CTC layer.
In [ ]:
# Inspired from
# https://keras.io/examples/vision/captcha_ocr/
In [ ]:
# from google.colab import drive
# drive.mount('/content/drive')
Setup¶
In [ ]:
import os
import numpy as np
import matplotlib.pyplot as plt
import math
import cv2
import pandas as pd
from scipy.stats import mode
from pathlib import Path
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from google.colab import files
from tqdm.notebook import tnrange, tqdm
In [ ]:
%%time
!pip install git+https://gitlab.aicrowd.com/yoogottamk/aicrowd-cli.git > /dev/null
API_KEY = """SECRET_KEY""" #Input your API key here, you can get it from your profile page.
!aicrowd login --api-key $API_KEY > /dev/null
In [ ]:
%%time
!aicrowd dataset download -c txtocr >/dev/null
In [ ]:
%%time
!rm -rf data
!mkdir data
!mv train.csv data/train.csv
!mv val.csv data/val.csv
!unzip train.zip -d data/ > /dev/null
!unzip val.zip -d data/ > /dev/null
!unzip test.zip -d data/ > /dev/null
In [ ]:
train_df = pd.read_csv("data/train.csv")
val_df = pd.read_csv("data/val.csv")
In [ ]:
# Adding full image path
train_df['image_id'] = "data/train/"+train_df['image_id'].astype(str)+".png"
train_df
Out[ ]:
In [ ]:
images = train_df['image_id'].astype(str).values
# https://github.com/tensorflow/tensorflow/issues/40919
raw_labels = train_df['label'].astype(str).values
max_length = max([len(label) for label in raw_labels])
labels = [label.ljust(max_length) for label in raw_labels]
characters = set(char for label in labels for char in label)
In [ ]:
print("Number of images found: ", len(images))
print("Number of labels found: ", len(labels))
print("Number of unique characters: ", len(characters))
print("Characters present: ", characters)
In [ ]:
# Batch size for training and validation
batch_size = 32
# # Desired image dimensions
img_width = 235
img_height = 25
# Maximum length of any captcha in the dataset
max_length = max([len(label) for label in labels])
max_length
Out[ ]:
Preprocessing¶
In [ ]:
# https://www.tensorflow.org/api_docs/python/tf/keras/layers/experimental/preprocessing/StringLookup
# num_oov_indices
# The number of out-of-vocabulary tokens to use
# if this value is 0, passing an OOV input will result in a '-1' being returned for that value in the output tensor.
# mask_token
# A token that represents masked values, and which is mapped to index 0.
# If set to None, no mask term will be added and the OOV tokens, if any, will be indexed from (0...num_oov_indices) instead of (1...num_oov_indices+1).
# invert
# If true, this layer will map indices to vocabulary items instead of mapping vocabulary items to indices.
# Mapping characters to integers
char_to_num = layers.experimental.preprocessing.StringLookup(vocabulary=list(characters), num_oov_indices=0, mask_token=None)
# Mapping integers back to original characters
num_to_char = layers.experimental.preprocessing.StringLookup(vocabulary=char_to_num.get_vocabulary(), mask_token=None, invert=True)
In [ ]:
def split_data(images, labels, train_size=0.9, shuffle=True):
# 1. Get the total size of the dataset
size = len(images)
# 2. Make an indices array and shuffle it, if required
indices = np.arange(size)
if shuffle:
np.random.shuffle(indices)
# 3. Get the size of training samples
train_samples = int(size * train_size)
# 4. Split data into training and validation sets
x_train, y_train = images[indices[:train_samples]], labels[indices[:train_samples]]
x_valid, y_valid = images[indices[train_samples:]], labels[indices[train_samples:]]
return x_train, x_valid, y_train, y_valid
In [ ]:
# Splitting data into training and validation sets
x_train, x_valid, y_train, y_valid = split_data(np.array(images), np.array(labels))
Word Segmentation¶
Use word segmentation to find the words and trim the image
In [ ]:
# https://github.com/githubharald/WordDetector/blob/master/src/WordSegmentation.py
def wordSegmentation(img, kernelSize=25, sigma=11, theta=7, minArea=0):
"""Scale space technique for word segmentation proposed by R. Manmatha: http://ciir.cs.umass.edu/pubfiles/mm-27.pdf
Args:
img: grayscale uint8 image of the text-line to be segmented.
kernelSize: size of filter kernel, must be an odd integer.
sigma: standard deviation of Gaussian function used for filter kernel.
theta: approximated width/height ratio of words, filter function is distorted by this factor.
minArea: ignore word candidates smaller than specified area.
Returns:
List of tuples. Each tuple contains the bounding box and the image of the segmented word.
"""
# apply filter kernel
kernel = createKernel(kernelSize, sigma, theta)
imgFiltered = cv2.filter2D(img, -1, kernel, borderType=cv2.BORDER_REPLICATE).astype(np.uint8)
(_, imgThres) = cv2.threshold(imgFiltered, 0, 255, cv2.THRESH_BINARY+cv2.THRESH_OTSU)
imgThres = 255 - imgThres
# find connected components. OpenCV: return type differs between OpenCV2 and 3
if cv2.__version__.startswith('3.'):
(_, components, _) = cv2.findContours(imgThres, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
else:
(components, _) = cv2.findContours(imgThres, cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE)
# append components to result
res = []
for c in components:
# skip small word candidates
if cv2.contourArea(c) < minArea:
continue
# append bounding box and image of word to result list
currBox = cv2.boundingRect(c) # returns (x, y, w, h)
(x, y, w, h) = currBox
currImg = img[y:y+h, x:x+w]
res.append((currBox, currImg))
# return list of words, sorted by x-coordinate
return sorted(res, key=lambda entry:entry[0][0])
def prepareImg(img, height):
"""convert given image to grayscale image (if needed) and resize to desired height"""
assert img.ndim in (2, 3)
if img.ndim == 3:
img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
h = img.shape[0]
factor = height / h
return cv2.resize(img, dsize=None, fx=factor, fy=factor)
def createKernel(kernelSize, sigma, theta):
"""create anisotropic filter kernel according to given parameters"""
assert kernelSize % 2 # must be odd size
halfSize = kernelSize // 2
kernel = np.zeros([kernelSize, kernelSize])
sigmaX = sigma
sigmaY = sigma * theta
for i in range(kernelSize):
for j in range(kernelSize):
x = i - halfSize
y = j - halfSize
expTerm = np.exp(-x**2 / (2 * sigmaX) - y**2 / (2 * sigmaY))
xTerm = (x**2 - sigmaX**2) / (2 * math.pi * sigmaX**5 * sigmaY)
yTerm = (y**2 - sigmaY**2) / (2 * math.pi * sigmaY**5 * sigmaX)
kernel[i, j] = (xTerm + yTerm) * expTerm
kernel = kernel / np.sum(kernel)
return kernel
In [ ]:
def trim_image(img_path, label):
test_img = cv2.imread(img_path)
if test_img.shape[0] > 25:
test_img = test_img.reshape((256*256,3))
mx = mode(test_img)[0][0]
mask = (test_img[:,0] == mx[0]) & (test_img[:,1] == mx[1]) & (test_img[:,2] == mx[2])
mask0 = np.logical_not(mask)
ixs = np.where(mask)
ixs0 = np.where(mask0)
test_img[ixs] = [255,255,255]
# test_img[ixs0] = [0,0,0]
img = test_img.reshape((256,256,3))
img = prepareImg(img, 256)
res = wordSegmentation(img, kernelSize=11, sigma=11, theta=7, minArea=100)
img = np.full((img_height,img_width,1), 255)
for (j, w) in enumerate(res):
(wordBox, wordImg) = w
(x, y, w, h) = wordBox
if x > 0 or y > 0:
img[:h, :w, 0] = wordImg
break
cv2.imwrite(img_path, img)
In [ ]:
for i in tnrange(x_train.shape[0]):
try:
trim_image(x_train[i], y_train[i])
except:
print ("error " + str(i))
In [ ]:
for i in tnrange(x_valid.shape[0]):
try:
trim_image(x_valid[i], y_valid[i])
except:
print ("error " + str(i))
In [ ]:
def encode_single_sample(img_path, label):
# 1. Read image
img = tf.io.read_file(img_path)
# 2. Decode and convert to grayscale
img = tf.io.decode_png(img, channels=1)
# 3. Convert to float32 in [0, 1] range
img = tf.image.convert_image_dtype(img, tf.float32)
# 4. Resize to the desired size
img = tf.image.resize(img, [img_height, img_width])
# 5. Transpose the image because we want the time
# dimension to correspond to the width of the image.
img = tf.transpose(img, perm=[1, 0, 2])
# 6. Map the characters in label to numbers
label = char_to_num(tf.strings.unicode_split(label, input_encoding="UTF-8"))
# 7. Return a dict as our model is expecting two inputs
return {"image": img, "label": label}
Create Dataset
objects¶
In [ ]:
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = (
train_dataset.map(
encode_single_sample, num_parallel_calls=tf.data.experimental.AUTOTUNE
)
.batch(batch_size)
.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
)
validation_dataset = tf.data.Dataset.from_tensor_slices((x_valid, y_valid))
validation_dataset = (
validation_dataset.map(
encode_single_sample, num_parallel_calls=tf.data.experimental.AUTOTUNE
)
.batch(batch_size)
.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
)
Visualize the data¶
In [ ]:
_, ax = plt.subplots(4, 4, figsize=(15, 10))
for batch in train_dataset.take(2):
images = batch["image"]
labels = batch["label"]
for i in range(16):
img = (images[i] * 255).numpy().astype("uint8")
label = tf.strings.reduce_join(num_to_char(labels[i])).numpy().decode("utf-8")
ax[i // 4, i % 4].imshow(img[:, :, 0].T, cmap="gray")
ax[i // 4, i % 4].set_title(label)
# ax[i // 4, i % 4].axis("off")
plt.show()
In [ ]:
_, ax = plt.subplots(4, 4, figsize=(15, 10))
for batch in validation_dataset.take(1):
images = batch["image"]
labels = batch["label"]
for i in range(16):
img = (images[i] * 255).numpy().astype("uint8")
label = tf.strings.reduce_join(num_to_char(labels[i])).numpy().decode("utf-8")
ax[i // 4, i % 4].imshow(img[:, :, 0].T, cmap="gray")
ax[i // 4, i % 4].set_title(label)
# ax[i // 4, i % 4].axis("off")
plt.show()
Model¶
In [ ]:
class CTCLayer(layers.Layer):
def __init__(self, name=None):
super().__init__(name=name)
self.loss_fn = keras.backend.ctc_batch_cost
def call(self, y_true, y_pred):
# Compute the training-time loss value and add it
# to the layer using `self.add_loss()`.
batch_len = tf.cast(tf.shape(y_true)[0], dtype="int64")
input_length = tf.cast(tf.shape(y_pred)[1], dtype="int64")
label_length = tf.cast(tf.shape(y_true)[1], dtype="int64")
input_length = input_length * tf.ones(shape=(batch_len, 1), dtype="int64")
label_length = label_length * tf.ones(shape=(batch_len, 1), dtype="int64")
loss = self.loss_fn(y_true, y_pred, input_length, label_length)
self.add_loss(loss)
# At test time, just return the computed predictions
return y_pred
def build_model():
# Inputs to the model
input_img = layers.Input(
shape=(img_width, img_height, 1), name="image", dtype="float32"
)
labels = layers.Input(name="label", shape=(None,), dtype="float32")
# First conv block
x = layers.Conv2D(
64,
(3, 3),
activation="relu",
padding="same",
name="Conv1",
)(input_img)
x = layers.MaxPooling2D((2, 2), name="pool1")(x)
new_shape = (117, 12*64)
x = layers.Reshape(target_shape=new_shape, name="reshape")(x)
x = layers.Dense(128, activation="relu", name="dense1")(x)
x = layers.Dropout(0.2)(x)
# RNNs
x = layers.Bidirectional(layers.LSTM(128, return_sequences=True, dropout=0.25))(x)
x = layers.Bidirectional(layers.LSTM(64, return_sequences=True, dropout=0.25))(x)
# Output layer
x = layers.Dense(len(characters) + 1, activation="softmax", name="dense2")(x)
# Add CTC layer for calculating CTC loss at each step
output = CTCLayer(name="ctc_loss")(labels, x)
# Define the model
model = keras.models.Model(
inputs=[input_img, labels], outputs=output, name="ocr_model_v1"
)
# Optimizer
opt = keras.optimizers.Adam()
# Compile the model and return
model.compile(optimizer=opt)
return model
# Get the model
model = build_model()
model.summary()
Training¶
In [ ]:
epochs = 400
# epochs = 3
# Add early stopping
early_stopping = keras.callbacks.EarlyStopping(
monitor="val_loss", patience=30, restore_best_weights=True
)
In [ ]:
%%time
# Train the model
history = model.fit(
train_dataset,
validation_data=validation_dataset,
epochs=epochs,
callbacks=[early_stopping],
)
In [ ]:
# model.save('/content/drive/My Drive/Colab Notebooks/txtocr.h5')
Inference¶
In [ ]:
# Get the prediction model by extracting layers till the output layer
prediction_model = keras.models.Model(
model.get_layer(name="image").input, model.get_layer(name="dense2").output
)
prediction_model.summary()
In [ ]:
# A utility function to decode the output of the network
def decode_batch_predictions(pred):
input_len = np.ones(pred.shape[0]) * pred.shape[1]
# Use greedy search. For complex tasks, you can use beam search
results = keras.backend.ctc_decode(pred, input_length=input_len, greedy=True)[0][0][
:, :max_length
]
# Iterate over the results and get back the text
output_text = []
for res in results:
res = tf.strings.reduce_join(num_to_char(res)).numpy().decode("utf-8")
output_text.append(res)
return output_text
In [ ]:
# Let's check results on some validation samples
for batch in validation_dataset.take(3):
batch_images = batch["image"]
batch_labels = batch["label"]
preds = prediction_model.predict(batch_images)
pred_texts = decode_batch_predictions(preds)
orig_texts = []
for label in batch_labels:
label = tf.strings.reduce_join(num_to_char(label)).numpy().decode("utf-8")
orig_texts.append(label)
_, ax = plt.subplots(4, 4, figsize=(15, 5))
for i in range(16):
img = (batch_images[i, :, :, 0] * 255).numpy().astype(np.uint8)
img = img.T
title = f"Prediction: {pred_texts[i]}"
ax[i // 4, i % 4].imshow(img, cmap="gray")
ax[i // 4, i % 4].set_title(title)
ax[i // 4, i % 4].axis("off")
plt.show()
Prediction¶
In [ ]:
path = "data/test"
test_imgs_paths = os.listdir(path)
test_imgs_paths.sort()
image_ids = []
labels = []
for test_img_path in tqdm(test_imgs_paths):
image_id = test_img_path.split(".")[0]
image_ids.append(image_id)
test_img_path = path+"/"+test_img_path
test_img = cv2.imread(test_img_path)
if test_img.shape[0] > 25:
trim_image(test_img_path, "")
test_img = cv2.imread(test_img_path)
enc = encode_single_sample(test_img_path, "label")
preds = prediction_model.predict(tf.expand_dims(enc['image'], axis=0))
pred_texts = decode_batch_predictions(preds)
label = pred_texts[0].strip()
labels.append(label)
visualize = False
if visualize:
print(test_img_path)
fig, ax = plt.subplots()
im = ax.imshow(test_img, cmap="gray")
ax.set_title(label)
ax.axis('off')
plt.show()
test_df = pd.DataFrame.from_dict({'image_id': image_ids, 'label': labels})
In [ ]:
test_df.sort_values('image_id', inplace=True)
test_df.to_csv("submission.csv", index=False)
files.download('submission.csv')
Well Done! 👍 We are all set to make a submission and see your name on leaderborad. Let navigate to challenge page and make one.¶
Content
Comments
You must login before you can post a comment.