Working with json
Extracting json values
Sometimes, a column in a data source contains raw json strings, and you want to extract this value before starting to understand it.
This already happened to me in several cases, such as: - Some automatic data capture tool that wraps a payload's raw json value into an Avro file. - A microservice event that follows a data contract but one of the column contains the raw json payload that this microservice exchanged with another external API.
Let's take a sample DataFrame with two raw json columns.
>>> from spark_frame.examples.working_with_json import _get_sample_data
>>> df = _get_sample_data()
>>> df.printSchema()
root
|-- call_id: integer (nullable = true)
|-- raw_input: string (nullable = true)
|-- raw_output: string (nullable = true)
>>> df.show(truncate=False)
+-------+-----------------------------------------------------------------------------+---------------------------------------------------------+
|call_id|raw_input |raw_output |
+-------+-----------------------------------------------------------------------------+---------------------------------------------------------+
|1 |{"model_name": "bot_detector", "model_version": 3, "model_args": "some data"}|{"model_score": 0.94654, "model_parameters": "some data"}|
|2 |{"model_name": "cat_finder", "model_version": 3, "model_args": "some data"} |{"model_score": 0.4234, "model_parameters": "some data"} |
+-------+-----------------------------------------------------------------------------+---------------------------------------------------------+
This DataFrame represents the logs of an application calling a machine learning model. Keeping the "call_id" is important to be able to link this call to other events that happen in the system, and we would like to analyze these logs with typed data.
Without spark-frame
Spark does provide a from_json
function that can parse a raw json column and convert it into a struct, but it does require the user to provide
the schema of the json column in advance, like this:
>>> from pyspark.sql import functions as f
>>> raw_input_schema = '{"fields":[{"name":"model_name","nullable":true,"type":"string"},{"name":"model_version","nullable":true,"type":"integer"},{"name":"model_args","nullable":true,"type":"string"}],"type":"struct"}'
>>> raw_output_schema = '{"fields":[{"name":"model_score","nullable":true,"type":"double"},{"name":"model_parameters","nullable":true,"type":"string"}],"type":"struct"}'
>>> df.withColumn(
... "raw_input", f.from_json("raw_input", raw_input_schema)
... ).withColumn(
... "raw_output", f.from_json("raw_output", raw_output_schema)
... ).show(truncate=False)
+-------+----------------------------+--------------------+
|call_id|raw_input |raw_output |
+-------+----------------------------+--------------------+
|1 |{bot_detector, 3, some data}|{0.94654, some data}|
|2 |{cat_finder, 3, some data} |{0.4234, some data} |
+-------+----------------------------+--------------------+
With spark-frame
While it does works, as you can see writing the schema can be quite heavy.
Also, for some reason, from_json
does not accept the "simpleString" format, unlike
the SparkSession.createDataFrame
method.
The first thing we can do to make things simpler is by using the method
spark_frame.schema_utils.schema_from_simple_string
like this :
>>> from spark_frame.schema_utils import schema_from_simple_string
>>> raw_input_schema = schema_from_simple_string("model_name: STRING, model_version: INT, model_args: STRING")
>>> raw_output_schema = schema_from_simple_string("model_score: DOUBLE, model_parameters: STRING")
>>> df.withColumn(
... "raw_input", f.from_json("raw_input", raw_input_schema)
... ).withColumn(
... "raw_output", f.from_json("raw_output", raw_output_schema)
... ).show(truncate=False)
+-------+----------------------------+--------------------+
|call_id|raw_input |raw_output |
+-------+----------------------------+--------------------+
|1 |{bot_detector, 3, some data}|{0.94654, some data}|
|2 |{cat_finder, 3, some data} |{0.4234, some data} |
+-------+----------------------------+--------------------+
But if we don't know the schema or if we know that the schema may evolve and we want to add
(or at least, detect) the new fields automatically, we can leverage Spark's automatic json schema inference
by using the method [spark_frame.transformations.parse_json_columns
]
[spark_frame.transformations_impl.parse_json_columns.parse_json_columns
] to infer automatically
the schema of these json columns.
>>> from spark_frame.transformations import parse_json_columns
>>> res = parse_json_columns(df, ["raw_input", "raw_output"])
>>> res.show(truncate=False)
+-------+----------------------------+--------------------+
|call_id|raw_input |raw_output |
+-------+----------------------------+--------------------+
|1 |{some data, bot_detector, 3}|{some data, 0.94654}|
|2 |{some data, cat_finder, 3} |{some data, 0.4234} |
+-------+----------------------------+--------------------+
>>> res.printSchema()
root
|-- call_id: integer (nullable = true)
|-- raw_input: struct (nullable = true)
| |-- model_args: string (nullable = true)
| |-- model_name: string (nullable = true)
| |-- model_version: long (nullable = true)
|-- raw_output: struct (nullable = true)
| |-- model_parameters: string (nullable = true)
| |-- model_score: double (nullable = true)
As we can see, the order of the field is different, this is because Spark's automatic inference will always sort the json field by names.
Methods used in this example
schema_utils.schema_from_simple_string
Parses the given data type string to a :class:DataType
. The data type string format equals
pyspark.sql.types.DataType.simpleString, except that the top level struct type can omit
the struct<>
.
This method requires the SparkSession to have already been instantiated.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
schema_string |
str
|
A simpleString representing a DataFrame schema. |
required |
Returns:
Type | Description |
---|---|
DataType
|
A DataType object representing the DataFrame schema. |
Raises:
Type | Description |
---|---|
AssertionError
|
If no SparkContext has been instantiated first. |
Examples:
>>> from pyspark.sql import SparkSession
>>> spark = SparkSession.builder.appName("doctest").getOrCreate()
>>> schema_from_simple_string("int ")
IntegerType()
>>> schema_from_simple_string("INT ")
IntegerType()
>>> schema_from_simple_string("a: byte, b: decimal( 16 , 8 ) ")
StructType([StructField('a', ByteType(), True), StructField('b', DecimalType(16,8), True)])
>>> schema_from_simple_string("a DOUBLE, b STRING")
StructType([StructField('a', DoubleType(), True), StructField('b', StringType(), True)])
>>> schema_from_simple_string("a: array< short>")
StructType([StructField('a', ArrayType(ShortType(), True), True)])
>>> schema_from_simple_string(" map<string , string > ")
MapType(StringType(), StringType(), True)
Error cases:
>>> schema_from_simple_string("blabla")
Traceback (most recent call last):
...
pyspark.sql.utils.ParseException:...
>>> schema_from_simple_string("a: int,")
Traceback (most recent call last):
...
pyspark.sql.utils.ParseException:...
>>> schema_from_simple_string("array<int")
Traceback (most recent call last):
...
pyspark.sql.utils.ParseException:...
>>> schema_from_simple_string("map<int, boolean>>")
Traceback (most recent call last):
...
pyspark.sql.utils.ParseException:...
Source code in spark_frame/schema_utils.py
spark_frame.transformations.parse_json_columns
Transform the specified columns containing json strings in the given DataFrame into structs containing the equivalent parsed information.
This method is similar to Spark's from_json
function, with one main difference: from_json
requires the user
to pass the expected json schema, while this method performs a first pass on the DataFrame to detect automatically
the json schema of each column.
By default, the output columns will have the same name as the input columns, but if you want to keep the input columns you can pass a dict(input_col_name, output_col_name) to specify different output column names.
Please be aware that automatic schema detection is not very robust, and while this method can be quite helpful for quick prototyping and data exploration, it is recommended to use a fixed schema and make sure the schema of the input json data is properly enforce, or at the very least use schema have a drift detection mechanism.
Warning
This method's performances are not optimal, has it has to perform a Python operation on the executor's side.
Warning
When you use this method on a column that is inside a struct (e.g. column "a.b.c"),
instead of replacing that column it will create a new column outside the struct (e.g. "a.b.c
")
(See Example 2).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df |
DataFrame
|
A Spark DataFrame |
required |
columns |
Union[str, List[str], Dict[str, str]]
|
A column name, list of column names, or dict(column_name, parsed_column_name) |
required |
Returns:
Type | Description |
---|---|
DataFrame
|
A new DataFrame |
Example 1
>>> from pyspark.sql import SparkSession
>>> spark = SparkSession.builder.appName("doctest").getOrCreate()
>>> df = spark.createDataFrame([
... (1, '[{"a": 1}, {"a": 2}]'),
... (1, '[{"a": 2}, {"a": 4}]'),
... (2, None)
... ], "id INT, json1 STRING"
... )
>>> df.show()
+---+--------------------+
| id| json1|
+---+--------------------+
| 1|[{"a": 1}, {"a": 2}]|
| 1|[{"a": 2}, {"a": 4}]|
| 2| NULL|
+---+--------------------+
>>> df.printSchema()
root
|-- id: integer (nullable = true)
|-- json1: string (nullable = true)
>>> parse_json_columns(df, 'json1').printSchema()
root
|-- id: integer (nullable = true)
|-- json1: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- a: long (nullable = true)
Example 2 : different output column name
>>> parse_json_columns(df, {'json1': 'parsed_json1'}).printSchema()
root
|-- id: integer (nullable = true)
|-- json1: string (nullable = true)
|-- parsed_json1: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- a: long (nullable = true)
Example 3 : json inside a struct
>>> df = spark.createDataFrame([
... (1, {'json1': '[{"a": 1}, {"a": 2}]'}),
... (1, {'json1': '[{"a": 2}, {"a": 4}]'}),
... (2, None)
... ], "id INT, struct STRUCT<json1: STRING>"
... )
>>> df.show(10, False)
+---+----------------------+
|id |struct |
+---+----------------------+
|1 |{[{"a": 1}, {"a": 2}]}|
|1 |{[{"a": 2}, {"a": 4}]}|
|2 |NULL |
+---+----------------------+
>>> df.printSchema()
root
|-- id: integer (nullable = true)
|-- struct: struct (nullable = true)
| |-- json1: string (nullable = true)
>>> res = parse_json_columns(df, 'struct.json1')
>>> res.printSchema()
root
|-- id: integer (nullable = true)
|-- struct: struct (nullable = true)
| |-- json1: string (nullable = true)
|-- struct.json1: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- a: long (nullable = true)
>>> res.show(10, False)
+---+----------------------+------------+
|id |struct |struct.json1|
+---+----------------------+------------+
|1 |{[{"a": 1}, {"a": 2}]}|[{1}, {2}] |
|1 |{[{"a": 2}, {"a": 4}]}|[{2}, {4}] |
|2 |NULL |NULL |
+---+----------------------+------------+
Source code in spark_frame/transformations_impl/parse_json_columns.py
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
|