Working with nested data
Transforming nested fields
Let's take a sample DataFrame with a deeply nested schema
>>> from spark_frame.examples.working_with_nested_data import _get_sample_employee_data
>>> from pyspark.sql import functions as f
>>> df = _get_sample_employee_data()
>>> df.printSchema()
root
|-- employee_id: integer (nullable = true)
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- skills: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- name: string (nullable = true)
| | |-- level: string (nullable = true)
|-- projects: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- name: string (nullable = true)
| | |-- client: string (nullable = true)
| | |-- tasks: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- name: string (nullable = true)
| | | | |-- description: string (nullable = true)
| | | | |-- status: string (nullable = true)
| | | | |-- estimate: long (nullable = true)
>>> df.show(truncate=False)
+-----------+----------+---+---------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|employee_id|name |age|skills |projects |
+-----------+----------+---+---------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1 |John Smith|30 |[{Java, expert}, {Python, intermediate}] |[{Project A, Acme Inc, [{Task 1, Implement feature X, completed, 8}, {Task 2, Fix bug Y, in progress, 5}]}, {Project B, Beta Corp, [{Task 3, Implement feature Z, pending, 13}, {Task 4, Improve performance, in progress, 3}]}] |
|2 |Jane Doe |25 |[{JavaScript, advanced}, {PHP, intermediate}]|[{Project C, Gamma Inc, [{Task 5, Implement feature W, completed, 20}, {Task 6, Fix bug V, in progress, 13}]}, {Project D, Delta Ltd, [{Task 7, Implement feature U, pending, 8}, {Task 8, Improve performance, in progress, 5}]}]|
+-----------+----------+---+---------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
As we can see, the schema has two top-level columns of type ARRAY (skills
and projects
),
and the projects
array contains a second level of repetition projects.tasks
.
Manipulating this DataFrame with Spark can quickly become really painful, and it is still quite simple compare to what engineers may encounter while working with entreprise-grade datasets.
The task
Let's say we want to enrich this DataFrame by performing the following changes:
- Change the
skills.level
to uppercase - Cast the
projects.tasks.estimate
to double
Without spark_frame.nested
Prior to Spark 3.0, we would have had only two choices:
- Flatten
skills
andprojects.tasks
into two separate DataFrames, perform the transformation then join the two DataFrames back together. - Write a custom Python UDF to perform the changes.
Option 1. is not a good solution, as it would be quite costly, and require several shuffle operations.
Option 2. is not great either, as the Python UDF would be slow and not very reusable. Had we been using Java or Scala, this might have been a better option already, as we would not incure the performance costs associated with Python UDFs, but this would still have required a lot of work to code the whole Employee data structure in Java/Scala before being able to manipulate it.
Since Spark 3.1.0, a third option is available, which consists in using pyspark.sql.functions.transform and pyspark.sql.Column.withField to achieve our goal.
However, the code that we need to write is quite complex:
>>> new_df = df.withColumn(
... "skills",
... f.transform(f.col("skills"), lambda skill: skill.withField("level", f.upper(skill["level"])))
... ).withColumn(
... "projects",
... f.transform(
... f.col("projects"),
... lambda project: project.withField(
... "tasks",
... f.transform(
... project["tasks"],
... lambda task: task.withField("estimate", task["estimate"].cast("DOUBLE"))),
... ),
... ),
... )
>>> new_df.printSchema()
root
|-- employee_id: integer (nullable = true)
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- skills: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- name: string (nullable = true)
| | |-- level: string (nullable = true)
|-- projects: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- name: string (nullable = true)
| | |-- client: string (nullable = true)
| | |-- tasks: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- name: string (nullable = true)
| | | | |-- description: string (nullable = true)
| | | | |-- status: string (nullable = true)
| | | | |-- estimate: double (nullable = true)
>>> new_df.select("employee_id", "name", "age", "skills").show(truncate=False)
+-----------+----------+---+---------------------------------------------+
|employee_id|name |age|skills |
+-----------+----------+---+---------------------------------------------+
|1 |John Smith|30 |[{Java, EXPERT}, {Python, INTERMEDIATE}] |
|2 |Jane Doe |25 |[{JavaScript, ADVANCED}, {PHP, INTERMEDIATE}]|
+-----------+----------+---+---------------------------------------------+
As we can see, the transformation worked: the schema is the same except projects.tasks.estimate
which is now
a double
, and skills.name
is now in uppercase. But hopefully we can agree that the code to achieve this
looks quite complex, and that it's complexity would grow even more if we tried to perform more transformations
at the same time.
With spark_frame.nested
The module spark_frame.nested
proposes several methods to help us deal
with nested data structure more easily.
First, let's use spark_frame.nested.print_schema
to get
a flat version of the DataFrame's schema:
>>> from spark_frame import nested
>>> nested.print_schema(df)
root
|-- employee_id: integer (nullable = true)
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- skills!.name: string (nullable = true)
|-- skills!.level: string (nullable = true)
|-- projects!.name: string (nullable = true)
|-- projects!.client: string (nullable = true)
|-- projects!.tasks!.name: string (nullable = true)
|-- projects!.tasks!.description: string (nullable = true)
|-- projects!.tasks!.status: string (nullable = true)
|-- projects!.tasks!.estimate: long (nullable = true)
As we can see, this is the same schema as before, but instead of being displayed as a tree, it is displayed
as a flat list where each field is represented with its full name. We can also see that fields of type ARRAY
can be easily identified thanks to the exclamation marks (!
) added after their names.
Once you get used to it, this flat representation is more compact and easier to read than the
tree representation, while conveying the same amount of information.
This notation will also help us performing the target transformations more easily. As a reminder, we want to:
- Change the
skills.level
to uppercase - Cast the
projects.tasks.estimate
to double
Using the spark_frame.nested.with_fields
method, this can be done like this:
>>> new_df = df.transform(nested.with_fields, {
... "skills!.level": lambda skill: f.upper(skill["level"]),
... "projects!.tasks!.estimate": lambda task: task["estimate"].cast("DOUBLE")
... })
>>> nested.print_schema(new_df)
root
|-- employee_id: integer (nullable = true)
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- skills!.name: string (nullable = true)
|-- skills!.level: string (nullable = true)
|-- projects!.name: string (nullable = true)
|-- projects!.client: string (nullable = true)
|-- projects!.tasks!.name: string (nullable = true)
|-- projects!.tasks!.description: string (nullable = true)
|-- projects!.tasks!.status: string (nullable = true)
|-- projects!.tasks!.estimate: double (nullable = true)
>>> new_df.select("employee_id", "name", "age", "skills").show(truncate=False)
+-----------+----------+---+---------------------------------------------+
|employee_id|name |age|skills |
+-----------+----------+---+---------------------------------------------+
|1 |John Smith|30 |[{Java, EXPERT}, {Python, INTERMEDIATE}] |
|2 |Jane Doe |25 |[{JavaScript, ADVANCED}, {PHP, INTERMEDIATE}]|
+-----------+----------+---+---------------------------------------------+
As we can see, we obtained the same result with a much simpler and cleaner code. Now let's explain what this code did:
The spark_frame.nested.with_fields
method is similar to
the pyspark.sql.DataFrame.withColumns
method, except that it works on nested fields inside
structs and arrays. We pass it a Dict(field_name, transformation)
indicating the expression we want to apply for each field. The transformation must be a higher order function:
a lambda expression or named function that takes a Column as argument and returns a Column. The column passed
to that function will represent the struct parent of the target field. For instance, when we write
"skills!.level": lambda skill: f.upper(skill["level"])
, the lambda function will be applied to each struct
element of the array skills
.
Info
The data for this example was generated by ChatGPT :-)
Methods used in this example
nested.print_schema
Print the DataFrame's flattened schema to the standard output.
- Structs are flattened with a
.
after their name. - Arrays are flattened with a
!
character after their name. - Maps are flattened with a
%key
and '%value' after their name.
Limitation: Dots, percents, and exclamation marks are not supported in field names
Given the syntax used, every method defined in the spark_frame.nested
module assumes that all field
names in DataFrames do not contain any dot .
, percent %
or exclamation mark !
.
This can be worked around using the transformation
spark_frame.transformations.transform_all_field_names
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df |
DataFrame
|
A Spark DataFrame |
required |
Examples:
>>> from pyspark.sql import SparkSession
>>> from spark_frame import nested
>>> spark = SparkSession.builder.appName("doctest").getOrCreate()
>>> df = spark.sql('''SELECT
... 1 as id,
... ARRAY(STRUCT(2 as a, ARRAY(STRUCT(3 as c, 4 as d)) as b, ARRAY(5, 6) as e)) as s1,
... STRUCT(7 as f) as s2,
... ARRAY(ARRAY(1, 2), ARRAY(3, 4)) as s3,
... ARRAY(ARRAY(STRUCT(1 as e, 2 as f)), ARRAY(STRUCT(3 as e, 4 as f))) as s4,
... MAP(STRUCT(1 as a), STRUCT(2 as b)) as m1
... ''')
>>> df.printSchema()
root
|-- id: integer (nullable = false)
|-- s1: array (nullable = false)
| |-- element: struct (containsNull = false)
| | |-- a: integer (nullable = false)
| | |-- b: array (nullable = false)
| | | |-- element: struct (containsNull = false)
| | | | |-- c: integer (nullable = false)
| | | | |-- d: integer (nullable = false)
| | |-- e: array (nullable = false)
| | | |-- element: integer (containsNull = false)
|-- s2: struct (nullable = false)
| |-- f: integer (nullable = false)
|-- s3: array (nullable = false)
| |-- element: array (containsNull = false)
| | |-- element: integer (containsNull = false)
|-- s4: array (nullable = false)
| |-- element: array (containsNull = false)
| | |-- element: struct (containsNull = false)
| | | |-- e: integer (nullable = false)
| | | |-- f: integer (nullable = false)
|-- m1: map (nullable = false)
| |-- key: struct
| | |-- a: integer (nullable = false)
| |-- value: struct (valueContainsNull = false)
| | |-- b: integer (nullable = false)
>>> nested.print_schema(df)
root
|-- id: integer (nullable = false)
|-- s1!.a: integer (nullable = false)
|-- s1!.b!.c: integer (nullable = false)
|-- s1!.b!.d: integer (nullable = false)
|-- s1!.e!: integer (nullable = false)
|-- s2.f: integer (nullable = false)
|-- s3!!: integer (nullable = false)
|-- s4!!.e: integer (nullable = false)
|-- s4!!.f: integer (nullable = false)
|-- m1%key.a: integer (nullable = false)
|-- m1%value.b: integer (nullable = false)
Source code in spark_frame/nested_impl/print_schema.py
nested.with_fields
Return a new DataFrame by adding or replacing (when they already exist) columns.
This method is similar to the DataFrame.withColumn method, with the extra capability of working on nested and repeated fields (structs and arrays).
The syntax for field names works as follows:
- "." is the separator for struct elements
- "!" must be appended at the end of fields that are repeated (arrays)
- Map keys are appended with
%key
- Map values are appended with
%value
The following types of transformation are allowed:
- String and column expressions can be used on any non-repeated field, even nested ones.
- When working on repeated fields, transformations must be expressed as higher order functions (e.g. lambda expressions). String and column expressions can be used on repeated fields as well, but their value will be repeated multiple times.
- When working on multiple levels of nested arrays, higher order functions may take multiple arguments, corresponding to each level of repetition (See Example 5.).
None
can also be used to represent the identity transformation, this is useful to select a field without changing and without having to repeat its name.
Limitation: Dots, percents, and exclamation marks are not supported in field names
Given the syntax used, every method defined in the spark_frame.nested
module assumes that all field
names in DataFrames do not contain any dot .
, percent %
or exclamation mark !
.
This can be worked around using the transformation
spark_frame.transformations.transform_all_field_names
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df |
DataFrame
|
A Spark DataFrame |
required |
fields |
Mapping[str, AnyKindOfTransformation]
|
A Dict(field_name, transformation_to_apply) |
required |
Returns:
Type | Description |
---|---|
DataFrame
|
A new DataFrame with the same fields as the input DataFrame, where the specified transformations have been |
DataFrame
|
applied to the corresponding fields. If a field name did not exist in the input DataFrame, |
DataFrame
|
it will be added to the output DataFrame. If it did exist, the original value will be replaced with the new one. |
Example 1: non-repeated fields
>>> from pyspark.sql import SparkSession
>>> from pyspark.sql import functions as f
>>> from spark_frame import nested
>>> spark = SparkSession.builder.appName("doctest").getOrCreate()
>>> df = spark.sql('''SELECT 1 as id, STRUCT(2 as a, 3 as b) as s''')
>>> nested.print_schema(df)
root
|-- id: integer (nullable = false)
|-- s.a: integer (nullable = false)
|-- s.b: integer (nullable = false)
>>> df.show()
+---+------+
| id| s|
+---+------+
| 1|{2, 3}|
+---+------+
Transformations on non-repeated fields may be expressed as a string representing a column name or a Column expression.
>>> new_df = nested.with_fields(df, {
... "s.id": "id", # column name (string)
... "s.c": f.col("s.a") + f.col("s.b") # Column expression
... })
>>> new_df.printSchema()
root
|-- id: integer (nullable = false)
|-- s: struct (nullable = false)
| |-- a: integer (nullable = false)
| |-- b: integer (nullable = false)
| |-- id: integer (nullable = false)
| |-- c: integer (nullable = false)
>>> new_df.show()
+---+------------+
| id| s|
+---+------------+
| 1|{2, 3, 1, 5}|
+---+------------+
Example 2: repeated fields
>>> df = spark.sql('''
... SELECT
... 1 as id,
... ARRAY(STRUCT(1 as a, STRUCT(2 as c) as b), STRUCT(3 as a, STRUCT(4 as c) as b)) as s
... ''')
>>> nested.print_schema(df)
root
|-- id: integer (nullable = false)
|-- s!.a: integer (nullable = false)
|-- s!.b.c: integer (nullable = false)
>>> df.show()
+---+--------------------+
| id| s|
+---+--------------------+
| 1|[{1, {2}}, {3, {4}}]|
+---+--------------------+
Transformations on repeated fields must be expressed as higher-order functions (lambda expressions or named functions). The value passed to this function will correspond to the last repeated element.
>>> new_df = df.transform(nested.with_fields, {
... "s!.b.d": lambda s: s["a"] + s["b"]["c"]}
... )
>>> nested.print_schema(new_df)
root
|-- id: integer (nullable = false)
|-- s!.a: integer (nullable = false)
|-- s!.b.c: integer (nullable = false)
|-- s!.b.d: integer (nullable = false)
>>> new_df.show(truncate=False)
+---+--------------------------+
|id |s |
+---+--------------------------+
|1 |[{1, {2, 3}}, {3, {4, 7}}]|
+---+--------------------------+
String and column expressions can be used on repeated fields as well, but their value will be repeated multiple times.
>>> df.transform(nested.with_fields, {
... "id": None,
... "s!.a": "id",
... "s!.b.c": f.lit(2)
... }).show(truncate=False)
+---+--------------------+
|id |s |
+---+--------------------+
|1 |[{1, {2}}, {1, {2}}]|
+---+--------------------+
Example 3: field repeated twice
>>> df = spark.sql('SELECT 1 as id, ARRAY(STRUCT(ARRAY(1, 2, 3) as e)) as s')
>>> nested.print_schema(df)
root
|-- id: integer (nullable = false)
|-- s!.e!: integer (nullable = false)
>>> df.show()
+---+-------------+
| id| s|
+---+-------------+
| 1|[{[1, 2, 3]}]|
+---+-------------+
Here, the lambda expression will be applied to the last repeated element e
.
>>> df.transform(nested.with_fields, {"s!.e!": lambda e : e.cast("DOUBLE")}).show()
+---+-------------------+
| id| s|
+---+-------------------+
| 1|[{[1.0, 2.0, 3.0]}]|
+---+-------------------+
Example 4: Dataframe with maps
>>> df = spark.sql('''
... SELECT
... 1 as id,
... MAP("a", STRUCT(2 as a, 3 as b)) as m1
... ''')
>>> nested.print_schema(df)
root
|-- id: integer (nullable = false)
|-- m1%key: string (nullable = false)
|-- m1%value.a: integer (nullable = false)
|-- m1%value.b: integer (nullable = false)
>>> df.show()
+---+-------------+
| id| m1|
+---+-------------+
| 1|{a -> {2, 3}}|
+---+-------------+
>>> new_df = df.transform(nested.with_fields, {
... "m1%key": lambda key : f.upper(key),
... "m1%value.a": lambda value : value["a"].cast("DOUBLE")
... })
>>> nested.print_schema(new_df)
root
|-- id: integer (nullable = false)
|-- m1%key: string (nullable = false)
|-- m1%value.a: double (nullable = false)
|-- m1%value.b: integer (nullable = false)
>>> new_df.show()
+---+---------------+
| id| m1|
+---+---------------+
| 1|{A -> {2.0, 3}}|
+---+---------------+
Example 5: Accessing multiple repetition levels
>>> df = spark.sql('''
... SELECT
... 1 as id,
... ARRAY(
... STRUCT(2 as average, ARRAY(1, 2, 3) as values),
... STRUCT(3 as average, ARRAY(1, 2, 3, 4, 5) as values)
... ) as s1
... ''')
>>> nested.print_schema(df)
root
|-- id: integer (nullable = false)
|-- s1!.average: integer (nullable = false)
|-- s1!.values!: integer (nullable = false)
>>> df.show(truncate=False)
+---+--------------------------------------+
|id |s1 |
+---+--------------------------------------+
|1 |[{2, [1, 2, 3]}, {3, [1, 2, 3, 4, 5]}]|
+---+--------------------------------------+
Here, the transformation applied to "s1!.values!" takes two arguments.
>>> new_df = df.transform(nested.with_fields, {
... "s1!.values!": lambda s1, value : value - s1["average"]
... })
>>> new_df.show(truncate=False)
+---+-----------------------------------------+
|id |s1 |
+---+-----------------------------------------+
|1 |[{2, [-1, 0, 1]}, {3, [-2, -1, 0, 1, 2]}]|
+---+-----------------------------------------+
Extra arguments can be added to the left for each repetition level, up to the root level.
>>> new_df = df.transform(nested.with_fields, {
... "s1!.values!": lambda root, s1, value : value - s1["average"] + root["id"]
... })
>>> new_df.show(truncate=False)
+---+---------------------------------------+
|id |s1 |
+---+---------------------------------------+
|1 |[{2, [0, 1, 2]}, {3, [-1, 0, 1, 2, 3]}]|
+---+---------------------------------------+
Source code in spark_frame/nested_impl/with_fields.py
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 |
|
Selecting nested fields
In this example, we will see how to select and rename specific elements in a nested data structure
>>> from spark_frame.examples.working_with_nested_data import _get_sample_employee_data
>>> from pyspark.sql import functions as f
>>> from spark_frame import nested
>>> df = _get_sample_employee_data()
>>> nested.print_schema(df)
root
|-- employee_id: integer (nullable = true)
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- skills!.name: string (nullable = true)
|-- skills!.level: string (nullable = true)
|-- projects!.name: string (nullable = true)
|-- projects!.client: string (nullable = true)
|-- projects!.tasks!.name: string (nullable = true)
|-- projects!.tasks!.description: string (nullable = true)
|-- projects!.tasks!.status: string (nullable = true)
|-- projects!.tasks!.estimate: long (nullable = true)
>>> df.show(truncate=False)
+-----------+----------+---+---------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|employee_id|name |age|skills |projects |
+-----------+----------+---+---------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1 |John Smith|30 |[{Java, expert}, {Python, intermediate}] |[{Project A, Acme Inc, [{Task 1, Implement feature X, completed, 8}, {Task 2, Fix bug Y, in progress, 5}]}, {Project B, Beta Corp, [{Task 3, Implement feature Z, pending, 13}, {Task 4, Improve performance, in progress, 3}]}] |
|2 |Jane Doe |25 |[{JavaScript, advanced}, {PHP, intermediate}]|[{Project C, Gamma Inc, [{Task 5, Implement feature W, completed, 20}, {Task 6, Fix bug V, in progress, 13}]}, {Project D, Delta Ltd, [{Task 7, Implement feature U, pending, 8}, {Task 8, Improve performance, in progress, 5}]}]|
+-----------+----------+---+---------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
The task
Let's say we want to select only the following fields, while keeping the same overall structure: - employee_id - projects.name - projects.tasks.name
Without spark_frame.nested
This forces us to do something quite complicated, using pyspark.sql.functions.transform
>>> new_df = df.select(
... "employee_id",
... f.transform("projects", lambda project:
... f.struct(project["name"].alias("name"), f.transform(project["tasks"], lambda task:
... f.struct(task["name"].alias("name"))
... ).alias("tasks"))
... ).alias("projects")
... )
>>> nested.print_schema(new_df)
root
|-- employee_id: integer (nullable = true)
|-- projects!.name: string (nullable = true)
|-- projects!.tasks!.name: string (nullable = true)
>>> new_df.show(truncate=False)
+-----------+----------------------------------------------------------------------+
|employee_id|projects |
+-----------+----------------------------------------------------------------------+
|1 |[{Project A, [{Task 1}, {Task 2}]}, {Project B, [{Task 3}, {Task 4}]}]|
|2 |[{Project C, [{Task 5}, {Task 6}]}, {Project D, [{Task 7}, {Task 8}]}]|
+-----------+----------------------------------------------------------------------+
With spark_frame.nested
Using spark_frame.nested.select
, we can easily obtain the exact
same result.
>>> new_df = df.transform(nested.select, {
... "employee_id": None,
... "projects!.name": None,
... "projects!.tasks!.name": None
... })
>>> nested.print_schema(new_df)
root
|-- employee_id: integer (nullable = true)
|-- projects!.name: string (nullable = true)
|-- projects!.tasks!.name: string (nullable = true)
>>> new_df.show(truncate=False)
+-----------+----------------------------------------------------------------------+
|employee_id|projects |
+-----------+----------------------------------------------------------------------+
|1 |[{Project A, [{Task 1}, {Task 2}]}, {Project B, [{Task 3}, {Task 4}]}]|
|2 |[{Project C, [{Task 5}, {Task 6}]}, {Project D, [{Task 7}, {Task 8}]}]|
+-----------+----------------------------------------------------------------------+
Here, None
is used to indicate that we don't want to perform any transformation on the column, be we could
also replace them with functions to perform transformations at the same time. For instance, we could pass
all the names to uppercase like this:
>>> df.transform(nested.select, {
... "employee_id": None,
... "projects!.name": lambda project: f.upper(project["name"]),
... "projects!.tasks!.name": lambda task: f.upper(task["name"])
... }).show(truncate=False)
+-----------+----------------------------------------------------------------------+
|employee_id|projects |
+-----------+----------------------------------------------------------------------+
|1 |[{PROJECT A, [{TASK 1}, {TASK 2}]}, {PROJECT B, [{TASK 3}, {TASK 4}]}]|
|2 |[{PROJECT C, [{TASK 5}, {TASK 6}]}, {PROJECT D, [{TASK 7}, {TASK 8}]}]|
+-----------+----------------------------------------------------------------------+
nested.print_schema
Print the DataFrame's flattened schema to the standard output.
- Structs are flattened with a
.
after their name. - Arrays are flattened with a
!
character after their name. - Maps are flattened with a
%key
and '%value' after their name.
Limitation: Dots, percents, and exclamation marks are not supported in field names
Given the syntax used, every method defined in the spark_frame.nested
module assumes that all field
names in DataFrames do not contain any dot .
, percent %
or exclamation mark !
.
This can be worked around using the transformation
spark_frame.transformations.transform_all_field_names
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df |
DataFrame
|
A Spark DataFrame |
required |
Examples:
>>> from pyspark.sql import SparkSession
>>> from spark_frame import nested
>>> spark = SparkSession.builder.appName("doctest").getOrCreate()
>>> df = spark.sql('''SELECT
... 1 as id,
... ARRAY(STRUCT(2 as a, ARRAY(STRUCT(3 as c, 4 as d)) as b, ARRAY(5, 6) as e)) as s1,
... STRUCT(7 as f) as s2,
... ARRAY(ARRAY(1, 2), ARRAY(3, 4)) as s3,
... ARRAY(ARRAY(STRUCT(1 as e, 2 as f)), ARRAY(STRUCT(3 as e, 4 as f))) as s4,
... MAP(STRUCT(1 as a), STRUCT(2 as b)) as m1
... ''')
>>> df.printSchema()
root
|-- id: integer (nullable = false)
|-- s1: array (nullable = false)
| |-- element: struct (containsNull = false)
| | |-- a: integer (nullable = false)
| | |-- b: array (nullable = false)
| | | |-- element: struct (containsNull = false)
| | | | |-- c: integer (nullable = false)
| | | | |-- d: integer (nullable = false)
| | |-- e: array (nullable = false)
| | | |-- element: integer (containsNull = false)
|-- s2: struct (nullable = false)
| |-- f: integer (nullable = false)
|-- s3: array (nullable = false)
| |-- element: array (containsNull = false)
| | |-- element: integer (containsNull = false)
|-- s4: array (nullable = false)
| |-- element: array (containsNull = false)
| | |-- element: struct (containsNull = false)
| | | |-- e: integer (nullable = false)
| | | |-- f: integer (nullable = false)
|-- m1: map (nullable = false)
| |-- key: struct
| | |-- a: integer (nullable = false)
| |-- value: struct (valueContainsNull = false)
| | |-- b: integer (nullable = false)
>>> nested.print_schema(df)
root
|-- id: integer (nullable = false)
|-- s1!.a: integer (nullable = false)
|-- s1!.b!.c: integer (nullable = false)
|-- s1!.b!.d: integer (nullable = false)
|-- s1!.e!: integer (nullable = false)
|-- s2.f: integer (nullable = false)
|-- s3!!: integer (nullable = false)
|-- s4!!.e: integer (nullable = false)
|-- s4!!.f: integer (nullable = false)
|-- m1%key.a: integer (nullable = false)
|-- m1%value.b: integer (nullable = false)
Source code in spark_frame/nested_impl/print_schema.py
nested.select
Project a set of expressions and returns a new DataFrame.
This method is similar to the DataFrame.select method, with the extra capability of working on nested and repeated fields (structs and arrays).
The syntax for field names works as follows:
- "." is the separator for struct elements
- "!" must be appended at the end of fields that are repeated (arrays)
- Map keys are appended with
%key
- Map values are appended with
%value
The following types of transformation are allowed:
- String and column expressions can be used on any non-repeated field, even nested ones.
- When working on repeated fields, transformations must be expressed as higher order functions (e.g. lambda expressions). String and column expressions can be used on repeated fields as well, but their value will be repeated multiple times.
- When working on multiple levels of nested arrays, higher order functions may take multiple arguments, corresponding to each level of repetition (See Example 5.).
None
can also be used to represent the identity transformation, this is useful to select a field without changing and without having to repeat its name.
Limitation: Dots, percents, and exclamation marks are not supported in field names
Given the syntax used, every method defined in the spark_frame.nested
module assumes that all field
names in DataFrames do not contain any dot .
, percent %
or exclamation mark !
.
This can be worked around using the transformation
spark_frame.transformations.transform_all_field_names
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df |
DataFrame
|
A Spark DataFrame |
required |
fields |
Mapping[str, ColumnTransformation]
|
A Dict(field_name, transformation_to_apply) |
required |
Returns:
Type | Description |
---|---|
DataFrame
|
A new DataFrame where only the specified field have been selected and the corresponding |
DataFrame
|
transformations were applied to each of them. |
Example 1: non-repeated fields
>>> from pyspark.sql import SparkSession
>>> from pyspark.sql import functions as f
>>> from spark_frame import nested
>>> spark = SparkSession.builder.appName("doctest").getOrCreate()
>>> df = spark.sql('''SELECT 1 as id, STRUCT(2 as a, 3 as b) as s''')
>>> df.printSchema()
root
|-- id: integer (nullable = false)
|-- s: struct (nullable = false)
| |-- a: integer (nullable = false)
| |-- b: integer (nullable = false)
>>> df.show()
+---+------+
| id| s|
+---+------+
| 1|{2, 3}|
+---+------+
Transformations on non-repeated fields may be expressed as a string representing a column name, a Column expression or None. (In this example the column "id" will be dropped because it was not selected)
>>> new_df = nested.select(df, {
... "s.a": "s.a", # Column name (string)
... "s.b": None, # None: use to keep a column without having to repeat its name
... "s.c": f.col("s.a") + f.col("s.b") # Column expression
... })
>>> new_df.printSchema()
root
|-- s: struct (nullable = false)
| |-- a: integer (nullable = false)
| |-- b: integer (nullable = false)
| |-- c: integer (nullable = false)
>>> new_df.show()
+---------+
| s|
+---------+
|{2, 3, 5}|
+---------+
Example 2: repeated fields
>>> df = spark.sql('SELECT 1 as id, ARRAY(STRUCT(1 as a, 2 as b), STRUCT(3 as a, 4 as b)) as s')
>>> nested.print_schema(df)
root
|-- id: integer (nullable = false)
|-- s!.a: integer (nullable = false)
|-- s!.b: integer (nullable = false)
>>> df.show()
+---+----------------+
| id| s|
+---+----------------+
| 1|[{1, 2}, {3, 4}]|
+---+----------------+
Transformations on repeated fields must be expressed as higher-order functions (lambda expressions or named functions). The value passed to this function will correspond to the last repeated element.
>>> df.transform(nested.select, {
... "s!.a": lambda s: s["a"],
... "s!.b": None,
... "s!.c": lambda s: s["a"] + s["b"]
... }).show(truncate=False)
+----------------------+
|s |
+----------------------+
|[{1, 2, 3}, {3, 4, 7}]|
+----------------------+
String and column expressions can be used on repeated fields as well, but their value will be repeated multiple times.
>>> df.transform(nested.select, {
... "id": None,
... "s!.a": "id",
... "s!.b": f.lit(2)
... }).show(truncate=False)
+---+----------------+
|id |s |
+---+----------------+
|1 |[{1, 2}, {1, 2}]|
+---+----------------+
Example 3: field repeated twice
>>> df = spark.sql('''
... SELECT
... 1 as id,
... ARRAY(STRUCT(ARRAY(1, 2, 3) as e)) as s1,
... ARRAY(STRUCT(ARRAY(4, 5, 6) as e)) as s2
... ''')
>>> nested.print_schema(df)
root
|-- id: integer (nullable = false)
|-- s1!.e!: integer (nullable = false)
|-- s2!.e!: integer (nullable = false)
>>> df.show()
+---+-------------+-------------+
| id| s1| s2|
+---+-------------+-------------+
| 1|[{[1, 2, 3]}]|[{[4, 5, 6]}]|
+---+-------------+-------------+
Here, the lambda expression will be applied to the last repeated element e
.
>>> new_df = df.transform(nested.select, {
... "s1!.e!": None,
... "s2!.e!": lambda e : e.cast("DOUBLE")
... })
>>> nested.print_schema(new_df)
root
|-- s1!.e!: integer (nullable = false)
|-- s2!.e!: double (nullable = false)
>>> new_df.show()
+-------------+-------------------+
| s1| s2|
+-------------+-------------------+
|[{[1, 2, 3]}]|[{[4.0, 5.0, 6.0]}]|
+-------------+-------------------+
Example 4: Dataframe with maps
>>> df = spark.sql('''
... SELECT
... 1 as id,
... MAP("a", STRUCT(2 as a, 3 as b)) as m1
... ''')
>>> nested.print_schema(df)
root
|-- id: integer (nullable = false)
|-- m1%key: string (nullable = false)
|-- m1%value.a: integer (nullable = false)
|-- m1%value.b: integer (nullable = false)
>>> df.show()
+---+-------------+
| id| m1|
+---+-------------+
| 1|{a -> {2, 3}}|
+---+-------------+
>>> new_df = df.transform(nested.select, {
... "id": None,
... "m1%key": lambda key : f.upper(key),
... "m1%value.a": lambda value : value["a"].cast("DOUBLE")
... })
>>> nested.print_schema(new_df)
root
|-- id: integer (nullable = false)
|-- m1%key: string (nullable = false)
|-- m1%value.a: double (nullable = false)
>>> new_df.show()
+---+------------+
| id| m1|
+---+------------+
| 1|{A -> {2.0}}|
+---+------------+
Example 5: Accessing multiple repetition levels
>>> df = spark.sql('''
... SELECT
... 1 as id,
... ARRAY(
... STRUCT(2 as average, ARRAY(1, 2, 3) as values),
... STRUCT(3 as average, ARRAY(1, 2, 3, 4, 5) as values)
... ) as s1
... ''')
>>> nested.print_schema(df)
root
|-- id: integer (nullable = false)
|-- s1!.average: integer (nullable = false)
|-- s1!.values!: integer (nullable = false)
>>> df.show(truncate=False)
+---+--------------------------------------+
|id |s1 |
+---+--------------------------------------+
|1 |[{2, [1, 2, 3]}, {3, [1, 2, 3, 4, 5]}]|
+---+--------------------------------------+
Here, the transformation applied to "s1!.values!" takes two arguments.
>>> new_df = df.transform(nested.select, {
... "id": None,
... "s1!.average": None,
... "s1!.values!": lambda s1, value : value - s1["average"]
... })
>>> new_df.show(truncate=False)
+---+-----------------------------------------+
|id |s1 |
+---+-----------------------------------------+
|1 |[{2, [-1, 0, 1]}, {3, [-2, -1, 0, 1, 2]}]|
+---+-----------------------------------------+
Extra arguments can be added to the left for each repetition level, up to the root level.
>>> new_df = df.transform(nested.select, {
... "id": None,
... "s1!.average": None,
... "s1!.values!": lambda root, s1, value : value - s1["average"] + root["id"]
... })
>>> new_df.show(truncate=False)
+---+---------------------------------------+
|id |s1 |
+---+---------------------------------------+
|1 |[{2, [0, 1, 2]}, {3, [-1, 0, 1, 2, 3]}]|
+---+---------------------------------------+
Source code in spark_frame/nested_impl/select_impl.py
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 |
|