본문 바로가기

Kubernetes

Airflow DAG와 Kubeflow Pipeline에 대한 오해(kfp pipeline 파헤쳐보기)

Kubeflow에 대해 찾아보다, 어떤 블로그에서 이런 내용을 봤다.

 

Airflow DAG와 Kubeflow Pipeline을 비교하면서,  Kubeflow pipeline의 task는 up/downstream의 task를 하나밖에 가지지 못한다는 내용이었다.

 

차이점..?

 

이에 대해 결론만 말하면 사실이 아니다.

 

Kubeflow task도 부모, 자식 task를 여러개 가질 수 있다.

 

이는 Kubeflow 공식문서의 예시만 봐도 알 수 있는 내용이다.

https://www.kubeflow.org/docs/components/pipelines/v1/concepts/graph/

 

Graph

Conceptual overview of graphs in Kubeflow Pipelines

www.kubeflow.org

 

 

Kubeflow Pipelines vs Airflow DAG

그래서 뭔가 이상해서 이렇게 생각해보기도 하였다.

Kubeflow의 한 task는 여러개의 task에 의해 의존될 수 있지만, 그 task 자체는 하나의 task에 대해서만 의존성 설정이 가능하다?

 

이것도 결론만 따지면 아니다.

 

부모 task들이 모두 완료가 되어야 하위에 존재하는 자식 task가 실행될 수 있다.

 


 

검증을 위한 실습을 진행하였고, 환경은 다음과 같다.

 

Container Runtime / docker 24.0.7
WSL2
Ubuntu 20.04.3 LTS
Minikube 1.32.0
Kubectl 1.28.2
Kubeflow 1.8.0
Kubeflow dashboard : 2.6.1
Kustomize 5.2.1
Miniconda (가상환경)

 

일단, 쿠버네티스 클러스터 구축을 미리 해놓지 않은 상태라면, 간단한 실습을 위한 Minikube 환경을 추천한다.

 

(하지만, 가장 좋은 것은 직접 Kubernetes 클러스터를 구축해보는 것이다. (Kubespray, EKS, GKE 등...) )

 

그러나 현실적으로, 여러 인스턴스를 가지고 클러스터 구축이 어렵기 때문에 Minikube를 사용한다.

 

 

  1. Minikube 설치
  2. Kubeflow 설치 (종속성 중요)
  3. Kubernetes Dashboard 배포
  4. Kubeflow Pipeline 작성
  5. Kubeflow Dashboard에서 실습
  6. Kubeflow Pipeline 실행
    1. Upload Pipeline
    2. Create Experiment
    3. Pipeline run

 

설치 과정 일부분을 적어보면,

 

미니큐브 설치

minikube start --memory=16g --cpus=6 --disk-size 40GB --kubernetes-version=1.21.12

docker context use default

minikube config set profile mk

 

 

Kubeflow 설치 

알맞은 버전의 kubeflow git clone 후에

#  단일 명령어 설치
while ! kustomize build example | kubectl apply -f -; do echo "Retrying to apply resources"; sleep 10; done

 

 

Kubeflow Dashboard 접속

export NAMESPACE=istio-system

kubectl port-forward -n istio-system svc/istio-ingressgateway 8080:80

 

접속
http://127.0.0.1:8080/

초기 dex
user@example.com
12341234

 

 


 

  • Kubeflow Pipeline 작성시에, KFP 버전 1과 버전 2의 코드는 상이하고, 버전에 따른 마이그레이션에 유의한다.
  • Yaml파일 작성 없이, KFP SDK를 이용하여, 파이프라인(.py) 작성 후에, 컴파일을 통해 yaml이나 다른 형식으로 추출할 수 있다.

 

POC를 위한 빈 깡통 Kubeflow pipeline을 생성하기 위해 다음과 같이 파이프라인을 작성하였다. (KFP v1)

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: example-pipeline-
  annotations: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.0, pipelines.kubeflow.org/pipeline_compilation_time: '2023-11-27T16:28:34.852950',
    pipelines.kubeflow.org/pipeline_spec: '{"description": "A pipeline with multiple
      dependencies", "name": "example-pipeline"}'}
  labels: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.0}
spec:
  entrypoint: example-pipeline
  templates:
  - name: example-pipeline
    dag:
      tasks:
      - {name: task-a, template: task-a}
      - name: task-b
        template: task-b
        dependencies: [task-a]
      - name: task-c
        template: task-c
        dependencies: [task-a]
      - name: task-d
        template: task-d
        dependencies: [task-a]
      - name: task-e
        template: task-e
        dependencies: [task-b, task-c]
      - name: task-f
        template: task-f
        dependencies: [task-d]
      - name: task-g
        template: task-g
        dependencies: [task-d, task-e]
  - name: task-a
    container:
      command: [echo, Task A]
      image: wath1457/test-image
    metadata:
      labels:
        pipelines.kubeflow.org/kfp_sdk_version: 1.8.0
        pipelines.kubeflow.org/pipeline-sdk-type: kfp
        pipelines.kubeflow.org/enable_caching: "true"
  - name: task-b
    container:
      command: [echo, Task B]
      image: wath1457/test-image
    metadata:
      labels:
        pipelines.kubeflow.org/kfp_sdk_version: 1.8.0
        pipelines.kubeflow.org/pipeline-sdk-type: kfp
        pipelines.kubeflow.org/enable_caching: "true"
  - name: task-c
    container:
      command: [echo, Task C]
      image: wath1457/test-image
    metadata:
      labels:
        pipelines.kubeflow.org/kfp_sdk_version: 1.8.0
        pipelines.kubeflow.org/pipeline-sdk-type: kfp
        pipelines.kubeflow.org/enable_caching: "true"
  - name: task-d
    container:
      command: [echo, Task D]
      image: wath1457/test-image
    metadata:
      labels:
        pipelines.kubeflow.org/kfp_sdk_version: 1.8.0
        pipelines.kubeflow.org/pipeline-sdk-type: kfp
        pipelines.kubeflow.org/enable_caching: "true"
  - name: task-e
    container:
      command: [echo, Task E]
      image: wath1457/test-image
    metadata:
      labels:
        pipelines.kubeflow.org/kfp_sdk_version: 1.8.0
        pipelines.kubeflow.org/pipeline-sdk-type: kfp
        pipelines.kubeflow.org/enable_caching: "true"
  - name: task-f
    container:
      command: [echo, Task F]
      image: wath1457/test-image
    metadata:
      labels:
        pipelines.kubeflow.org/kfp_sdk_version: 1.8.0
        pipelines.kubeflow.org/pipeline-sdk-type: kfp
        pipelines.kubeflow.org/enable_caching: "true"
  - name: task-g
    container:
      command: [echo, Task G]
      image: wath1457/test-image
    metadata:
      labels:
        pipelines.kubeflow.org/kfp_sdk_version: 1.8.0
        pipelines.kubeflow.org/pipeline-sdk-type: kfp
        pipelines.kubeflow.org/enable_caching: "true"
  arguments:
    parameters: []
  serviceAccountName: pipeline-runner

 

 

 

 

Kubeflow Dashboard에서 이런식으로 Pipeline을 돌려볼 수 있다.

 

 

 

 

원하는 task의 로그와 실행 정보를 확인할 수 있다.

 


파이프라인을 바꿔가며(의존성) 실험을 해본 결과,

 

  • 의존성이 없는 task들은 비동기적으로 실행된다.
  • task의 up/downstream은 1개 이상 가질 수 있다.
  • task의 의존성은 다양한 task에 대해 가질 수 있다.

 

 

아직 의문인 점, 모르는 부분들도 많다.

 

내가 실험해본 내용에 대해 정리해 보았다.

 

after() 의존성(부모) 리스트에 들어간 작업들이 앞에서부터 큐의 형태로 처리되는가?


=> 의존성 순서를 바꿔도 생성되는 yaml파일은 동일하다.

 

A          B
 |          |
  -- C --
      |
      D

일때,

A -> C -> D인가
B -> C -> D 인가
A,B -> C -> D 인가


=> set_upstream()이면 A나 B 둘 중 하나라도 완료되면 C가 시작한다.
=> after()이면 A와 B 모두 완료되어야 C가 시작한다.

 

after(task_a, task_b)와

after(task_a)
after(task_b)

모두 a,b task 모두 완료되어야 다음 task가 실행된다.

 

 

이번 포스팅은 틀린 내용이 있을 가능성(?)이 높으니.. 피드백 주시면 감사하겠습니다!