This notebook describes my approach of how to solve the TXTOCR challenge. It combines some favourable data preparation with an already well developed OCR algorithm that I adapted and trained on this specific dataset.
(colab only) Setup AIcrowd to get data¶
!pip install git+https://gitlab.aicrowd.com/yoogottamk/aicrowd-cli.git
API_KEY = """SECRET_KEY"""
!aicrowd login --api-key $API_KEY
!aicrowd dataset download -c txtocr >/dev/null
!rm -rf data
!mkdir data
!mv train.csv data/train.csv
!mv val.csv data/val.csv
!unzip train.zip -d data/
!unzip val.zip -d data/
!unzip test.zip -d data/
!rm -rf *zip sample_data
Step 1 - Getting data into shape¶
Import modules¶
import numpy as np
import pandas as pd
from glob import glob
from os.path import join, basename
import seaborn as sns
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm
from skimage.color import rgb2gray
from sklearn.cluster import KMeans
# Load datasets
df_train = pd.read_csv('data/train.csv', index_col=0).sort_index()
df_val = pd.read_csv('data/val.csv', index_col=0).sort_index()
df_test = pd.read_csv('data/sample_submission.csv', index_col=0).sort_index()
df_train
Images¶
# Get filenames
names_train = sorted(glob(join('data', 'train', '*')))
names_val = sorted(glob(join('data', 'val', '*')))
names_test = sorted(glob(join('data', 'test', '*')))
# Combine filenames with label
label_train = [int(basename(n)[:-4]) for n in names_train]
label_train = pd.DataFrame(np.transpose([label_train, names_train]), columns=['idx', 'name'])
label_train.idx = label_train.idx.astype('int')
label_train = label_train.set_index('idx').sort_index()
label_train['label'] = df_train.label
label_val = [int(basename(n)[:-4]) for n in names_val]
label_val = pd.DataFrame(np.transpose([label_val, names_val]), columns=['idx', 'name'])
label_val.idx = label_val.idx.astype('int')
label_val= label_val.set_index('idx').sort_index()
label_val['label'] = df_val.label
label_test = [int(basename(n)[:-4]) for n in names_test]
label_test = pd.DataFrame(np.transpose([label_test, names_test]), columns=['idx', 'name'])
label_test.idx = label_test.idx.astype('int')
label_test = label_test.set_index('idx').sort_index();
# Check content
label_train.head()
label_train.sample().values
# Show random image
from PIL import Image
sample = np.ravel(label_train.sample().values)
im = Image.open(sample[0])
im
Preprocess images¶
def prepare_text(df_label):
"""This functions takes the original image, finds where the text is,
cuts it out, adds a certain padding and transforms colors to grayscale."""
images = []
for i in tqdm(df_label.name):
# Load image as grayscale
img = np.array(Image.open(i))
try:
# Compute scope of text (with padding)
padding = 4
x_scope = np.ravel(np.argwhere(np.any(img.std(1)>=1e-8, axis=1)))[[0, -1]] - [padding, -(1+padding)]
x_scope = np.clip(x_scope, 0, 255)
y_scope = np.ravel(np.argwhere(np.any(img.std(0)>=1e-8, axis=1)))[[0, -1]] - [padding, -(1+padding)]
y_scope = np.clip(y_scope, 0, 255)
# Crop image
img = img[x_scope[0]:x_scope[1], y_scope[0]:y_scope[1]]
# Compute color centers
n_clusters = 2
k_means = KMeans(n_clusters=n_clusters).fit(img.reshape((-1, 3)))
centers = k_means.cluster_centers_
labels = k_means.labels_
# Make sure that dominant background color is center_0
if labels.mean() > 0.5:
centers = centers[::-1]
# Compute distance between pixels and dominant color
new_img = np.linalg.norm(img.reshape(-1, 3)-centers[0], axis=1).reshape(img.shape[:2])
# Clip and rescale image to main colors
plow = np.percentile(new_img, 5)
phigh = np.percentile(new_img, 95)
new_img = np.clip(new_img, plow, phigh)
new_img = new_img - new_img.min()
new_img = new_img / new_img.max()
# Flip colors
new_img = 1. - new_img
except:
# If it fails make image empty
new_img = np.ones((21, 80))
new_img = np.array(new_img*255, dtype='uint8')
images.append(new_img)
return images
Example of the preprocessing¶
# Plot an random image before and after preprocessing
fidx = np.random.choice(len(label_train))
img = Image.open(label_train.name[fidx])
img_preproc = prepare_text(label_train.iloc[fidx:fidx+1])
plt.title('before')
plt.imshow(img)
plt.show()
plt.title('after')
plt.imshow(img_preproc[0], cmap='gray')
plt.show();
# Plot an image before and after preprocessing
fidx = 63
img = Image.open(label_train.name[fidx])
img_preproc = prepare_text(label_train.iloc[fidx:fidx+1])
plt.title('before')
plt.imshow(img)
plt.show()
plt.title('after')
plt.imshow(img_preproc[0], cmap='gray')
plt.show();
The trick with this preprocessing is two fold:
- Cropping image to text only reduces data size. This is simply done by looking for pixel variance in x and y direction. Given that this dataset doesn't contain any extra noise, the text can very easily be located.
- Given that all images only consit of two colors, plus jpeg compression noise, the color information is not relevant and images can be convert to gray scale. However, sometimes the background and text color are very close to each other, so a simple threshold doesn't work here. The solution I found was to take the dominant color (i.e. background) and compute the euclidean distance (i.e. using RGB) to all other pixels. I then used this distance metric as "gray" value, clipped it of at 5% and 95% and achive strong black/white contrast, independent of original colors.
Run Preprocessing for all images¶
# Preprocess training set and save data on disk
img_train = prepare_text(label_train)
np.savez_compressed('img_train',
data=np.array(img_train,dtype=object),
labels=df_train.label.values)
# Preprocess validation set and save data on disk
img_val = prepare_text(label_val)
np.savez_compressed('img_val',
data=np.array(img_val,dtype=object),
labels=df_val.label.values)
# Preprocess validation set and save data on disk
img_test = prepare_text(label_test)
np.savez_compressed('img_test', data=np.array(img_test,dtype=object))
Load data (helps with rerunning analysis multiple times)¶
# Load train data
npy_train = np.load('img_train.npz', allow_pickle=True)
img_train = npy_train['data']
label_train = npy_train['labels']
# Load val data
npy_val = np.load('img_val.npz', allow_pickle=True)
img_val = npy_val['data']
label_val = npy_val['labels']
# Load test data
npy_test = np.load('img_test.npz', allow_pickle=True)
img_test = npy_test['data']
Step 2 - EDA: Look at data¶
Exploration of labels¶
# Explore words in labels
words = np.array(list(df_train.label.values) + list(df_val.label.values))
words.shape, words
# Explore characteres in labels
letters = []
for i, w in enumerate(words):
for c in w:
letters.append(c)
letters = np.array(letters)
letters.shape, letters
# Plot unique characters
pd.value_counts(letters).plot.bar(figsize=(14, 3));
# Print which letters
unique_letters = ''.join(np.unique(letters))
unique_letters
# Max length of text
text_length = [len(w) for w in words]
plt.hist(text_length, bins=100);
print('Max:', np.max(text_length))
Exploration of images¶
# Plot example image
img = img_train[1]
label = label_train[1]
plt.title(label)
plt.imshow(img);
# Size of words
word_size = []
for i, e in tqdm(enumerate(img_train)):
word_size.append(e.shape[1])
plt.hist(word_size, bins=100);
# height of words
word_height = []
for i, e in tqdm(enumerate(img_train)):
word_height.append(e.shape[0])
plt.hist(word_height, bins=100);
Create word corpus of training and validation set - used for later typo correction¶
# Get words from training and validation set
corpus = np.array(list(label_train) + list(label_val), dtype='str')
word_list_txt = []
for w in tqdm(corpus):
word_list_txt = word_list_txt + w.split()
word_list_txt = list(np.unique(word_list_txt))
len(word_list_txt), word_list_txt[:10]
# Overwrite word corpus by dataset only
word_corpus = np.unique(word_list_txt)
len(word_corpus)
Step 3 - Prepare dataset for ML model¶
The ML model that we want to use expects the text to be centered in the image and that all images have the same extent. Therefore we will create a standard canvas and put the word into the middle of this canvas.
def create_canvas(img):
# Target size
height = 32
length = 256
# Load image and extract size
h, w = img.shape
# Prepare canvas
canvas = np.array(np.ones((height, length))*255, dtype='uint8')
# Find middle line of image
weigth = np.mean(img, axis=1)
try:
middle_idx = int(np.median(np.argwhere(weigth<np.percentile(weigth, 30))))
except:
middle_idx = int(h/2)
# Fill canvas with image
offset_x = length//2 - w//2
offset_y = height//2 - middle_idx
canvas[offset_y:offset_y+h, offset_x:offset_x+w] = img
# Convert to range [-1, 1]
canvas = canvas / 127.5 - 1
return canvas
# Prepare datasets
x_train = np.array([create_canvas(i) for i in tqdm(img_train)])
x_val = np.array([create_canvas(i) for i in tqdm(img_val)])
x_test = np.array([create_canvas(i) for i in tqdm(img_test)])
# Plot a few examples
for i in range(10):
plt.imshow(x_train[i], cmap='gray')
plt.show()
# Plot mean image
plt.imshow(x_train.mean(0), cmap='gray');
# Plot median image
plt.imshow(np.median(x_train, axis=0), cmap='gray');
# Plot std image
plt.imshow(np.std(x_train, axis=0), cmap='gray');
Step 4 - Setup SimpleHTR¶
The code for this model is taken from SimpleHTR. A CNN-RNN model that is able to read handwritten text. I've adapted the code slightly (to make it work for the new image dimension) and added an additional correction algorithm that does spell checking based on the words in the training set.
To run SimpleHTR we need some additional libraries.
!pip install -U editdistance opencv-python tensorflow==2.4.0
# Data needs to be transposed for SimpleHTR (del command is to save memory)
x_htr_tr = np.rollaxis(x_train, 2, 1)
del x_train
x_htr_va = np.rollaxis(x_val, 2, 1)
del x_val
x_htr_te = np.rollaxis(x_test, 2, 1)
del x_test
Save images to npy¶
To make the dataset work with SimpleHTR, I stored it in a similar format as the one it was trained on.
import os
for d in ['data_tr', 'data_val', 'data_te']:
if not os.path.exists(d):
os.makedirs(d)
%%time
for i, x in enumerate(x_htr_tr):
np.savez_compressed('data_tr/%05d' % (i + 1), img=x)
np.savetxt('data_tr/%05d.txt' % (i + 1), [label_train[i]], fmt='%s')
del x_htr_tr
%%time
for i, x in enumerate(x_htr_va):
np.savez_compressed('data_val/%05d' % (i + 1), img=x)
np.savetxt('data_val/%05d.txt' % (i + 1), [label_val[i]], fmt='%s')
del x_htr_va
%%time
for i, x in enumerate(x_htr_te):
np.savez_compressed('data_te/%05d' % (i + 1), img=x)
del x_htr_te
Adaptation 1 - of code in DataLoader.py
¶
import random
from os.path import join as opj
class Sample:
"sample from the dataset"
def __init__(self, gtText, filePath):
self.gtText = gtText
self.filePath = filePath
class Batch:
"batch containing images and ground truth texts"
def __init__(self, gtTexts, imgs):
self.imgs = np.stack(imgs, axis=0)
self.gtTexts = gtTexts
The following Class was adapted to allow the test and prediction routine. And I've changed it from the original to directly read the images and text from the stored files, instead of using the original's strange dataframework.
class DataLoader:
def __init__(self, batchSize, imgSize, maxTextLen, nEpoch=20000):
"load images and texts at given location"
self.currIdx = 0
self.batchSize = batchSize
self.imgSize = imgSize
self.samples = []
self.tests = []
self.predictions = []
# Load training files
files_tr = sorted(glob(opj('data_tr', '*npz')))
chars = set()
for fileName in files_tr:
with open(fileName.replace('.npz', '.txt')) as ftxt:
gtText = ftxt.readline().strip()
chars = chars.union(set(list(gtText)))
# put sample into list
self.samples.append(Sample(gtText, fileName))
# Load validation files
files_val = sorted(glob(opj('data_val', '*npz')))
for fileName in files_val:
with open(fileName.replace('.npz', '.txt')) as ftxt:
gtText = ftxt.readline().strip()
# put sample into list
self.tests.append(Sample(gtText, fileName))
# Load prediction files
files_pred = sorted(glob(opj('data_te', '*npz')))
for fileName in files_pred:
# put sample into list
self.predictions.append(Sample('test', fileName))
# split into training and validation set: 80% - 20%
splitIdx = int(0.8 * len(self.samples))
self.trainSamples = self.samples[:splitIdx]
self.validationSamples = self.samples[splitIdx:]
self.testSamples = self.tests
self.predSamples = self.predictions
# put words into lists
self.trainWords = [x.gtText for x in self.trainSamples]
self.validationWords = [x.gtText for x in self.validationSamples]
# number of randomly chosen samples per epoch for training
self.numTrainSamplesPerEpoch = nEpoch
# start with train set
self.trainSet()
# list of all chars in dataset
self.charList = sorted(list(chars))
def truncateLabel(self, text, maxTextLen):
# ctc_loss can't compute loss if it cannot find a mapping between text label and input
# labels. Repeat letters cost double because of the blank symbol needing to be inserted.
# If a too-long label is provided, ctc_loss returns an infinite gradient
cost = 0
for i in range(len(text)):
if i != 0 and text[i] == text[i - 1]:
cost += 2
else:
cost += 1
if cost > maxTextLen:
return text[:i]
return text
def trainSet(self):
"switch to randomly chosen subset of training set"
self.currIdx = 0
random.shuffle(self.trainSamples)
self.samples = self.trainSamples[:self.numTrainSamplesPerEpoch]
self.currSet = 'train'
def validationSet(self):
"switch to validation set"
self.currIdx = 0
self.samples = self.validationSamples
self.currSet = 'val'
def testSet(self):
"switch to test set"
self.currIdx = 0
self.samples = self.testSamples
self.currSet = 'tes'
def predictionSet(self):
"switch to pred set"
self.currIdx = 0
self.samples = self.predSamples
self.currSet = 'pred'
def getIteratorInfo(self):
"current batch index and overall number of batches"
if self.currSet == 'train':
numBatches = int(np.floor(len(self.samples) / self.batchSize)) # train set: only full-sized batches
else:
numBatches = int(np.ceil(len(self.samples) / self.batchSize)) # val set: allow last batch to be smaller
currBatch = self.currIdx // self.batchSize + 1
return currBatch, numBatches
def hasNext(self):
"iterator"
if self.currSet == 'train':
return self.currIdx + self.batchSize <= len(self.samples) # train set: only full-sized batches
else:
return self.currIdx < len(self.samples) # val set: allow last batch to be smaller
def getNext(self):
"iterator"
batchRange = range(self.currIdx, min(self.currIdx + self.batchSize, len(self.samples)))
gtTexts = [self.samples[i].gtText for i in batchRange]
imgs = []
for i in batchRange:
img = np.load(self.samples[i].filePath, allow_pickle=True)['img']
imgs.append(img)
self.currIdx += self.batchSize
return Batch(gtTexts, imgs)
Adaptation 2 - of code in Model.py
¶
import os
for d in ['model', 'dump']:
if not os.path.exists(d):
os.makedirs(d)
# Create list of all words
words = np.array(list(label_train) + list(label_val))
np.savetxt('model/corpus.txt', [' '.join(words)], fmt='%s')
# Create list of all characters
np.savetxt('model/charList.txt', [unique_letters], fmt='%s')
np.savetxt('model/wordCharList.txt', [unique_letters], fmt='%s')
import os
import tensorflow as tf
# Disable eager mode
tf.compat.v1.disable_eager_execution()
class DecoderType:
BestPath = 0
BeamSearch = 1
WordBeamSearch = 2
class Model:
"minimalistic TF model for HTR"
# model constants
imgSize = (256, 32)
maxTextLen = 32
def __init__(self, charList, decoderType=DecoderType.BestPath, mustRestore=False, dump=False, corpus=None):
"init model: add CNN, RNN and CTC and initialize TF"
self.dump = dump
self.charList = charList
self.decoderType = decoderType
self.mustRestore = mustRestore
self.snapID = 0
self.corpus = corpus
# Whether to use normalization over a batch or a population
self.is_train = tf.compat.v1.placeholder(tf.bool, name='is_train')
# input image batch
self.inputImgs = tf.compat.v1.placeholder(tf.float32, shape=(None, Model.imgSize[0], Model.imgSize[1]))
# setup CNN, RNN and CTC
self.setupCNN()
self.setupRNN()
self.setupCTC()
# setup optimizer to train NN
self.batchesTrained = 0
self.update_ops = tf.compat.v1.get_collection(tf.compat.v1.GraphKeys.UPDATE_OPS)
with tf.control_dependencies(self.update_ops):
self.optimizer = tf.compat.v1.train.AdamOptimizer().minimize(self.loss)
# initialize TF
(self.sess, self.saver) = self.setupTF()
def setupCNN(self):
"create CNN layers and return output of these layers"
cnnIn4d = tf.expand_dims(input=self.inputImgs, axis=3)
# list of parameters for the layers
kernelVals = [5, 5, 3, 3, 3]
featureVals = [1, 32, 64, 128, 128, 256]
strideVals = poolVals = [(2, 2), (2, 2), (1, 2), (1, 2), (1, 2)]
numLayers = len(strideVals)
# create layers
pool = cnnIn4d # input to first CNN layer
for i in range(numLayers):
kernel = tf.Variable(
tf.random.truncated_normal([kernelVals[i], kernelVals[i], featureVals[i], featureVals[i + 1]],
stddev=0.1))
conv = tf.nn.conv2d(input=pool, filters=kernel, padding='SAME', strides=(1, 1, 1, 1))
conv_norm = tf.compat.v1.layers.batch_normalization(conv, training=self.is_train)
relu = tf.nn.relu(conv_norm)
pool = tf.nn.max_pool2d(input=relu, ksize=(1, poolVals[i][0], poolVals[i][1], 1),
strides=(1, strideVals[i][0], strideVals[i][1], 1), padding='VALID')
self.cnnOut4d = pool
def setupRNN(self):
"create RNN layers and return output of these layers"
rnnIn3d = tf.squeeze(self.cnnOut4d, axis=[2])
# basic cells which is used to build RNN
numHidden = 256
cells = [tf.compat.v1.nn.rnn_cell.LSTMCell(num_units=numHidden, state_is_tuple=True) for _ in
range(2)] # 2 layers
# stack basic cells
stacked = tf.compat.v1.nn.rnn_cell.MultiRNNCell(cells, state_is_tuple=True)
# bidirectional RNN
# BxTxF -> BxTx2H
((fw, bw), _) = tf.compat.v1.nn.bidirectional_dynamic_rnn(cell_fw=stacked, cell_bw=stacked, inputs=rnnIn3d,
dtype=rnnIn3d.dtype)
# BxTxH + BxTxH -> BxTx2H -> BxTx1X2H
concat = tf.expand_dims(tf.concat([fw, bw], 2), 2)
# project output to chars (including blank): BxTx1x2H -> BxTx1xC -> BxTxC
kernel = tf.Variable(tf.random.truncated_normal([1, 1, numHidden * 2, len(self.charList) + 1], stddev=0.1))
self.rnnOut3d = tf.squeeze(tf.nn.atrous_conv2d(value=concat,
filters=kernel, rate=1, padding='SAME'), axis=[2])
def setupCTC(self):
"create CTC loss and decoder and return them"
# BxTxC -> TxBxC
self.ctcIn3dTBC = tf.transpose(a=self.rnnOut3d, perm=[1, 0, 2])
# ground truth text as sparse tensor
self.gtTexts = tf.SparseTensor(tf.compat.v1.placeholder(tf.int64, shape=[None, 2]),
tf.compat.v1.placeholder(tf.int32, [None]),
tf.compat.v1.placeholder(tf.int64, [2]))
# calc loss for batch
self.seqLen = tf.compat.v1.placeholder(tf.int32, [None])
self.loss = tf.reduce_mean(input_tensor=tf.compat.v1.nn.ctc_loss(labels=self.gtTexts, inputs=self.ctcIn3dTBC,
sequence_length=self.seqLen,
ctc_merge_repeated=True))
# calc loss for each element to compute label probability
self.savedCtcInput = tf.compat.v1.placeholder(tf.float32,
shape=[Model.maxTextLen, None, len(self.charList) + 1])
self.lossPerElement = tf.compat.v1.nn.ctc_loss(labels=self.gtTexts, inputs=self.savedCtcInput,
sequence_length=self.seqLen, ctc_merge_repeated=True)
# decoder: either best path decoding or beam search decoding
if self.decoderType == DecoderType.BestPath:
self.decoder = tf.nn.ctc_greedy_decoder(inputs=self.ctcIn3dTBC, sequence_length=self.seqLen)
elif self.decoderType == DecoderType.BeamSearch:
self.decoder = tf.nn.ctc_beam_search_decoder(inputs=self.ctcIn3dTBC, sequence_length=self.seqLen,
beam_width=50)
elif self.decoderType == DecoderType.WordBeamSearch:
# import compiled word beam search operation (see https://github.com/githubharald/CTCWordBeamSearch)
word_beam_search_module = tf.load_op_library('TFWordBeamSearch.so')
# prepare information about language (dictionary, characters in dataset, characters forming words)
chars = str().join(self.charList)
wordChars = open('model/wordCharList.txt').read().splitlines()[0]
corpus = open('model/corpus.txt').read()
# decode using the "Words" mode of word beam search
self.decoder = word_beam_search_module.word_beam_search(
tf.nn.softmax(self.ctcIn3dTBC, axis=2), 50, 'Words',
0.0, corpus.encode('utf8'), chars.encode('utf8'),
wordChars.encode('utf8'))
def setupTF(self):
"initialize TF"
print('Tensorflow: ' + tf.__version__)
sess = tf.compat.v1.Session() # TF session
saver = tf.compat.v1.train.Saver(max_to_keep=1) # saver saves model to file
modelDir = 'model/'
latestSnapshot = tf.train.latest_checkpoint(modelDir) # is there a saved model?
# if model must be restored (for inference), there must be a snapshot
if self.mustRestore and not latestSnapshot:
raise Exception('No saved model found in: ' + modelDir)
# load saved model if available
if latestSnapshot:
print('Init with stored values from ' + latestSnapshot)
saver.restore(sess, latestSnapshot)
else:
print('Init with new values')
sess.run(tf.compat.v1.global_variables_initializer())
return (sess, saver)
def toSparse(self, texts):
"put ground truth texts into sparse tensor for ctc_loss"
indices = []
values = []
shape = [len(texts), 0] # last entry must be max(labelList[i])
# go over all texts
for (batchElement, text) in enumerate(texts):
# convert to string of label (i.e. class-ids)
labelStr = [self.charList.index(c) for c in text]
# sparse tensor must have size of max. label-string
if len(labelStr) > shape[1]:
shape[1] = len(labelStr)
# put each label into sparse tensor
for (i, label) in enumerate(labelStr):
indices.append([batchElement, i])
values.append(label)
return (indices, values, shape)
def decoderOutputToText(self, ctcOutput, batchSize):
"extract texts from output of CTC decoder"
# contains string of labels for each batch element
encodedLabelStrs = [[] for i in range(batchSize)]
# word beam search: label strings terminated by blank
if self.decoderType == DecoderType.WordBeamSearch:
blank = len(self.charList)
for b in range(batchSize):
for label in ctcOutput[b]:
if label == blank:
break
encodedLabelStrs[b].append(label)
# TF decoders: label strings are contained in sparse tensor
else:
# ctc returns tuple, first element is SparseTensor
decoded = ctcOutput[0][0]
# go over all indices and save mapping: batch -> values
idxDict = {b: [] for b in range(batchSize)}
for (idx, idx2d) in enumerate(decoded.indices):
label = decoded.values[idx]
batchElement = idx2d[0] # index according to [b,t]
encodedLabelStrs[batchElement].append(label)
# map labels to chars for all batch elements
return [str().join([self.charList[c] for c in labelStr]) for labelStr in encodedLabelStrs]
def trainBatch(self, batch):
"feed a batch into the NN to train it"
numBatchElements = len(batch.imgs)
sparse = self.toSparse(batch.gtTexts)
evalList = [self.optimizer, self.loss]
feedDict = {self.inputImgs: batch.imgs,
self.gtTexts: sparse,
self.seqLen: [Model.maxTextLen] * numBatchElements,
self.is_train: True}
_, lossVal = self.sess.run(evalList, feedDict)
self.batchesTrained += 1
return lossVal
def dumpNNOutput(self, rnnOutput):
"dump the output of the NN to CSV file(s)"
dumpDir = 'dump/'
if not os.path.isdir(dumpDir):
os.mkdir(dumpDir)
# iterate over all batch elements and create a CSV file for each one
maxT, maxB, maxC = rnnOutput.shape
for b in range(maxB):
csv = ''
for t in range(maxT):
for c in range(maxC):
csv += str(rnnOutput[t, b, c]) + ';'
csv += '\n'
fn = dumpDir + 'rnnOutput_' + str(b) + '.csv'
print('Write dump of NN to file: ' + fn)
with open(fn, 'w') as f:
f.write(csv)
def inferBatch(self, batch, calcProbability=False, probabilityOfGT=False):
"feed a batch into the NN to recognize the texts"
# decode, optionally save RNN output
numBatchElements = len(batch.imgs)
evalRnnOutput = self.dump or calcProbability
evalList = [self.decoder] + ([self.ctcIn3dTBC] if evalRnnOutput else [])
feedDict = {self.inputImgs: batch.imgs, self.seqLen: [Model.maxTextLen] * numBatchElements,
self.is_train: False}
evalRes = self.sess.run(evalList, feedDict)
decoded = evalRes[0]
texts = self.decoderOutputToText(decoded, numBatchElements)
# feed RNN output and recognized text into CTC loss to compute labeling probability
probs = None
if calcProbability:
sparse = self.toSparse(batch.gtTexts) if probabilityOfGT else self.toSparse(texts)
ctcInput = evalRes[1]
evalList = self.lossPerElement
feedDict = {self.savedCtcInput: ctcInput, self.gtTexts: sparse,
self.seqLen: [Model.maxTextLen] * numBatchElements, self.is_train: False}
lossVals = self.sess.run(evalList, feedDict)
probs = np.exp(-lossVals)
# dump the output of the NN to CSV file(s)
if self.dump:
self.dumpNNOutput(evalRes[1])
return (texts, probs)
def save(self):
"save model to file"
self.snapID += 1
self.saver.save(self.sess, 'model/snapshot', global_step=self.snapID)
Adaptation 3 - of code in Main.py
¶
import cv2
import editdistance
class FilePaths:
"filenames and paths to data"
fnCharList = 'model/charList.txt'
fnAccuracy = 'model/accuracy.txt'
fnInfer = 'data/test.png'
fnCorpus = 'model/corpus.txt'
Here I've added this function that looks through the corpus of all words used in the training dataset and suggests a typo correctio if the predicted word is only less than 2 changes (i.e. thresh=2) away from a word in the corpus.
def find_closest(word, corpus, thresh=2):
"""Correct typos in text based on corpus"""
answer = []
for check in word.split():
# Compute distance to words in corpus
dist = np.array([editdistance.eval(check, w) for w in corpus])
# Replace words if close ones can be found, else leave them
if len(np.argwhere(dist<=thresh)):
check = corpus[dist.argmin()]
answer.append(check)
return ' '.join(answer)
def train(model, loader):
"train NN"
epoch = 0 # number of training epochs since start
bestCharErrorRate = float('inf') # best valdiation character error rate
noImprovementSince = 0 # number of epochs no improvement of character error rate occured
earlyStopping = 25 # stop training after this number of epochs without improvement
while True:
epoch += 1
print('Epoch:', epoch)
# train
print('Train NN')
loader.trainSet()
while loader.hasNext():
iterInfo = loader.getIteratorInfo()
batch = loader.getNext()
loss = model.trainBatch(batch)
print(f'Epoch: {epoch} Batch: {iterInfo[0]}/{iterInfo[1]} Loss: {loss}')
# validate
charErrorRate = validate(model, loader)
# if best validation accuracy so far, save model parameters
if charErrorRate < bestCharErrorRate:
print('Character error rate improved, save model')
bestCharErrorRate = charErrorRate
noImprovementSince = 0
model.save()
open(FilePaths.fnAccuracy, 'w').write(
f'Validation character error rate of saved model: {charErrorRate * 100.0}%')
else:
print(f'Character error rate not improved, best so far: {bestCharErrorRate * 100.0}%')
noImprovementSince += 1
# stop training if no more improvement in the last x epochs
if noImprovementSince >= earlyStopping:
print(f'No more improvement since {earlyStopping} epochs. Training stopped.')
break
def validate(model, loader):
"validate NN"
print('Validate NN')
loader.validationSet()
numCharErr = 0
numCharTotal = 0
numWordOK = 0
numWordTotal = 0
while loader.hasNext():
iterInfo = loader.getIteratorInfo()
print(f'Batch: {iterInfo[0]} / {iterInfo[1]}')
batch = loader.getNext()
(recognized, _) = model.inferBatch(batch)
print('Ground truth -> Recognized')
for i in range(len(recognized)):
numWordOK += 1 if batch.gtTexts[i] == recognized[i] else 0
numWordTotal += 1
dist = editdistance.eval(recognized[i], batch.gtTexts[i])
numCharErr += dist
numCharTotal += len(batch.gtTexts[i])
#print('[OK]' if dist == 0 else '[ERR:%d]' % dist, '"' + batch.gtTexts[i] + '"', '->', '"' + recognized[i] + '"')
# print validation result
charErrorRate = numCharErr / numCharTotal
wordAccuracy = numWordOK / numWordTotal
print(f'Character error rate: {charErrorRate * 100.0}%. Word accuracy: {wordAccuracy * 100.0}%.')
return charErrorRate
def testtest(model, loader):
"test NN"
print('Test NN')
loader.testSet()
numCharErr = 0
numCharTotal = 0
numWordOK = 0
numWordTotal = 0
while loader.hasNext():
iterInfo = loader.getIteratorInfo()
print(f'Batch: {iterInfo[0]} / {iterInfo[1]}')
batch = loader.getNext()
(recognized, _) = model.inferBatch(batch)
print('Ground truth -> Recognized')
for i in range(len(recognized)):
# Typo correction
recognized[i] = find_closest(recognized[i], model.corpus)
numWordOK += 1 if batch.gtTexts[i] == recognized[i] else 0
numWordTotal += 1
dist = editdistance.eval(recognized[i], batch.gtTexts[i])
numCharErr += dist
numCharTotal += len(batch.gtTexts[i])
print('[OK]' if dist == 0 else '[ERR:%d]' % dist, '"' + batch.gtTexts[i] + '"', '->', '"' + recognized[i] + '"')
# print test result
charErrorRate = numCharErr / numCharTotal
wordAccuracy = numWordOK / numWordTotal
print(f'Character error rate: {charErrorRate * 100.0}%. Word accuracy: {wordAccuracy * 100.0}%.')
return charErrorRate
def predpred(model, loader):
# Collect response
response = []
"pred NN"
print('Prediction NN')
loader.predictionSet()
while loader.hasNext():
iterInfo = loader.getIteratorInfo()
print(f'Batch: {iterInfo[0]} / {iterInfo[1]}')
batch = loader.getNext()
(recognized, _) = model.inferBatch(batch)
for i in range(len(recognized)):
# Typo correction
recognized[i] = find_closest(recognized[i], model.corpus, thresh=2)
response.append(recognized[i])
return response
Step 5 - Run the model¶
Finding optimal batch size and number of epochs took some time, and was just based on trail and error.
# Specify model specific parameters
batch_size = 500
imgSize = (256, 32)
maxTextLen = 32
nEpoch = 25000
Train the model¶
# Specify parameters
decoderType = DecoderType.BeamSearch
loader = DataLoader(batch_size, imgSize, maxTextLen, nEpoch=nEpoch)
# Reset graph
tf.compat.v1.reset_default_graph()
# save characters of model for inference mode
open(FilePaths.fnCharList, 'w').write(str().join(loader.charList))
# save words contained in dataset into file
open(FilePaths.fnCorpus, 'w').write(str(' ').join(loader.trainWords + loader.validationWords))
# execute training or validation
model = Model(loader.charList, decoderType, corpus=word_corpus)
train(model, loader)
Validation¶
# Reset graph
tf.compat.v1.reset_default_graph()
# Specify parameters
decoderType = DecoderType.BeamSearch
loader = DataLoader(batch_size, imgSize, maxTextLen, nEpoch=nEpoch)
# save characters of model for inference mode
open(FilePaths.fnCharList, 'w').write(str().join(loader.charList))
# save words contained in dataset into file
open(FilePaths.fnCorpus, 'w').write(str(' ').join(loader.trainWords + loader.validationWords))
# execute training or validation
model = Model(loader.charList, decoderType, mustRestore=True, corpus=word_corpus)
validate(model, loader)
Test on validation set¶
# Reset graph
tf.compat.v1.reset_default_graph()
# Specify parameters
decoderType = DecoderType.BeamSearch
loader = DataLoader(batch_size, imgSize, maxTextLen, nEpoch=nEpoch)
# save characters of model for inference mode
open(FilePaths.fnCharList, 'w').write(str().join(loader.charList))
# save words contained in dataset into file
open(FilePaths.fnCorpus, 'w').write(str(' ').join(loader.trainWords + loader.validationWords))
# execute test
model = Model(loader.charList, decoderType, mustRestore=True, corpus=word_corpus)
testtest(model, loader)
Prediction¶
# Reset graph
tf.compat.v1.reset_default_graph()
# Specify parameters
decoderType = DecoderType.BeamSearch
loader = DataLoader(batch_size, imgSize, maxTextLen, nEpoch=nEpoch)
# save characters of model for inference mode
open(FilePaths.fnCharList, 'w').write(str().join(loader.charList))
# save words contained in dataset into file
open(FilePaths.fnCorpus, 'w').write(str(' ').join(loader.trainWords + loader.validationWords))
# execute test
model = Model(loader.charList, decoderType, mustRestore=True, corpus=word_corpus)
response = predpred(model, loader)
Store response in CSV file¶
df_res = pd.read_csv('data/sample_submission.csv', index_col=0)
df_res.label[:] = response
df_res.to_csv('answer_123.csv')
Content
Comments
You must login before you can post a comment.