且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

使用 Apache Beam 将 BigQuery 结果以 CSV 格式写入 GCS

更新时间:2023-11-21 23:07:16

您可以使用 WriteToText 添加 .csv 后缀和 headers.考虑到您需要将查询结果解析为 CSV 格式.例如,我使用了莎士比亚公共数据集 和以下查询:

You can do so using WriteToText to add a .csv suffix and headers. Take into account that you'll need to parse the query results to CSV format. As an example, I used the Shakespeare public dataset and the following query:

SELECT word, word_count, corpus FROM `bigquery-public-data.samples.shakespeare` WHERE CHAR_LENGTH(word) > 3 ORDER BY word_count DESC LIMIT 10

SELECT word, word_count, corpus FROM `bigquery-public-data.samples.shakespeare` WHERE CHAR_LENGTH(word) > 3 ORDER BY word_count DESC LIMIT 10

我们现在读取查询结果:

We now read the query results with:

BQ_DATA = p | 'read_bq_view' >> beam.io.Read(
    beam.io.BigQuerySource(query=query, use_standard_sql=True))

BQ_DATA 现在包含键值对:

{u'corpus': u'hamlet', u'word': u'HAMLET', u'word_count': 407}
{u'corpus': u'kingrichardiii', u'word': u'that', u'word_count': 319}
{u'corpus': u'othello', u'word': u'OTHELLO', u'word_count': 313}

我们可以应用 beam.Map 函数来只产生值:

We can apply a beam.Map function to yield only values:

BQ_VALUES = BQ_DATA | 'read values' >> beam.Map(lambda x: x.values())

摘录BQ_VALUES:

[u'hamlet', u'HAMLET', 407]
[u'kingrichardiii', u'that', 319]
[u'othello', u'OTHELLO', 313]

最后再次映射以用逗号而不是列表分隔所有列值(考虑到如果双引号可以出现在字段中,则需要转义双引号):

And finally map again to have all column values separated by commas instead of a list (take into account that you would need to escape double quotes if they can appear within a field):

BQ_CSV = BQ_VALUES | 'CSV format' >> beam.Map(
    lambda row: ', '.join(['"'+ str(column) +'"' for column in row]))

现在我们使用后缀和标题将结果写入 GCS:

Now we write the results to GCS with the suffix and headers:

BQ_CSV | 'Write_to_GCS' >> beam.io.WriteToText(
    'gs://{0}/results/output'.format(BUCKET), file_name_suffix='.csv', header='word, word count, corpus')

书面结果:

$ gsutil cat gs://$BUCKET/results/output-00000-of-00001.csv
word, word count, corpus
"hamlet", "HAMLET", "407"
"kingrichardiii", "that", "319"
"othello", "OTHELLO", "313"
"merrywivesofwindsor", "MISTRESS", "310"
"othello", "IAGO", "299"
"antonyandcleopatra", "ANTONY", "284"
"asyoulikeit", "that", "281"
"antonyandcleopatra", "CLEOPATRA", "274"
"measureforemeasure", "your", "274"
"romeoandjuliet", "that", "270"