data engineering

spark udf

qkqhxla1 2021. 5. 30. 23:25

udf(user define function) in spark 초간단히 정리... 
parquet에서 데이터를 읽어서 dataframe으로 작업을 좀 하고 mysql로 insert를 하는데 필드에 이모찌가 포함이 되어있어서 mysql로 삽입중 에러가 난다. dataframe의 특정 필드에서 이모찌를 지우는 작업을 해주었다.

pyspark.
https://spark.apache.org/docs/3.0.1/api/python/pyspark.sql.html#pyspark.sql.functions.udf

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

import re

spark = SparkSession\
    .builder\
    .appName("myApp")\
    .getOrCreate()
df = spark.read.parquet('./data/*')

@udf(returnType=StringType())
def remove_emoji(text):
    regrex_pattern = re.compile(pattern = "["
        u"\U0001F600-\U0001F64F"  # emoticons
        u"\U0001F300-\U0001F5FF"  # symbols & pictographs
        u"\U0001F680-\U0001F6FF"  # transport & map symbols
        u"\U0001F1E0-\U0001F1FF"  # flags (iOS)
                           "]+", flags = re.UNICODE)
    return regrex_pattern.sub(r'', text)

df = df.withColumn('name2', remove_emoji(df.name))
df.select('name', 'name2').limit(10).show()

pyspark의 dataframe은 조금 써보면 알겠지만 파이썬 안에 뭔가 dataframe만을 다루기 위한 스칼라 언어를 이식해서 넣은 느낌이다. udf를 등록할때 위처럼 데코레이터를 넣어주는데(위의 공식 홈페이지 링크를 보면 다른 방법도 있다.) 파이썬에서는 안하는, 데코레이터에 리턴 타입을 미리 지정해줘야 한다.n udf말고도 dataframe의 각 필드들에 어떤 변형 작업을 할 때마다 pyspark.sql.functions에서 해당 함수를 가져와서 쓰거나, udf를 만든후 파이썬으로 가공을 해서 넣어야 한다.

굳이 udf말고도 pyspark.sql.functions 안에있는 regexp_replace를 사용해서도 이모찌 제거는 가능할것 같은데, 아직 dataframe을 다루는것보다는 raw python이 더 쉽게느껴져서 udf사용이 편하다. 그런데 dataframe용으로 만들어져있는 함수를 쓰는게 성능상 더 좋은것같으니 왠만해서는 udf는 필요한경우가 아니면 지양하는게 맞는것같다.
https://stackoverflow.com/questions/45491339/udfs-vs-spark-sql-vs-column-expressions-performance-optimization


scala
https://jaceklaskowski.gitbooks.io/mastering-spark-sql/content/spark-sql-udfs.html

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.udf


object scalaApp3 {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder.appName("get data").getOrCreate()
    val df = spark.read.parquet("./src/main/data")
    val remove_emoji = udf {name: String => name.replaceAll("[^\u0000-\uFFFF]", "")}
    // val remove_emoji = udf {name: String => """[😩😨]""".replaceAll("[^\u0000-\uFFFF]", "")} // test
    /*val remove_emoji = udf((name: String) => {
      if ("이름".equals("이름")) {
        name + "_rr"
      }
      else {
        name + "_concated!"
      }
    })*/ // multi line
    df.withColumn("name2", remove_emoji(df("name"))).select("name", "name2", "owner").show()
  }
}

스칼라는 스택오버플로우에서 한줄로 이모찌를 제거하는 코드를 찾아서 arrow function으로 한줄로 udf를 만들어주었다.파이썬에서도 위의 공식 링크를 들어가보면 한줄짜리 코드의 경우 lambda를 사용해서 한줄로 udf 만드는게 가능하다.