Breaking Wechall Crackcha using tensorflow(keras) 2. training and testing the data.
http://qkqhxla1.tistory.com/988 에 이은 글.
이젠 데이터를 트레이닝하고 테스트할 차례이다. 그전에 이전 글의 클래스를 가져왔다.
import numpy as np import cv2 import requests import queue import os import string download_image_name = './image/problem.png' first_preprocessed_name = './image/problem1.png' after_bfs_dot_remove_name = './image/problem2.png' class SiteHandler: def __init__(self): self.header = {'Cookie': 'WC=11107310-13590-TcoWToqv7YGPrVbC'} def init_site(self): print('init site!') requests.get('https://www.wechall.net/challenge/crackcha/reset.php', stream=True, headers=self.header) def download_photo(self, name): file_path = '{}'.format(name) while True: try: r = requests.get('http://www.wechall.net/challenge/crackcha/problem.php', stream=True, headers=self.header) if r.status_code == 200: with open(file_path, 'wb') as f: for chunk in r: f.write(chunk) break except: print('download fail. request again.') def get_right_answer(self): try: page = requests.get('https://www.wechall.net/challenge/crackcha/answer.php?answer=aaaaa', headers=self.header).text answer = page.split('have been ')[1][:-1] return answer except: return -1 def send_predicted_answer(self, answer): try: page = requests.get('https://www.wechall.net/challenge/crackcha/answer.php?answer={}'.format(answer), headers=self.header).text return page except: return -1 class BfsDenoiser: def __init__(self): self.bfs_dict = {} self.cnt = 0 def denoising_small_dot(self, img, each_image): for i in range(42): # 작은 필요없는 점들을 없애기 위한 작업. for j in range(42): if img[i][each_image*42 + j] == self.biggest_key: img[i][each_image*42 + j] = 0 else: img[i][each_image*42 + j] = 255 return img def is_grouping_success(self): if not self.bfs_dict: # 글자가 하나씩 전부 지워지는 경우가 있음. 그런 경우 다시수집. return False self.biggest_key, self.biggest_value = map(int, max(self.bfs_dict.items(), key=lambda x: x[1])) if self.biggest_value < 40: # 한 글자당 40개미만의 픽셀로 이루어져있으면 학습에 부적합하다고 생각해서 재수집. return False return True def grouping_with_bfs(self, img, each_image): for i in range(42): for j in range(42): if img[i][each_image*42 + j] == 0: self.bfs(img, i, each_image*42 + j, each_image) return img def bfs(self, img, y, x, each_image): # global img self.cnt += 1 img[y][x] = self.cnt self.bfs_dict[str(self.cnt)] = 1 visited = [[0 for j in range(42 * (each_image+1))] for i in range(42)] q = queue.Queue() q.put([y,x]) visited[y][x] = 1 while not q.empty(): p = q.get() for i in range(8): ty, tx = p[0]+[1,1,1,0,0,-1,-1,-1][i], p[1]+[1,0,-1,1,-1,1,0,-1][i] if ty < 0 or tx < 0 or ty > 41 or tx > (each_image+1)*42-1: continue if visited[ty][tx] == 0 and img[ty][tx] != 255: q.put([ty, tx]) visited[ty][tx] = 1 img[ty][tx] = self.cnt self.bfs_dict[str(self.cnt)] += 1 class ImageDataMaker: def __init__(self): self.sitehandler = SiteHandler() def make_empty_image(self, name, row, column): empty_png = np.zeros((42 * row, 42 * column)) empty_png.fill(255) # 흰색 바탕 self.save_image(name, empty_png) def save_image(self, name, img): cv2.imwrite(name, img) def get_image_object(self, name): return cv2.imread(name, 0) def get_coordinate(self, name): if not os.path.exists(name): return -1, -1 with open(name, 'r') as f: return map(int, f.readline().split(', ')) def set_coordinate(self, name, y, x): with open(name, 'w') as f: f.write('{}, {}'.format(y, x)) def is_data_collecting_finish(self, last_row): for character in string.ascii_uppercase: row, column = self.get_coordinate('./data/{}.txt'.format(character)) if row != last_row: return False return True def write_character_to_empty_page(self, png_img, img, e, row, column): for i in range(42): for j in range(42): if img[i][e * 42 + j] != 255: png_img[row * 42 + i][column * 42 + j] = img[i][e * 42 + j] else: png_img[row * 42 + i][column * 42 + j] = 255 return png_img def image_denoising(self, img): denoised_image = cv2.fastNlMeansDenoising(img, 10, 30, 2, 100) kernel = np.ones((2, 2), np.uint8) denoised_image = cv2.morphologyEx(denoised_image, cv2.MORPH_OPEN, kernel) return denoised_image def image_thresholding(self, img): ret, img = cv2.threshold(img, 120, 255, cv2.THRESH_BINARY) return img def preprocess_image(self, img): img = self.image_denoising(img) img = self.image_thresholding(img) return img def download_and_process_image(self): while True: self.sitehandler.download_photo(download_image_name) img = self.get_image_object(download_image_name) if img is None: print('download fail. download again') continue preprocessed_image = self.preprocess_image(img) # 이미지를 받아와서 기본적인 전처리를 함. return preprocessed_image self.save_image(first_preprocessed_name, preprocessed_image) img = self.get_image_object(first_preprocessed_name) if img is None: print('None') continue fail_flag = False for each_image in range(5): # 각각의 글자에 대해서 작업. bfs_denoiser = BfsDenoiser() img = bfs_denoiser.grouping_with_bfs(img, each_image) # bfs로 점들을 그룹핑함. if not bfs_denoiser.is_grouping_success(): fail_flag = True break img = bfs_denoiser.denoising_small_dot(img, each_image) if not fail_flag: self.save_image(after_bfs_dot_remove_name, img) return img def classify_and_save_image(self, answer, last_row, last_column): for e, each_character in enumerate(answer): character_png = './data/{}.png'.format(each_character) character_txt = './data/{}.txt'.format(each_character) if not os.path.exists(character_png): self.make_empty_image(character_png, last_row, last_column) self.set_coordinate(character_txt, 0, 0) row, column = self.get_coordinate(character_txt) if row > last_row - 1: print(e, each_character, 'already finish. continue!') continue png_img = self.get_image_object(character_png) img = self.get_image_object(after_bfs_dot_remove_name) png_img = self.write_character_to_empty_page(png_img, img, e, row, column) self.save_image(character_png, png_img) column += 1 if column > last_column - 1: column = 0 row += 1 self.set_coordinate(character_txt, row, column) print('{} : {} {}'.format(each_character, row, column)) def combine_all_images(self): result = './data/combined.png' self.make_empty_image(result, 20 * 26, 100) combined_image = [] for character in string.ascii_uppercase: img = self.get_image_object('./data/{}.png'.format(character)) if combined_image == []: combined_image = img else: combined_image = np.concatenate((combined_image, img)) self.save_image(result, combined_image) def main(): datamaker = ImageDataMaker() datamaker.sitehandler.init_site() while True: datamaker.download_and_process_image() answer = datamaker.sitehandler.get_right_answer() if answer == -1: continue print(answer) last_row, last_column = 20, 100 # 10*100개의 이미지를 저장할거임. datamaker.classify_and_save_image(answer, last_row, last_column) if datamaker.is_data_collecting_finish(last_row): break datamaker.combine_all_images() if __name__ == '__main__': main()
이전글의 클래스에서 달라진건 download_and process_image에서 return preprocessed_image로 그냥 리턴시켜서 뒤의 작업들을 더 진행하지 않는다는 것이다.(있으나 마나 시간만 늘어나서 그냥 없앰.)
import numpy as np import tensorflow as tf from tensorflow import keras import cv2 import dataMaker import datetime class DataPreProcessor: def get_image_object(self, name): return cv2.imread(name, 0) def get_divided_image(self, img, row, column): return np.array([np.hsplit(each_row, column) for each_row in np.vsplit(img, row)]) def make_labeled_data(self, row, column, answer=''): label_data_count = 26 if answer == '': img = np.array([np.array([np.array([0 for i in range(label_data_count)]) for j in range(column)]) for k in range(row * label_data_count)]) for i in range(label_data_count): for j in range(row): for k in range(column): img[i * 20 + j][k][ord('A') - 65 + i] = 1 else: img = np.array([np.array([0 for i in range(label_data_count)]) for j in range(column)]) for i, c in enumerate(map(lambda x: ord(x) - 65, answer)): img[i][c] = 1 return img.reshape(-1, label_data_count) def get_divided_image_data(self, img, row, column): img = self.get_divided_image(img, row, column) img = img.reshape(-1, 42 * 42).astype('float32')/255/255 return img def get_train_test_data(self): combined_image = self.get_image_object('./data/combined.png') image_data = self.get_divided_image_data(combined_image, 20 * 26, 100) # row는 20행*알파벳26개, column은 100개임. image_label = self.make_labeled_data(20, 100) print("image_data shape = {}".format(image_data.shape)) print("image_label shape = {}\n".format(image_label.shape)) r = np.arange(len(image_data)) np.random.shuffle(r) randomized_image_data = image_data[r] randomized_image_label = image_label[r] validation_range = 47000 train_data = randomized_image_data[:validation_range] # ~validation_range까지는 트레이닝 train_label = randomized_image_label[:validation_range] test_data = randomized_image_data[validation_range:] # validation_range~끝까지는 검증용. test_label = randomized_image_label[validation_range:] # test_data = self.get_divided_image_data('./image/problem2.png', 1, 5) # test_label = self.make_labeled_data(1, 5, 'JIUTE') print("train_data shape = {}".format(train_data.shape)) print("train_label shape = {}".format(train_label.shape)) print("test_data shape = {}".format(test_data.shape)) print("test_label shape = {}".format(test_label.shape)) return train_data, train_label, test_data, test_label
일단 첫번째 import하는 부분과 기본적으로 데이터를 가져와서 preprocessing하는 클래스다. make_labeled_image에서는 요기에서 설명했던 keras.utils.to_categorical함수까지 적용이 된 형태의 label된 데이터를 만들어준다.
label_data_count는 영어 문자 갯수인 26개이고, 이걸 기초로 label의 갯수를 만든다. 그리고 아래쪽 for문에서 해당 레이블에 맞는 영어숫자의 인덱스에 1을 할당한다.
get_train_test_data에서는 내가 조합한 combined.png를 가져와서 나눈다. 26개의 영어 단어를 20행, 100열로만들었으므로 20*26행, 100열로 나눈다. 그리고 트레이닝 데이터와 테스트 데이터를 나누기 편하도록 데이터의 순서를 랜덤화한다.
validation_range변수로 ~47000개 까지는 트레이닝용, 47000~52000까지는 검증용으로 사용해서 훈련이 얼마나 잘 되는지 검증하도록 하였다. validation_range 를 정할때 얼마나 검증용으로 사용해야 할지 결정하는대 애먹었는데, 그냥 이것저것 여러번 돌리다보니 검증용으로는 10%가 적당한것같았다. 어느정도가 적합한지는 아직 잘 모르겠다.
class TensorflowModel: def build_new_model(self): # building a linear stack of layers with the sequential model model = keras.Sequential() model.add(keras.layers.Dense(512, input_shape=(42 * 42,))) model.add(keras.layers.Activation('relu')) model.add(keras.layers.Dropout(0.2)) model.add(keras.layers.Dense(512)) model.add(keras.layers.Activation('relu')) model.add(keras.layers.Dropout(0.2)) model.add(keras.layers.Dense(26)) model.add(keras.layers.Activation('softmax')) model.compile(optimizer=tf.train.AdamOptimizer(), loss='categorical_crossentropy', metrics=['accuracy']) return model def train_model(self, model, train_data, train_label, test_data, test_label, batch_size=128, epochs=20): model.fit(train_data, train_label, batch_size=batch_size, epochs=epochs, verbose=2, validation_data=(test_data, test_label)) return model def save_model(self, model, path): model.save(path) def load_model_and_compile(self, path): model = keras.models.load_model(path) model.compile(optimizer=tf.train.AdamOptimizer(), loss='categorical_crossentropy', metrics=['accuracy']) return model def load_model_and_test(self, test_data, test_label): loss_and_metrics = self.model.evaluate(test_data, test_label, verbose=2) print("Test Loss", loss_and_metrics[0]) print("Test Accuracy", loss_and_metrics[1]) def predict_text(self, model, test_data): return ''.join(map(lambda x: chr(np.argmax(x) + 65), model.predict(test_data)))
텐서플로우의 모델 관련 정보를 가지고 있는 클래스이다. 앞에 나온 코드를 그냥 간단히 감쌌다. 나머지는 다 똑같고 predct_text에서는 해당 모델을 가지고 예측한후 가장 가능성이 높은 값을 알파벳으로 만들어 리턴한다.
datapreprocessor = DataPreProcessor() train_data, train_label, test_data, test_label = datapreprocessor.get_train_test_data() batch_size, epochs = 256, 90 t = TensorflowModel() model = t.build_new_model() model = t.train_model(model, train_data, train_label, test_data, test_label, batch_size, epochs) t.save_model(model, './results/wechall_model_{}_{}.h5'.format(batch_size, epochs))
훈련하는 부분이다. batch_size와 epochs는 각각 256, 90으로 설정한다. batch_size 와 epochs를 많이 변경해봤는데 batch_size는 바꿔도 크게 차이가 없는거같고, epochs가 높아질수록 정확도가 올라갔다. epochs가 높아질수록 학습이 많이 되니 그럼 epochs를 최대한 많이 하는게 좋지 않을까? 생각해서 찾아봤었는데 어떤 스택 오버플로우 글에서 검증값의 정확도가 더 올라가지 않을 정도까지만 epochs를 올리라고 하였다. epochs를 50,70,90,120까지 올리면서 계속 트레이닝을 해봤는데 90으로 했을때 검증 데이터의 정확도가 97%정도가 나오고 더이상 올라가지 않아서 90에서 그만두었다. (아래 사진에는 80부터 일정한데 내가돌릴때는 90정도부터 일정했었다..)
아래는 훈련이 끝난후 정확도를 캡쳐한것이다.
97.7%면 한문제당 글자가 5개니까, 5글자를 동시에 맞추는데 0.977**5 == 89%정도의 정확도가 나온다는 소리이다.
sitehandler = dataMaker.SiteHandler() sitehandler.init_site() datamaker = dataMaker.ImageDataMaker() t = TensorflowModel() batch_size, epochs = 256, 90 model = t.load_model_and_compile("./results/wechall_model_{}_{}.h5".format(batch_size, epochs)) success, fail = 0, 0 start = datetime.datetime.now() while True: img = datamaker.download_and_process_image() datapreprocessor = DataPreProcessor() test_data = datapreprocessor.get_divided_image_data(img, 1, 5) predict = t.predict_text(model, test_data) page = sitehandler.send_predicted_answer(predict) if page != -1 and page.find('wrong') != -1: fail += 1 else: success += 1 progress = datetime.datetime.now() - start print('{} 경과. 성공률 = {:.2f}%. 결과 = {}'.format(progress, (success/float(success+fail))*100, page)) if progress > datetime.timedelta(minutes=30): # 경과 시간이 끝날때까지 break print('finish')
그리고 이제 드디어 테스트하는 코드이다. siteHandler를 dataMaker에서 가져왔고 사이트를 초기화한다. 그리고 batch_size, epochs가 각각 256, 90으로 트레이닝했던 모델을 가져와서 로드하고, 시간을 측정하면서 계속 캡챠를 다운로드받은 후 predict_text로 예측한다. 성공과 실패를 계산해서 성공률을 찍어준다. 30분이 지나면 종료하도록 하고 돌린 결과..
성공할수 있었다. 28분만에 성공했고, 성공률은 예측했던것과 비슷한 88%정도 나왔다. 사이트에서도 정확도가 높은 편이었다.
아니 98%나온분은 어떤걸 사용해서 저렇게 나왔는지 도저히 모르겠다...