ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • airflow의 새로운 경쟁자 오케스트레이션 플랫폼 dagster
    Programming/python 2021. 12. 23. 23:42

    orchestration platform을 강조하고 있다.

     

    Open Source Data Stack Conference

    아래 내용은 위 동영상의 내용을 번안하고 정리한 내용입니다.

    위 사진의 기술들은 다양한 방식들로 서로 의존성이 존재한다. 예를들어 스노우플로우나 다른 데이터 웨어하우스에서 데이터가 만들어지기 전까지 dbt를 실행할 수 없다. 그리고 dbt가 그것의 모델을 업데이트하기 전까지 Grouparoo같은 리버스 ETL은 새로운 정보를 적재할 수 없다.

    이런 의존성들간에 Ordering과 Excute를 조율하는 것이 오케스트레이션의 목적이다.

    왜 dagster여야 하는가?


    전통적인 오케스트레이션은 스케줄링과 실행을 통합하는 시스템과 가시적이고 알림을 운영할 수 있어야 한다. 여기에 더해져서 덱스터는

    • 유용한 metadata를 생성하는 과정에서 데이터가 어디를 통과하는지 충분히 수치화해서 보여준다.
    • 오케이트레이션 레이어에서 생산성을 늘리기 위해서 덱스터가 코드를 실행하기 위해 편리하게 만들어진 프로덕션 레벨의 데이터베이스를 건들지 않는 api로 테스트를 할 수 있다.
    • 파이프라인을 디버그하고 작업하는 ui가 쉽고 재밌다.
    • 중앙 집중형으로 시스템들을 기록할 수 있다.

    dagster의 기본적인 개념


    • Dagster에서 Job들은 solid로 표현되며 workflow는 pieline으로 구현
      • 최근 버전에서 solid가 op로 pipeline이 job으로 명칭이 변경되면서 하나의 낱개 task들은 op로 이어놓는 workflow들은 job으로 구현한다.
    • 각각 op와 job은 데코레이터로 정의된다.
    • 하나의 스크립트에 다수의 pipeline을 포함할 수 있으며 이를 통해 여러 workflow를 구현 가능하다→이런 스크립트 하나를 repository라 정의
    • 각각의 op는 별도의 input과 output을 가질 수 있으며, solid간 변수 상속이 가능하다.

     

    airflow와 비교 및 차별점


    • dagster는 오케스트레이션을 표방하고 airflow는 monitor workflow를 표방한다.
    • airflow는 추상화된 클래스 객체를 오버라이딩하거나 상속받아서 dag를 작성하는데 사용하지만 dagster는 데코레이터를 사용해서 함수를 기반으로 job을 작성한다.
    • airflow는 dag의 워크플로우를 구성할 때 >> 를 통해 방향성을 결정하지만 dagster는 정의해놓은 함수에 인자를 태움으로써 방향성을 결정한다.
    • airflow에서는 task간 변수 상속이 x-com을 통해서 가능하긴 하지만 메모리 등 어느정도 제약점이 존재하지만 dagster는 op간 변수 상속이 용이한 점
    • airflow에서는 비교적 힘들게 UTC기준으로 된 타임존을 변경했었는데 dagster에서는 timezone을 지원해서 쉽게 데코레이터의 파라미터로 타임존을 입력하면 쉽게 스케줄링을 걸수있다.

    DEMO


    dagster는 python 3.6이상의 환경에서 원활히 활용할 수 있으며 3.8, 3.7, 3.6에서 정상적으로 작동되는 것이 확인됨

    pip install dagster dagit

    python pip를 통해서 설치

    • Dagster: 싱글 노드, 싱글 프로세스와 멀티 프로세스 실행 엔진, 엔진들을 실행하기 위한 CLI 툴
    • Dagit: DAG browser를 포함한 개발, jobs 운영 UI. config editor, 라이브 실행 인터페이스

     

    Single-Op Job

    op는 job의 한 부분이다. 일반적으로 op들은 @op decorator와 함께 python function을 어노테이팅하는 걸로 정의할 수 있다.

    아래 op는 CSV를 다운로드하고 CSV의 row들과 로그들의 row들을 딕셔너리 리스트로 불러서 읽는 작업을 한다.

    import requests
    import csv
    from dagster import job, op, get_dagster_logger
    
    
    @op
    def hello_cereal():
        response = requests.get("https://docs.dagster.io/assets/cereal.csv")
        lines = response.text.split("\n")
        cereals = [row for row in csv.DictReader(lines)]
        get_dagster_logger().info(f"Found {len(cereals)} cereals")
    
        return cereal
    
    @job
    def hello_cereal_job():
        hello_cereal()
    dagit -f 파일명.py

    콘솔에서 위 명령어를 통해서 job을 비주얼라이즈할 수 있다.

    dagster job execute -f 파일명.py

    위 명령어로 이벤트를 스트림하게 처리하며 내부적인 로그를 확인할 수 있다.

    Complex DAG

    import csv
    
    import requests
    from dagster import get_dagster_logger, job, op
    
    
    @op
    def download_cereals():
        response = requests.get("https://docs.dagster.io/assets/cereal.csv")
        lines = response.text.split("\n")
        return [row for row in csv.DictReader(lines)]
    
    
    @op
    def find_highest_calorie_cereal(cereals):
        sorted_cereals = list(
            sorted(cereals, key=lambda cereal: cereal["calories"])
        )
        return sorted_cereals[-1]["name"]
    
    
    @op
    def find_highest_protein_cereal(cereals):
        sorted_cereals = list(
            sorted(cereals, key=lambda cereal: cereal["protein"])
        )
        return sorted_cereals[-1]["name"]
    
    
    @op
    def display_results(most_calories, most_protein):
        logger = get_dagster_logger()
        logger.info(f"Most caloric cereal: {most_calories}")
        logger.info(f"Most protein-rich cereal: {most_protein}")
    
    
    @job
    def diamond():
        cereals = download_cereals()
        display_results(
            most_calories=find_highest_calorie_cereal(cereals),
            most_protein=find_highest_protein_cereal(cereals),
        )

    • 순차적이지 않은 다이아몬드형태의 복잡한 DAG구성의 예제
    • 이 예제를 실행하면 donwload_cereals가 가장 먼저 실행되고 뒤이어서 find_highest_calorie_cereal과 find_highest_protein_cereal이 동시에 실행된고 display_result가 마지막으로 실행된다

     

    dag+ster


    dagster란 네이밍에서도 알 수 있듯이 dag와 dag를 실행시킨다는 접미사를 붙인다는 네이밍으로 합쳐져서 덱스터라는 이름이 등장한 것 같다.

    프로덕션 레벨에서는 위와 같이 튜토리얼 처럼 cli로 실행하는게 아니라 yaml파일로 제공하는 데몬을 띄워서 웹서버와 스케줄러 등을 msa로 나누어서 실행하고 레포지토리들을 storage에서 불러와서 실행하게 된다.

    단순히 컨퍼런스의 데모영상을 보고 직접 튜토리얼을 진행하면서 느낀점은 airflow에 비해서 UI가 깔끔하고 리얼타임으로 진행상황을 간트차트로 확인할 수 있다는 정도, 나머지는 좀 불편하긴 하지만 airflow에서도 다 구현이 가능한 부분이라서 굳이...?라는 생각이 든다.

    함수를 기반으로 dag의 종속관계를 짜야한다는 것도 조금 한 몫하는 것 같다. 다만 dbt나 멜타노 같은 환경들을 같이 사용한다면 좀더 시너지있고 통합적으로 활용 가능성이 많다는 점은 앞으로 두고봐야할 것

    댓글

Copyright 2023. 은유 All rights reserved.