data engineering

spark 기초.

qkqhxla1 2017. 6. 8. 12:08

python기반의 pyspark기준으로 설명하겠다. 처음에 기본적인 sparkContext를 선언해주자. 

from pyspark import SparkConf, SparkContext

conf = SparkConf().setAppName("PySpark") # 내가 만들 앱의 이름.
sparkContext = SparkContext(conf=conf) # 설정

간단하게 선언이 가능하다.

스파크를 다운받으면 있는 파이썬 쉘에는 기본적으로 sc로 SparkContext가 선언되어있다. 그러므로 쉘에서는 굳이 선언이 필요없고 sc를 가져다가 쓰면 된다.


RDD란 분산되어 존재하는 데이터 요소들의 모임이다.(스파크에서 사용하는 자료구조라고 생각하자.) 스파크에서의 모든 작업은 새로운 RDD를 만들거나, 존재하는 RDD를 변형하거나 연산을 호출하는것중에 하나이다. RDD는 클러스터의 서로 다른 노드들에서 연산 가능하도록 여러 개의 파티션으로 나뉜다. 병렬 처리를 위한 자료구조라고 생각하면 편하다.

스파크 쉘에서 기본적으로 파일을 읽어오는 예제인데 RDD임을 알수 있다. RDD연산이 실행될 때는 익스큐터라고 하는 컨테이너에 하나씩 알아서 할당되서 실행되는데, 각각이 하나의 머신?프로세스?라고 보면 된다.(익스큐터 갯수도 조절할수 있다.) 

원래 프로그래밍시 일반적으로 하나의 프로세스에서 돌아가는것과 달리 하나의 프로세스가 할 일을 여러개의 프로세스가 나눠서 처리한다고 생각하면 된다. 

예를 들어 큰 하나의 파일을 열어서 내용을 가져오거나 연산을 할 때, 파일이 애초에 병렬처리연산을 하기 쉽도록 쪼개져서 저장되어있다.(이런게 RDD구조인것같다) 이걸 익스큐터 10개로 처리한다고 가정하면 큰 하나의 파일을 10개씩 구획을 나눠서 처리한다고 생각하면 된다. (나중에 보면 신기하다.) 


RDD관련 함수나 연산들은 구글링해보자.


RDD는 두 가지 방법으로 만들수 있다. 위처럼 외부 데이터를 불러오거나, parallelize메소드를 쓰면 된다.

ex) lines = sc.parallelize(["1",'rara'])


RDD는 두 가지 타입의 연산 작업이 있다. 트랜스포메이션과 액션인데 트랜스포메이션은 새로운 RDD를 만들어 내는 연산이고, 액션은 결과를 되돌려주거나 스토리지에 결과를 써넣는다. 스파크는 트랜스포메이션과 액션을 매우 다르게 취급하므로 실행하는 연산의 타입에 대한 이해가 중요하다.

트랜스포메이션은 RDD를 리턴하지만 액션은 그 외의 타입을 리턴한다.


트랜스포메이션 예제.

map함수가 있는데 파이썬의 map과 비슷하다.

파이썬의 map과 같다. 다만 map으로 감싸는게 아니라 .map으로 호출한다는 점과 데이터를 보려면 collect()를 호출해줘야 한다는점. RDD는 트랜스포메이션들이 생성한 데이터를 어떻게 계산할지에 대한 명령어들을 갖고 있는 구조이다. 실제로 실행되는건 진짜 데이터가 필요한 시점이다. 

아래에서는 collect가 호출되기 전까지는 어떤 잘못을 해도 넘어간다. collect()함수가 호출될때 실제 연산이 일어나므로 map에서 어떤 잘못된 연산을 해도 일단은 그냥 넘어간다.

나머지 연산들은 구글링하자.


또 클래스 내부에서 호출시 참조 영역에 주의해야 한다. 객체의 맴버인 함수를 전달하거나 객체의 필드 참조를 갖고 있는 함수를 전달할 때 스파크는 작업 노드들에 전체 객체를 전달하므로 필요로 하는 정보에 비해 상당히 커질 수 있다. 


아래 코드를 돌려보자. isodd함수가 전달될 때, isodd함수 정보만 전달되는게 아니라 전체 객체가 보내진다. 이 경우 에러가 발생한다.

from pyspark import SparkConf, SparkContext

conf = SparkConf().setAppName("PySpark")
sc = SparkContext(conf=conf)

class myclass():
	def isodd(self, num):
		return num%2 == 1
	def spark(self):
		lines = sc.parallelize([1,2,3,4]).map(lambda x:self.isodd(x)).collect()
		return lines

m = myclass()
m.spark()

이 경우 isodd함수를 전역변수로 선언하는 로직으로 변경하면 해결된다. 마찬가지로 아래 코드를 보자.

주석처리된 self.t가 전달될때 정보 전체가 전달되므로 에러가 발생하지만 주석을 풀고 isodd함수에 self.t대신 val을 전달하면 에러가 발생하지 않는다.

from pyspark import SparkConf, SparkContext

conf = SparkConf().setAppName("PySpark")
sc = SparkContext(conf=conf)

class temp():
	pass
def isodd(num):
	pass
class myclass():
	def __init__(self):
		self.t = temp()
	def spark(self):
		# val = self.t
		lines = sc.parallelize([1,2,3,4]).map(lambda x:isodd(self.t)).collect()
		return lines

m = myclass()
m.spark() 

책에서는 이게 파이썬에서 피클링되는 문제라고 하는데 깊데 들어가보진 않아서 모르겠다. 이밖에 RDD에서만 쓸수 있는 액션 등등이 있으니 참고하자.


영속화.

때때로 결과값이 나온 RDD를 여러번 사용하고 싶을 때가 있는데, 재연산하면 너무 비효율적이므로 결과값 영속화 요청을 할수 있다. ex) 

r = ~~~.map(lambda x:x)

r.persist(옵션)

옵션에 따라 몇개의 디스크에 저장할지 등등을 결정할수 있다.

'data engineering' 카테고리의 다른 글

hadoop shell 관련.  (0) 2017.06.25
aws 개발 관련 기본, 설정.  (0) 2017.06.24
oozie workflow 기본과 삽질한 내용들  (0) 2017.06.04
mapreduce(맵리듀스)란?  (0) 2017.05.20
docker 삽질.  (0) 2017.04.25