且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

使用 jinja 模板中的气流连接

更新时间:2023-01-05 08:32:05

For Airflow >= 2.2.0:

假设你有 conn id test_conn 你可以通过以下方式直接使用宏:

Assuming you have conn id test_conn you can use macros directly via:

{{ conn.test_conn }} 所以你得到任何连接属性,如:

{{ conn.test_conn }} so you get any connection attribute like:

{{ conn.test_conn.host }}, {{ conn.test_conn.login }}, {{ conn.test_conn.password }} 等等.

对于气流:

没有现成的宏,但您可以创建自定义宏来解决这个问题.

There is no ready to use macro however you can create custom macros to address this.

连接示例:

创建宏:

def get_host(conn_id):
    connection = BaseHook.get_connection(conn_id)
    return connection.host

def get_schema(conn_id):
    connection = BaseHook.get_connection(conn_id)
    return connection.schema

def get_login(conn_id):
    connection = BaseHook.get_connection(conn_id)
    return connection.login

在 DAG 中使用它们:

Using them in a DAG:

def print_function(**context):
    print(f"host={context['host']} schema={context['schema']} login={context['login']}")

user_macros = {
    'get_host': get_host,
    'get_schema': get_schema,
    'get_login': get_login,
}

with DAG(
    dag_id='connection',
    default_args=default_args,
    schedule_interval=None,
    user_defined_macros=user_macros,
) as dag:

# Example how to use as function
python_op = PythonOperator( 
    task_id='python_task',
    provide_context=True,
    python_callable=print_function,
    op_kwargs={
        'host': get_host("test_conn"),
        'schema': get_schema("test_conn"),
        'login': get_login("test_conn"),
    }
)

# Example how to use as Jinja string
bash_op = BashOperator( 
    task_id='bash_task',
    bash_command='echo {{ get_host("test_conn") }} {{ get_schema("test_conn") }} {{ get_login("test_conn") }} ',
)

PythonOperator 的渲染示例:

BashOperator 的渲染示例:

一般说明:这段代码的作用是创建一个自定义函数 func() 用作 user_defined_macros 从而提供使用它的能力,就像 Airflow 本身定义的这个宏一样.您可以通过以下方式访问模板: {{ func() }} 如示例中所示,该函数允许接受参数.

General Explnation: What this code does is creating a custom function func() to be used as user_defined_macros thus providing the ability to use it just like this macro was defined by Airflow itself. You can access the templating as: {{ func() }} as seen in the example the function allow accept parameters.

注意您可以为连接对象中的所有字段创建这样的函数.

Note you can create such functions for all fields in the connection object.

谨慎使用它,以文本形式传递密码可能不是一个好主意.

be cautious with how you use it, passing passwords as text may not be a good idea.