且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

处理 Spark Streaming rdd 并存储到单个 HDFS 文件

更新时间:2022-02-02 19:10:14

您可以使用一个函数来合并"saveAsTextFile 的结果.像这样:

You can use a function to "merge" the result of saveAsTextFile. Like this:

import org.apache.hadoop.fs._

def saveAsTextFileAndMerge[T](hdfsServer: String, fileName: String, rdd: RDD[T]) = {
    val sourceFile = hdfsServer + "/tmp/" 
    rdd.saveAsTextFile(sourceFile)
    val dstPath = hdfsServer + "/final/" 
    merge(sourceFile, dstPath, fileName)
  }

  def merge(srcPath: String, dstPath: String, fileName: String): Unit = {
    val hadoopConfig = new Configuration()
    val hdfs = FileSystem.get(hadoopConfig)
    val destinationPath = new Path(dstPath)
    if (!hdfs.exists(destinationPath)) {
      hdfs.mkdirs(destinationPath)
    }
    FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath + "/" + fileName), false, hadoopConfig, null)
  }