최근에 tensorflow를 공부하던 도중 keras를 사용해서 숫자 인식하는 코드가 많은것을 보고.. tensorflow도 공부해볼겸 다시 도전했다. 바로 이전글에서 이것저것 삽질하면서, 또 스스로 데이터를 수집하고 트레이닝을 시켜보면서 깨달은건 어떤 알고리즘을 사용하느냐보다는, 좋은 데이터가 '많이' 들어갔을때 좋은 결과를 낸다는거다.
엄청난 삽질을 해가며 데이터를 가공하고, 넣고, 예측 테스트를 했는데 예측률이 90%가 넘었을때의 기쁨은 진짜 어떤걸로도 표현할 수가 없을것 같다. 최근 몇달동안중 프로그래밍으로 이뤄낸 가장 기뻤던 성과인것 같다. wechall의 crackcha문제의 링크는 아래에 있다.(안보일시 로그인하면됨.)
https://www.wechall.net/challenge/crackcha/answer.php?answer=DZUHZ 처럼 보내주면 된다. 30분안에 468개의 캡챠를 깨야 한다.(틀려도 상관없음) 1800초에 468개니 약 3.8초에 1개씩 잘 보내면 된다. 문제는 1문제당 문자가 5개라는거다. 3.8초당 한번씩 5개를 동시에 정확하게 인식해야한다는게 어렵다. 이미 3년전에 엄청난 삽질을 하고도 실패했기에, 장기적이지만 정확도와 효율성을 높여서 문제에 접근하기로 결심했다.
첫번째 할 일은 데이터 수집이다. 저번에는 생각도 하기싫지만 반 수작업으로 어거지로 데이터를 모았었다. 이번에는 데이터 수집 자동화부터 제대로 만들어보기로 결심했다.
import numpy as np
import cv2
import requests
import queue
import os
import string
download_image_name = './image/problem.png'
first_preprocessed_name = './image/problem1.png'
after_bfs_dot_remove_name = './image/problem2.png'
class SiteHandler:
def __init__(self):
self.header = {'Cookie': 'WC=본인 쿠키'}
def init_site(self):
print('init site!')
requests.get('https://www.wechall.net/challenge/crackcha/reset.php', stream=True,
headers=self.header)
def download_photo(self, name):
file_path = '{}'.format(name)
while True:
try:
r = requests.get('http://www.wechall.net/challenge/crackcha/problem.php', stream=True,
headers=self.header)
if r.status_code == 200:
with open(file_path, 'wb') as f:
for chunk in r:
f.write(chunk)
break
except:
print('download fail. request again.')
def get_right_answer(self):
try:
page = requests.get('https://www.wechall.net/challenge/crackcha/answer.php?answer=aaaaa',
headers=self.header).text
answer = page.split('have been ')[1][:-1]
return answer
except:
return -1
def send_predicted_answer(self, answer):
try:
page = requests.get('https://www.wechall.net/challenge/crackcha/answer.php?answer={}'.format(answer),
headers=self.header).text
return page
except:
return -1
첫번째로 Wechall에서 reset이나 답을 보내거나 문제를 다운받기위해 SiteHandler이라는 클래스를 만들었다. 아래의 WC에는 로그인후 본인의 쿠키값을 넣어준다.init_site()에서는 reset.php를 호출해서 문제 진행상황을 리셋시켜준다.
download_photo에서는 문제 이미지를 다운로드해서 저장한다. get_right_answer에서는 일부러 틀린 답(aaaaa)를 보내 맞는 답을 가져온다. 틀릴 경우 Your answer (NXBNN) was wrong. Correct would have been NXHNN.와 같은 포맷으로 답이 출력되기에, 맞는 부분을 정규식이나 그런걸로 잘라서 가져오면 된다. send_predicted_answer에서는 나중에 예측 후 맞는 값을 보낼거다.
class BfsDenoiser:
def __init__(self):
self.bfs_dict = {}
self.cnt = 0
def denoising_small_dot(self, img, each_image):
# global img
for i in range(42): # 작은 필요없는 점들을 없애기 위한 작업.
for j in range(42):
if img[i][each_image*42 + j] == self.biggest_key:
img[i][each_image*42 + j] = 0
else:
img[i][each_image*42 + j] = 255
return img
def is_grouping_success(self):
if not self.bfs_dict: # 글자가 하나씩 전부 지워지는 경우가 있음. 그런 경우 다시수집.
return False
self.biggest_key, self.biggest_value = map(int, max(self.bfs_dict.items(), key=lambda x: x[1]))
if self.biggest_value < 40: # 한 글자당 40개미만의 픽셀로 이루어져있으면 학습에 부적합하다고 생각해서 재수집.
return False
return True
def grouping_with_bfs(self, img, each_image):
# global img
for i in range(42):
for j in range(42):
if img[i][each_image*42 + j] == 0:
self.bfs(img, i, each_image*42 + j, each_image)
return img
def bfs(self, img, y, x, each_image):
# global img
self.cnt += 1
img[y][x] = self.cnt
self.bfs_dict[str(self.cnt)] = 1
visited = [[0 for j in range(42 * (each_image+1))] for i in range(42)]
q = queue.Queue()
q.put([y,x])
visited[y][x] = 1
while not q.empty():
p = q.get()
for i in range(8):
ty, tx = p[0]+[1,1,1,0,0,-1,-1,-1][i], p[1]+[1,0,-1,1,-1,1,0,-1][i]
if ty < 0 or tx < 0 or ty > 41 or tx > (each_image+1)*42-1:
continue
if visited[ty][tx] == 0 and img[ty][tx] != 255:
q.put([ty, tx])
visited[ty][tx] = 1
img[ty][tx] = self.cnt
self.bfs_dict[str(self.cnt)] += 1
그다음으로 bfsDenoiser이다. 나중에 돌릴때는 속도가 더 중요해서 빼버렸고, 빼도 결과 차이가 크게 안났지만.. 데이터 수집할때 이 코드가 있으므로 그냥 썼다. 첫번째로 grouping_with_bfs에서는 bfs알고리즘을 사용해서 점들을 그룹핑한다. 위,아래,좌,우,대각선 4방향안에 점이 있으면 같은 그룹으로 인식한다. 간혹 이미지를 preprocess한 후에 이미지 끝부분에 점같은게 있어서.. 이걸 지워버리려고 이 방법을 썼다. denoising_small_dot에서는 가장 큰 그룹의 이미지가 중앙에 있는 숫자라고 가정하고, 나머지 작은 그룹들은 그냥 배경과 같은 0으로 칠해버렸다.
class ImageDataMaker:
def __init__(self):
self.sitehandler = SiteHandler()
def make_empty_image(self, name, row, column):
empty_png = np.zeros((42 * row, 42 * column))
empty_png.fill(255) # 흰색 바탕
self.save_image(name, empty_png)
def save_image(self, name, img):
cv2.imwrite(name, img)
def get_image_object(self, name):
return cv2.imread(name, 0)
def get_coordinate(self, name):
if not os.path.exists(name):
return -1, -1
with open(name, 'r') as f:
return map(int, f.readline().split(', '))
def set_coordinate(self, name, y, x):
with open(name, 'w') as f:
f.write('{}, {}'.format(y, x))
def is_data_collecting_finish(self, last_row):
for character in string.ascii_uppercase:
row, column = self.get_coordinate('./data/{}.txt'.format(character))
if row != last_row:
return False
return True
def write_character_to_empty_page(self, png_img, img, e, row, column):
for i in range(42):
for j in range(42):
if img[i][e * 42 + j] != 255:
png_img[row * 42 + i][column * 42 + j] = img[i][e * 42 + j]
else:
png_img[row * 42 + i][column * 42 + j] = 255
return png_img
def image_denoising(self, img):
denoised_image = cv2.fastNlMeansDenoising(img, 10, 30, 2, 100)
kernel = np.ones((2, 2), np.uint8)
denoised_image = cv2.morphologyEx(denoised_image, cv2.MORPH_OPEN, kernel)
return denoised_image
def image_thresholding(self, img):
ret, img = cv2.threshold(img, 120, 255, cv2.THRESH_BINARY)
return img
def preprocess_image(self, img):
img = self.image_denoising(img)
img = self.image_thresholding(img)
return img
def download_and_process_image(self):
while True:
self.sitehandler.download_photo(download_image_name)
img = self.get_image_object(download_image_name)
if img is None:
print('download fail. download again')
continue
preprocessed_image = self.preprocess_image(img) # 이미지를 받아와서 기본적인 전처리를 함.
#return preprocessed_image
self.save_image(first_preprocessed_name, preprocessed_image)
img = self.get_image_object(first_preprocessed_name)
if img is None:
print('None')
continue
fail_flag = False
for each_image in range(5): # 각각의 글자에 대해서 작업.
bfs_denoiser = BfsDenoiser()
img = bfs_denoiser.grouping_with_bfs(img, each_image) # bfs로 점들을 그룹핑함.
if not bfs_denoiser.is_grouping_success():
fail_flag = True
break
img = bfs_denoiser.denoising_small_dot(img, each_image)
if not fail_flag:
self.save_image(after_bfs_dot_remove_name, img)
return
def classify_and_save_image(self, answer, last_row, last_column):
for e, each_character in enumerate(answer):
character_png = './data/{}.png'.format(each_character)
character_txt = './data/{}.txt'.format(each_character)
if not os.path.exists(character_png):
self.make_empty_image(character_png, last_row, last_column)
self.set_coordinate(character_txt, 0, 0)
row, column = self.get_coordinate(character_txt)
if row > last_row - 1:
print(e, each_character, 'already finish. continue!')
continue
png_img = self.get_image_object(character_png)
img = self.get_image_object(after_bfs_dot_remove_name)
png_img = self.write_character_to_empty_page(png_img, img, e, row, column)
self.save_image(character_png, png_img)
column += 1
if column > last_column - 1:
column = 0
row += 1
self.set_coordinate(character_txt, row, column)
print('{} : {} {}'.format(each_character, row, column))
def combine_all_images(self):
result = './data/combined.png'
self.make_empty_image(result, 20 * 26, 100)
combined_image = []
for character in string.ascii_uppercase:
img = self.get_image_object('./data/{}.png'.format(character))
if combined_image == []:
combined_image = img
else:
combined_image = np.concatenate((combined_image, img))
self.save_image(result, combined_image)
그리고 메인 클래스인 ImageDataMaker이다. 생성자에서 위의 SiteHandler()을 만들어두고 쓴다. make_empty_image에서는 새로 저장할 경로의 이미지 이름을 받아서 42*row, 42*column의 크기로 만든다. 42*42로 만드는 이유는 Wechall의 한 글자가 42*42의 크기이기 때문이다.
get_coordinate에서는 이미지를 수집해서 종류별로 저장할건데,(A는 A대로, B는 B대로 등.) 첫번째 자리에 이미 이미지가 있으면 그 다음 자리에 이미지를 붙여야 하기 때문에 좌표를 저장해 둔다. 좌표는 파일로 쓰여지며, get_coordinate에서는 이미지를 쓸 좌표의 row와 column을 가져온다. is_data_collecting_finish에서는 모든 대문자를 돌면서 이미지의 좌표가 마지막 라인까지 다 쓰여졌는지를 확인한다. 모든 A~Z까지 이미지가 저장되었으면 True를 리턴한다.
write_character_to_empty_page에서는 해당 이미지의 좌표에 다운로드한 이미지를 그린다.
image_denoising, image_thresholding, preprocess_image함수는 이전에 삽질했던 링크의 preprocessing을 그대로 가져와서 클래스화했다.
나머지 함수는 main과 같이 설명하겠다.
def main():
datamaker = ImageDataMaker()
datamaker.sitehandler.init_site()
while True:
datamaker.download_and_process_image()
answer = datamaker.sitehandler.get_right_answer()
if answer == -1:
continue
print(answer)
last_row, last_column = 20, 100 # 10*100개의 이미지를 저장할거임.
datamaker.classify_and_save_image(answer, last_row, last_column)
if datamaker.is_data_collecting_finish(last_row):
break
datamaker.combine_all_images()
if __name__ == '__main__':
main()
main()함수에서는 Wechall사이트를 초기화한다. 그러고 위에서 설명한 is_data_collecting_finish.즉 모든 알파벳 데이터가 원하는 만큼 수집되기전까지 반복문이 계속 돈다.
download_and_process_image에서는 이미지를 다운로드받아 전처리를 하고 전처리에 성공하면 전처리된 이미지를 저장해둔다. 그후 이 이미지에 맞는 답을 가져온 후(get_right_answer) 좌표를 가져와서 하나하나 페이지에 그린다. last_row, last_column이 20,100이므로 각각 알파벳을 20행 * 100열개(2000개)만큼 가져온다는 뜻이다. ./data폴더에 저장되며, 다 저장된 후 아래처럼 저장된다. (다 모으는데 이틀 걸렸다..)
예시로 A.png를 열어보면 다음과 같다. row, column은 위에서 설정한대로 20, 100. 즉 2000개다.
A.txt는 마지막까지 갔으므로 20, 0으로 기록되어 있다. 이 이미지들은 전부 모아주는 함수가 combine_all_images이다. 저 함수를 실행시키면 A부터 Z까지 모든 이미지를 합친후 새로 그린다. 그린 후 아래와 같다. (일부만 캡쳐해서 올림.)