Airflow variables getting updated even if the DAG is not running(即使DAG未运行,气流变量也会更新)
本文介绍了即使DAG未运行,气流变量也会更新的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我从气流变量中读取一个整数变量,然后在每次DAG运行时将该值加1,并再次将其设置为该变量。
但在下面的代码之后,每次刷新页面时,UI处的变量都会更改。 了解导致此类行为的原因
counter = Variable.get('counter')
s = BashOperator(
task_id='echo_start_variable',
bash_command='echo ' + counter,
dag=dag,
)
Variable.set("counter", int(counter) + 1)
sql_query = "SELECT * FROM UNNEST(SEQUENCE({start}, {end}))"
sql_query = sql_query.replace('{start}', start).replace('{end}', end)
submit_query = PythonOperator(
task_id='submit_athena_query',
python_callable=run_athena_query,
op_kwargs={'query': sql_query, 'db': 'db',
's3_output': 's3://s3-path/rohan/date=' + current_date + '/'},
dag=dag)
e = BashOperator(
task_id='echo_end_variable',
bash_command='echo ' + counter,
dag=dag,
)
s >> submit_query >> e
Airflow每30秒处理一次推荐答案文件(默认设置为)这意味着您所有顶级代码都是每30秒运行一次,因此
将导致变量计数器每30秒递增1。
在顶级代码中与变量交互是一种糟糕的做法(不管值的增加问题如何)。它每隔30秒打开一个到Metore数据库的连接,这可能会导致严重问题并使数据库不堪重负。
要获取变量的值,可以使用JJJA:
e = BashOperator(
task_id='echo_end_variable',
bash_command='echo {{ var.value.counter }}',
dag=dag,
)
这是使用变量的一种安全方式,因为只有在执行运算符时才会检索值。
如果要将变量的值增加1,则使用PythonOpeartor
:
def increase():
counter = Variable.get('counter')
Variable.set("counter", int(counter) + 1)
increase_op = PythonOperator(
task_id='increase_task',
python_callable=increase,
dag=dag)
只有在运算符运行时,才会执行可调用的python。
这篇关于即使DAG未运行,气流变量也会更新的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
沃梦达教程
本文标题为:即使DAG未运行,气流变量也会更新
基础教程推荐
猜你喜欢
- 哪些 Python 包提供独立的事件系统? 2022-01-01
- 将 YAML 文件转换为 python dict 2022-01-01
- 如何在 Python 中检测文件是否为二进制(非文本)文 2022-01-01
- Python 的 List 是如何实现的? 2022-01-01
- 如何在Python中绘制多元函数? 2022-01-01
- 使用 Google App Engine (Python) 将文件上传到 Google Cloud Storage 2022-01-01
- 合并具有多索引的两个数据帧 2022-01-01
- 使用Python匹配Stata加权xtil命令的确定方法? 2022-01-01
- 症状类型错误:无法确定关系的真值 2022-01-01
- 使 Python 脚本在 Windows 上运行而不指定“.py";延期 2022-01-01