data engineering

spark streaming(dstream, structured streaming) 정리 + 삽질

qkqhxla1 2021. 5. 6. 23:49

spark streaming 에 대해 읽고 간단히 정리한다.
spark streaming에는 dstream과 structured streaming의 두가지가 있다. 

1. dstream (spark.apache.org/docs/latest/streaming-programming-guide.html#input-dstreams-and-receivers)

dstream

dstream의 input으로는 파일스트림, 소켓 연결, 카프카로의 입력이 기본적으로 제공된다.
custom receiver를 구현할 수도 있는데 scala나 java를 사용해 구현해야 한다.
spark.apache.org/docs/latest/streaming-custom-receivers.html


흐름을 파악하기 위해 소켓 연결 워드카운트 예제를 가볍게 정리해놓는다.
1) 'nc -lk 9999'로 localhost의 9999번 포트를 열어놓고 입력을 받기를 기다린다.
2) 아래 코드를 실행시킨다.(spark-submit없이 로컬에서 실행된다.)

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Create a local StreamingContext with two working thread and batch interval of 1 second
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)

# Create a DStream that will connect to hostname:port, like localhost:9999
lines = ssc.socketTextStream("localhost", 9999)

# Split each line into words
words = lines.flatMap(lambda line: line.split(" "))

# Count each word in each batch
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

# Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.pprint()

ssc.start()             # Start the computation
ssc.awaitTermination()  # Wait for the computation to terminate

localhost:9999로의 스트림을 받는 소켓을 만들었고, StreamingContext의 두번째 인자 1은 1초마다 배치로 확인한다는 뜻이다. 나머지는 별다른 설명이 필요 없다. 실행시키면 1초마다 시간이 나온다.

3) 이제 nc -lk 9999를 실행시켰던 쉘로 돌아가서 hi there같은걸 입력후 엔터치고, 위의 파이썬 코드가 돌아가고있는걸 확인해보자. 

-------------------------------------------
Time: 2021-05-03 21:57:49
-------------------------------------------
('there', 1)
('hi', 1)

-------------------------------------------
Time: 2021-05-03 21:57:50
-------------------------------------------
wordcount 예제가 잘 출력된다. 나머지는 공식문서를 참조하자.



2. structured streaming (spark.apache.org/docs/latest/structured-streaming-programming-guide.html#input-sources)

structured streaming

structured streaming도 input은 dstream과 거의 동일하다. 파일, 카프카, 소켓 등으로 데이터를 입력받는게 가능하다. dstream과 같은 스트리밍이지만 structured streaming이 더 최근에 나왔다. 동일한 소켓 예제를 돌려보자. dstream과 마찬가지고 nc -lk 9999로 9999를 열어놓고, 아래 소스코드를 실행하고, nc -lk 9999를 킨 터미널에서 값을 입력하면 된다.

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

spark = SparkSession \
    .builder \
    .appName("StructuredNetworkWordCount") \
    .getOrCreate()

# Create DataFrame representing the stream of input lines from connection to localhost:9999
lines = spark \
    .readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

# Split the lines into words
words = lines.select(
   explode(
       split(lines.value, " ")
   ).alias("word")
)

# Generate running word count
wordCounts = words.groupBy("word").count()

 # Start running the query that prints the running counts to the console
query = wordCounts \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

query.awaitTermination()

실행되는 결과를 비교해보자면, structured streaming이 확실히 더 실시간 streaming같다. dstream에서는 입력값이 있던 없던 1초마다 결과값이 나왔지만, structured streaming은 입력값이 있을때만 출력값이 있다.

Batch: 0
-------------------------------------------
+----+-----+
|word|count|
+----+-----+
+----+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+-----+-----+
| word|count|
+-----+-----+
|there|    1|
|   hi|    1|
+-----+-----+

그리고 dstream이 rdd로 결과가 나오는데 반해 structured streaming은 data frame으로 나와서 이후에 활용하기가 더 편하다. 아래 링크에서 차이점을 읽어보자.
blog.knoldus.com/spark-streaming-vs-structured-streaming/#:~:text=Structured%20Streaming%20works%20on%20the,no%20concept%20of%20a%20batch.

요약하자면 왠만해서는 최근에 나온 structured streaming이 더 좋다고 한다.

그런데! spark에서의 streaming은 pyspark가 아직 지원하지 않는 부분이 많아서, 지원하더라도 문서가 없어서(심지어 공식문서도)작업이 필요하면 scala로 작업하는게 권장된다.


실전 예제를 하나 추가한다. 스파크 스트리밍으로 카프카에서 데이터를 가져온 후, 중간 처리를 하고 몽고로 쓰는 예제를 만들어 보고자 한다. 현재 환경. kafka : 2.4.1, spark : 3.0.1 hadoop 3.2, python 3.6

현재 카프카에 security_protocol : sasl_plaintext등 여러가지 config가 설정되어 있다. 전부 설정해줘야 하는데 아래 링크에서 좋은 예제를 찾았다.
stackoverflow.com/questions/61481628/spark-structured-streaming-with-kafka-sasl-plain-authentication
설정후 콘솔로 확인하기위해 출력해주었다. 코드를 짜는 도중 디펜던시 에러가 발생해 이것저것 찾아본 후 아래의 패키지 3개를 추가해주었다.
--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1,org.apache.kafka:kafka-clients:2.4.0,org.apache.spark:spark-token-provider-kafka-0-10_2.12:3.0.1


그리고 만들어진 코드.
structured_streaming_kafka_example.py

from pyspark.sql import SparkSession
from common_function import get_mongo_info
import urllib

spark = SparkSession \
    .builder \
    .appName("kafka_streaming") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")  # 이거 없으면 INFO메시지가 너무 많이 뜬다. 궁금하면 주석처리하고 실행해보자.

kafka_username, kafka_password = '', ''
kafka_topic_name = ''
kafka_bootstrap_server = ''
options = {
    "kafka.sasl.jaas.config": 'org.apache.kafka.common.security.plain.PlainLoginModule required username="{}" password="{}";'.format(kafka_username, kafka_password),
    "kafka.sasl.mechanism": "PLAIN",
    "kafka.security.protocol": "SASL_PLAINTEXT",
    "kafka.bootstrap.servers": kafka_bootstrap_server,
    "group.id": 'test',
    "subscribe": kafka_topic_name,
}
df = spark.readStream.format("kafka").options(**options).load()

def write_mongo_row(df, epoch_id):
    if df.rdd.isEmpty():
        return
    # df.show()  # 테스트용

    mongo_host, mongo_port, mongo_id, mongo_password, mongo_uri_format = get_mongo_info()
    mongo_uri_format = mongo_uri_format.format(mongo_id, urllib.parse.quote(mongo_password), mongo_host, mongo_port,
                                               'db.streaming_test')
    # mongo_uri_format = "mongodb://{몽고 id}:{몽고 pw}@{몽고 address}:{몽고 port}/{몽고 db name}.{몽고 collection name}"  # uri포맷은 이와 같다.
    df.write \
        .format("com.mongodb.spark.sql.DefaultSource") \
        .option("spark.mongodb.output.uri", '{}?authSource=admin'.format(mongo_uri_format)) \
        .mode("append") \
        .save()

query = df \
    .selectExpr("CAST(value AS STRING)") \
    .writeStream \
    .foreachBatch(write_mongo_row) \
    .start()

# query = df \
#     .selectExpr("*") \
#     .writeStream \
#     .outputMode("append") \
#     .format("console") \
#     .start()  # 콘솔로 출력하는 예제. kafka같은 스트리밍으로 들어오는 데이터의 outputMode를 complete로 하면 에러뜨면서 안됨.

query.awaitTermination()

structured_streaming_kafka_example.sh

#!/bin/bash
SPARK_SERVER_ADDRESS=$SPARK_SERVER_ADDRESS

./spark-3.0.1-bin-hadoop3.2/bin/spark-submit --master ${SPARK_SERVER_ADDRESS} \
--total-executor-cores 2 --executor-cores 1 --driver-memory 2g \
--executor-memory 8g --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1,org.apache.kafka:kafka-clients:2.4.0,\
org.apache.spark:spark-token-provider-kafka-0-10_2.12:3.0.1 \
./structured_streaming_kafka_example.py

kafka로 읽어오는 부분은 pyspark로 구현은 했다. mongo로 쓰는 부분은 공식홈페이지에 없는데, foreachbatch로 구현가능하다. stackoverflow.com/questions/62125482/how-to-write-spark-structure-stream-into-mongodb-collection

structured streaming으로 읽어오는 공식 도큐먼트에서는 structured streaming이지만 dstream처럼 배치성으로 읽어오는 read와 정말 스트리밍으로 읽어오는 readstream 두개가 있는데, 카프카를 읽어오면 동작이 약간 다르다. 테스트용 카프카로 매분 100개씩 데이터를 쏘고있는데, read로 읽어오면 출력시 여러개를 읽어온다.(20개 이상) 그런데 readstream으로 읽어올경우 100개씩 프로듀싱되어도 1,2개씩만 읽어온다. 왜그러는지는 모르겠다.. 원래 이러는건지 버그인지. 이후 알게되면 더 추가예정

'data engineering' 카테고리의 다른 글

(비공개) Hive performance tuning guide  (0) 2021.05.13
spark + scala 세팅.  (0) 2021.05.08
kafka관련 좋은글 링크  (0) 2021.05.03
Spark Architecture, Job exectution flow  (0) 2021.04.17
Hive Architecture, Job exectution flow  (0) 2021.04.11