且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

apache_beam.transforms.util.Reshuffle()不适用于GCP数据流

更新时间:2023-11-21 23:11:46

一种方法是重新创建自我洗牌。

  import random 

shuffled_data =(unshuffled_pcoll
|'AddRandomKeys'>> Map(lambda t:(random.getrandbits(32),t))
|'GroupByKey'>> GroupByKey()
|'RemoveRandomKeys'>> FlatMap(lambda t:t [1]))
pre>

我剩下的问题是如果我需要担心窗口或展开可用节代码


I have upgraded to the latest apache_beam[gcp] package via pip install --upgrade apache_beam[gcp]. However, I noticed that Reshuffle() does not appear in the [gcp] distribution. Does this mean that I will not be able to use Reshuffle() in any dataflow pipelines? Is there any way around this? Or is it possible that the pip package is just not up to date and if Reshuffle() is in master on github then it will be available on dataflow?

Based on the response to this question I am trying to read data from BigQuery and then randomize the data before I write it to CSV's in a GCP storage bucket. I have noticed that my sharded .csv's that I am using to train my GCMLE model are not truly random. Within tensorflow I can randomize the batches, but that will only randomize the rows within each file that is built up in the queue and my issue is that currently the files being generated are biased in some way. If there are any suggestions for other ways to shuffle right before writing to CSV in dataflow that would be much appreciated.

One approach is to recreate shuffle myself.

import random

shuffled_data = (unshuffled_pcoll
        | 'AddRandomKeys' >> Map(lambda t: (random.getrandbits(32), t))
        | 'GroupByKey' >> GroupByKey()
        | 'RemoveRandomKeys' >> FlatMap(lambda t: t[1]))

My remaining question would be if I need to worry about the windowing or ExpandIterable sections from the code