Cloud Composer(Airflow)のREST APIを叩いてDAGを走らせる
Cloud ComposerのDAGの実行方法
Cloud Composer(Apache AirflowのGoogle Cloudによるマネージドサービス)では、DAG(Directed Acyclic Graph)をPythonによるファイルに定義して、依存関係をもとにデータに関するワークフローを実行できます。
基本的にはそのファイル内に書かれたスケジュール設定に基づいて自動で実行されるようにDAGを設定することが多いです。
例えば…
dag = DAG( dag_id="foo", default_args=default_args, start_date=days_ago(1), schedule_interval="0 0 * * *", catchup=False, )
のような指定の場合は、UTCの0時 = JSTの9時に毎日実行…とこんな感じです。
この schedule_interval
が実行スケジュールを握っているわけですね。
これ自体は None
を指定することができます。
dag = DAG( dag_id="bar", default_args=default_args, start_date=days_ago(1), schedule_interval=None, catchup=False, )
こうするとどうなるか?というと、スケジュール実行されなくなります。
つまり、 手動でDAGを実行する状況になります。
Dailyでなくまとめてのimport処理などにはこのようなDAGを準備することになるかと思います。
AirflowのWeb UIでも、スケジュール・Next RunともにNoneとなります。
どうやって実行するかというと、この画像のとおり、再生ボタンから実行可能です。
が、 何らかの理由でローカルのPythonスクリプト等から実行したいときが出てくるかもしれません。 今回はその話です。
Airflow REST API
AirflowにはREST APIがありますので、今回はこれをDAG実行に使います。
Google Cloudの公式ドキュメントはこれです: Airflow REST APIにアクセスする
今回扱うDAG実行の他にも接続の管理などもできるのでぜひ知っておくべきものの一つかと思います。
Apache Airflowの Airflow REST API の仕様書 を見ていただけるとどんなことが出来るかがわかりますので必読です。
実際のAirflow REST APIの叩き方
では実際にどのようにAirflow REST APIを叩くかについてまとめます。
先ほどの公式ドキュメントにもコード付きでありますが、やり方は極めてシンプルです。
認証情報の設定
まずは認証情報を用意しなくてはなりません。
公式ドキュメントにあるような
import google.auth AUTH_SCOPE = "https://www.googleapis.com/auth/cloud-platform" CREDENTIALS, _ = google.auth.default(scopes=[AUTH_SCOPE])
でデフォルトの認証情報(ADC)をとってくる方法もありますし
サービスアカウントの認証情報のJSONファイルがあれば
from google.oauth2 import service_account CREDENTIALS = service_account.Credentials.from_service_account_file( "service-account.json", scopes=["https://www.googleapis.com/auth/cloud-platform"], )
のように google.oauth2
から認証情報を立てても良いです。
いずれの認証情報でも、それをもとにセッションを立ち上げます。
import google.auth.transport.requests authed_session = google.auth.transport.requests.AuthorizedSession(CREDENTIALS)
APIの実行
APIの実行は、作成したセッションをもとにメソッドとURLを指定して行うのみです
response = authed_session.request(method, url, json=data)
このような形です。
ここで指定する method
と url
は Airflow REST APIの仕様書ページ を参照することでわかります。
ベースとなるAirflowのURLはCloud Composerの画面からも確認出来ます。
ここに /api/v1
を足したものをベースとして、あとは仕様書通りにPathを指定するだけですね。
たとえばDAGの実行であれば、「Trigger a new DAG run」の仕様通り、
https://[WEB UIのホスト名]/api/v1/dags/{dag_id}/dagRuns
にPOSTするだけです。簡単ですね。
まとめ
今回はCloud ComposerのDAGをREST API経由で実行する方法についてまとめました。
これを知っているとデータに関する処理をCloud Composerに集約していく発想にも繋がるかもしれません。
例えば、Dataformでターゲットを指定して実行したい場合に、Pythonから直接Dataformを叩こうとおもうとまだDataform用のライブラリ( google-cloud-dataform
)がBetaだったりしますので、Cloud Composerで DataformCreateWorkflowInvocationOperator
などを使うDAGを作ってそれを今回の方法で実行するほうが楽だったりします。
ちょっとニッチめな話ですが、Cloud Composerでデータ基盤管理をしている場合はREST APIについても知っておきたいところです。