import numpy as np
import cv2
import requests
import queue
import os
import string
download_image_name = './image/problem.png'
first_preprocessed_name = './image/problem1.png'
after_bfs_dot_remove_name = './image/problem2.png'
class SiteHandler:
def __init__(self):
self.header = {'Cookie': 'WC=11107310-13590-TcoWToqv7YGPrVbC'}
def init_site(self):
print('init site!')
requests.get('https://www.wechall.net/challenge/crackcha/reset.php', stream=True,
headers=self.header)
def download_photo(self, name):
file_path = '{}'.format(name)
while True:
try:
r = requests.get('http://www.wechall.net/challenge/crackcha/problem.php', stream=True,
headers=self.header)
if r.status_code == 200:
with open(file_path, 'wb') as f:
for chunk in r:
f.write(chunk)
break
except:
print('download fail. request again.')
def get_right_answer(self):
try:
page = requests.get('https://www.wechall.net/challenge/crackcha/answer.php?answer=aaaaa',
headers=self.header).text
answer = page.split('have been ')[1][:-1]
return answer
except:
return -1
def send_predicted_answer(self, answer):
try:
page = requests.get('https://www.wechall.net/challenge/crackcha/answer.php?answer={}'.format(answer),
headers=self.header).text
return page
except:
return -1
class BfsDenoiser:
def __init__(self):
self.bfs_dict = {}
self.cnt = 0
def denoising_small_dot(self, img, each_image):
for i in range(42): # 작은 필요없는 점들을 없애기 위한 작업.
for j in range(42):
if img[i][each_image*42 + j] == self.biggest_key:
img[i][each_image*42 + j] = 0
else:
img[i][each_image*42 + j] = 255
return img
def is_grouping_success(self):
if not self.bfs_dict: # 글자가 하나씩 전부 지워지는 경우가 있음. 그런 경우 다시수집.
return False
self.biggest_key, self.biggest_value = map(int, max(self.bfs_dict.items(), key=lambda x: x[1]))
if self.biggest_value < 40: # 한 글자당 40개미만의 픽셀로 이루어져있으면 학습에 부적합하다고 생각해서 재수집.
return False
return True
def grouping_with_bfs(self, img, each_image):
for i in range(42):
for j in range(42):
if img[i][each_image*42 + j] == 0:
self.bfs(img, i, each_image*42 + j, each_image)
return img
def bfs(self, img, y, x, each_image):
# global img
self.cnt += 1
img[y][x] = self.cnt
self.bfs_dict[str(self.cnt)] = 1
visited = [[0 for j in range(42 * (each_image+1))] for i in range(42)]
q = queue.Queue()
q.put([y,x])
visited[y][x] = 1
while not q.empty():
p = q.get()
for i in range(8):
ty, tx = p[0]+[1,1,1,0,0,-1,-1,-1][i], p[1]+[1,0,-1,1,-1,1,0,-1][i]
if ty < 0 or tx < 0 or ty > 41 or tx > (each_image+1)*42-1:
continue
if visited[ty][tx] == 0 and img[ty][tx] != 255:
q.put([ty, tx])
visited[ty][tx] = 1
img[ty][tx] = self.cnt
self.bfs_dict[str(self.cnt)] += 1
class ImageDataMaker:
def __init__(self):
self.sitehandler = SiteHandler()
def make_empty_image(self, name, row, column):
empty_png = np.zeros((42 * row, 42 * column))
empty_png.fill(255) # 흰색 바탕
self.save_image(name, empty_png)
def save_image(self, name, img):
cv2.imwrite(name, img)
def get_image_object(self, name):
return cv2.imread(name, 0)
def get_coordinate(self, name):
if not os.path.exists(name):
return -1, -1
with open(name, 'r') as f:
return map(int, f.readline().split(', '))
def set_coordinate(self, name, y, x):
with open(name, 'w') as f:
f.write('{}, {}'.format(y, x))
def is_data_collecting_finish(self, last_row):
for character in string.ascii_uppercase:
row, column = self.get_coordinate('./data/{}.txt'.format(character))
if row != last_row:
return False
return True
def write_character_to_empty_page(self, png_img, img, e, row, column):
for i in range(42):
for j in range(42):
if img[i][e * 42 + j] != 255:
png_img[row * 42 + i][column * 42 + j] = img[i][e * 42 + j]
else:
png_img[row * 42 + i][column * 42 + j] = 255
return png_img
def image_denoising(self, img):
denoised_image = cv2.fastNlMeansDenoising(img, 10, 30, 2, 100)
kernel = np.ones((2, 2), np.uint8)
denoised_image = cv2.morphologyEx(denoised_image, cv2.MORPH_OPEN, kernel)
return denoised_image
def image_thresholding(self, img):
ret, img = cv2.threshold(img, 120, 255, cv2.THRESH_BINARY)
return img
def preprocess_image(self, img):
img = self.image_denoising(img)
img = self.image_thresholding(img)
return img
def download_and_process_image(self):
while True:
self.sitehandler.download_photo(download_image_name)
img = self.get_image_object(download_image_name)
if img is None:
print('download fail. download again')
continue
preprocessed_image = self.preprocess_image(img) # 이미지를 받아와서 기본적인 전처리를 함.
return preprocessed_image
self.save_image(first_preprocessed_name, preprocessed_image)
img = self.get_image_object(first_preprocessed_name)
if img is None:
print('None')
continue
fail_flag = False
for each_image in range(5): # 각각의 글자에 대해서 작업.
bfs_denoiser = BfsDenoiser()
img = bfs_denoiser.grouping_with_bfs(img, each_image) # bfs로 점들을 그룹핑함.
if not bfs_denoiser.is_grouping_success():
fail_flag = True
break
img = bfs_denoiser.denoising_small_dot(img, each_image)
if not fail_flag:
self.save_image(after_bfs_dot_remove_name, img)
return img
def classify_and_save_image(self, answer, last_row, last_column):
for e, each_character in enumerate(answer):
character_png = './data/{}.png'.format(each_character)
character_txt = './data/{}.txt'.format(each_character)
if not os.path.exists(character_png):
self.make_empty_image(character_png, last_row, last_column)
self.set_coordinate(character_txt, 0, 0)
row, column = self.get_coordinate(character_txt)
if row > last_row - 1:
print(e, each_character, 'already finish. continue!')
continue
png_img = self.get_image_object(character_png)
img = self.get_image_object(after_bfs_dot_remove_name)
png_img = self.write_character_to_empty_page(png_img, img, e, row, column)
self.save_image(character_png, png_img)
column += 1
if column > last_column - 1:
column = 0
row += 1
self.set_coordinate(character_txt, row, column)
print('{} : {} {}'.format(each_character, row, column))
def combine_all_images(self):
result = './data/combined.png'
self.make_empty_image(result, 20 * 26, 100)
combined_image = []
for character in string.ascii_uppercase:
img = self.get_image_object('./data/{}.png'.format(character))
if combined_image == []:
combined_image = img
else:
combined_image = np.concatenate((combined_image, img))
self.save_image(result, combined_image)
def main():
datamaker = ImageDataMaker()
datamaker.sitehandler.init_site()
while True:
datamaker.download_and_process_image()
answer = datamaker.sitehandler.get_right_answer()
if answer == -1:
continue
print(answer)
last_row, last_column = 20, 100 # 10*100개의 이미지를 저장할거임.
datamaker.classify_and_save_image(answer, last_row, last_column)
if datamaker.is_data_collecting_finish(last_row):
break
datamaker.combine_all_images()
if __name__ == '__main__':
main()
이전글의 클래스에서 달라진건 download_and process_image에서 return preprocessed_image로 그냥 리턴시켜서 뒤의 작업들을 더 진행하지 않는다는 것이다.(있으나 마나 시간만 늘어나서 그냥 없앰.)
import numpy as np
import tensorflow as tf
from tensorflow import keras
import cv2
import dataMaker
import datetime
class DataPreProcessor:
def get_image_object(self, name):
return cv2.imread(name, 0)
def get_divided_image(self, img, row, column):
return np.array([np.hsplit(each_row, column) for each_row in np.vsplit(img, row)])
def make_labeled_data(self, row, column, answer=''):
label_data_count = 26
if answer == '':
img = np.array([np.array([np.array([0 for i in range(label_data_count)])
for j in range(column)]) for k in range(row * label_data_count)])
for i in range(label_data_count):
for j in range(row):
for k in range(column):
img[i * 20 + j][k][ord('A') - 65 + i] = 1
else:
img = np.array([np.array([0 for i in range(label_data_count)]) for j in range(column)])
for i, c in enumerate(map(lambda x: ord(x) - 65, answer)):
img[i][c] = 1
return img.reshape(-1, label_data_count)
def get_divided_image_data(self, img, row, column):
img = self.get_divided_image(img, row, column)
img = img.reshape(-1, 42 * 42).astype('float32')/255/255
return img
def get_train_test_data(self):
combined_image = self.get_image_object('./data/combined.png')
image_data = self.get_divided_image_data(combined_image, 20 * 26, 100) # row는 20행*알파벳26개, column은 100개임.
image_label = self.make_labeled_data(20, 100)
print("image_data shape = {}".format(image_data.shape))
print("image_label shape = {}\n".format(image_label.shape))
r = np.arange(len(image_data))
np.random.shuffle(r)
randomized_image_data = image_data[r]
randomized_image_label = image_label[r]
validation_range = 47000
train_data = randomized_image_data[:validation_range] # ~validation_range까지는 트레이닝
train_label = randomized_image_label[:validation_range]
test_data = randomized_image_data[validation_range:] # validation_range~끝까지는 검증용.
test_label = randomized_image_label[validation_range:]
# test_data = self.get_divided_image_data('./image/problem2.png', 1, 5)
# test_label = self.make_labeled_data(1, 5, 'JIUTE')
print("train_data shape = {}".format(train_data.shape))
print("train_label shape = {}".format(train_label.shape))
print("test_data shape = {}".format(test_data.shape))
print("test_label shape = {}".format(test_label.shape))
return train_data, train_label, test_data, test_label
일단 첫번째 import하는 부분과 기본적으로 데이터를 가져와서 preprocessing하는 클래스다. make_labeled_image에서는 요기에서 설명했던 keras.utils.to_categorical함수까지 적용이 된 형태의 label된 데이터를 만들어준다.
label_data_count는 영어 문자 갯수인 26개이고, 이걸 기초로 label의 갯수를 만든다. 그리고 아래쪽 for문에서 해당 레이블에 맞는 영어숫자의 인덱스에 1을 할당한다.
get_train_test_data에서는 내가 조합한 combined.png를 가져와서 나눈다. 26개의 영어 단어를 20행, 100열로만들었으므로 20*26행, 100열로 나눈다. 그리고 트레이닝 데이터와 테스트 데이터를 나누기 편하도록 데이터의 순서를 랜덤화한다.
validation_range변수로 ~47000개 까지는 트레이닝용, 47000~52000까지는 검증용으로 사용해서 훈련이 얼마나 잘 되는지 검증하도록 하였다. validation_range 를 정할때 얼마나 검증용으로 사용해야 할지 결정하는대 애먹었는데, 그냥 이것저것 여러번 돌리다보니 검증용으로는 10%가 적당한것같았다. 어느정도가 적합한지는 아직 잘 모르겠다.
class TensorflowModel:
def build_new_model(self):
# building a linear stack of layers with the sequential model
model = keras.Sequential()
model.add(keras.layers.Dense(512, input_shape=(42 * 42,)))
model.add(keras.layers.Activation('relu'))
model.add(keras.layers.Dropout(0.2))
model.add(keras.layers.Dense(512))
model.add(keras.layers.Activation('relu'))
model.add(keras.layers.Dropout(0.2))
model.add(keras.layers.Dense(26))
model.add(keras.layers.Activation('softmax'))
model.compile(optimizer=tf.train.AdamOptimizer(),
loss='categorical_crossentropy',
metrics=['accuracy'])
return model
def train_model(self, model, train_data, train_label, test_data, test_label, batch_size=128, epochs=20):
model.fit(train_data, train_label,
batch_size=batch_size, epochs=epochs,
verbose=2,
validation_data=(test_data, test_label))
return model
def save_model(self, model, path):
model.save(path)
def load_model_and_compile(self, path):
model = keras.models.load_model(path)
model.compile(optimizer=tf.train.AdamOptimizer(),
loss='categorical_crossentropy',
metrics=['accuracy'])
return model
def load_model_and_test(self, test_data, test_label):
loss_and_metrics = self.model.evaluate(test_data, test_label, verbose=2)
print("Test Loss", loss_and_metrics[0])
print("Test Accuracy", loss_and_metrics[1])
def predict_text(self, model, test_data):
return ''.join(map(lambda x: chr(np.argmax(x) + 65), model.predict(test_data)))
텐서플로우의 모델 관련 정보를 가지고 있는 클래스이다. 앞에 나온 코드를 그냥 간단히 감쌌다. 나머지는 다 똑같고 predct_text에서는 해당 모델을 가지고 예측한후 가장 가능성이 높은 값을 알파벳으로 만들어 리턴한다.
훈련하는 부분이다. batch_size와 epochs는 각각 256, 90으로 설정한다. batch_size 와 epochs를 많이 변경해봤는데 batch_size는 바꿔도 크게 차이가 없는거같고, epochs가 높아질수록 정확도가 올라갔다. epochs가 높아질수록 학습이 많이 되니 그럼 epochs를 최대한 많이 하는게 좋지 않을까? 생각해서 찾아봤었는데 어떤 스택 오버플로우 글에서 검증값의 정확도가 더 올라가지 않을 정도까지만 epochs를 올리라고 하였다. epochs를 50,70,90,120까지 올리면서 계속 트레이닝을 해봤는데 90으로 했을때 검증 데이터의 정확도가 97%정도가 나오고 더이상 올라가지 않아서 90에서 그만두었다. (아래 사진에는 80부터 일정한데 내가돌릴때는 90정도부터 일정했었다..)
아래는 훈련이 끝난후 정확도를 캡쳐한것이다.
97.7%면 한문제당 글자가 5개니까, 5글자를 동시에 맞추는데 0.977**5 == 89%정도의 정확도가 나온다는 소리이다.
sitehandler = dataMaker.SiteHandler()
sitehandler.init_site()
datamaker = dataMaker.ImageDataMaker()
t = TensorflowModel()
batch_size, epochs = 256, 90
model = t.load_model_and_compile("./results/wechall_model_{}_{}.h5".format(batch_size, epochs))
success, fail = 0, 0
start = datetime.datetime.now()
while True:
img = datamaker.download_and_process_image()
datapreprocessor = DataPreProcessor()
test_data = datapreprocessor.get_divided_image_data(img, 1, 5)
predict = t.predict_text(model, test_data)
page = sitehandler.send_predicted_answer(predict)
if page != -1 and page.find('wrong') != -1:
fail += 1
else:
success += 1
progress = datetime.datetime.now() - start
print('{} 경과. 성공률 = {:.2f}%. 결과 = {}'.format(progress, (success/float(success+fail))*100, page))
if progress > datetime.timedelta(minutes=30): # 경과 시간이 끝날때까지
break
print('finish')
그리고 이제 드디어 테스트하는 코드이다. siteHandler를 dataMaker에서 가져왔고 사이트를 초기화한다. 그리고 batch_size, epochs가 각각 256, 90으로 트레이닝했던 모델을 가져와서 로드하고, 시간을 측정하면서 계속 캡챠를 다운로드받은 후 predict_text로 예측한다. 성공과 실패를 계산해서 성공률을 찍어준다. 30분이 지나면 종료하도록 하고 돌린 결과..
성공할수 있었다. 28분만에 성공했고, 성공률은 예측했던것과 비슷한 88%정도 나왔다. 사이트에서도 정확도가 높은 편이었다.