요약
- Airflow 오퍼레이터로 외부 시스템에서 태스크 수행
- 오퍼레이터를 구현하여 A-to-B 작업(시스템 간 데이터 이동) 구현
- 외부 시스템에 연결하는 태스크 테스트
1. 클라우드 서비스에 연결하기
에어플로우에서는 클라우드 서비스에 요청을 보낼 수 있도록 오퍼레이터를 제공함.
- 내부적으로 클라우드의 sdk를 사용해서 호출
1.1 추가 의존성 패키지 설치하기
서비스 연결을 위해서는 패키지 설치가 필요함.
- aws
pip install apache-airflow-providers-amazon
- gcp
pip install apache-airflow-providers-google
- Azure
pip install apache-airflow-providers-microsoft-azure
- 이외 다른 외부 서비스도 해당(https://airflow.apache.org/docs)
1.2 머신러닝 모델 개발하기
- Airflow 워크플로우는 일반적으로 모델의 오프라인 학습을 처리
- 오프라인 학습: 모델을 학습시킨 뒤 더 이상의 학습 없이 제품 시스템에 적용하는 것.
- 온라인 학습 : 온라인 학습: 학습된 모델이 제품 시스템에 적용된 상태에서도 계속해서 모델을 학습할 수 있는 것.
예제:
여러 AWS 오퍼레이터를 이용해 손글씨 숫자 분류 모델을 학습
- 샘플 데이터를 s3 버킷에 복사 (
S3CopyObjectOperator
) - 모델 입력 형식으로 데이터 변환 (직접 로직 구현하여 PythonOperator로 호출)
- binary 데이터 다운 & RecordIO 레코드 포맷으로 변환 & S3 upload
- 모델 학습 (
SageMakerTrainingOperator
)- config에 SageMaker를 사용하는데 필요한 설정값들을 넣어서 사용가능
- 모델 배포 (
SageMakerEndpointOperator
)
- 스케줄링을 통해 주기적으로 새롭게 훈련된 모델을 배포할 수 있음.
1.3 외부 시스템을 사용하여 로컬에서 개발하기
airflow task test
통해서 단일 태스크 실행 가능
예제: 로컬에서 각 테스크 실행.
- AWS 오퍼레이터 - 인증을 위한 환경변수 등 config 설정
- SageMaker 오퍼레이터 - config설정 및 유의사항
- 모델을 재학습하기 위해서는 AWS 계정과 리전 내에서 고유해야함
→ execution_date를 이용해 고유 이름을 부여할 수 있음. - AWS 작업 완료 여부를 알 수 없음.
→ wait_for_completion=True, check_interval 설정
- 모델을 재학습하기 위해서는 AWS 계정과 리전 내에서 고유해야함
- 엔드포인트 서비스 제공을 위한 API는 파이프라인과 별도로 구성
- 한번만 배포하면 되기 때문에 파이프라인에 포함할 필요가 없음.
2. 시스템 간 데이터 이동하기
- Airflow는 다양한 시스템 간 데이터 이동 작업을 관리
예제: Airbnb의 데이터 ETL
- DB에 쿼리를 보내 얻은 결과를 서버에 저장
SqlToGoogleCloudStorageOperator
,SFTPToS3Operator
,SimpleHttpOperator
- 없는 경우 직접 구현 - “PostgresToS3Operator”
- Postgres에 쿼리 실행
- csv파일로 변환
- s3에 csv파일 업로드
2.2 큰 작업을 외부에서 수행하기
Airflow, a Task Orchestration System? also a Task Execution System?
“큰 작업을 수행해야하는 경우 Airflow의 오케스트레이션과 실행이 분리되어있어야한다.”
(= Airflow는 테스크 트리거 시스템으로서 사용되어야한다)
'데이터 엔지니어링' 카테고리의 다른 글
pandas 통해서 csv 파일로 mysql에 테이블 생성하기 (0) | 2022.08.18 |
---|---|
[빅데이터를 지탱하는 기술] 1장 빅데이터 기초 지식과 Druid (0) | 2022.05.20 |