Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
Tags
- git 기본명령어
- 오라클 데이터 처리방식
- 데이터 분석
- Oracle ASSM
- airflow 정리
- 리눅스 환경변수
- Oracle 논리적 저장 구조
- eda
- 네트워크
- BFS
- 추천시스템
- 랜덤포레스트
- 데이터분석
- Python
- 앙상블
- Collaborative filtering
- CF
- Decision Tree
- Linux
- 알고리즘
- SQL
- enq: FB - contention
- git stash
- Spark Data Read
- git init
- 의사결정나무
- Spark jdbc parallel read
- Spark 튜닝
- 통계분석
- 배깅
Archives
- Today
- Total
[Alex] 데이터 장인의 블로그
[Airflow] DAGs 사용 방법 정리 본문
DAGs
airflow에서는 워크플로우를 DAG(Directed Acyclic Graph)로 관리.
DAGs : 비순환, 방향성을 가지고 있는 그래프를 뜻함.
Airflow 웹서버에서 DAGs Task 확인.
DAGs - Operator의 모음
DAGs는 Operator(Task) 의 모음이다.
개별 Task 상태 값을 확인하여 재실행, Failed 마킹, Success 마킹 등의 여러가지 개별 작업을 수행할 수 있다.
DAGs 작성방법 (주의점)
Scope
파이썬 파일 내에 DAG는 전역 스코프에 존재해야 함.
dag_1 = DAG('this_dag_will_be_discovered')
def my_function():
dag_2 = DAG('this_dag_will_not')
my_function()
위와 같이 작성하게 되면 dag_1만 airflow에 로드. dag2는 로드되지 않음.
Default arguments
파라미터로 default_arg를 입력하여 operator에 적용.
default_args = {
'start_date' : datetime(2016, 1, 1),
'owner' : 'airflow'
}
dag = DAG(
dag_id='example_dag_tag',
default_args=default_args,
schedule_interval='0 0 * * *',
tags=['example'] # airflow 1.10.8, 1.10.9 버전부터 tag 기능 생김.
)
op = DummyOperator(task_id='dummy', dag=dag)
print(op.owner) # Airflow
Operators
DAG는 Operator의 모음(전후 관계 설정)
즉, airflow의 작업은 [ 1. DAG 객체 생성 -> 2. Operator 활용한 Task 작성 -> 3. Task를 연결 ] 이뤄져 있음.
Airflows의 Operator 종류
- BashOperator : bash 명령 실행
- PythonOperator : 임의의 python function 호출
- EmailOperator : Email 전송
- SimpleHttpOperator : HTTP request 전송
- MySqlOperator, SqliteOperator, PostgresOperator, MsSqlOperator, OracleOperator, JdbcOperator, etc. : SQL 명령을 실행
- Sensor : 특정 시간, 파일 db row를 Poll (wait)하는 오퍼레이터
반응형
'Airflow' 카테고리의 다른 글
[Airflow] Why Airflow? (0) | 2022.06.11 |
---|
Comments