data engineering

spark + scala mongo to mysql 8+(jdbc), 궁금증 몇개

qkqhxla1 2021. 5. 17. 21:19

공부용으로 간단하게 만든 코드를 올린다. 코드를 짜면서 어떤걸 유의해야 되었는지 등을 더 적는다.
제목 그대로 몽고에서 데이터를 가져와서 가공했다고 가정하고, jdbc를 사용해서 mysql(8+버전)으로 삽입하는 예제이다.

scalaApp2.scala

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SaveMode
import java.util.Properties


object scalaApp2 {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder.appName("get data").getOrCreate()

    val (mongo_user, mongo_password, mongo_server, mongo_db, mongo_collection) = ("", "", "", "", "")
    val day = "20210513"
    val pipeline = s"{'$$match': {'day': '$day'}}"
    val mongo_uri_format = s"mongodb://${mongo_user}:${mongo_password}@${mongo_server}:27017/${mongo_db}.${mongo_collection}?authSource=admin"
    val df = spark.read.format("com.mongodb.spark.sql.DefaultSource")
      .option("spark.mongodb.input.partitioner", "MongoSamplePartitioner")
      .option("spark.mongodb.input.partitionerOptions.partitionKey", "_id")
      .option("spark.mongodb.input.partitionerOptions.partitionSizeMB", 128)
      .option("spark.mongodb.input.partitionerOptions.samplesPerPartition", 10)
      .option("spark.mongodb.input.uri", mongo_uri_format)
      .option("pipeline", pipeline).load()

    val (username, password, host, db) = ("", "", "", "")
    val prop = new Properties()
    prop.setProperty("user", username)
    prop.setProperty("password", password)
    prop.setProperty("driver", "com.mysql.cj.jdbc.Driver")  // com.mysql.jdbc.Driver는 deprecated되었다고 나온다.

    df.limit(10)
      .selectExpr("CAST(ctg1 AS STRING)", "CAST(ctg2 AS STRING)", "CAST(ctg3 AS STRING)",
        "CAST(Title AS STRING) as title", "CAST(selId AS STRING) as selid", "CAST(optionName AS STRING) as option_name")
      .write
      .mode(SaveMode.Append)
      .jdbc(s"jdbc:mysql://${host}:3306/${db}?serverTimezone=UTC", "test_table", prop)  // timezone을 UTCf로 설정해줘야한다.. KST는 인식못하는 버그가 있다함. https://devuna.tistory.com/47
  }
}

build.sbt

name := "scala-sbt-test"

version := "0.1"

scalaVersion := "2.12.13"

libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.0.1"
libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % "3.0.1"
libraryDependencies += "org.mongodb.spark" %% "mongo-spark-connector" % "3.0.1"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "3.0.1"
libraryDependencies += "mysql" % "mysql-connector-java" % "8.0.11"

1. mongo의 aggregate pipeline을 작성하기위해 $라는 문자 자체를 써야했는데, 스칼라에서 $는 변수값을 쓴다는 의미이다. 문자 그대로 $를 쓰려면 $$를 써야 했다.

2. 삽질하다가 발견한건데 위처럼 s""안에 $a처럼 쓰면 a라는 변수의 값을 쓸수 있다. 근데 s""안에 "를 문자 자체로 쓰려고했는데.. 보통 \"처럼 이스케이핑 처리를 해주면 되는데 스칼라는 그게 안된다. 그래서 아래와 같이 이상하게 해야 한다.
https://stackoverflow.com/questions/21086263/how-to-insert-double-quotes-into-string-with-interpolation-in-scala(뭐 이런게 -_-;)

3. 현재 스칼라에서 sbt로 빌드하고있다. 만약 내가 어떤 새로운걸 import한다고 하면.. build.sbt에서 libraryDependencies라인을 추가해줘야 한다.(아닐시 classNotFound에러가 뜸.) 현재 위의 build.sbt에는 다른 클래스에서 쓰는것까지 다 추가해놔서 좀 많은데, 클래스별로 분리할수 있는지는 찾아봐야겠다.(한 프로젝트에 한 클래스만 넣는건 말이 안되니)

4. 몽고에서 mysql로 데이터를 가져올때

.selectExpr("CAST(ctg1 AS STRING)", "CAST(ctg2 AS STRING)", "CAST(ctg3 AS STRING)",
"CAST(Title AS STRING) as title", "CAST(selId AS STRING) as selid", "CAST(optionName AS STRING) as option_name")

처럼 가져왔는데, CAST(Title AS STRING) as title에서 Title은 몽고의 필드명이고, 다시 title이라는 컬럼으로 재변환해주는 이유는 mysql에서의 필드네임이 title이기 때문이다. 순서만 맞으면 컬럼명이 달라도 알아서 알아서 들어갈줄 알았는데, 컬럼명이 틀리면 에러가 발생한다. dataframe의 컬럼명과 들어갈 mysql table의 컬럼명이 동일해야 한다.

5. 이건 아직 이해가 잘 안가는건데 mysql jdbc의 dependency를 위해
libraryDependencies += "mysql" % "mysql-connector-java" % "8.0.11"
라인을 build.sbt에 추가해줬음에도 불구하고 실행하면 에러가 나서, 스파크를 실행시킬때 명시적으로 --package옵션으로 추가를 해 줘야 한다.

./spark-3.0.1-bin-hadoop3.2/bin/spark-submit --packages mysql:mysql-connector-java:8.0.11 --class "scalaApp2" --master local[4] target/scala-2.12/scala-sbt-test_2.12-0.1.jar


6. dependency를 추가할때, %%와 %를 쓴다. 이게 무슨 차이일까?
-> https://stackoverflow.com/questions/17461453/build-scala-and-symbols-meaning

스칼라 공부하면서 느낀점 : 단순히 스파크를 위해서 입문용으로는 쉽다. 깊게 들어가서는 아직 깊게 못들어가봐서 어찌될지 모르겠다.