data engineering

pyspark에서 jar파일 사용하는방법. (py4j)

qkqhxla1 2017. 7. 20. 11:06

이것저것 자료를 찾아보고 통합해서 적었다. 

pyspark에서 jar파일을 사용해야 하는 상황이 왔다. 원래는 서버에 요청을 보내는 방식이었는데 네트워크상의 부하가 너무 크고, jar파일을 가져와서 로컬에서 돌리면 네트워크 부하도 줄고 속도도 빨라져서 이걸로 돌리기로 했다. 문제는 한번도 해본적이 없다는거다.


py4j (https://www.py4j.org/getting_started.html)가 이런 작업에 좋다고 들어서 이걸로 하기로 결정했다. py4j에 나온 예제는 자바 소스를 만들때 py4j라이브러리를 연결해서, import py4j.GatewayServer; 등으로 임포트 한 후 메인소스에 GatewayServer gatewayServer = new GatewayServer(new StackEntryPoint()); 

gatewayServer.start();처럼 삽입해서 실행시켜놓은후, 파이썬에서 py4j를 이용한 소스를 돌리면 저기를 통해서 자바 함수에 접근 가능하다고 나와있다. http://lingua.blog.me/220880055851 도 잘 나와있다.


하지만 내가 원하는건 '이미 만들어져 나온' jar파일 내부의 함수 하나를 그냥 파이썬에서 접근해서 실행시키는거다. 

예제랑 다른 케이스여서 안될줄 알았는데 구글링을 하다보니 되는것같아 보이는 글을 하나 발견했다 : https://stackoverflow.com/questions/29986922/using-py4j-to-invoke-a-method-that-takes-a-javasparkcontext-and-return-a-javardd

그래서 아래처럼 테스트를 하였다.



첫번째로 예제로 사용할 jar파일을 만들어야 한다. https://www.youtube.com/watch?v=3Xo6zSBgdgk 을 참고해서 jar파일을 만든다. 내가 사용한 자바 소스는 아래와 같다.

public class jartest {
    public static void _print(int a){
        System.out.printf("Hello world! a = %d", a);
    }
}

만든 후 위 동영상을 참고해서 jar를 만들었다. 다음과 같은 구조가 만들어지며 jar가 나왔다.

jartest.jar 내부의 _print함수는 int형을 인자로 받아 hello world와 함께 실행시킨다.


이젠 spark부분이다. 위의 스택오버플로우 링크를 참조해서 아래처럼 스파크 소스를 만들었다. 

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from py4j.java_gateway import java_import
import subprocess
import urllib2
from redis import Redis
import sys
import re
import json
from datetime import datetime
import time
from slacker import Slacker
from pyspark import SparkConf, SparkContext
from pyspark.sql import HiveContext, Row
from pyspark.sql.types import StringType
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.functions import *
from pyspark.sql.readwriter import DataFrameWriter

sconf = SparkConf().setAppName("PySpark")
sconf.set('spark.kryoserializer.buffer.max', '1024')
sparkContext = SparkContext(conf=sconf)

java_import(sparkContext._gateway.jvm, "jartest")

func = sparkContext._gateway.jvm.jartest()
func._print(10)

그리고 저 스파크 소스를 워크플로우에 등록 시킨 후 위에서 만든 jartest.jar파일을 서버에 업로드한 후 워크플로우의 options에서 다음과 같은 옵션으로 jar을 주었다. --py-files ${nameNode}~~경로~~/jartest.jar 그리고 실행 결과..