即使DAG未运行,气流变量也会更新

Airflow variables getting updated even if the DAG is not running(即使DAG未运行,气流变量也会更新)

本文介绍了即使DAG未运行,气流变量也会更新的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我从气流变量中读取一个整数变量,然后在每次DAG运行时将该值加1,并再次将其设置为该变量。

但在下面的代码之后,每次刷新页面时,UI处的变量都会更改。 了解导致此类行为的原因

counter = Variable.get('counter')
s = BashOperator(
    task_id='echo_start_variable',
    bash_command='echo ' + counter,
    dag=dag,
)
Variable.set("counter", int(counter) + 1)

sql_query = "SELECT * FROM UNNEST(SEQUENCE({start}, {end}))"
sql_query = sql_query.replace('{start}', start).replace('{end}', end)
submit_query = PythonOperator(
    task_id='submit_athena_query',
    python_callable=run_athena_query,
    op_kwargs={'query': sql_query, 'db': 'db',
               's3_output': 's3://s3-path/rohan/date=' + current_date + '/'},
    dag=dag)

e = BashOperator(
    task_id='echo_end_variable',
    bash_command='echo ' + counter,
    dag=dag,
)

s >> submit_query >> e
Airflow每30秒处理一次推荐答案文件(默认设置为)这意味着您所有顶级代码都是每30秒运行一次,因此 将导致变量计数器每30秒递增1。

在顶级代码中与变量交互是一种糟糕的做法(不管值的增加问题如何)。它每隔30秒打开一个到Metore数据库的连接,这可能会导致严重问题并使数据库不堪重负。

要获取变量的值,可以使用JJJA:

e = BashOperator(
    task_id='echo_end_variable',
    bash_command='echo {{ var.value.counter }}',
    dag=dag,
)

这是使用变量的一种安全方式,因为只有在执行运算符时才会检索值。

如果要将变量的值增加1,则使用PythonOpeartor

def increase():
    counter = Variable.get('counter')
    Variable.set("counter", int(counter) + 1)

increase_op = PythonOperator(
    task_id='increase_task',
    python_callable=increase,
    dag=dag)

只有在运算符运行时,才会执行可调用的python。

这篇关于即使DAG未运行,气流变量也会更新的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!

本文标题为:即使DAG未运行,气流变量也会更新

基础教程推荐