且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

处理Spark数据帧中的非均匀JSON列

更新时间:2023-11-18 22:25:52

我认为您的尝试和总体构想都朝着正确的方向发展.这是另外两种基于数据帧API的内置选项aka get_json_object/from_json的方法,并通过RDD API将map转换与python的json.dumps()json.loads()一起使用.

I think your attempt and the overall idea is in the right direction. Here are two more approaches based on the build-in options aka get_json_object/from_json via dataframe API and using map transformation along with python's json.dumps() and json.loads() via the RDD API.

选项1: get_json_object()/ from_json()

首先,我们尝试使用不需要架构的get_json_object():

First let's try with get_json_object() which doesn't require a schema:

import pyspark.sql.functions as f

df = spark.createDataFrame([
  ('{"id": 1, "type": "foo", "data": {"key0": "foo", "key2": "meh"}}'),
  ('{"id": 2, "type": "bar", "data": {"key2": "poo", "key3": "pants"}}'),
  ('{"id": 3, "type": "baz", "data": {"key3": "moo"}}')
], StringType())

df.select(f.get_json_object("value", "$.id").alias("id"), \
          f.get_json_object("value", "$.type").alias("type"), \
           f.get_json_object("value", "$.data").alias("data"))

# +---+----+-----------------------------+
# |id |type|data                         |
# +---+----+-----------------------------+
# |1  |foo |{"key0":"foo","key2":"meh"}  |
# |2  |bar |{"key2":"poo","key3":"pants"}|
# |3  |baz |{"key3":"moo"}               |
# +---+----+-----------------------------+

相反,from_json()需要架构定义:

from pyspark.sql.types import StringType, StructType, StructField
import pyspark.sql.functions as f

df = spark.createDataFrame([
  ('{"id": 1, "type": "foo", "data": {"key0": "foo", "key2": "meh"}}'),
  ('{"id": 2, "type": "bar", "data": {"key2": "poo", "key3": "pants"}}'),
  ('{"id": 3, "type": "baz", "data": {"key3": "moo"}}')
], StringType())

schema = StructType([
    StructField("id", StringType(), True),
    StructField("type", StringType(), True),
    StructField("data", StringType(), True)
])

df.select(f.from_json("value", schema).getItem("id").alias("id"), \
         f.from_json("value", schema).getItem("type").alias("type"), \
         f.from_json("value", schema).getItem("data").alias("data"))

# +---+----+-----------------------------+
# |id |type|data                         |
# +---+----+-----------------------------+
# |1  |foo |{"key0":"foo","key2":"meh"}  |
# |2  |bar |{"key2":"poo","key3":"pants"}|
# |3  |baz |{"key3":"moo"}               |
# +---+----+-----------------------------+

选项2:map/RDD API + json.dumps()

from pyspark.sql.types import StringType, StructType, StructField
import json

df = spark.createDataFrame([
  '{"id": 1, "type": "foo", "data": {"key0": "foo", "key2": "meh"}}',
  '{"id": 2, "type": "bar", "data": {"key2": "poo", "key3": "pants"}}',
  '{"id": 3, "type": "baz", "data": {"key3": "moo"}}'
], StringType())

def from_json(data):
  row = json.loads(data[0])
  return (row['id'], row['type'], json.dumps(row['data']))

json_rdd = df.rdd.map(from_json)

schema = StructType([
    StructField("id", StringType(), True),
    StructField("type", StringType(), True),
    StructField("data", StringType(), True)
])

spark.createDataFrame(json_rdd, schema).show(10, False)

# +---+----+--------------------------------+
# |id |type|data                            |
# +---+----+--------------------------------+
# |1  |foo |{"key2": "meh", "key0": "foo"}  |
# |2  |bar |{"key2": "poo", "key3": "pants"}|
# |3  |baz |{"key3": "moo"}                 |
# +---+----+--------------------------------+

函数from_json会将字符串行转换为(id, type, data)的元组. json.loads()将解析json字符串并返回一个字典,通过该字典我们可以生成并返回最后的元组.

Function from_json will transform the string row into a tuple of (id, type, data). json.loads() will parse the json string and return a dictionary through which we generate and return the final tuple.