data engineering

spark로 hive 쿼리 실행시키기.

qkqhxla1 2017. 7. 14. 15:26

하이브보다 스파크에서 하이브 쿼리를 실행시켰을때 속도가 더 빠르다.(당연하겠지만) 대신 스파크는 리턴값의 크기 등을 모르기 때문에 프로그래머도 모르게 메모리 초과 오류 등이 날수 있다는점이 안좋다.


시간이 그닥 안중요하고, 안정성이 중요 -> 하이브 쿼리.

시간 단축(성능 향상)이 목표, 자원은 충분, 그대신 안정성을 약간 포기 -> 스파크로 돌리기.


아래는 코드만.

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import urllib2
from redis import Redis
import sys
import re
import json
from datetime import datetime
import time
from slacker import Slacker
from pyspark import SparkConf, SparkContext
from pyspark.sql import HiveContext, Row
from pyspark.sql.types import StringType
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.functions import *
from pyspark.sql.readwriter import DataFrameWriter

sconf = SparkConf().setAppName("PySpark")
sconf.set('spark.kryoserializer.buffer.max', '1024')
sparkContext = SparkContext(conf=sconf)
hiveContext = HiveContext(sparkContext)
hiveContext.setConf("hive.vectorized.execution.enabled", "true")
hiveContext.setConf("hive.vectorized.execution.reduce.enabled", "nonstrict")
hiveContext.setConf("hive.exec.dynamic.partition", "true")
hiveContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
hiveContext.setConf("hive.cbo.enable", "true")
hiveContext.setConf("hive.compute.query.using.stats", "true")
hiveContext.setConf("hive.stats.fetch.column.stats", "true")
hiveContext.setConf("hive.stats.fetch.partition.stats", "true")
hiveContext.setConf("hive.exec.parallel", "true")

reload(sys);
sys.setdefaultencoding("utf-8")

# 나중 참고.
# https://stackoverflow.com/questions/38249624/how-to-increase-partitions-of-the-sql-result-from-hivecontext-in-spark-sql

# 하이브 blocksize 옵션 적용 관련
sparkContext._jsc.hadoopConfiguration().set("dfs.block.size", "104857600")

SQL = '''
~~~~~~~~ # sql문을 적는다. 끝에 ;를 붙이지 말자.
'''

dfs = hiveContext.sql(SQL)
dfs.write.csv('~~~~/sparktestttt22.csv')