J.Y.S
¿ whats my interest ?
J.Y.S
전체 방문자
오늘
어제
  • 분류 전체보기 (59)
    • 트러블슈팅 기록 (7)
    • Java&Kotlin (5)
    • Server (22)
    • 데이터 엔지니어링 (3)
    • Architecture& Design Patter.. (1)
    • Daily (11)
    • 알고리즘 공부 (9)

블로그 메뉴

  • 홈
  • 태그

공지사항

인기 글

최근 댓글

최근 글

티스토리

hELLO · Designed By 정상우.
J.Y.S

¿ whats my interest ?

Airflow: 외부 시스템과 통신 | 시스템 간 데이터 이동
데이터 엔지니어링

Airflow: 외부 시스템과 통신 | 시스템 간 데이터 이동

2022. 9. 7. 01:44

요약

  • Airflow 오퍼레이터로 외부 시스템에서 태스크 수행
  • 오퍼레이터를 구현하여 A-to-B 작업(시스템 간 데이터 이동) 구현
  • 외부 시스템에 연결하는 태스크 테스트

1. 클라우드 서비스에 연결하기

에어플로우에서는 클라우드 서비스에 요청을 보낼 수 있도록 오퍼레이터를 제공함.

  • 내부적으로 클라우드의 sdk를 사용해서 호출

1.1 추가 의존성 패키지 설치하기

서비스 연결을 위해서는 패키지 설치가 필요함.

  • aws    pip install apache-airflow-providers-amazon
  • gcp    pip install apache-airflow-providers-google
  • Azure pip install apache-airflow-providers-microsoft-azure
  • 이외 다른 외부 서비스도 해당(https://airflow.apache.org/docs)

1.2 머신러닝 모델 개발하기

  • Airflow 워크플로우는 일반적으로 모델의 오프라인 학습을 처리
    • 오프라인 학습: 모델을 학습시킨 뒤 더 이상의 학습 없이 제품 시스템에 적용하는 것.
    • 온라인 학습 : 온라인 학습: 학습된 모델이 제품 시스템에 적용된 상태에서도 계속해서 모델을 학습할 수 있는 것.

예제:

여러 AWS 오퍼레이터를 이용해 손글씨 숫자 분류 모델을 학습

  1. 샘플 데이터를 s3 버킷에 복사 (S3CopyObjectOperator)
  2. 모델 입력 형식으로 데이터 변환 (직접 로직 구현하여 PythonOperator로 호출)
    • binary 데이터 다운 & RecordIO 레코드 포맷으로 변환 & S3 upload
  3. 모델 학습 (SageMakerTrainingOperator)
    • config에 SageMaker를 사용하는데 필요한 설정값들을 넣어서 사용가능
  4. 모델 배포 (SageMakerEndpointOperator)

  • 스케줄링을 통해 주기적으로 새롭게 훈련된 모델을 배포할 수 있음.

1.3 외부 시스템을 사용하여 로컬에서 개발하기

airflow task test 통해서 단일 태스크 실행 가능

예제: 로컬에서 각 테스크 실행.

  • AWS 오퍼레이터 - 인증을 위한 환경변수 등 config 설정
  • SageMaker 오퍼레이터 - config설정 및 유의사항
    • 모델을 재학습하기 위해서는 AWS 계정과 리전 내에서 고유해야함
      → execution_date를 이용해 고유 이름을 부여할 수 있음.
    • AWS 작업 완료 여부를 알 수 없음.
      → wait_for_completion=True, check_interval 설정
  • 엔드포인트 서비스 제공을 위한 API는 파이프라인과 별도로 구성
    • 한번만 배포하면 되기 때문에 파이프라인에 포함할 필요가 없음.

2. 시스템 간 데이터 이동하기

  • Airflow는 다양한 시스템 간 데이터 이동 작업을 관리

예제: Airbnb의 데이터 ETL

  • DB에 쿼리를 보내 얻은 결과를 서버에 저장
    • SqlToGoogleCloudStorageOperator, SFTPToS3Operator, SimpleHttpOperator
    • 없는 경우 직접 구현 - “PostgresToS3Operator”
      • Postgres에 쿼리 실행
      • csv파일로 변환
      • s3에 csv파일 업로드

2.2 큰 작업을 외부에서 수행하기

Airflow, a Task Orchestration System? also a Task Execution System?

“큰 작업을 수행해야하는 경우 Airflow의 오케스트레이션과 실행이 분리되어있어야한다.”

(= Airflow는 테스크 트리거 시스템으로서 사용되어야한다)

저작자표시 (새창열림)

'데이터 엔지니어링' 카테고리의 다른 글

pandas 통해서 csv 파일로 mysql에 테이블 생성하기  (0) 2022.08.18
[빅데이터를 지탱하는 기술] 1장 빅데이터 기초 지식과 Druid  (0) 2022.05.20
    '데이터 엔지니어링' 카테고리의 다른 글
    • pandas 통해서 csv 파일로 mysql에 테이블 생성하기
    • [빅데이터를 지탱하는 기술] 1장 빅데이터 기초 지식과 Druid
    J.Y.S
    J.Y.S

    티스토리툴바