且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

在Play Scala中使用迭代器和枚举器将数据流传输到S3

更新时间:2022-11-06 22:24:22

您的代码可能存在多个问题.这是由map方法调用引起的有点不可读.您将来的作品可能有问题.另一个问题可能是由于所有块(最后一个块除外)至少应为5MB的事实造成的.

There might be multiple problems with your code. It's a bit unreadable caused by the map method calls. You might have a problem with your future composition. Another problem might be caused by the fact that all chunks (except for the last) should be at least 5MB.

下面的代码尚未经过测试,但是显示了另一种方法.迭代方法是一种您可以创建小构件并将其组成一系列操作的方法.

The code below has not been tested, but shows a different approach. The iteratee approach is one where you can create small building blocks and compose them into a pipe of operations.

为了使代码编译,我添加了一个特征和一些方法

To make the code compile I added a trait and a few methods

trait BucketFilePartUploadTicket
val uploadPart: (Int, Array[Byte]) => Future[BucketFilePartUploadTicket] = ???
val completeUpload: Seq[BucketFilePartUploadTicket] => Future[Unit] = ???
val body: Enumerator[Array[Byte]] = ???

在这里我们创建一些部分

Here we create a few parts

// Create 5MB chunks
val chunked = {
  val take5MB = Traversable.takeUpTo[Array[Byte]](1024 * 1024 * 5)
  Enumeratee.grouped(take5MB transform Iteratee.consume())
}

// Add a counter, used as part number later on
val zipWithIndex = Enumeratee.scanLeft[Array[Byte]](0 -> Array.empty[Byte]) {
  case ((counter, _), bytes) => (counter + 1) -> bytes
}

// Map the (Int, Array[Byte]) tuple to a BucketFilePartUploadTicket
val uploadPartTickets = Enumeratee.mapM[(Int, Array[Byte])](uploadPart.tupled)

// Construct the pipe to connect to the enumerator
// the ><> operator is an alias for compose, it is more intuitive because of 
// it's arrow like structure
val pipe = chunked ><> zipWithIndex ><> uploadPartTickets

// Create a consumer that ends by finishing the upload
val consumeAndComplete = 
  Iteratee.getChunks[BucketFilePartUploadTicket] mapM completeUpload

只需连接零件即可运行

// This is the result, a Future[Unit]
val result = body through pipe run consumeAndComplete 

请注意,我没有测试任何代码,并且可能在我的方法中犯了一些错误.但是,这显示了解决问题的另一种方式,可能应该可以帮助您找到一个好的解决方案.

Note that I did not test any code and might have made some mistakes in my approach. This however shows a different way of dealing with the problem and should probably help you to find a good solution.

请注意,此方法要等待一部分完成上传,然后再进行下一部分.如果从服务器到亚马逊的连接速度比从浏览器到服务器的连接速度慢,则此机制将减慢输入速度.

Note that this approach waits for one part to complete upload before it takes on the next part. If the connection from your server to amazon is slower than the connection from the browser to you server this mechanism will slow the input.

您可以采用另一种方法,不等待零件上传的Future完成.这将导致下一步,您可以使用Future.sequence将上传期货的序列转换为包含结果序列的单个期货.结果将是一种机制,只要您有足够的数据就可以将零件发送到亚马逊.

You could take another approach where you do not wait for the Future of the part upload to complete. This would result in another step where you use Future.sequence to convert the sequence of upload futures into a single future containing a sequence of the results. The result would be a mechanism sending a part to amazon as soon as you have enough data.