如何在没有数据扫描的情况下覆盖 pyspark DataFrame 架构?

这个问题与 https://stackoverflow.com/a/37090151/1661491 有关。假设我有一个具有特定模式的 pyspark DataFrame,并且我想用我知道兼容的新模式覆盖该模式,我可以这样做:

df: DataFrame
new_schema = ...

df.rdd.toDF(schema=new_schema)

不幸的是,这会触发上面链接中描述的计算。有没有办法在元数据级别(或惰性)做到这一点,而不急切地触发计算或转换?

编辑,注意:

  • 架构可以任意复杂(嵌套等)
  • 新模式包括对描述、可空性和其他元数据的更新(类型更新的奖励积分)
  • 我想避免编写自定义查询表达式生成器,除非 Spark 中已经内置了一个可以生成基于 schema/ StructType 的查询
stack overflow How to overwrite pyspark DataFrame schema without data scan?
原文答案
author avatar

接受的答案

我自己最终对此进行了一些研究,我很好奇您对我的解决方法/ POC 的看法。参见 https://github.com/ravwojdyla/spark-schema-utils 。它转换表达式并更新属性。

假设我有两个模式,第一个没有任何元数据,让我们调用 schema_wo_metadata

{
  "fields": [
    {
      "metadata": {},
      "name": "oa",
      "nullable": false,
      "type": {
        "containsNull": true,
        "elementType": {
          "fields": [
            {
              "metadata": {},
              "name": "ia",
              "nullable": false,
              "type": "long"
            },
            {
              "metadata": {},
              "name": "ib",
              "nullable": false,
              "type": "string"
            }
          ],
          "type": "struct"
        },
        "type": "array"
      }
    },
    {
      "metadata": {},
      "name": "ob",
      "nullable": false,
      "type": "double"
    }
  ],
  "type": "struct"
}

第二个在内部 ( ia ) 字段和外部 ( ob ) 字段上有额外的元数据,我们称之为 schema_wi_metadata

{
  "fields": [
    {
      "metadata": {},
      "name": "oa",
      "nullable": false,
      "type": {
        "containsNull": true,
        "elementType": {
          "fields": [
            {
              "metadata": {
                "description": "this is ia desc"
              },
              "name": "ia",
              "nullable": false,
              "type": "long"
            },
            {
              "metadata": {},
              "name": "ib",
              "nullable": false,
              "type": "string"
            }
          ],
          "type": "struct"
        },
        "type": "array"
      }
    },
    {
      "metadata": {
        "description": "this is ob desc"
      },
      "name": "ob",
      "nullable": false,
      "type": "double"
    }
  ],
  "type": "struct"
}

现在假设我有一个具有 schema_wo_metadata 模式的数据集,并希望将模式与 schema_wi_metadata 交换:

from pyspark.sql import SparkSession
from pyspark.sql import Row, DataFrame
from pyspark.sql.types import StructType

# I assume these get generate/specified somewhere
schema_wo_metadata: StructType = ...
schema_wi_metadata: StructType = ...

# You need my extra package
spark = SparkSession.builder 
    .config("spark.jars.packages", "io.github.ravwojdyla:spark-schema-utils_2.12:0.1.0") 
    .getOrCreate()

# Dummy data with `schema_wo_metadata` schema:
df = spark.createDataFrame(data=[Row(oa=[Row(ia=0, ib=1)], ob=3.14),
                                 Row(oa=[Row(ia=2, ib=3)], ob=42.0)],
                           schema=schema_wo_metadata)

_jdf = spark._sc._jvm.io.github.ravwojdyla.SchemaUtils.update(df._jdf, schema.json())
new_df = DataFrame(_jdf, df.sql_ctx)

现在 new_dfschema_wi_metadata ,例如:

new_df.schema["oa"].dataType.elementType["ia"].metadata
# -> {'description': 'this is ia desc'}

有什么意见吗?


答案:

作者头像

仅供参考,快速更新,此功能已通过 https://github.com/apache/spark/pull/37011 添加到 Spark,并将在 3.4.0 版中发布。