Feature description and background
Currently, the Kubernetes operator supports generating XCom output through job output. But as for receiving XCom input, it only provides the most basic approach through argument templating and usage of TaskInstance.xcom.pull() within the template.
Fighting with Jinja templating can be cumbersome sometimes, especially when accessing XCom output from multiple upstream tasks. For instance, when running a Kubernetes job with dynamic arguments (arguments argument) that relies on several upstream tasks to be determined, one need to build a Jinja template that outputs the argument list, and remember to enable render_template_as_native_obj flag for the DAG it's running.
Given Airflow has introduced concept of TaskFlow and been promoting usage of decorated tasks since Airflow 2.0's release, adopting the new paradigm and adding decorated form of KubernetesJobOperator (e.g. @task.kubernetes_job) can be helpful, and use cases like the situation described above can be done more smoothly by passing upstream task instances to the decorated task function directly.
Proposed Solution
For task operator that only runs Kubernetes jobs, it may be difficult to make Implementation similar to @task.kubernetes (a wrapper of Airflow's KuberetesPodOperator) and move task program logics into the decorated function. However, I think it's feasible to make the decorated function return arguments for the KubernetesJobOperator instead. The dynamically generated arguments can then integrated/merged with the KubernetesJobOperator's defaults and arguments passed to decorator header.
For example, supposed we define a decorated task function like this:
@task.kubernetes_job(task_id="my_task_id", image="myimage")
def my_job(options: list[str] | None = None):
positional_args = ["a", "b", "c"]
return {"arguments"=(options + positional_args) if options else positional_args}
When a list ["-n", "-f", "source_data.csv"] gets passed to my_job and creates a task instance within a DAG:
my_task = my_job(options=["-n", "-f", "source_data.csv"])
then the created task is equivalent to:
KubernetesJobOperator(
task_id="my_task_id",
image="myimage",
arguments=["-n", "-f", "source_data.csv", "a", "b", "c"]
)
When task output gets passed to options argument, then value of arguments will depend on value of the output.
# Task created from task operator class
upstream_task = PythonOperator(python_callable=upstream_task_f, task_id="upstream")
downstream_task = my_job(options=upstream_task.output)
# Task created from decorated task (TaskFlow)
upstream_task = task(upstream_task_f, task_id="upstream")()
downstream_task = my_job(options=upstream_task)
Example DAG implementation before introducing feature
Say we're having an container image owning a program that transfers data from several sources to the destination at once, where the image's entry point accepts several optional arguments to customize this program:
--sources option to only pull data from specified entries.
--enable-high-loading-mode and --disable-high-loading-mode flags to decide whether the program should run in special mode to endure high volume loading
--bucket-name option to specify location to place intermediate data dumps
Now we want to design a DAG pipeline that creates a Kubernetes job and runs the image when triggered, and we decide to collect argument information for the image by setting up three upstream tasks separately:
- A task that checks which sources have incoming data, for setting up
--sources option).
- A task that estimates incoming data volume to decide whether
--enable-high-loading-mode should be turned on.
- A task that creates a temporary cloud storage bucket and returns URI accepted by
--bucket-name.
To pass information from these upstream tasks into the final KubernetesJobOperator task directly, one have to create Jinja template that generates the arguments parameter, and introduce outputs from upstream tasks with TaskInstance.xcom_pull() calls within the template:
from airflow.decorators import dag, task
from airflow_kubernetes_job_operator.kubernetes_job_operator import KubernetesJobOperator
import pendulum
@dag(
dag_id="transfer_pipeline",
schedule=None,
start_date=pendulum.datetime(2000, 1, 1, tz="UTC"),
catchup=False,
render_template_as_native_obj=True,
)
def example_dag():
# Upstream task that evaluates sources with new incoming data
@task(task_id="get_sources")
def get_sources_f() -> list[str]:
from my_package.tasks.example_dag import GetSources
incoming_data_sources = GetSources().run()
return incoming_data_sources
# Upstream task that evaluates amount of data to be pulled
@task(task_id="get_incoming_data_amount"):
def get_incoming_data_amount_f() -> int:
from my_package.tasks.example_dag import GetIncomingDataSummary
data_amount = GetUpdateSummary().run()
return data_amount
@task(task_id="evaluate_high_loading")
def evaluate_high_loading_f(amount: int) -> bool:
threshold = 1_000_000
return amount > threshold
# Creates a temporary storage (e.g. S3 bucket) to store intermediate data dumps
@task(task_id="get_bucket_name") -> str:
def get_bucket_name_f()
from my_package.tasks.example_dag import GetBucketName
bucket_name = GetBucketName().run()
return bucket_name
incoming_data_sources = get_sources_f()
incoming_data_amount = get_incoming_data_amount_f()
high_loading = evaluate_high_loading_f(incoming_data_amount)
bucket_name = get_bucket_name_f()
transfer_job = KubernetesJobOperator(
task_id="transfer",
image="myimage",
jinja_job_arg=True,
arguments=(
"["
"\"--sources\",{{ ti.xcom_pull('get_sources') }}"
"\"{{'--enable-high-loading-mode' if ti.xcom_pull('evaluate_high_loading') '--disable-high-loading-mode'}}\""
"\"--bucket\",\"{{ ti.xcom_pull('get_bucket_name') }}\""
"]"
)
)
dag = example_dag()
As an alternative, one can insert additional task in between to organize upstream outputs into single argument list to avoid templating:
- (2024/06/14 Update) Update operator usage (because operator have not supported TaskFlow yet)
from airflow.decorators import dag, task
from airflow_kubernetes_job_operator.kubernetes_job_operator import KubernetesJobOperator
import pendulum
@dag(
dag_id="transfer_pipeline",
schedule=None,
start_date=pendulum.datetime(2000, 1, 1, tz="UTC"),
catchup=False,
render_template_as_native_obj=True,
)
def example_dag():
# Upstream task that evaluates sources with new incoming data
@task(task_id="get_sources")
def get_sources_f() -> list[str]:
from my_package.tasks.example_dag import GetSources
incoming_data_sources = GetSources().run()
return incoming_data_sources
# Upstream task that evaluates amount of data to be pulled
@task(task_id="get_incoming_data_amount"):
def get_incoming_data_amount_f() -> int:
from my_package.tasks.example_dag import GetIncomingDataSummary
data_amount = GetUpdateSummary().run()
return data_amount
@task(task_id="evaluate_high_loading")
def evaluate_high_loading_f(amount: int) -> bool:
threshold = 1_000_000
return amount > threshold
# Creates a temporary storage (e.g. S3 bucket) to store intermediate data dumps
@task(task_id="get_bucket_name") -> str:
def get_bucket_name_f()
from my_package.tasks.example_dag import GetBucketName
bucket_name = GetBucketName().run()
return bucket_name
# Organize upstream information and generates final argument list
@task("organize_arguments")
def organize_arguments_f(data_sources: list[str], high_loading: bool, bucket_name: str):
return [
"--sources",
*data_sources,
(
"--enable-high-loading-mode"
if high_loading
else "--disable-high-loading-mode"
),
"--bucket",
bucket_name
]
incoming_data_sources = get_sources_f()
incoming_data_amount = get_incoming_data_amount_f()
high_loading = evaluate_high_loading_f(incoming_data_amount)
bucket_name = get_bucket_name_f()
organized_arguments = organize_arguments_f(
data_sources=incoming_data_sources,
high_loading=high_loading,
bucket_name=bucket_name,
)
transfer_job = KubernetesJobOperator(
task_id="transfer",
image="myimage",
jinja_job_arg=True,
arguments="{{ ti.xcom_pull('organize_arguments') }}"
# If TaskFlow is supported we can pass the task instance instead:
# arguments=organized_arguments
)
dag = example_dag()
Example DAG implementation after introducing feature
The sample code below tries to replicate the example above with the proposed approach. Task ID and image name are assumed to be "fixed" and passed to the decorator header, while args field is dynamically generated within the decorated function:
from airflow.decorators import dag, task
import pendulum
@dag(
dag_id="transfer_pipeline",
schedule=None,
start_date=pendulum.datetime(2000, 1, 1, tz="UTC"),
catchup=False,
)
def example_dag():
# Upstream task that evaluates sources with new incoming data
@task(task_id="get_sources")
def get_sources_f() -> list[str]:
from my_package.tasks.example_dag import GetSources
incoming_data_sources = GetSources().run()
return incoming_data_sources
# Upstream task that evaluates amount of data to be pulled
@task(task_id="get_incoming_data_amount"):
def get_incoming_data_amount_f() -> int:
from my_package.tasks.example_dag import GetIncomingDataSummary
data_amount = GetUpdateSummary().run()
return data_amount
@task(task_id="evaluate_high_loading")
def evaluate_high_loading_f(amount: int) -> bool:
threshold = 1_000_000
return amount > threshold
# Creates a temporary storage (e.g. S3 bucket) to store intermediate data dumps
@task(task_id="get_bucket_name") -> str:
def get_bucket_name_f()
from my_package.tasks.example_dag import GetBucketName
bucket_name = GetBucketName().run()
return bucket_name
@task.kuberenetes_job(task_id="transfer", image="myimage")
def transfer_f(data_sources: list[str], high_loading: bool, bucket_name: str):
return {
"arguments": [
"--sources",
data_sources,
("--enable-feature" if upstream_result_b else "--disable-feature"),
job_target,
]
}
incoming_data_sources = get_sources_f()
incoming_data_amount = get_incoming_data_amount_f()
high_loading = evaluate_high_loading_f(incoming_data_amount)
bucket_name = get_bucket_name_f()
transfer_job = transfer_f(
data_sources=incoming_data_sources,
high_loading=high_loading,
bucket_name=bucket_name
)
dag = example_dag()
Feature description and background
Currently, the Kubernetes operator supports generating XCom output through job output. But as for receiving XCom input, it only provides the most basic approach through argument templating and usage of
TaskInstance.xcom.pull()within the template.Fighting with Jinja templating can be cumbersome sometimes, especially when accessing XCom output from multiple upstream tasks. For instance, when running a Kubernetes job with dynamic arguments (
argumentsargument) that relies on several upstream tasks to be determined, one need to build a Jinja template that outputs the argument list, and remember to enablerender_template_as_native_objflag for the DAG it's running.Given Airflow has introduced concept of
TaskFlowand been promoting usage of decorated tasks since Airflow 2.0's release, adopting the new paradigm and adding decorated form ofKubernetesJobOperator(e.g.@task.kubernetes_job) can be helpful, and use cases like the situation described above can be done more smoothly by passing upstream task instances to the decorated task function directly.Proposed Solution
For task operator that only runs Kubernetes jobs, it may be difficult to make Implementation similar to
@task.kubernetes(a wrapper of Airflow'sKuberetesPodOperator) and move task program logics into the decorated function. However, I think it's feasible to make the decorated function return arguments for theKubernetesJobOperatorinstead. The dynamically generated arguments can then integrated/merged with theKubernetesJobOperator's defaults and arguments passed to decorator header.For example, supposed we define a decorated task function like this:
When a list
["-n", "-f", "source_data.csv"]gets passed tomy_joband creates a task instance within a DAG:then the created task is equivalent to:
When task output gets passed to
optionsargument, then value ofargumentswill depend on value of the output.Example DAG implementation before introducing feature
Say we're having an container image owning a program that transfers data from several sources to the destination at once, where the image's entry point accepts several optional arguments to customize this program:
--sourcesoption to only pull data from specified entries.--enable-high-loading-modeand--disable-high-loading-modeflags to decide whether the program should run in special mode to endure high volume loading--bucket-nameoption to specify location to place intermediate data dumpsNow we want to design a DAG pipeline that creates a Kubernetes job and runs the image when triggered, and we decide to collect argument information for the image by setting up three upstream tasks separately:
--sourcesoption).--enable-high-loading-modeshould be turned on.--bucket-name.To pass information from these upstream tasks into the final
KubernetesJobOperatortask directly, one have to create Jinja template that generates theargumentsparameter, and introduce outputs from upstream tasks withTaskInstance.xcom_pull()calls within the template:As an alternative, one can insert additional task in between to organize upstream outputs into single argument list to avoid templating:
Example DAG implementation after introducing feature
The sample code below tries to replicate the example above with the proposed approach. Task ID and image name are assumed to be "fixed" and passed to the decorator header, while
argsfield is dynamically generated within the decorated function: