[Alex] 데이터 장인의 블로그

[Spark] 성능 튜닝(1) - Data Ingestion (Feat. JDBC Parallel Read) 본문

Hadoop & Spark

[Spark] 성능 튜닝(1) - Data Ingestion (Feat. JDBC Parallel Read)

Alex, Yoon 2023. 1. 5. 13:53

Data Ingestion 단계에서 튜닝이 필요한 이유 .

내가 겪어본 케이스 중에서는 Data Ingestion 단계에서부터 튜닝이 필요한 경우는 다음과 같다. 

 

1. Spark Job의 Task 중 데이터를 메모리에 올리는 시간이 생각보다 오래 걸린다. (비용, 쉬는 자원 발생)

때문에 병렬로 데이터를 한꺼번에 빠르게 가져와야 구동 시간을 그만큼 줄일 수 있다. 

 

2. Data Ingestion 를 수행하는 task가 하나의 executor에 몰려서 GC Time (정확히 꼽자면 Major GC가 극악으로 발생하여 어마어마한 Job 실행시간을 늘려버리는 경우가 발생한다.) 이 케이스는 간헐적으로 발생하는 케이스였어서 배치로 실행시킨 spark job 이 이런 경우를 보였을때는 진짜 멘붕 그 자체 였다... 

 

하나의 executor 에 data ingestion 이 동시에 발생하여 발생하는 지연 문제에도 parallel read 문제가 효과적으로 도움을 준다. task를 나눠서 read 하기 때문에 메모리를 나눠서 부담하는 효과를 주기 때문이다. 

Big Data를 메모리에 올리는데 시간이 오래걸리는 CASE 

stage id : 12 

아래 예시(stage : 12)는 하나의 task(JDBC read) 작업이 약 8분째 시간을 점유하고 있어 그만큼의 JOB 구동 시간이 지체 되고 있는 것을 확인할 수 있다. (총 5개의 테이블을 가져오는 task가 존재하지만 4개 task는 수행 완료, 가장 무거운 12 stage의 수행만 기다리고 있다.) 

 

Data 를 전부 메모리에 가져오기 전까지 그 다음 task를 진행시키지 않고 기다리기 때문에 그만큼 비용 손해가 발생한다. 

나머지 시간동안 executor의 core/thread 가 놀고있다거나... 쉬고 있다거나..

이런 경우 JDBC READ 의 경우에는 Parallel 로 데이터를 읽어와 시간 단축을 노릴 수 있다. 

How to Read JDBC in Parallel

partitionColumn : 파티션을 나눌 컬럼 (Numeric, Date Type : Only numeric (integer or decimal), date or timestamp type.)

lowerBound, upperBound : partitionColumn의 최소, 최대값 

numPartitions : 파티션을 몇개로 나눌건지. 경험적으로 partitionColumn의 nunique 값으로 설정하는 것이 균등하게 데이터를 가져가는 방법이었다.  

fetchsize : 패치 사이즈, 몇 줄씩 데이터를 가져올건지에 대한 내용. 이 값을 default 값으로 사용하거나 너무 낮은 경우는 parallel 시간이 안 줄어드는 경우도 있다. 적당히 조절해서 사용해야 한다. 

df = spark.read.format("jdbc") \
    .option("driver","com.mysql.jdbc.Driver") \
    .option("url", "[MYSQL ADDRESS]") \
    .option("dbtable", qry) \
    .option("fetchsize", 1000)\
    .option("partitionColumn", '[COL]').option("lowerBound", 0).option("upperBound", 10).option("numPartitions", 10)\
    .option("user", "").option("password", "") \
    .load()

 

위 케이스의 경우에는 partitionColumn으로 지정한 컬럼의 구성이 [0,1,2,3,4 ..... 9]까지 이었기 때문에 이 값들 중 최소값(lowerBound)을 0으로, 최대값 (upperBound)을 9로 지정하였다. 때문에 최종 파티션 수인 numPartitions 을 10개로 지정하였다. 

 

이렇게 수행한 경우 기존 JOB (numPartitions 5개 기준) 

JOB 수행 시간이 15분 -> 8분으로 줄었다. 

파티션 수(10개)가 너무 부담스러운 경우

파티션 10개는 task 10개를 사용한다는 것과 동일한 작업이므로 부담스러울 수 있다. 이런 경우에는 numPartitions을 5개로 임의 지정하여 그대로 data ingestion을 수행하면 된다. 하지만, 경험적으로 봤을 땐 lowerBound ~ upperBound 의 유니크 값의 수와 numPartitions 이 정확히 일치하는 경우가 정확히 parallel하게 데이터를 읽어와 속도 향상에 도움을 준다.  때문에 데이터를 불러오는 쿼리에서부터 전처리하여 10개의 값을 5개로 줄여주는 것이 좋다. numPartitions%5 -> 10개인 값을 5개로 줄여서 가져옴. 이 내용도 상황에 따라서 달라질 수 있기 때문에 실행계획, spark UI 를 확인한 후 조정 필요. 

파티션 컬럼이 마땅치 않을 때.

파티션을 정확히 나눌 수 있는 컬럼이 있다면 좋겠지만, 그렇지 않은 경우가 많을 수 있다. 나 또한 Key로 잡은 ID가 String 타입이라 난감했다. 하지만 이런 경우에도 방법은 있다. Partition을 나누어 줄 수 있도록 임의로 컬럼을 만드는 것이다. BLOCK 정보를 Int로 변환하여 사용하거나, 파티션 구성이 되어있는 컬럼을 숫자로 변환(HASH, CASE 문)하여 사용하는 것이다. 

-- KEY가 NUMBER로 구성되어 있을 때, 예시 
SELECT  CAST(RIGHT(NVR_PROD_ID, 1) as unsigned) partitionColumn 
		, A.*
  FROM  TABLE A
 GROUP
 	BY  CAST(RIGHT(NVR_PROD_ID, 1) as unsigned)
    
-- KEY가 마땅히 없을 때 
SELECT  A.R%10 partitionColumn 
		, A.* 
  FROM  ( 
		select @rownum := @rownum + 1 R,
		       T.*
		from LST_NAVER_LOWEST_PRICE_ORGI T,
		(select @rownum := 0) rownum
		) A

 

Read JDBC in Parallel CASE 주의할 점 

경우에 따라 다르겠지만, Parallel Read 대상이 되는 테이블에 Cache 나 Broadcast 조인을 수행하게 된다면, 나눠진 파티션 수 만큼 Storage Memory를 점유하게 된다. 만약 내가 8개 파티션으로 데이터를 Read 한다고 가정하면, 150MB의 broadcast 테이블이 모든 8개의 파티션 메모리에 저장되어 처리된다. 이런 경우로 점유 메모리로 인한 Major GC가 발생할 수도 있다. (오히려 총 구동시간이 늘어나는 최악의 상황도 경험할 수 있다.) 때문에 뭐든 풀로 파티션 줘서 땡겨야지! 이런 생각보다는 Spark UI 에서 구동되는 방식들을 확인한 후 파티션 수, 방식을 고려하는 것을 추천한다. 

 

GC option으로 G1GC를 사용하는 것도 추천.(기본으로 parallel GC가 채택되어 있을 확률이 높다) 어느 정도 메모리(GC TIME) 때문에 발생하는 정체 현상을 한방에 해결해 준다. 앗, 다만 내가 메모리를 최적으로 설정했다는 가정하에만 사용하길.. G1GC에 대한 내용은 다음 챕터에서 정리해보도록 하겠다. 

참고

https://luminousmen.com/post/spark-tips-optimizing-jdbc-data-source-reads

https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html

반응형
Comments