Skip to content

Working with json

Extracting json values

Sometimes, a column in a data source contains raw json strings, and you want to extract this value before starting to understand it.

This already happened to me in several cases, such as: - Some automatic data capture tool that wraps a payload's raw json value into an Avro file. - A microservice event that follows a data contract but one of the column contains the raw json payload that this microservice exchanged with another external API.

Let's take a sample DataFrame with two raw json columns.

>>> from spark_frame.examples.working_with_json import _get_sample_data
>>> df = _get_sample_data()
>>> df.printSchema()
root
 |-- call_id: integer (nullable = true)
 |-- raw_input: string (nullable = true)
 |-- raw_output: string (nullable = true)

>>> df.show(truncate=False)
+-------+-----------------------------------------------------------------------------+---------------------------------------------------------+
|call_id|raw_input                                                                    |raw_output                                               |
+-------+-----------------------------------------------------------------------------+---------------------------------------------------------+
|1      |{"model_name": "bot_detector", "model_version": 3, "model_args": "some data"}|{"model_score": 0.94654, "model_parameters": "some data"}|
|2      |{"model_name": "cat_finder", "model_version": 3, "model_args": "some data"}  |{"model_score": 0.4234, "model_parameters": "some data"} |
+-------+-----------------------------------------------------------------------------+---------------------------------------------------------+

This DataFrame represents the logs of an application calling a machine learning model. Keeping the "call_id" is important to be able to link this call to other events that happen in the system, and we would like to analyze these logs with typed data.

Without spark-frame

Spark does provide a from_json function that can parse a raw json column and convert it into a struct, but it does require the user to provide the schema of the json column in advance, like this:

>>> from pyspark.sql import functions as f
>>> raw_input_schema = '{"fields":[{"name":"model_name","nullable":true,"type":"string"},{"name":"model_version","nullable":true,"type":"integer"},{"name":"model_args","nullable":true,"type":"string"}],"type":"struct"}'
>>> raw_output_schema = '{"fields":[{"name":"model_score","nullable":true,"type":"double"},{"name":"model_parameters","nullable":true,"type":"string"}],"type":"struct"}'
>>> df.withColumn(
...     "raw_input", f.from_json("raw_input", raw_input_schema)
... ).withColumn(
...     "raw_output", f.from_json("raw_output", raw_output_schema)
... ).show(truncate=False)
+-------+----------------------------+--------------------+
|call_id|raw_input                   |raw_output          |
+-------+----------------------------+--------------------+
|1      |{bot_detector, 3, some data}|{0.94654, some data}|
|2      |{cat_finder, 3, some data}  |{0.4234, some data} |
+-------+----------------------------+--------------------+

With spark-frame

While it does works, as you can see writing the schema can be quite heavy. Also, for some reason, from_json does not accept the "simpleString" format, unlike the SparkSession.createDataFrame method. The first thing we can do to make things simpler is by using the method spark_frame.schema_utils.schema_from_simple_string like this :

>>> from spark_frame.schema_utils import schema_from_simple_string
>>> raw_input_schema = schema_from_simple_string("model_name: STRING, model_version: INT, model_args: STRING")
>>> raw_output_schema = schema_from_simple_string("model_score: DOUBLE, model_parameters: STRING")
>>> df.withColumn(
...     "raw_input", f.from_json("raw_input", raw_input_schema)
... ).withColumn(
...     "raw_output", f.from_json("raw_output", raw_output_schema)
... ).show(truncate=False)
+-------+----------------------------+--------------------+
|call_id|raw_input                   |raw_output          |
+-------+----------------------------+--------------------+
|1      |{bot_detector, 3, some data}|{0.94654, some data}|
|2      |{cat_finder, 3, some data}  |{0.4234, some data} |
+-------+----------------------------+--------------------+

But if we don't know the schema or if we know that the schema may evolve and we want to add (or at least, detect) the new fields automatically, we can leverage Spark's automatic json schema inference by using the method [spark_frame.transformations.parse_json_columns] [spark_frame.transformations_impl.parse_json_columns.parse_json_columns] to infer automatically the schema of these json columns.

>>> from spark_frame.transformations import parse_json_columns
>>> res = parse_json_columns(df, ["raw_input", "raw_output"])
>>> res.show(truncate=False)
+-------+----------------------------+--------------------+
|call_id|raw_input                   |raw_output          |
+-------+----------------------------+--------------------+
|1      |{some data, bot_detector, 3}|{some data, 0.94654}|
|2      |{some data, cat_finder, 3}  |{some data, 0.4234} |
+-------+----------------------------+--------------------+

>>> res.printSchema()
root
 |-- call_id: integer (nullable = true)
 |-- raw_input: struct (nullable = true)
 |    |-- model_args: string (nullable = true)
 |    |-- model_name: string (nullable = true)
 |    |-- model_version: long (nullable = true)
 |-- raw_output: struct (nullable = true)
 |    |-- model_parameters: string (nullable = true)
 |    |-- model_score: double (nullable = true)

As we can see, the order of the field is different, this is because Spark's automatic inference will always sort the json field by names.

Methods used in this example

schema_utils.schema_from_simple_string

Parses the given data type string to a :class:DataType. The data type string format equals pyspark.sql.types.DataType.simpleString, except that the top level struct type can omit the struct<>. This method requires the SparkSession to have already been instantiated.

Parameters:

Name Type Description Default
schema_string str

A simpleString representing a DataFrame schema.

required

Returns:

Type Description
DataType

A DataType object representing the DataFrame schema.

Raises:

Type Description
AssertionError

If no SparkContext has been instantiated first.

Examples:

>>> from pyspark.sql import SparkSession
>>> spark = SparkSession.builder.appName("doctest").getOrCreate()
>>> schema_from_simple_string("int ")
IntegerType()
>>> schema_from_simple_string("INT ")
IntegerType()
>>> schema_from_simple_string("a: byte, b: decimal(  16 , 8   ) ")
StructType([StructField('a', ByteType(), True), StructField('b', DecimalType(16,8), True)])
>>> schema_from_simple_string("a DOUBLE, b STRING")
StructType([StructField('a', DoubleType(), True), StructField('b', StringType(), True)])
>>> schema_from_simple_string("a: array< short>")
StructType([StructField('a', ArrayType(ShortType(), True), True)])
>>> schema_from_simple_string(" map<string , string > ")
MapType(StringType(), StringType(), True)

Error cases:

>>> schema_from_simple_string("blabla")
Traceback (most recent call last):
  ...
pyspark.sql.utils.ParseException:...
>>> schema_from_simple_string("a: int,")
Traceback (most recent call last):
  ...
pyspark.sql.utils.ParseException:...
>>> schema_from_simple_string("array<int")
Traceback (most recent call last):
  ...
pyspark.sql.utils.ParseException:...
>>> schema_from_simple_string("map<int, boolean>>")
Traceback (most recent call last):
  ...
pyspark.sql.utils.ParseException:...
Source code in spark_frame/schema_utils.py
def schema_from_simple_string(schema_string: str) -> DataType:
    """Parses the given data type string to a :class:`DataType`. The data type string format equals
    [pyspark.sql.types.DataType.simpleString][], except that the top level struct type can omit
    the ``struct<>``.
    This method requires the SparkSession to have already been instantiated.

    Args:
        schema_string: A simpleString representing a DataFrame schema.

    Returns:
        A DataType object representing the DataFrame schema.

    Raises:
        AssertionError: If no SparkContext has been instantiated first.

    Examples:
        >>> from pyspark.sql import SparkSession
        >>> spark = SparkSession.builder.appName("doctest").getOrCreate()
        >>> schema_from_simple_string("int ")
        IntegerType()
        >>> schema_from_simple_string("INT ")
        IntegerType()
        >>> schema_from_simple_string("a: byte, b: decimal(  16 , 8   ) ")
        StructType([StructField('a', ByteType(), True), StructField('b', DecimalType(16,8), True)])
        >>> schema_from_simple_string("a DOUBLE, b STRING")
        StructType([StructField('a', DoubleType(), True), StructField('b', StringType(), True)])
        >>> schema_from_simple_string("a: array< short>")
        StructType([StructField('a', ArrayType(ShortType(), True), True)])
        >>> schema_from_simple_string(" map<string , string > ")
        MapType(StringType(), StringType(), True)

        **Error cases:**

        >>> schema_from_simple_string("blabla") # doctest: +IGNORE_EXCEPTION_DETAIL
        Traceback (most recent call last):
          ...
        pyspark.sql.utils.ParseException:...
        >>> schema_from_simple_string("a: int,") # doctest: +IGNORE_EXCEPTION_DETAIL
        Traceback (most recent call last):
          ...
        pyspark.sql.utils.ParseException:...
        >>> schema_from_simple_string("array<int") # doctest: +IGNORE_EXCEPTION_DETAIL
        Traceback (most recent call last):
          ...
        pyspark.sql.utils.ParseException:...
        >>> schema_from_simple_string("map<int, boolean>>") # doctest: +IGNORE_EXCEPTION_DETAIL
        Traceback (most recent call last):
          ...
        pyspark.sql.utils.ParseException:...

    """
    sc = SparkContext._active_spark_context  # noqa: SLF001
    assert_true(sc is not None, "No SparkContext has been instantiated yet")
    return _parse_datatype_string(schema_string)
spark_frame.transformations.parse_json_columns

Transform the specified columns containing json strings in the given DataFrame into structs containing the equivalent parsed information.

This method is similar to Spark's from_json function, with one main difference: from_json requires the user to pass the expected json schema, while this method performs a first pass on the DataFrame to detect automatically the json schema of each column.

By default, the output columns will have the same name as the input columns, but if you want to keep the input columns you can pass a dict(input_col_name, output_col_name) to specify different output column names.

Please be aware that automatic schema detection is not very robust, and while this method can be quite helpful for quick prototyping and data exploration, it is recommended to use a fixed schema and make sure the schema of the input json data is properly enforce, or at the very least use schema have a drift detection mechanism.

Warning

This method's performances are not optimal, has it has to perform a Python operation on the executor's side.

Warning

When you use this method on a column that is inside a struct (e.g. column "a.b.c"), instead of replacing that column it will create a new column outside the struct (e.g. "a.b.c") (See Example 2).

Parameters:

Name Type Description Default
df DataFrame

A Spark DataFrame

required
columns Union[str, List[str], Dict[str, str]]

A column name, list of column names, or dict(column_name, parsed_column_name)

required

Returns:

Type Description
DataFrame

A new DataFrame

Example 1

>>> from pyspark.sql import SparkSession
>>> spark = SparkSession.builder.appName("doctest").getOrCreate()
>>> df = spark.createDataFrame([
...         (1, '[{"a": 1}, {"a": 2}]'),
...         (1, '[{"a": 2}, {"a": 4}]'),
...         (2, None)
...     ], "id INT, json1 STRING"
... )
>>> df.show()
+---+--------------------+
| id|               json1|
+---+--------------------+
|  1|[{"a": 1}, {"a": 2}]|
|  1|[{"a": 2}, {"a": 4}]|
|  2|                NULL|
+---+--------------------+

>>> df.printSchema()
root
 |-- id: integer (nullable = true)
 |-- json1: string (nullable = true)

>>> parse_json_columns(df, 'json1').printSchema()
root
 |-- id: integer (nullable = true)
 |-- json1: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- a: long (nullable = true)

Example 2 : different output column name

>>> parse_json_columns(df, {'json1': 'parsed_json1'}).printSchema()
root
 |-- id: integer (nullable = true)
 |-- json1: string (nullable = true)
 |-- parsed_json1: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- a: long (nullable = true)

Example 3 : json inside a struct

>>> df = spark.createDataFrame([
...         (1, {'json1': '[{"a": 1}, {"a": 2}]'}),
...         (1, {'json1': '[{"a": 2}, {"a": 4}]'}),
...         (2, None)
...     ], "id INT, struct STRUCT<json1: STRING>"
... )
>>> df.show(10, False)
+---+----------------------+
|id |struct                |
+---+----------------------+
|1  |{[{"a": 1}, {"a": 2}]}|
|1  |{[{"a": 2}, {"a": 4}]}|
|2  |NULL                  |
+---+----------------------+

>>> df.printSchema()
root
 |-- id: integer (nullable = true)
 |-- struct: struct (nullable = true)
 |    |-- json1: string (nullable = true)

>>> res = parse_json_columns(df, 'struct.json1')
>>> res.printSchema()
root
 |-- id: integer (nullable = true)
 |-- struct: struct (nullable = true)
 |    |-- json1: string (nullable = true)
 |-- struct.json1: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- a: long (nullable = true)

>>> res.show(10, False)
+---+----------------------+------------+
|id |struct                |struct.json1|
+---+----------------------+------------+
|1  |{[{"a": 1}, {"a": 2}]}|[{1}, {2}]  |
|1  |{[{"a": 2}, {"a": 4}]}|[{2}, {4}]  |
|2  |NULL                  |NULL        |
+---+----------------------+------------+
Source code in spark_frame/transformations_impl/parse_json_columns.py
def parse_json_columns(df: DataFrame, columns: Union[str, List[str], Dict[str, str]]) -> DataFrame:
    """Transform the specified columns containing json strings in the given DataFrame into structs containing
    the equivalent parsed information.

    This method is similar to Spark's `from_json` function, with one main difference: `from_json` requires the user
    to pass the expected json schema, while this method performs a first pass on the DataFrame to detect automatically
    the json schema of each column.

    By default, the output columns will have the same name as the input columns, but if you want to keep the input
    columns you can pass a dict(input_col_name, output_col_name) to specify different output column names.

    Please be aware that automatic schema detection is not very robust, and while this method can be quite helpful
    for quick prototyping and data exploration, it is recommended to use a fixed schema and make sure the schema
    of the input json data is properly enforce, or at the very least use schema have a drift detection mechanism.

    !!! warning
        This method's performances are not optimal, has it has to perform a Python operation on the executor's side.

    !!! warning
        When you use this method on a column that is inside a struct (e.g. column "a.b.c"),
        instead of replacing that column it will create a new column outside the struct (e.g. "`a.b.c`")
        (See Example 2).

    Args:
        df: A Spark DataFrame
        columns: A column name, list of column names, or dict(column_name, parsed_column_name)

    Returns:
        A new DataFrame

    Examples: Example 1
        >>> from pyspark.sql import SparkSession
        >>> spark = SparkSession.builder.appName("doctest").getOrCreate()
        >>> df = spark.createDataFrame([
        ...         (1, '[{"a": 1}, {"a": 2}]'),
        ...         (1, '[{"a": 2}, {"a": 4}]'),
        ...         (2, None)
        ...     ], "id INT, json1 STRING"
        ... )
        >>> df.show()
        +---+--------------------+
        | id|               json1|
        +---+--------------------+
        |  1|[{"a": 1}, {"a": 2}]|
        |  1|[{"a": 2}, {"a": 4}]|
        |  2|                NULL|
        +---+--------------------+
        <BLANKLINE>
        >>> df.printSchema()
        root
         |-- id: integer (nullable = true)
         |-- json1: string (nullable = true)
        <BLANKLINE>
        >>> parse_json_columns(df, 'json1').printSchema()
        root
         |-- id: integer (nullable = true)
         |-- json1: array (nullable = true)
         |    |-- element: struct (containsNull = true)
         |    |    |-- a: long (nullable = true)
        <BLANKLINE>

    Examples: Example 2 : different output column name
        >>> parse_json_columns(df, {'json1': 'parsed_json1'}).printSchema()
        root
         |-- id: integer (nullable = true)
         |-- json1: string (nullable = true)
         |-- parsed_json1: array (nullable = true)
         |    |-- element: struct (containsNull = true)
         |    |    |-- a: long (nullable = true)
        <BLANKLINE>

    Examples: Example 3 : json inside a struct
        >>> df = spark.createDataFrame([
        ...         (1, {'json1': '[{"a": 1}, {"a": 2}]'}),
        ...         (1, {'json1': '[{"a": 2}, {"a": 4}]'}),
        ...         (2, None)
        ...     ], "id INT, struct STRUCT<json1: STRING>"
        ... )
        >>> df.show(10, False)
        +---+----------------------+
        |id |struct                |
        +---+----------------------+
        |1  |{[{"a": 1}, {"a": 2}]}|
        |1  |{[{"a": 2}, {"a": 4}]}|
        |2  |NULL                  |
        +---+----------------------+
        <BLANKLINE>
        >>> df.printSchema()
        root
         |-- id: integer (nullable = true)
         |-- struct: struct (nullable = true)
         |    |-- json1: string (nullable = true)
        <BLANKLINE>
        >>> res = parse_json_columns(df, 'struct.json1')
        >>> res.printSchema()
        root
         |-- id: integer (nullable = true)
         |-- struct: struct (nullable = true)
         |    |-- json1: string (nullable = true)
         |-- struct.json1: array (nullable = true)
         |    |-- element: struct (containsNull = true)
         |    |    |-- a: long (nullable = true)
        <BLANKLINE>
        >>> res.show(10, False)
        +---+----------------------+------------+
        |id |struct                |struct.json1|
        +---+----------------------+------------+
        |1  |{[{"a": 1}, {"a": 2}]}|[{1}, {2}]  |
        |1  |{[{"a": 2}, {"a": 4}]}|[{2}, {4}]  |
        |2  |NULL                  |NULL        |
        +---+----------------------+------------+
        <BLANKLINE>

    """
    if isinstance(columns, str):
        columns = [columns]
    if isinstance(columns, list):
        columns = {col: col for col in columns}

    wrapped_df = __wrap_json_columns(df, columns)
    schema_per_col = __infer_schema_per_column(wrapped_df, list(columns.values()))
    res = __parse_json_columns(wrapped_df, schema_per_col)
    return res