且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

如何使用Azure数据工厂将CosmosDb文档复制到Blob存储(每个JSON文件中的每个文档)

更新时间:2023-02-14 09:15:02

由于您的cosmosdb具有数组,而ADF不支持针对cosmos db的序列化数组,因此这是我可以提供的解决方法.

Since your cosmosdb has array and ADF doesn't support serialize array for cosmos db, this is the workaround I can provide.

首先,按原样导出json将所有文档导出到json文件(到blob或adls或文件系统,任何文件存储).我想您已经知道该怎么做.这样,每个集合都会有一个json文件.

First, export all your document to json files with export json as-is (to blob or adls or file systems, any file storage). I think you already knows how to do it. In this way, each collection will have a json file.

第二,处理每个json文件,以将文件中的每一行精确到一个文件.

Second, handle each json file, to exact each row in the file to a single file.

我仅为步骤2提供管道.您可以使用执行管道活动来链接步骤1和步骤2.甚至可以通过foreach活动来处理步骤2中的所有集合.

I only provide pipeline for step 2. You could use execute pipeline activity to chain step 1 and step 2. And you could even handle all the collections in step 2 with a foreach activity.

管道json

{
"name": "pipeline27",
"properties": {
    "activities": [
        {
            "name": "Lookup1",
            "type": "Lookup",
            "policy": {
                "timeout": "7.00:00:00",
                "retry": 0,
                "retryIntervalInSeconds": 30,
                "secureOutput": false
            },
            "typeProperties": {
                "source": {
                    "type": "BlobSource",
                    "recursive": true
                },
                "dataset": {
                    "referenceName": "AzureBlob7",
                    "type": "DatasetReference"
                },
                "firstRowOnly": false
            }
        },
        {
            "name": "ForEach1",
            "type": "ForEach",
            "dependsOn": [
                {
                    "activity": "Lookup1",
                    "dependencyConditions": [
                        "Succeeded"
                    ]
                }
            ],
            "typeProperties": {
                "items": {
                    "value": "@activity('Lookup1').output.value",
                    "type": "Expression"
                },
                "activities": [
                    {
                        "name": "Copy1",
                        "type": "Copy",
                        "policy": {
                            "timeout": "7.00:00:00",
                            "retry": 0,
                            "retryIntervalInSeconds": 30,
                            "secureOutput": false
                        },
                        "typeProperties": {
                            "source": {
                                "type": "DocumentDbCollectionSource",
                                "query": {
                                    "value": "select @{item()}",
                                    "type": "Expression"
                                },
                                "nestingSeparator": "."
                            },
                            "sink": {
                                "type": "BlobSink"
                            },
                            "enableStaging": false,
                            "cloudDataMovementUnits": 0
                        },
                        "inputs": [
                            {
                                "referenceName": "DocumentDbCollection1",
                                "type": "DatasetReference"
                            }
                        ],
                        "outputs": [
                            {
                                "referenceName": "AzureBlob6",
                                "type": "DatasetReference",
                                "parameters": {
                                    "id": {
                                        "value": "@item().id",
                                        "type": "Expression"
                                    },
                                    "PartitionKey": {
                                        "value": "@item().PartitionKey",
                                        "type": "Expression"
                                    }
                                }
                            }
                        ]
                    }
                ]
            }
        }
    ]
},
"type": "Microsoft.DataFactory/factories/pipelines"

}

用于查找的数据集json

dataset json for lookup

   {
"name": "AzureBlob7",
"properties": {
    "linkedServiceName": {
        "referenceName": "bloblinkedservice",
        "type": "LinkedServiceReference"
    },
    "type": "AzureBlob",
    "typeProperties": {
        "format": {
            "type": "JsonFormat",
            "filePattern": "arrayOfObjects"
        },
        "fileName": "cosmos.json",
        "folderPath": "aaa"
    }
},
"type": "Microsoft.DataFactory/factories/datasets"

}

要复制的源数据集.实际上,该数据集没有任何用处.只是想用它来托管查询(选择@ {item()}

Source dataset for copy. Actually, this dataset has no use. Just want to use it to host the query (select @{item()}

{
"name": "DocumentDbCollection1",
"properties": {
    "linkedServiceName": {
        "referenceName": "CosmosDB-r8c",
        "type": "LinkedServiceReference"
    },
    "type": "DocumentDbCollection",
    "typeProperties": {
        "collectionName": "test"
    }
},
"type": "Microsoft.DataFactory/factories/datasets"

}

目标数据集.通过两个参数,它还解决了您的文件名请求.

Destination dataset. With two parameters, it also addressed your file name request.

{
"name": "AzureBlob6",
"properties": {
    "linkedServiceName": {
        "referenceName": "AzureStorage-eastus",
        "type": "LinkedServiceReference"
    },
    "parameters": {
        "id": {
            "type": "String"
        },
        "PartitionKey": {
            "type": "String"
        }
    },
    "type": "AzureBlob",
    "typeProperties": {
        "format": {
            "type": "JsonFormat",
            "filePattern": "setOfObjects"
        },
        "fileName": {
            "value": "@{dataset().PartitionKey}-@{dataset().id}.json",
            "type": "Expression"
        },
        "folderPath": "aaacosmos"
    }
},
"type": "Microsoft.DataFactory/factories/datasets"

}

还请注意Lookup活动的局限性: 支持以下数据源进行查找.查找活动可以返回的最大行数为5000,最大为2MB.目前,超时前查找活动的最大持续时间为一小时.

please also note the limitation of Lookup activity: The following data sources are supported for lookup. The maximum number of rows can be returned by Lookup activity is 5000, and up to 2MB in size. And currently the max duration for Lookup activity before timeout is one hour.