更新时间:2023-11-18 23:04:46
尝试一下-
val data1 =
"""
|salesperson1 | salesperson2
|Customer_17 |Customer_202
|Customer_24 |Customer_130
""".stripMargin
val stringDS1 = data1.split(System.lineSeparator())
.map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
.toSeq.toDS()
val df1 = spark.read
.option("sep", ",")
.option("inferSchema", "true")
.option("header", "true")
.option("nullValue", "null")
.csv(stringDS1)
df1.show(false)
df1.printSchema()
/**
* +------------+------------+
* |salesperson1|salesperson2|
* +------------+------------+
* |Customer_17 |Customer_202|
* |Customer_24 |Customer_130|
* +------------+------------+
*
* root
* |-- salesperson1: string (nullable = true)
* |-- salesperson2: string (nullable = true)
*/
val data2 =
"""
|Place |Customer
|shop |Customer_17
|Home |Customer_17
|shop |Customer_17
|Home |Customer_130
|Shop |Customer_202
""".stripMargin
val stringDS2 = data2.split(System.lineSeparator())
.map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
.toSeq.toDS()
val df2 = spark.read
.option("sep", ",")
.option("inferSchema", "true")
.option("header", "true")
.option("nullValue", "null")
.csv(stringDS2)
df2.show(false)
df2.printSchema()
/**
* +-----+------------+
* |Place|Customer |
* +-----+------------+
* |shop |Customer_17 |
* |Home |Customer_17 |
* |shop |Customer_17 |
* |Home |Customer_130|
* |Shop |Customer_202|
* +-----+------------+
*
* root
* |-- Place: string (nullable = true)
* |-- Customer: string (nullable = true)
*/
Unpivot
和left join
val stringCol = df1.columns.map(c => s"'$c', cast(`$c` as string)").mkString(", ")
val processedDF = df1.selectExpr(s"stack(${df1.columns.length}, $stringCol) as (Salesperson, Customer)")
processedDF.show(false)
/**
* +------------+------------+
* |Salesperson |Customer |
* +------------+------------+
* |salesperson1|Customer_17 |
* |salesperson2|Customer_202|
* |salesperson1|Customer_24 |
* |salesperson2|Customer_130|
* +------------+------------+
*/
processedDF.join(df2, Seq("Customer"), "left")
.groupBy("Customer")
.agg(count("Place").as("Occurance"), first("Salesperson").as("Salesperson"))
.show(false)
/**
* +------------+---------+------------+
* |Customer |Occurance|Salesperson |
* +------------+---------+------------+
* |Customer_130|1 |salesperson2|
* |Customer_17 |3 |salesperson1|
* |Customer_202|1 |salesperson2|
* |Customer_24 |0 |salesperson1|
* +------------+---------+------------+
*/