study 974

airflow Kubernetes Executor, how helm values.yaml works

바로 이전글(https://qkqhxla1.tistory.com/1159) 에서는 kubernetes에서 helm을 사용해 airflow를 구성하는데 초점을 두었었다. 그런데 내가 kubernetes와 helm에 익숙하지 않아서 가장 쉽고 익숙한 airflow를 CeleryExecutor로 구성했었다. airflow-worker에 spark를 통째로 올려놓고, docker-in-docker로 docker를 마운트해놓은다음 대부분의 배치 잡은 DockerOperator로 실행하고, spark잡을 실행할 때는 airflow-worker내에 설치된 스파크를 직접 호출해서 사용한다. 간단한 구성도이다. CeleryExecutor이 기본 설정으로 되어있기도 하고 구성도 쉬워서 이렇게 만들었었는데, 구성하고 이것저..

data engineering 2021.09.05

customizing helm chart (bitnami/airflow)

이전에 다른 제목으로 비슷한 글을 쓴적이 있는데 지금 보니까 정말 별다른 영양가가 없는 글이었다. 그래서 이전 글은 지워버리고 이번에 새로 얻은 지식까지 합쳐서 다시 정리한다. 이전에도 적은것같지만 bitnami라는 회사가 있는데 오픈소스로 helm chart를 상당히 많이 만들었고, 상당히 자주 업데이트해주면서 종류도 많고 완성도도 높아서 여기서 주로 가져다가 app을 만들어서 쓴다. k8s와 helm자체에 익숙하지 않았기에 여태까지는 기본으로 제공해주는 기능만 썼는데 더 필요한 기능이 있어도 수정할줄을 모르고.. 이러다보니 반쪽짜리 지식이 되버렸다. 빡쳐서 주말동안 커스터마이징을 하기 위해 공부와 삽질하고, 얻은 지식을 정리한다. 우선 목적을 적자면 airflow helm chart를 설치해서, ai..

data engineering 2021.08.19

leetcode 1638(브루트포스), 1387(dp), 35(이분 탐색)

1638 https://leetcode.com/problems/count-substrings-that-differ-by-one-character/ 제한조건이 s와 t가 둘다 길이가 100이하이니 그냥 되는대로 구현하면 된다. s의 substring이 t의 substring안에 속하는데 하나만 달라야 한다. class Solution(object): def countSubstrings(self, s, t): #s = "aba" #t = "baba" len_s, len_t = len(s), len(t) ret = 0 for i in range(len_s): for j in range(len_t): diff, k = 0, 0 #print i,j while k+i < len_s and k+j < len_t: #..

haproxy tutorial (basic loadbalancing example)

haproxy에서 라운드로빈으로 로드밸런싱하는 예제를 올린다. 추가적으로 커스터마이징같은거는 한번 돌아가는걸 확인하면 이것저것 해보면서 테스트가 가능하기에 올리지 않는다. haproxy란? : https://leffept.tistory.com/309에 잘 설명되어있다. 몇가지만 가져왔다. 1. HAProxy는 기존의 하드웨어 스위치를 대체하는 소프트웨어 로드 밸런서로, 네트워크 스위치에서 제공하는 L4, L7 기능 및 로드 밸런서 기능을 제공한다. 설치가 쉽고 빠르기에 서비스 이중화(HA- High Availability)를 구성하는데 주로 사용한다. 2. HAProxy는 기본적으로 reverse proxy 형태로 동작한다. (reverse proxy는 서버로 들어오는 요청을 대신 받아서 서버에 전달하고..

data engineering 2021.07.12

leetcode 1823(조세퍼스 문제, 구현), 1448(트리, 재귀), 1396(구현)

1823 https://leetcode.com/problems/find-the-winner-of-the-circular-game/ 딱봐도 문제가 원형 이중연결리스트처럼 생겨서 이걸 만들어서 풀었다. 근데 시간이 많이 느리다.. 뭐지 하고 다른 답을 봤는데 아주 예전에 풀었던 조세퍼스(요세푸스) 문제였다. https://qkqhxla1.tistory.com/707 본지 하두 오래돼서 잊어먹고 있었다. 원형 이중연결리스트. class Node: def __init__(self, v): self.val = v self.next = None self.prev = None class Solution(object): def findTheWinner(self, n, k): start_node = cur_node = ..

setting k8s using rke + aws ec2

예전에 https://qkqhxla1.tistory.com/1026 에서 rke(rancher kubernetes engine)환경에서 aws cluster를 추가하는 글을 썼었는데 처음부터 만드는 글이 아니라 단순하게 클러스터 추가와 aws 설정에 초점이 맞추어져있었다. 이번에는 처음에 구성하는 글을 적으려고 한다. 근데 aws를 사용하는데 eks가 아니라 단순히 여러대 있는 서버에 설치하듯이 ec2 ubuntu에 세팅할 예정이다. eks로 세팅하는건 아직 잘 몰라서 익숙한 방법인 이 방법으로 세팅한다. 아마 나중에 eks로 세팅할 일이 있으면 다시 글을 적을것 같다. 왠만해서는 처음부터 다시 다 세팅할 일이 없는데.... 인증서 관련 장애로 rancher ui가 다 날라가버렸다. 어떻게든 살리려 노력..

data engineering 2021.06.23

leetcode 617(트리) 1351(구현), 1104(트리), 1472(구현)

617 https://leetcode.com/problems/merge-two-binary-trees/ 재귀 연습하기 좋은 기초문제다. 이런 비슷한 문제를 앞에서 여러번 풀어봤지만 그래도 좋다고 생각해서 가져온다.. class Solution(object): def mergeTrees(self, t1, t2): if not t1 or not t2: # 둘중 하나가 없으면 있는 노드 리턴 return t1 or t2 node = TreeNode(t1.val + t2.val) node.left = self.mergeTrees(t1.left, t2.left) node.right = self.mergeTrees(t1.right, t2.right) return node 1351 https://leetcode.co..

python create parquet format for hive, map type column.

1. make parquet file spark를 사용하는 경우 dataframe.write.parquet처럼 간편하게 하면 되니 패스하고, raw python의 경우는 아래처럼 pandas를 사용해서 pandas dataframe으로 만든다음 parquet로 쓴다. https://www.mikulskibartosz.name/how-to-write-parquet-file-in-python/를 참고한다. pandas + pyarrow조합으로 아래처럼 리스트로 잘 구성해준다. import pandas as pd import pyarrow as pa import pyarrow.parquet as pq column_name = ['title', 'age', 'name', 'score'] datalist = [[..

data engineering 2021.06.09

leetcode 1261(트리), 1395(구현), 1829(bit 비트연산)

1261 https://leetcode.com/problems/find-elements-in-a-contaminated-binary-tree/ 이진 트리가 오염되었다고 하고 복구를 먼저 한다음 풀라고 한다. 복구 조건은 루트값은 0부터 시작해서 왼쪽 자식은 2*부모+1, 오른쪽 자식은 2*부모+2이다. init에서 복구하면서 존재하는 값들을 set에 넣어준 후 계산하면된다. class FindElements(object): def __init__(self, root): self.val_set = set() queue = [[root, 0]] while queue: n, n_value = queue.pop(0) n.val = n_value self.val_set.add(n_value) if n.left: ..

spark udf

udf(user define function) in spark 초간단히 정리... parquet에서 데이터를 읽어서 dataframe으로 작업을 좀 하고 mysql로 insert를 하는데 필드에 이모찌가 포함이 되어있어서 mysql로 삽입중 에러가 난다. dataframe의 특정 필드에서 이모찌를 지우는 작업을 해주었다. pyspark. https://spark.apache.org/docs/3.0.1/api/python/pyspark.sql.html#pyspark.sql.functions.udf from pyspark.sql import SparkSession from pyspark.sql.functions import udf from pyspark.sql.types import StringType im..

data engineering 2021.05.30

leetcode 1845(heap), 1557(graph, dfs), 1325(트리)

1845 https://leetcode.com/problems/seat-reservation-manager/ 예약, 예약취소기능을 구현해야하고, 예약시에는 현재 예약가능한 좌석중에서 가장 번호가 낮은 좌석을 예약한다. == 현재 리스트에서 항상 가장 번호가 낮은 좌석을 가져오려면 힙을 만들어서 pop만 해주면 된다. 예약취소시에 새로 들어온 숫자까지 합쳐서 매번 정렬하는것보다는 힙으로 구현하는게 더 시간복잡도가 적게걸린다. import heapq class SeatManager(object): def __init__(self, n): self.available_seat = range(1, n+1) heapq.heapify(self.available_seat) def reserve(self): return..

spark + scala mongo to mysql 8+(jdbc), 궁금증 몇개

공부용으로 간단하게 만든 코드를 올린다. 코드를 짜면서 어떤걸 유의해야 되었는지 등을 더 적는다. 제목 그대로 몽고에서 데이터를 가져와서 가공했다고 가정하고, jdbc를 사용해서 mysql(8+버전)으로 삽입하는 예제이다. scalaApp2.scala import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SaveMode import java.util.Properties object scalaApp2 { def main(args: Array[String]): Unit = { val spark = SparkSession.builder.appName("get data").getOrCreate() val (mongo_user, mongo_pa..

data engineering 2021.05.17

spark + scala 세팅.

spark를 공부하면서 scala라는 언어 자체를 깊게 파지는 않더라도 알아둘 필요가 있어서 정리해놓는다. 내 기준으로 이해하기 쉽게 정리해놓은거라서 다른 사람들은 환경이 다를 수 있음. 환경은 spark 3.0.1 + scala 2.12로 하려고 한다. 사실 저번글 qkqhxla1.tistory.com/1143?category=698045 에서 스파크 스트리밍을 가볍게 만지면서 pyspark로 가벼운 예제 코드도 아래에 더 추가하려고 삽질했었다. 그런데 글 끝에 적었듯이 스파크 스트리밍은 아직 pyspark가 지원 안하는게 많다. 예로 mongo로 스트리밍으로 쓰는것도 scala만 지원한다. 데이터 처리를 진짜 끝까지 하려면 스칼라가 그래도 필요하다는 생각에 공부겸 + 업무와 연관해서 조금씩 정리하려고..

data engineering 2021.05.08

spark streaming(dstream, structured streaming) 정리 + 삽질

spark streaming 에 대해 읽고 간단히 정리한다. spark streaming에는 dstream과 structured streaming의 두가지가 있다. 1. dstream (spark.apache.org/docs/latest/streaming-programming-guide.html#input-dstreams-and-receivers) dstream의 input으로는 파일스트림, 소켓 연결, 카프카로의 입력이 기본적으로 제공된다. custom receiver를 구현할 수도 있는데 scala나 java를 사용해 구현해야 한다. spark.apache.org/docs/latest/streaming-custom-receivers.html 흐름을 파악하기 위해 소켓 연결 워드카운트 예제를 가볍게 정..

data engineering 2021.05.06

kafka관련 좋은글 링크

글 대부분을 유용한 팁을 적어주시고, 필요한 것 위주로 이해하기 쉽게 적어주심. 몇년전 글이지만 여러번 읽어볼것 같다. 카프카 운영자가 말하는 처음 접하는 카프카 : www.popit.kr/kafka-%EC%9A%B4%EC%98%81%EC%9E%90%EA%B0%80-%EB%A7%90%ED%95%98%EB%8A%94-%EC%B2%98%EC%9D%8C-%EC%A0%91%ED%95%98%EB%8A%94-kafka/ 카프카 운영자가 말하는 카프카 컨슈밍 그룹 : www.popit.kr/kafka-consumer-group/ 카프카 설치시 가장 중요한 config : www.popit.kr/%ec%b9%b4%ed%94%84%ec%b9%b4-%ec%84%a4%ec%b9%98-%ec%8b%9c-%ea%b0%80%ec..

data engineering 2021.05.03

leetcode 110(이진 트리), 1079(백트래킹, subset), 1286(백트래킹), 1641(dp)

110 leetcode.com/problems/balanced-binary-tree/ 문제를 이해하는데 시간이 좀 걸렸다. 모든 노드에서 각각의 서브트리의 높이 차가 1 초과로 나면 안된다. 1까지의 높이차이는 balanced되었다고 판별한다. 각각의 서브트리를 판별해야 하므로 재귀적으로 풀어야 한다. class Solution(object): def isBalanced(self, root): if not root: return True def check(root): if not root: return 0 l = check(root.left) r = check(root.right) if l == -1 or r == -1 or abs(l - r) > 1: # 좌,우 자식의 높이차가 1 초과로 나면 -1을 ..

Spark Architecture, Job exectution flow

www.edureka.co/blog/spark-architecture/#:~:text=Scala%20and%20Python.-,Spark%20Architecture%20Overview,Resilient%20Distributed%20Dataset%20(RDD) 에서 필요한 정보만 가져다가 정리합니다. 바로 앞 글처럼 스파크가 어떻게 동작하는지는 알아두고싶어서 정리합니다. Spark Architecture Overview 아파치 스파크는 모든 컴포넌트와 레이어가 느슨하게 결합된, 구조적으로 잘 정의된 아키텍쳐이며, 다양한 라이브러리가 통합되어 있습니다. 스파크 아키텍쳐는 두가지 개념을 기반으로 만들어졌습니다. Resilient Distributed Dataset(RDD) Directed Acyclic Gra..

data engineering 2021.04.17

Hive Architecture, Job exectution flow

www.guru99.com/introduction-hive.html 에서 가져옴. 자주 사용하는 하이브(apache hive)가 어떻게 동작하는지는 알아두고 싶어서 필요한 부분만 번역해놓습니다. Hive Architecture. 하이브는 3개의 코어 파트를 유지합니다. 1. Hive Clients 2. Hive Services 3. Hive Storage and Computing Hive Clients 하이브는 여러 타입의 어플리케이션들과 통신하기 위해 여러가지의 드라이버를 제공합니다. 예로 Thrift기반의 어플리케이션은 Thrift 클라이언트를 통신에 사용합니다. 자바 기반의 어플리케이션은 JDBC 드라이버를 사용할 것이고, 그 외의 다른 타입은 전부 ODBC 드라이버를 사용합니다. 이 클라이언트와 ..

data engineering 2021.04.11

spark installation using helm, spark-submit docker

앞의 글에서 계속 언급을 했는데 우리 팀 자체적으로 쿠버네티스 서버가 있다. 근데 쿠버네티스는 순수하게 cli로만 사용하면 러닝커브가 커서 rancher라는 gui를 설치해서 쓴다고 언급했었다. App설치도 간편하다. qkqhxla1.tistory.com/1105 에서 언급한것처럼 helm chart를 추가해주고(bitnami만 추가해주면 왠만해서 다 있다..) qkqhxla1.tistory.com/1133 에서 언급했듯이 클릭만 하면 airflow같은 앱이 간단하게 만들어진다. 그런데 spark chart현 최신버전인 5.4.0을 설치해보면 제대로 되지 않았다. 앱을 지우고 재설치해도 계속 뜨고 버전을 조금 낮춰봐도 에러가 뜬다. 구글링을 몇일동안 죽어라 해도 원인을 찾을수 없어서 포기했다. chart..

data engineering 2021.04.06

leetcode 763(구현), 1551(구현), 1305(트리), 1035(dp, lcs)

763 leetcode.com/problems/partition-labels/ 글자들이 한묶음씩 묶여있다. 이걸 최대한 나눠줘야 한다. 모든 단어의 가장 오른쪽 인덱스를 저장해놓고, 하나씩 반복하면서 '여태까지 나온 글자중에 가장 오른쪽에 있는 글자' 가 현재 위치이면 이 인덱스가 하나의 파티션의 끝이 되므로, 이것과 현재 인덱스가 같으면 인덱스를 계산해서 넣어준다. class Solution(object): def partitionLabels(self, S): ret = [0] index_dict = {c:i for i,c in enumerate(S)} max_last_index = 0 for i, c in enumerate(S): max_last_index = max(max_last_index, in..

parquet vs orc vs avro (big data file format )

1. www.datanami.com/2018/05/16/big-data-file-formats-demystified/ 공통점. 3개 타입은 전부 하둡에 저장하는데에 최적화되어있다. orc, parquet, avro 3개 전부 기계가 읽을수 있는 바이너리 포맷이다. orc, parquet, avro는 여러개의 디스크로 나뉘어질수 있으며 이 특징으로 인해 확장성과 동시처리가 가능해진다. 반면에 json이나 xml은 나뉘어서 저장이 불가능하며, 그로 인해 확장성과 동시성에 제한이 있다. 3개 포맷 모두 스키마를 파일이 가지고 있어서 다른곳으로 옮겨서 처리하는게 가능하다. 차이점. orc, avro, parquet의 가장 큰 차이점은 데이터를 어떻게 저장하느냐이다. parquet과 orc는 데이터를 column..

data engineering 2021.03.25

leetcode 605(구현), 1302(트리), 56,57(구현, 그리디같은거), 654(트리, 재귀)

605 leetcode.com/problems/can-place-flowers/ 꽃을 심는데 반드시 꽃 사이사이에 한칸씩 공간이 있어야 한다. class Solution(object): def canPlaceFlowers(self, fb, n): i = 0 while i 0: if i == len(fb) -1 and fb[i-1] == 0 and fb[i] == 0: fb[i] = 1 n -= 1 break elif i < len(fb) -1 and fb[i] == 0 and fb[i+1] == 0: fb[i] = 1 n -= 1 i += 2 elif i < len(fb) -1 and fb[i] == 1 and fb[i+1] == 0: i += 2 else: i += 1 ..

leetcode 1769(prefix sum같은거), 1614, 1290(구현), 1315(트리)

1769 leetcode.com/problems/minimum-number-of-operations-to-move-all-balls-to-each-box/ boxes배열이 있고, 1이면 공이 있고 0이면 공이 없다. i번째 배열에 모든 공을 모으려면 몇번 움직여야 하는지 합을 리턴하면 된다. 근데 당연히 무식하게 O(n^2)으로 하는게 의도가 아니라 O(n)으로 가능하다. 좋은 해설 : leetcode.com/problems/minimum-number-of-operations-to-move-all-balls-to-each-box/discuss/1075895/Easy-Python-beats-100-time-and-space class Solution(object): def minOperations(self..

elastic kibana basic visualization

kibana에 대해서는 qkqhxla1.tistory.com/1024 에서 정리를 가볍게 했었는데... 저 글에서도 적었지만 겨우 한 글에 너무 전반적인 내용을 다 적으려고 하다가 이도 저도 아닌 글이 되버렸다. 이번에는 실제로 사용하는 데이터를 이용하여 가장 헷갈리는 부분인 키바나 gui를 이용해 시각화하는 부분만 다룰 예정이다.키바나 gui를 사용해서도 만들 수 있는 그래프가 상당히 많은데, 그중에 많이 사용하는, 간단한 패턴만 몇개 적는다. 다른 복잡한 작업들도 할수 있을건데 내 입장에서는 대충 아래 패턴정도로 키바나 파악이 끝나면 대충 이리저리 만져서 만들어낼 수 있다고 생각한다. '이렇게 넣으면 이렇게 그려지겠지' 감을 잡는게 가장 어려웠다. 1. 실시간 데이터의 카운트를 그래프로 모니터링하는 ..

data engineering 2021.03.03

leetcode 509, 1137(피보나치), 746(계단, dp), 698(dfs)

509 leetcode.com/problems/fibonacci-number/ 알고리즘 공부 시작하는 처음에 가장 먼저 배우는 쉬운거 class Solution(object): def fib(self, n): if n == 0: return 0 f = [0] * (n+1) f[1] = 1 for i in xrange(2, n + 1): f[i] = f[i-1] + f[i-2] return f[n] 복습용으로 읽어보는게 더 좋다 : leetcode.com/problems/fibonacci-number/discuss/215992/Java-Solutions 1137 leetcode.com/problems/n-th-tribonacci-number/ 위랑 똑같음. class Solution(object): def..

leetcode 1041(구현), 96(이진트리 dp)

1041 https://leetcode.com/problems/robot-bounded-in-circle/ 명령어들을 반복했을때 로봇의 이동경로가 서클을 이루는지를 판별하는 문제이다. 오랫만에 푸느라 그런지 어떻게 풀어야 할지 감이 안잡혔는데... discussion을 보니 1. 명령을 다 실행하고 0,0으로 되돌아오면 서클을 이룸. 2. 또는 명령을 다 실행했는데 이동 방향이 북쪽이 아니면 명령을 반복하면 다시 처음으로 되돌아옴. 이라고 한다. 2번을 떠올리려면 직관력이 뛰어놔야 할듯 싶다. 명령이 다 끝났는데도 0,0이 아니고 이동 방향이 북쪽이면 위로만 계속 올라가게되니 아닌거고, 다 끝나고 방향이 왼쪽이면 시계 반대방향으로 돌면서 0,0으로 돌아오고, 오른쪽이면 시계 방향으로 돌면서 0,0으로 오..