且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

如何在Azure数据工厂中使用此Rest API

更新时间:2022-03-17 00:37:20

以下说明将逐步创建一个类似于以下内容的管道.注意,它使用存储过程活动,Web活动和每个活动.

The following explanation will walk through creating a pipeline that looks like the following. Notice it uses Stored Procedure activities, Web Activities, and For Each activities.

首先提供Azure SQL DB,设置AAD管理员,然后按照

First provision an Azure SQL DB, setup the AAD administrator, then grant the ADF MSI permissions in the database as described here. Then create the following table and two stored procedures:

CREATE TABLE [dbo].[People](
    [id] [int] NULL,
    [email] [varchar](255) NULL,
    [first_name] [varchar](100) NULL,
    [last_name] [varchar](100) NULL,
    [avatar] [nvarchar](1000) NULL
)

GO
/*
sample call:
exec uspInsertPeople @json = '{"page":1,"per_page":3,"total":12,"total_pages":4,"data":[{"id":1,"email":"george.bluth@reqres.in","first_name":"George","last_name":"Bluth","avatar":"https://s3.amazonaws.com/uifaces/faces/twitter/calebogden/128.jpg"},{"id":2,"email":"janet.weaver@reqres.in","first_name":"Janet","last_name":"Weaver","avatar":"https://s3.amazonaws.com/uifaces/faces/twitter/josephstein/128.jpg"},{"id":3,"email":"emma.wong@reqres.in","first_name":"Emma","last_name":"Wong","avatar":"https://s3.amazonaws.com/uifaces/faces/twitter/olegpogodaev/128.jpg"}]}'
*/
create proc uspInsertPeople @json nvarchar(max)
as
begin
insert into People (id, email, first_name, last_name, avatar)
select d.*
from OPENJSON(@json)
WITH (
        [data] nvarchar(max) '$.data' as JSON
)
CROSS APPLY OPENJSON([data], '$')
    WITH (
        id int '$.id',
        email varchar(255) '$.email',
        first_name varchar(100) '$.first_name',
        last_name varchar(100) '$.last_name',
        avatar nvarchar(1000) '$.avatar'
    ) d;
end

GO

create proc uspTruncatePeople
as
truncate table People


接下来,在Azure Data Factory v2中创建一个新管道,将其重命名为ForEachPage,然后转到代码"视图并粘贴以下JSON:

Next, in Azure Data Factory v2 create a new pipeline, rename it to ForEachPage then go to the Code view and paste in the following JSON:

{
    "name": "ForEachPage",
    "properties": {
        "activities": [
            {
                "name": "GetTotalPages",
                "type": "WebActivity",
                "dependsOn": [
                    {
                        "activity": "Truncate SQL Table",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "policy": {
                    "timeout": "7.00:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                },
                "userProperties": [],
                "typeProperties": {
                    "url": {
                        "value": "https://reqres.in/api/users?page=1",
                        "type": "Expression"
                    },
                    "method": "GET"
                }
            },
            {
                "name": "ForEachPage",
                "type": "ForEach",
                "dependsOn": [
                    {
                        "activity": "GetTotalPages",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "userProperties": [],
                "typeProperties": {
                    "items": {
                        "value": "@range(1,activity('GetTotalPages').output.total_pages)",
                        "type": "Expression"
                    },
                    "activities": [
                        {
                            "name": "GetPage",
                            "type": "WebActivity",
                            "dependsOn": [],
                            "policy": {
                                "timeout": "7.00:00:00",
                                "retry": 0,
                                "retryIntervalInSeconds": 30,
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "userProperties": [],
                            "typeProperties": {
                                "url": {
                                    "value": "@concat('https://reqres.in/api/users?page=',item())",
                                    "type": "Expression"
                                },
                                "method": "GET"
                            }
                        },
                        {
                            "name": "uspInsertPeople stored procedure",
                            "type": "SqlServerStoredProcedure",
                            "dependsOn": [
                                {
                                    "activity": "GetPage",
                                    "dependencyConditions": [
                                        "Succeeded"
                                    ]
                                }
                            ],
                            "policy": {
                                "timeout": "7.00:00:00",
                                "retry": 0,
                                "retryIntervalInSeconds": 30,
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "userProperties": [],
                            "typeProperties": {
                                "storedProcedureName": "[dbo].[uspInsertPeople]",
                                "storedProcedureParameters": {
                                    "json": {
                                        "value": {
                                            "value": "@string(activity('GetPage').output)",
                                            "type": "Expression"
                                        },
                                        "type": "String"
                                    }
                                }
                            },
                            "linkedServiceName": {
                                "referenceName": "lsAzureDB",
                                "type": "LinkedServiceReference"
                            }
                        }
                    ]
                }
            },
            {
                "name": "Truncate SQL Table",
                "type": "SqlServerStoredProcedure",
                "dependsOn": [],
                "policy": {
                    "timeout": "7.00:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                },
                "userProperties": [],
                "typeProperties": {
                    "storedProcedureName": "[dbo].[uspTruncatePeople]"
                },
                "linkedServiceName": {
                    "referenceName": "lsAzureDB",
                    "type": "LinkedServiceReference"
                }
            }
        ],
        "annotations": []
    }
}

创建与Azure SQL DB的lsAzureDB链接服务,将其设置为使用MSI进行身份验证.

Create a lsAzureDB linked service to Azure SQL DB setting it to use the MSI for authentication.

此管道调用示例分页API (目前可以,但是不是我管理的API,因此可能会在某个时候停止工作),以演示如何循环以及如何获取Web活动的结果,并通过存储过程调用和存储过程中的JSON解析将它们插入到SQL表中.该循环将以并行方式运行,但是当然您可以更改ForEachPage活动的设置以使其以串行方式运行.

This pipeline calls a sample paged API (which works at the moment but it not an API I manage so may stop working at some point) to demonstrate how to loop and how to take the results of the Web Activities and insert them to a SQL table via a stored procedure call and JSON parsing in the stored procedure. The loop will run with parallelism but certainly you could change settings on the ForEachPage activity to make it run in serial.