코드 작성 없이 데이터 파이프라인 운영하기

Che Son
Team QANDA
Published in
8 min readDec 13, 2021

--

서비스가 성장하면 자연스럽게 데이터도 많아지는데요. 늘어나는 데이터들을 잘 수집해서 쓰기 편하게 정리해 두는 것이 데이터 파이프라인의 역할입니다. 데이터 파이프라인 작업들의 생산성을 높이고, 데이터 엔지니어가 아니더라도 누구나 데이터 파이프라인 작업을 추가할 수 있게 만들기 위한 고민과 이뤄낸 개선사항을 공유해보고자 합니다.

데이터 인프라

배치 파이프라인에서 사용하는 데이터 인프라 구성 요소들입니다. AWS ECS 서비스 위에 Spark가 실행이 되고 DW는 GCP BigQuery, 잡 스케쥴러는 GCP Composer(Airflow)를 사용하고 있습니다.

배치 파이프라인 인프라 구성도

ETL 파이프라인

RDB, 서버 로그, 클라이언트 로그 등 다양한 곳에서 데이터가 발생합니다. 이러한 데이터들을 한곳에 모아둘 때 Extract(추출), Transformation(변환), Load(적재)의 일련의 작업들이 필요한데 이를 ETL 파이프라인이라고 합니다.

반복되는 작업

새로운 서비스나 기능이 프로덕트에 배포가 되고 나면 자연스럽게 ETL 작업이 필요합니다. 새로운 테이블이나 로그에 맞춰 Spark 코드와 Airflow DAG 파일이 작성하게 되는데요. 새로운 서비스 배포나 서비스 변화가 자주 일어나는 시기에는 ETL 작업에 많은 시간을 쓰게 됩니다.

추상화

코드 추상화를 잘해두면 재사용이 가능합니다. RDB ETL 작업은 처리 단계에서 3가지로 추상화할 수 있습니다. Stitch 같은 Saas ETL 서비스에서도 3가지 개념을 사용해서 서비스를 제공하고 있습니다. Full Table Replication(전체 복제), Append-Only Incremental Replication(부분 추가 복제), Updated At Incremental Replication(부분 갱신 복제) 입니다. Spark ETL 클래스를 Update, Append, Snapshot으로 정의하고요. Spark job으로 제출할 때 3가지 중 하나를 선택하고 서비스와 테이블 정보만 넘겨 처리할 수 있습니다. AWS ECS 서비스 위에 Spark를 띄워서 사용하고 있습니다. 실제로 Airflow에서 ECSOperator의 파라미터를 정의하는 부분인데요. 서비스, 테이블, ETL 타입 정도만 정의합니다.

ELT

서버 로그와 클라이언트 로그는 BigQuery를 활용하여 ELT로 처리합니다. ELT의 장점은 변환 (Transformation)만 신경 쓰면 됩니다. 통합 로그를 하나의 table로 관리하고 적당한 SQL로 원하는 데이터를 추출해서 대상 테이블에 적재합니다. 통합 로그는 Google Cloud Storage에 두고 external table로 관리 할 수도 있습니다. 클라이언트 로그는 테이블 정의 파일을 관리하고 시스템 로직에서 SQL 만들어서 적재하고 있습니다. 서버 로그는 SQL 파일들을 관리하고 있습니다.

클라이언트 로그 수집 테이블 정의

태스크 자동 생성

DAG 내에 반복되는 각 태스크들을 뽑아서 동적으로 태스크 코드를 생성하게 준비해 둡니다. 테이블 목록 파일을 읽어서 variable에 저장하는 태스크를 Root 태스크로 만들고요. DAG 코드에서 variable을 얻어와서 테이블 단위로 ETL에 필요한 태스크들을 만들어 냅니다. (DAG 탑 레벨에서 variable에 접근하기 때문에 DAG 파싱 시간에 영향을 줍니다. 공식 Github에서 여러 방법들을 제안하고 있고요. 개인적으로는 CI 단계에서 DAG 파일들을 생성해서 배포하는 방법을 추천드립니다.)

좋아진 점

데이터 수집 요청 업무가 당일에 반영이 되면서 분석 업무에 지연이 많이 줄었습니다. ETL 업무에 쓰는 시간을 다른 문제를 고민하는데 사용할 수 있게 되었어요.

마트 파이프라인

마트 테이블이란 자주 접근하거나 중요한 데이터들을 잘 연결하여 만들어둔 테이블입니다. 이렇게 만들어두면 1) 필요한 데이터들이 어느 테이블들에 있는지 찾아보지 않아도 되고 2) 원하는 데이터를 얻기 위해 복잡한 조인 연산을 하지 않아도 되는 장점이 있습니다. 하지만 이렇게 파생된 데이터들은 관리 리소스가 필요합니다. 몇 가지 규칙를 따르고 태스크 생성을 자동화하면 이러한 리소스도 없앨 수 있습니다.

반복적인 작업

데이터 분석가분들이 SQL과 함께 마트 테이블 요청을 하게 되면 데이터 엔지니어가 Airflow에 마트 태스크를 추가합니다. SQL에서 사용하는 소스 테이블들을 보고 디펜던시를 잘 걸어주고 쿼리가 변경될 때 반복적인 작업이 일어납니다.

태스크 생성

SQL 파일과 적재할 마트 테이블이 1:1로 대응되게 사용합니다. SQL 파일명을 가지고 마트 태스크명을 결정하고 SQL 파일 수만큼 마트 태스크들이 만들어집니다. 그리고 마트 DAG에서 태스크들을 동적으로 생성합니다.

디펜던시 생성

SQL 파일을 파싱해서 소스 테이블들만 추출합니다. 어떤 소스 테이블들을 사용하는지 보고 디펜던시 코드를 만들 수 있습니다. 소스 테이블을 부모 태스크로 치환하고 업스트림으로 연결해 줍니다. 없으면 start 태스크(시작 태스크)에 연결합니다. 그리고 자신을 의존하는 태스크가 없으면 done 태스크(종료 태스크)로 연결합니다. root 태스크에서 SQL 폴더를 읽고 이러한 연결 정보들을 만들어서 json 형태로 variable에 저장합니다. ETL과 마찬가지로 DAG 탑 레벨에서 variable를 읽어 DAG를 만듭니다. start 태스크는 원천 데이터 소스(RDB, 서버/클라이언트 로그 등)의 적재 DAG가 모두 완료되었을 때 실행이 됩니다. done 태스크는 마트 태스크들이 모두 완료가 되었을 때 실행이 되고 metric DAG를 트리거 합니다.

완성된 마트 DAG의 모습

CI 점검

추가한 SQL 파일이 마트 태스크로 잘 만들어질 수 있는지 점검이 필요합니다. github actions를 활용해서 점검하고 있습니다.

점검 단계에서는

  1. SQL의 문법이 올바른지와 필수로 지정된 파티션 컬럼을 잊지 않았는지
  2. 순환 참조가 없는(acyclic) 유효한 SQL인지 점검합니다.

BigQuery API를 활용해서 SQL 문법과 파티션 컬럼을 점검합니다. 쿼리 파서 오픈 라이브러리를 활용하는 것도 좋지만 BigQuery API를 사용하는 것이 더 정확합니다. 파티션 컬럼 점검은 SQL에 LIMIT 0을 붙여서 실행하면 스키마 정보만 받아 올 수 있습니다.

DAG(Directed Acyclic Graph)는 방향이 존재하고 순환이 없는 그래프인데요. SQL로 태스크를 만들 때 순환 점검을 같이 해줘야 합니다. Graph 자료구조를 만들고 DFS 알고리즘으로 순환을 판별합니다. 순환이 판별되면 에러를 내고 SQL 파일 배포를 막습니다.

점검 에러가 나면 슬랙으로 알람을 오고 작성자가 문제를 파악하고 sql 파일을 수정해서 다시 제출합니다.

점검 실패시 발송되는 슬랙 메세지
링크를 통해 보여지는 에러 메세지

수동 싱크

마트 태스크 생성 후 수동 싱크를 원할 때가 있는데요.

테이블명을 선언하여 싱크하는 DAG도 제공하고 있습니다.

좋아진 점

데이터 분석가 분들이 데이터 엔지니어의 도움 없이 DAG에 직접 마트 태스크를 추가할 수 있게 되었습니다.

훌륭한 데이터 분석가 분들과 함께 일하는데요.

반응이 너무 좋습니다. 😄

앞으로의 계획

데이터팀이 아니더라도 누구나 쉽게 배치 파이프라인을 만들 수 있으면 좋겠다는 생각입니다. 클릭 몇 번 또는 SQL 작성 만으로 배치 파이프라인을 만들 수 있게 백오피스 서비스를 제공할 계획입니다.

🌏글로벌 Top 교육 앱 QANDA(콴다)를 함께 만들어 갈 Data Engineers, Data Analysts들을 기다리고 있습니다!

🖥️콴다 데이터팀의 문화가 궁금하다면? ➡️ Inside QANDA: 그 많은 데이터를 누가, 어떻게 왜

🕹️콴다 데이터팀은 채용 중 입니다!➡️ 공고 확인하기

--

--