Skip to content

Working with nested data

Transforming nested fields

Let's take a sample DataFrame with a deeply nested schema

>>> from spark_frame.examples.working_with_nested_data import _get_sample_employee_data
>>> from pyspark.sql import functions as f
>>> df = _get_sample_employee_data()
>>> df.printSchema()
root
 |-- employee_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- skills: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- level: string (nullable = true)
 |-- projects: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- client: string (nullable = true)
 |    |    |-- tasks: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- name: string (nullable = true)
 |    |    |    |    |-- description: string (nullable = true)
 |    |    |    |    |-- status: string (nullable = true)
 |    |    |    |    |-- estimate: long (nullable = true)

>>> df.show(truncate=False)
+-----------+----------+---+---------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|employee_id|name      |age|skills                                       |projects                                                                                                                                                                                                                          |
+-----------+----------+---+---------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1          |John Smith|30 |[{Java, expert}, {Python, intermediate}]     |[{Project A, Acme Inc, [{Task 1, Implement feature X, completed, 8}, {Task 2, Fix bug Y, in progress, 5}]}, {Project B, Beta Corp, [{Task 3, Implement feature Z, pending, 13}, {Task 4, Improve performance, in progress, 3}]}]  |
|2          |Jane Doe  |25 |[{JavaScript, advanced}, {PHP, intermediate}]|[{Project C, Gamma Inc, [{Task 5, Implement feature W, completed, 20}, {Task 6, Fix bug V, in progress, 13}]}, {Project D, Delta Ltd, [{Task 7, Implement feature U, pending, 8}, {Task 8, Improve performance, in progress, 5}]}]|
+-----------+----------+---+---------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

As we can see, the schema has two top-level columns of type ARRAY (skills and projects), and the projects array contains a second level of repetition projects.tasks.

Manipulating this DataFrame with Spark can quickly become really painful, and it is still quite simple compare to what engineers may encounter while working with entreprise-grade datasets.

The task

Let's say we want to enrich this DataFrame by performing the following changes:

  • Change the skills.level to uppercase
  • Cast the projects.tasks.estimate to double

Without spark_frame.nested

Prior to Spark 3.0, we would have had only two choices:

  1. Flatten skills and projects.tasks into two separate DataFrames, perform the transformation then join the two DataFrames back together.
  2. Write a custom Python UDF to perform the changes.

Option 1. is not a good solution, as it would be quite costly, and require several shuffle operations.

Option 2. is not great either, as the Python UDF would be slow and not very reusable. Had we been using Java or Scala, this might have been a better option already, as we would not incure the performance costs associated with Python UDFs, but this would still have required a lot of work to code the whole Employee data structure in Java/Scala before being able to manipulate it.

Since Spark 3.1.0, a third option is available, which consists in using pyspark.sql.functions.transform and pyspark.sql.Column.withField to achieve our goal.

However, the code that we need to write is quite complex:

>>> new_df = df.withColumn(
...     "skills",
...     f.transform(f.col("skills"), lambda skill: skill.withField("level", f.upper(skill["level"])))
... ).withColumn(
...     "projects",
...     f.transform(
...         f.col("projects"),
...         lambda project: project.withField(
...             "tasks",
...             f.transform(
...                 project["tasks"],
...                 lambda task: task.withField("estimate", task["estimate"].cast("DOUBLE"))),
...         ),
...     ),
... )
>>> new_df.printSchema()
root
 |-- employee_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- skills: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- level: string (nullable = true)
 |-- projects: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- client: string (nullable = true)
 |    |    |-- tasks: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- name: string (nullable = true)
 |    |    |    |    |-- description: string (nullable = true)
 |    |    |    |    |-- status: string (nullable = true)
 |    |    |    |    |-- estimate: double (nullable = true)

>>> new_df.select("employee_id", "name", "age", "skills").show(truncate=False)
+-----------+----------+---+---------------------------------------------+
|employee_id|name      |age|skills                                       |
+-----------+----------+---+---------------------------------------------+
|1          |John Smith|30 |[{Java, EXPERT}, {Python, INTERMEDIATE}]     |
|2          |Jane Doe  |25 |[{JavaScript, ADVANCED}, {PHP, INTERMEDIATE}]|
+-----------+----------+---+---------------------------------------------+

As we can see, the transformation worked: the schema is the same except projects.tasks.estimate which is now a double, and skills.name is now in uppercase. But hopefully we can agree that the code to achieve this looks quite complex, and that it's complexity would grow even more if we tried to perform more transformations at the same time.

With spark_frame.nested

The module spark_frame.nested proposes several methods to help us deal with nested data structure more easily. First, let's use spark_frame.nested.print_schema to get a flat version of the DataFrame's schema:

>>> from spark_frame import nested
>>> nested.print_schema(df)
root
 |-- employee_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- skills!.name: string (nullable = true)
 |-- skills!.level: string (nullable = true)
 |-- projects!.name: string (nullable = true)
 |-- projects!.client: string (nullable = true)
 |-- projects!.tasks!.name: string (nullable = true)
 |-- projects!.tasks!.description: string (nullable = true)
 |-- projects!.tasks!.status: string (nullable = true)
 |-- projects!.tasks!.estimate: long (nullable = true)

As we can see, this is the same schema as before, but instead of being displayed as a tree, it is displayed as a flat list where each field is represented with its full name. We can also see that fields of type ARRAY can be easily identified thanks to the exclamation marks (!) added after their names. Once you get used to it, this flat representation is more compact and easier to read than the tree representation, while conveying the same amount of information.

This notation will also help us performing the target transformations more easily. As a reminder, we want to:

  • Change the skills.level to uppercase
  • Cast the projects.tasks.estimate to double

Using the spark_frame.nested.with_fields method, this can be done like this:

>>> new_df = df.transform(nested.with_fields, {
...     "skills!.level": lambda skill: f.upper(skill["level"]),
...     "projects!.tasks!.estimate": lambda task: task["estimate"].cast("DOUBLE")
... })
>>> nested.print_schema(new_df)
root
 |-- employee_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- skills!.name: string (nullable = true)
 |-- skills!.level: string (nullable = true)
 |-- projects!.name: string (nullable = true)
 |-- projects!.client: string (nullable = true)
 |-- projects!.tasks!.name: string (nullable = true)
 |-- projects!.tasks!.description: string (nullable = true)
 |-- projects!.tasks!.status: string (nullable = true)
 |-- projects!.tasks!.estimate: double (nullable = true)

>>> new_df.select("employee_id", "name", "age", "skills").show(truncate=False)
+-----------+----------+---+---------------------------------------------+
|employee_id|name      |age|skills                                       |
+-----------+----------+---+---------------------------------------------+
|1          |John Smith|30 |[{Java, EXPERT}, {Python, INTERMEDIATE}]     |
|2          |Jane Doe  |25 |[{JavaScript, ADVANCED}, {PHP, INTERMEDIATE}]|
+-----------+----------+---+---------------------------------------------+

As we can see, we obtained the same result with a much simpler and cleaner code. Now let's explain what this code did:

The spark_frame.nested.with_fields method is similar to the pyspark.sql.DataFrame.withColumns method, except that it works on nested fields inside structs and arrays. We pass it a Dict(field_name, transformation) indicating the expression we want to apply for each field. The transformation must be a higher order function: a lambda expression or named function that takes a Column as argument and returns a Column. The column passed to that function will represent the struct parent of the target field. For instance, when we write "skills!.level": lambda skill: f.upper(skill["level"]), the lambda function will be applied to each struct element of the array skills.

Info

The data for this example was generated by ChatGPT :-)

Methods used in this example

nested.print_schema

Print the DataFrame's flattened schema to the standard output.

  • Structs are flattened with a . after their name.
  • Arrays are flattened with a ! character after their name.
  • Maps are flattened with a %key and '%value' after their name.

Limitation: Dots, percents, and exclamation marks are not supported in field names

Given the syntax used, every method defined in the spark_frame.nested module assumes that all field names in DataFrames do not contain any dot ., percent % or exclamation mark !. This can be worked around using the transformation spark_frame.transformations.transform_all_field_names.

Parameters:

Name Type Description Default
df DataFrame

A Spark DataFrame

required

Examples:

>>> from pyspark.sql import SparkSession
>>> from spark_frame import nested
>>> spark = SparkSession.builder.appName("doctest").getOrCreate()
>>> df = spark.sql('''SELECT
...     1 as id,
...     ARRAY(STRUCT(2 as a, ARRAY(STRUCT(3 as c, 4 as d)) as b, ARRAY(5, 6) as e)) as s1,
...     STRUCT(7 as f) as s2,
...     ARRAY(ARRAY(1, 2), ARRAY(3, 4)) as s3,
...     ARRAY(ARRAY(STRUCT(1 as e, 2 as f)), ARRAY(STRUCT(3 as e, 4 as f))) as s4,
...     MAP(STRUCT(1 as a), STRUCT(2 as b)) as m1
... ''')
>>> df.printSchema()
root
 |-- id: integer (nullable = false)
 |-- s1: array (nullable = false)
 |    |-- element: struct (containsNull = false)
 |    |    |-- a: integer (nullable = false)
 |    |    |-- b: array (nullable = false)
 |    |    |    |-- element: struct (containsNull = false)
 |    |    |    |    |-- c: integer (nullable = false)
 |    |    |    |    |-- d: integer (nullable = false)
 |    |    |-- e: array (nullable = false)
 |    |    |    |-- element: integer (containsNull = false)
 |-- s2: struct (nullable = false)
 |    |-- f: integer (nullable = false)
 |-- s3: array (nullable = false)
 |    |-- element: array (containsNull = false)
 |    |    |-- element: integer (containsNull = false)
 |-- s4: array (nullable = false)
 |    |-- element: array (containsNull = false)
 |    |    |-- element: struct (containsNull = false)
 |    |    |    |-- e: integer (nullable = false)
 |    |    |    |-- f: integer (nullable = false)
 |-- m1: map (nullable = false)
 |    |-- key: struct
 |    |    |-- a: integer (nullable = false)
 |    |-- value: struct (valueContainsNull = false)
 |    |    |-- b: integer (nullable = false)

>>> nested.print_schema(df)
root
 |-- id: integer (nullable = false)
 |-- s1!.a: integer (nullable = false)
 |-- s1!.b!.c: integer (nullable = false)
 |-- s1!.b!.d: integer (nullable = false)
 |-- s1!.e!: integer (nullable = false)
 |-- s2.f: integer (nullable = false)
 |-- s3!!: integer (nullable = false)
 |-- s4!!.e: integer (nullable = false)
 |-- s4!!.f: integer (nullable = false)
 |-- m1%key.a: integer (nullable = false)
 |-- m1%value.b: integer (nullable = false)
Source code in spark_frame/nested_impl/print_schema.py
def print_schema(df: DataFrame) -> None:
    """Print the DataFrame's flattened schema to the standard output.

    - Structs are flattened with a `.` after their name.
    - Arrays are flattened with a `!` character after their name.
    - Maps are flattened with a `%key` and '%value' after their name.

    !!! warning "Limitation: Dots, percents, and exclamation marks are not supported in field names"
        Given the syntax used, every method defined in the `spark_frame.nested` module assumes that all field
        names in DataFrames do not contain any dot `.`, percent `%` or exclamation mark `!`.
        This can be worked around using the transformation
        [`spark_frame.transformations.transform_all_field_names`]
        [spark_frame.transformations_impl.transform_all_field_names.transform_all_field_names].

    Args:
        df: A Spark DataFrame

    Examples:
        >>> from pyspark.sql import SparkSession
        >>> from spark_frame import nested
        >>> spark = SparkSession.builder.appName("doctest").getOrCreate()
        >>> df = spark.sql('''SELECT
        ...     1 as id,
        ...     ARRAY(STRUCT(2 as a, ARRAY(STRUCT(3 as c, 4 as d)) as b, ARRAY(5, 6) as e)) as s1,
        ...     STRUCT(7 as f) as s2,
        ...     ARRAY(ARRAY(1, 2), ARRAY(3, 4)) as s3,
        ...     ARRAY(ARRAY(STRUCT(1 as e, 2 as f)), ARRAY(STRUCT(3 as e, 4 as f))) as s4,
        ...     MAP(STRUCT(1 as a), STRUCT(2 as b)) as m1
        ... ''')
        >>> df.printSchema()
        root
         |-- id: integer (nullable = false)
         |-- s1: array (nullable = false)
         |    |-- element: struct (containsNull = false)
         |    |    |-- a: integer (nullable = false)
         |    |    |-- b: array (nullable = false)
         |    |    |    |-- element: struct (containsNull = false)
         |    |    |    |    |-- c: integer (nullable = false)
         |    |    |    |    |-- d: integer (nullable = false)
         |    |    |-- e: array (nullable = false)
         |    |    |    |-- element: integer (containsNull = false)
         |-- s2: struct (nullable = false)
         |    |-- f: integer (nullable = false)
         |-- s3: array (nullable = false)
         |    |-- element: array (containsNull = false)
         |    |    |-- element: integer (containsNull = false)
         |-- s4: array (nullable = false)
         |    |-- element: array (containsNull = false)
         |    |    |-- element: struct (containsNull = false)
         |    |    |    |-- e: integer (nullable = false)
         |    |    |    |-- f: integer (nullable = false)
         |-- m1: map (nullable = false)
         |    |-- key: struct
         |    |    |-- a: integer (nullable = false)
         |    |-- value: struct (valueContainsNull = false)
         |    |    |-- b: integer (nullable = false)
        <BLANKLINE>
        >>> nested.print_schema(df)
        root
         |-- id: integer (nullable = false)
         |-- s1!.a: integer (nullable = false)
         |-- s1!.b!.c: integer (nullable = false)
         |-- s1!.b!.d: integer (nullable = false)
         |-- s1!.e!: integer (nullable = false)
         |-- s2.f: integer (nullable = false)
         |-- s3!!: integer (nullable = false)
         |-- s4!!.e: integer (nullable = false)
         |-- s4!!.f: integer (nullable = false)
         |-- m1%key.a: integer (nullable = false)
         |-- m1%value.b: integer (nullable = false)
        <BLANKLINE>
    """
    print(schema_string(df))
nested.with_fields

Return a new DataFrame by adding or replacing (when they already exist) columns.

This method is similar to the DataFrame.withColumn method, with the extra capability of working on nested and repeated fields (structs and arrays).

The syntax for field names works as follows:

  • "." is the separator for struct elements
  • "!" must be appended at the end of fields that are repeated (arrays)
  • Map keys are appended with %key
  • Map values are appended with %value

The following types of transformation are allowed:

  • String and column expressions can be used on any non-repeated field, even nested ones.
  • When working on repeated fields, transformations must be expressed as higher order functions (e.g. lambda expressions). String and column expressions can be used on repeated fields as well, but their value will be repeated multiple times.
  • When working on multiple levels of nested arrays, higher order functions may take multiple arguments, corresponding to each level of repetition (See Example 5.).
  • None can also be used to represent the identity transformation, this is useful to select a field without changing and without having to repeat its name.

Limitation: Dots, percents, and exclamation marks are not supported in field names

Given the syntax used, every method defined in the spark_frame.nested module assumes that all field names in DataFrames do not contain any dot ., percent % or exclamation mark !. This can be worked around using the transformation spark_frame.transformations.transform_all_field_names.

Parameters:

Name Type Description Default
df DataFrame

A Spark DataFrame

required
fields Mapping[str, AnyKindOfTransformation]

A Dict(field_name, transformation_to_apply)

required

Returns:

Type Description
DataFrame

A new DataFrame with the same fields as the input DataFrame, where the specified transformations have been

DataFrame

applied to the corresponding fields. If a field name did not exist in the input DataFrame,

DataFrame

it will be added to the output DataFrame. If it did exist, the original value will be replaced with the new one.

Example 1: non-repeated fields

>>> from pyspark.sql import SparkSession
>>> from pyspark.sql import functions as f
>>> from spark_frame import nested
>>> spark = SparkSession.builder.appName("doctest").getOrCreate()
>>> df = spark.sql('''SELECT 1 as id, STRUCT(2 as a, 3 as b) as s''')
>>> nested.print_schema(df)
root
 |-- id: integer (nullable = false)
 |-- s.a: integer (nullable = false)
 |-- s.b: integer (nullable = false)

>>> df.show()
+---+------+
| id|     s|
+---+------+
|  1|{2, 3}|
+---+------+

Transformations on non-repeated fields may be expressed as a string representing a column name or a Column expression.

>>> new_df = nested.with_fields(df, {
...     "s.id": "id",                                 # column name (string)
...     "s.c": f.col("s.a") + f.col("s.b")            # Column expression
... })
>>> new_df.printSchema()
root
 |-- id: integer (nullable = false)
 |-- s: struct (nullable = false)
 |    |-- a: integer (nullable = false)
 |    |-- b: integer (nullable = false)
 |    |-- id: integer (nullable = false)
 |    |-- c: integer (nullable = false)

>>> new_df.show()
+---+------------+
| id|           s|
+---+------------+
|  1|{2, 3, 1, 5}|
+---+------------+

Example 2: repeated fields

>>> df = spark.sql('''
...     SELECT
...         1 as id,
...         ARRAY(STRUCT(1 as a, STRUCT(2 as c) as b), STRUCT(3 as a, STRUCT(4 as c) as b)) as s
... ''')
>>> nested.print_schema(df)
root
 |-- id: integer (nullable = false)
 |-- s!.a: integer (nullable = false)
 |-- s!.b.c: integer (nullable = false)

>>> df.show()
+---+--------------------+
| id|                   s|
+---+--------------------+
|  1|[{1, {2}}, {3, {4}}]|
+---+--------------------+

Transformations on repeated fields must be expressed as higher-order functions (lambda expressions or named functions). The value passed to this function will correspond to the last repeated element.

>>> new_df = df.transform(nested.with_fields, {
...     "s!.b.d": lambda s: s["a"] + s["b"]["c"]}
... )
>>> nested.print_schema(new_df)
root
 |-- id: integer (nullable = false)
 |-- s!.a: integer (nullable = false)
 |-- s!.b.c: integer (nullable = false)
 |-- s!.b.d: integer (nullable = false)

>>> new_df.show(truncate=False)
+---+--------------------------+
|id |s                         |
+---+--------------------------+
|1  |[{1, {2, 3}}, {3, {4, 7}}]|
+---+--------------------------+

String and column expressions can be used on repeated fields as well, but their value will be repeated multiple times.

>>> df.transform(nested.with_fields, {
...     "id": None,
...     "s!.a": "id",
...     "s!.b.c": f.lit(2)
... }).show(truncate=False)
+---+--------------------+
|id |s                   |
+---+--------------------+
|1  |[{1, {2}}, {1, {2}}]|
+---+--------------------+

Example 3: field repeated twice

>>> df = spark.sql('SELECT 1 as id, ARRAY(STRUCT(ARRAY(1, 2, 3) as e)) as s')
>>> nested.print_schema(df)
root
 |-- id: integer (nullable = false)
 |-- s!.e!: integer (nullable = false)

>>> df.show()
+---+-------------+
| id|            s|
+---+-------------+
|  1|[{[1, 2, 3]}]|
+---+-------------+

Here, the lambda expression will be applied to the last repeated element e.

>>> df.transform(nested.with_fields, {"s!.e!": lambda e : e.cast("DOUBLE")}).show()
+---+-------------------+
| id|                  s|
+---+-------------------+
|  1|[{[1.0, 2.0, 3.0]}]|
+---+-------------------+

Example 4: Dataframe with maps

>>> df = spark.sql('''
...     SELECT
...         1 as id,
...         MAP("a", STRUCT(2 as a, 3 as b)) as m1
... ''')
>>> nested.print_schema(df)
root
 |-- id: integer (nullable = false)
 |-- m1%key: string (nullable = false)
 |-- m1%value.a: integer (nullable = false)
 |-- m1%value.b: integer (nullable = false)

>>> df.show()
+---+-------------+
| id|           m1|
+---+-------------+
|  1|{a -> {2, 3}}|
+---+-------------+
>>> new_df = df.transform(nested.with_fields, {
...  "m1%key": lambda key : f.upper(key),
...  "m1%value.a": lambda value : value["a"].cast("DOUBLE")
... })
>>> nested.print_schema(new_df)
root
 |-- id: integer (nullable = false)
 |-- m1%key: string (nullable = false)
 |-- m1%value.a: double (nullable = false)
 |-- m1%value.b: integer (nullable = false)

>>> new_df.show()
+---+---------------+
| id|             m1|
+---+---------------+
|  1|{A -> {2.0, 3}}|
+---+---------------+

Example 5: Accessing multiple repetition levels

>>> df = spark.sql('''
...     SELECT
...         1 as id,
...         ARRAY(
...             STRUCT(2 as average, ARRAY(1, 2, 3) as values),
...             STRUCT(3 as average, ARRAY(1, 2, 3, 4, 5) as values)
...         ) as s1
... ''')
>>> nested.print_schema(df)
root
 |-- id: integer (nullable = false)
 |-- s1!.average: integer (nullable = false)
 |-- s1!.values!: integer (nullable = false)

>>> df.show(truncate=False)
+---+--------------------------------------+
|id |s1                                    |
+---+--------------------------------------+
|1  |[{2, [1, 2, 3]}, {3, [1, 2, 3, 4, 5]}]|
+---+--------------------------------------+

Here, the transformation applied to "s1!.values!" takes two arguments.

>>> new_df = df.transform(nested.with_fields, {
...  "s1!.values!": lambda s1, value : value - s1["average"]
... })
>>> new_df.show(truncate=False)
+---+-----------------------------------------+
|id |s1                                       |
+---+-----------------------------------------+
|1  |[{2, [-1, 0, 1]}, {3, [-2, -1, 0, 1, 2]}]|
+---+-----------------------------------------+

Extra arguments can be added to the left for each repetition level, up to the root level.

>>> new_df = df.transform(nested.with_fields, {
...  "s1!.values!": lambda root, s1, value : value - s1["average"] + root["id"]
... })
>>> new_df.show(truncate=False)
+---+---------------------------------------+
|id |s1                                     |
+---+---------------------------------------+
|1  |[{2, [0, 1, 2]}, {3, [-1, 0, 1, 2, 3]}]|
+---+---------------------------------------+
Source code in spark_frame/nested_impl/with_fields.py
def with_fields(df: DataFrame, fields: Mapping[str, AnyKindOfTransformation]) -> DataFrame:
    """Return a new [DataFrame][pyspark.sql.DataFrame] by adding or replacing (when they already exist) columns.

    This method is similar to the [DataFrame.withColumn][pyspark.sql.DataFrame.withColumn] method, with the extra
    capability of working on nested and repeated fields (structs and arrays).

    The syntax for field names works as follows:

    - "." is the separator for struct elements
    - "!" must be appended at the end of fields that are repeated (arrays)
    - Map keys are appended with `%key`
    - Map values are appended with `%value`

    The following types of transformation are allowed:

    - String and column expressions can be used on any non-repeated field, even nested ones.
    - When working on repeated fields, transformations must be expressed as higher order functions
      (e.g. lambda expressions). String and column expressions can be used on repeated fields as well,
      but their value will be repeated multiple times.
    - When working on multiple levels of nested arrays, higher order functions may take multiple arguments,
      corresponding to each level of repetition (See Example 5.).
    - `None` can also be used to represent the identity transformation, this is useful to select a field without
       changing and without having to repeat its name.

    !!! warning "Limitation: Dots, percents, and exclamation marks are not supported in field names"
        Given the syntax used, every method defined in the `spark_frame.nested` module assumes that all field
        names in DataFrames do not contain any dot `.`, percent `%` or exclamation mark `!`.
        This can be worked around using the transformation
        [`spark_frame.transformations.transform_all_field_names`]
        [spark_frame.transformations_impl.transform_all_field_names.transform_all_field_names].

    Args:
        df: A Spark DataFrame
        fields: A Dict(field_name, transformation_to_apply)

    Returns:
        A new DataFrame with the same fields as the input DataFrame, where the specified transformations have been
        applied to the corresponding fields. If a field name did not exist in the input DataFrame,
        it will be added to the output DataFrame. If it did exist, the original value will be replaced with the new one.

    Examples: Example 1: non-repeated fields
        >>> from pyspark.sql import SparkSession
        >>> from pyspark.sql import functions as f
        >>> from spark_frame import nested
        >>> spark = SparkSession.builder.appName("doctest").getOrCreate()
        >>> df = spark.sql('''SELECT 1 as id, STRUCT(2 as a, 3 as b) as s''')
        >>> nested.print_schema(df)
        root
         |-- id: integer (nullable = false)
         |-- s.a: integer (nullable = false)
         |-- s.b: integer (nullable = false)
        <BLANKLINE>
        >>> df.show()
        +---+------+
        | id|     s|
        +---+------+
        |  1|{2, 3}|
        +---+------+
        <BLANKLINE>

        Transformations on non-repeated fields may be expressed as a string representing a column name
        or a Column expression.
        >>> new_df = nested.with_fields(df, {
        ...     "s.id": "id",                                 # column name (string)
        ...     "s.c": f.col("s.a") + f.col("s.b")            # Column expression
        ... })
        >>> new_df.printSchema()
        root
         |-- id: integer (nullable = false)
         |-- s: struct (nullable = false)
         |    |-- a: integer (nullable = false)
         |    |-- b: integer (nullable = false)
         |    |-- id: integer (nullable = false)
         |    |-- c: integer (nullable = false)
        <BLANKLINE>
        >>> new_df.show()
        +---+------------+
        | id|           s|
        +---+------------+
        |  1|{2, 3, 1, 5}|
        +---+------------+
        <BLANKLINE>

    Examples: Example 2: repeated fields
        >>> df = spark.sql('''
        ...     SELECT
        ...         1 as id,
        ...         ARRAY(STRUCT(1 as a, STRUCT(2 as c) as b), STRUCT(3 as a, STRUCT(4 as c) as b)) as s
        ... ''')
        >>> nested.print_schema(df)
        root
         |-- id: integer (nullable = false)
         |-- s!.a: integer (nullable = false)
         |-- s!.b.c: integer (nullable = false)
        <BLANKLINE>
        >>> df.show()
        +---+--------------------+
        | id|                   s|
        +---+--------------------+
        |  1|[{1, {2}}, {3, {4}}]|
        +---+--------------------+
        <BLANKLINE>

        Transformations on repeated fields must be expressed as
        higher-order functions (lambda expressions or named functions).
        The value passed to this function will correspond to the last repeated element.
        >>> new_df = df.transform(nested.with_fields, {
        ...     "s!.b.d": lambda s: s["a"] + s["b"]["c"]}
        ... )
        >>> nested.print_schema(new_df)
        root
         |-- id: integer (nullable = false)
         |-- s!.a: integer (nullable = false)
         |-- s!.b.c: integer (nullable = false)
         |-- s!.b.d: integer (nullable = false)
        <BLANKLINE>
        >>> new_df.show(truncate=False)
        +---+--------------------------+
        |id |s                         |
        +---+--------------------------+
        |1  |[{1, {2, 3}}, {3, {4, 7}}]|
        +---+--------------------------+
        <BLANKLINE>

        String and column expressions can be used on repeated fields as well,
        but their value will be repeated multiple times.
        >>> df.transform(nested.with_fields, {
        ...     "id": None,
        ...     "s!.a": "id",
        ...     "s!.b.c": f.lit(2)
        ... }).show(truncate=False)
        +---+--------------------+
        |id |s                   |
        +---+--------------------+
        |1  |[{1, {2}}, {1, {2}}]|
        +---+--------------------+
        <BLANKLINE>

    Examples: Example 3: field repeated twice
        >>> df = spark.sql('SELECT 1 as id, ARRAY(STRUCT(ARRAY(1, 2, 3) as e)) as s')
        >>> nested.print_schema(df)
        root
         |-- id: integer (nullable = false)
         |-- s!.e!: integer (nullable = false)
        <BLANKLINE>
        >>> df.show()
        +---+-------------+
        | id|            s|
        +---+-------------+
        |  1|[{[1, 2, 3]}]|
        +---+-------------+
        <BLANKLINE>

        Here, the lambda expression will be applied to the last repeated element `e`.
        >>> df.transform(nested.with_fields, {"s!.e!": lambda e : e.cast("DOUBLE")}).show()
        +---+-------------------+
        | id|                  s|
        +---+-------------------+
        |  1|[{[1.0, 2.0, 3.0]}]|
        +---+-------------------+
        <BLANKLINE>

    Examples: Example 4: Dataframe with maps
        >>> df = spark.sql('''
        ...     SELECT
        ...         1 as id,
        ...         MAP("a", STRUCT(2 as a, 3 as b)) as m1
        ... ''')
        >>> nested.print_schema(df)
        root
         |-- id: integer (nullable = false)
         |-- m1%key: string (nullable = false)
         |-- m1%value.a: integer (nullable = false)
         |-- m1%value.b: integer (nullable = false)
        <BLANKLINE>
        >>> df.show()
        +---+-------------+
        | id|           m1|
        +---+-------------+
        |  1|{a -> {2, 3}}|
        +---+-------------+
        <BLANKLINE>

        >>> new_df = df.transform(nested.with_fields, {
        ...  "m1%key": lambda key : f.upper(key),
        ...  "m1%value.a": lambda value : value["a"].cast("DOUBLE")
        ... })
        >>> nested.print_schema(new_df)
        root
         |-- id: integer (nullable = false)
         |-- m1%key: string (nullable = false)
         |-- m1%value.a: double (nullable = false)
         |-- m1%value.b: integer (nullable = false)
        <BLANKLINE>
        >>> new_df.show()
        +---+---------------+
        | id|             m1|
        +---+---------------+
        |  1|{A -> {2.0, 3}}|
        +---+---------------+
        <BLANKLINE>

    Examples: Example 5: Accessing multiple repetition levels
        >>> df = spark.sql('''
        ...     SELECT
        ...         1 as id,
        ...         ARRAY(
        ...             STRUCT(2 as average, ARRAY(1, 2, 3) as values),
        ...             STRUCT(3 as average, ARRAY(1, 2, 3, 4, 5) as values)
        ...         ) as s1
        ... ''')
        >>> nested.print_schema(df)
        root
         |-- id: integer (nullable = false)
         |-- s1!.average: integer (nullable = false)
         |-- s1!.values!: integer (nullable = false)
        <BLANKLINE>
        >>> df.show(truncate=False)
        +---+--------------------------------------+
        |id |s1                                    |
        +---+--------------------------------------+
        |1  |[{2, [1, 2, 3]}, {3, [1, 2, 3, 4, 5]}]|
        +---+--------------------------------------+
        <BLANKLINE>

        Here, the transformation applied to "s1!.values!" takes two arguments.
        >>> new_df = df.transform(nested.with_fields, {
        ...  "s1!.values!": lambda s1, value : value - s1["average"]
        ... })
        >>> new_df.show(truncate=False)
        +---+-----------------------------------------+
        |id |s1                                       |
        +---+-----------------------------------------+
        |1  |[{2, [-1, 0, 1]}, {3, [-2, -1, 0, 1, 2]}]|
        +---+-----------------------------------------+
        <BLANKLINE>

        Extra arguments can be added to the left for each repetition level, up to the root level.
        >>> new_df = df.transform(nested.with_fields, {
        ...  "s1!.values!": lambda root, s1, value : value - s1["average"] + root["id"]
        ... })
        >>> new_df.show(truncate=False)
        +---+---------------------------------------+
        |id |s1                                     |
        +---+---------------------------------------+
        |1  |[{2, [0, 1, 2]}, {3, [-1, 0, 1, 2, 3]}]|
        +---+---------------------------------------+
        <BLANKLINE>

    """
    default_columns = {field: None for field in nested.fields(df)}
    fields = {**default_columns, **fields}
    return df.select(*resolve_nested_fields(fields, starting_level=df))

Selecting nested fields

In this example, we will see how to select and rename specific elements in a nested data structure

>>> from spark_frame.examples.working_with_nested_data import _get_sample_employee_data
>>> from pyspark.sql import functions as f
>>> from spark_frame import nested
>>> df = _get_sample_employee_data()
>>> nested.print_schema(df)
root
 |-- employee_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- skills!.name: string (nullable = true)
 |-- skills!.level: string (nullable = true)
 |-- projects!.name: string (nullable = true)
 |-- projects!.client: string (nullable = true)
 |-- projects!.tasks!.name: string (nullable = true)
 |-- projects!.tasks!.description: string (nullable = true)
 |-- projects!.tasks!.status: string (nullable = true)
 |-- projects!.tasks!.estimate: long (nullable = true)

>>> df.show(truncate=False)
+-----------+----------+---+---------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|employee_id|name      |age|skills                                       |projects                                                                                                                                                                                                                          |
+-----------+----------+---+---------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1          |John Smith|30 |[{Java, expert}, {Python, intermediate}]     |[{Project A, Acme Inc, [{Task 1, Implement feature X, completed, 8}, {Task 2, Fix bug Y, in progress, 5}]}, {Project B, Beta Corp, [{Task 3, Implement feature Z, pending, 13}, {Task 4, Improve performance, in progress, 3}]}]  |
|2          |Jane Doe  |25 |[{JavaScript, advanced}, {PHP, intermediate}]|[{Project C, Gamma Inc, [{Task 5, Implement feature W, completed, 20}, {Task 6, Fix bug V, in progress, 13}]}, {Project D, Delta Ltd, [{Task 7, Implement feature U, pending, 8}, {Task 8, Improve performance, in progress, 5}]}]|
+-----------+----------+---+---------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

The task

Let's say we want to select only the following fields, while keeping the same overall structure: - employee_id - projects.name - projects.tasks.name

Without spark_frame.nested

This forces us to do something quite complicated, using pyspark.sql.functions.transform

>>> new_df = df.select(
...     "employee_id",
...     f.transform("projects", lambda project:
...         f.struct(project["name"].alias("name"), f.transform(project["tasks"], lambda task:
...             f.struct(task["name"].alias("name"))
...         ).alias("tasks"))
...     ).alias("projects")
... )
>>> nested.print_schema(new_df)
root
 |-- employee_id: integer (nullable = true)
 |-- projects!.name: string (nullable = true)
 |-- projects!.tasks!.name: string (nullable = true)

>>> new_df.show(truncate=False)
+-----------+----------------------------------------------------------------------+
|employee_id|projects                                                              |
+-----------+----------------------------------------------------------------------+
|1          |[{Project A, [{Task 1}, {Task 2}]}, {Project B, [{Task 3}, {Task 4}]}]|
|2          |[{Project C, [{Task 5}, {Task 6}]}, {Project D, [{Task 7}, {Task 8}]}]|
+-----------+----------------------------------------------------------------------+

With spark_frame.nested

Using spark_frame.nested.select, we can easily obtain the exact same result.

>>> new_df = df.transform(nested.select, {
...     "employee_id": None,
...     "projects!.name": None,
...     "projects!.tasks!.name": None
... })
>>> nested.print_schema(new_df)
root
 |-- employee_id: integer (nullable = true)
 |-- projects!.name: string (nullable = true)
 |-- projects!.tasks!.name: string (nullable = true)

>>> new_df.show(truncate=False)
+-----------+----------------------------------------------------------------------+
|employee_id|projects                                                              |
+-----------+----------------------------------------------------------------------+
|1          |[{Project A, [{Task 1}, {Task 2}]}, {Project B, [{Task 3}, {Task 4}]}]|
|2          |[{Project C, [{Task 5}, {Task 6}]}, {Project D, [{Task 7}, {Task 8}]}]|
+-----------+----------------------------------------------------------------------+

Here, None is used to indicate that we don't want to perform any transformation on the column, be we could also replace them with functions to perform transformations at the same time. For instance, we could pass all the names to uppercase like this:

>>> df.transform(nested.select, {
...     "employee_id": None,
...     "projects!.name": lambda project: f.upper(project["name"]),
...     "projects!.tasks!.name": lambda task: f.upper(task["name"])
... }).show(truncate=False)
+-----------+----------------------------------------------------------------------+
|employee_id|projects                                                              |
+-----------+----------------------------------------------------------------------+
|1          |[{PROJECT A, [{TASK 1}, {TASK 2}]}, {PROJECT B, [{TASK 3}, {TASK 4}]}]|
|2          |[{PROJECT C, [{TASK 5}, {TASK 6}]}, {PROJECT D, [{TASK 7}, {TASK 8}]}]|
+-----------+----------------------------------------------------------------------+
nested.print_schema

Print the DataFrame's flattened schema to the standard output.

  • Structs are flattened with a . after their name.
  • Arrays are flattened with a ! character after their name.
  • Maps are flattened with a %key and '%value' after their name.

Limitation: Dots, percents, and exclamation marks are not supported in field names

Given the syntax used, every method defined in the spark_frame.nested module assumes that all field names in DataFrames do not contain any dot ., percent % or exclamation mark !. This can be worked around using the transformation spark_frame.transformations.transform_all_field_names.

Parameters:

Name Type Description Default
df DataFrame

A Spark DataFrame

required

Examples:

>>> from pyspark.sql import SparkSession
>>> from spark_frame import nested
>>> spark = SparkSession.builder.appName("doctest").getOrCreate()
>>> df = spark.sql('''SELECT
...     1 as id,
...     ARRAY(STRUCT(2 as a, ARRAY(STRUCT(3 as c, 4 as d)) as b, ARRAY(5, 6) as e)) as s1,
...     STRUCT(7 as f) as s2,
...     ARRAY(ARRAY(1, 2), ARRAY(3, 4)) as s3,
...     ARRAY(ARRAY(STRUCT(1 as e, 2 as f)), ARRAY(STRUCT(3 as e, 4 as f))) as s4,
...     MAP(STRUCT(1 as a), STRUCT(2 as b)) as m1
... ''')
>>> df.printSchema()
root
 |-- id: integer (nullable = false)
 |-- s1: array (nullable = false)
 |    |-- element: struct (containsNull = false)
 |    |    |-- a: integer (nullable = false)
 |    |    |-- b: array (nullable = false)
 |    |    |    |-- element: struct (containsNull = false)
 |    |    |    |    |-- c: integer (nullable = false)
 |    |    |    |    |-- d: integer (nullable = false)
 |    |    |-- e: array (nullable = false)
 |    |    |    |-- element: integer (containsNull = false)
 |-- s2: struct (nullable = false)
 |    |-- f: integer (nullable = false)
 |-- s3: array (nullable = false)
 |    |-- element: array (containsNull = false)
 |    |    |-- element: integer (containsNull = false)
 |-- s4: array (nullable = false)
 |    |-- element: array (containsNull = false)
 |    |    |-- element: struct (containsNull = false)
 |    |    |    |-- e: integer (nullable = false)
 |    |    |    |-- f: integer (nullable = false)
 |-- m1: map (nullable = false)
 |    |-- key: struct
 |    |    |-- a: integer (nullable = false)
 |    |-- value: struct (valueContainsNull = false)
 |    |    |-- b: integer (nullable = false)

>>> nested.print_schema(df)
root
 |-- id: integer (nullable = false)
 |-- s1!.a: integer (nullable = false)
 |-- s1!.b!.c: integer (nullable = false)
 |-- s1!.b!.d: integer (nullable = false)
 |-- s1!.e!: integer (nullable = false)
 |-- s2.f: integer (nullable = false)
 |-- s3!!: integer (nullable = false)
 |-- s4!!.e: integer (nullable = false)
 |-- s4!!.f: integer (nullable = false)
 |-- m1%key.a: integer (nullable = false)
 |-- m1%value.b: integer (nullable = false)
Source code in spark_frame/nested_impl/print_schema.py
def print_schema(df: DataFrame) -> None:
    """Print the DataFrame's flattened schema to the standard output.

    - Structs are flattened with a `.` after their name.
    - Arrays are flattened with a `!` character after their name.
    - Maps are flattened with a `%key` and '%value' after their name.

    !!! warning "Limitation: Dots, percents, and exclamation marks are not supported in field names"
        Given the syntax used, every method defined in the `spark_frame.nested` module assumes that all field
        names in DataFrames do not contain any dot `.`, percent `%` or exclamation mark `!`.
        This can be worked around using the transformation
        [`spark_frame.transformations.transform_all_field_names`]
        [spark_frame.transformations_impl.transform_all_field_names.transform_all_field_names].

    Args:
        df: A Spark DataFrame

    Examples:
        >>> from pyspark.sql import SparkSession
        >>> from spark_frame import nested
        >>> spark = SparkSession.builder.appName("doctest").getOrCreate()
        >>> df = spark.sql('''SELECT
        ...     1 as id,
        ...     ARRAY(STRUCT(2 as a, ARRAY(STRUCT(3 as c, 4 as d)) as b, ARRAY(5, 6) as e)) as s1,
        ...     STRUCT(7 as f) as s2,
        ...     ARRAY(ARRAY(1, 2), ARRAY(3, 4)) as s3,
        ...     ARRAY(ARRAY(STRUCT(1 as e, 2 as f)), ARRAY(STRUCT(3 as e, 4 as f))) as s4,
        ...     MAP(STRUCT(1 as a), STRUCT(2 as b)) as m1
        ... ''')
        >>> df.printSchema()
        root
         |-- id: integer (nullable = false)
         |-- s1: array (nullable = false)
         |    |-- element: struct (containsNull = false)
         |    |    |-- a: integer (nullable = false)
         |    |    |-- b: array (nullable = false)
         |    |    |    |-- element: struct (containsNull = false)
         |    |    |    |    |-- c: integer (nullable = false)
         |    |    |    |    |-- d: integer (nullable = false)
         |    |    |-- e: array (nullable = false)
         |    |    |    |-- element: integer (containsNull = false)
         |-- s2: struct (nullable = false)
         |    |-- f: integer (nullable = false)
         |-- s3: array (nullable = false)
         |    |-- element: array (containsNull = false)
         |    |    |-- element: integer (containsNull = false)
         |-- s4: array (nullable = false)
         |    |-- element: array (containsNull = false)
         |    |    |-- element: struct (containsNull = false)
         |    |    |    |-- e: integer (nullable = false)
         |    |    |    |-- f: integer (nullable = false)
         |-- m1: map (nullable = false)
         |    |-- key: struct
         |    |    |-- a: integer (nullable = false)
         |    |-- value: struct (valueContainsNull = false)
         |    |    |-- b: integer (nullable = false)
        <BLANKLINE>
        >>> nested.print_schema(df)
        root
         |-- id: integer (nullable = false)
         |-- s1!.a: integer (nullable = false)
         |-- s1!.b!.c: integer (nullable = false)
         |-- s1!.b!.d: integer (nullable = false)
         |-- s1!.e!: integer (nullable = false)
         |-- s2.f: integer (nullable = false)
         |-- s3!!: integer (nullable = false)
         |-- s4!!.e: integer (nullable = false)
         |-- s4!!.f: integer (nullable = false)
         |-- m1%key.a: integer (nullable = false)
         |-- m1%value.b: integer (nullable = false)
        <BLANKLINE>
    """
    print(schema_string(df))
nested.select

Project a set of expressions and returns a new DataFrame.

This method is similar to the DataFrame.select method, with the extra capability of working on nested and repeated fields (structs and arrays).

The syntax for field names works as follows:

  • "." is the separator for struct elements
  • "!" must be appended at the end of fields that are repeated (arrays)
  • Map keys are appended with %key
  • Map values are appended with %value

The following types of transformation are allowed:

  • String and column expressions can be used on any non-repeated field, even nested ones.
  • When working on repeated fields, transformations must be expressed as higher order functions (e.g. lambda expressions). String and column expressions can be used on repeated fields as well, but their value will be repeated multiple times.
  • When working on multiple levels of nested arrays, higher order functions may take multiple arguments, corresponding to each level of repetition (See Example 5.).
  • None can also be used to represent the identity transformation, this is useful to select a field without changing and without having to repeat its name.

Limitation: Dots, percents, and exclamation marks are not supported in field names

Given the syntax used, every method defined in the spark_frame.nested module assumes that all field names in DataFrames do not contain any dot ., percent % or exclamation mark !. This can be worked around using the transformation spark_frame.transformations.transform_all_field_names.

Parameters:

Name Type Description Default
df DataFrame

A Spark DataFrame

required
fields Mapping[str, ColumnTransformation]

A Dict(field_name, transformation_to_apply)

required

Returns:

Type Description
DataFrame

A new DataFrame where only the specified field have been selected and the corresponding

DataFrame

transformations were applied to each of them.

Example 1: non-repeated fields

>>> from pyspark.sql import SparkSession
>>> from pyspark.sql import functions as f
>>> from spark_frame import nested
>>> spark = SparkSession.builder.appName("doctest").getOrCreate()
>>> df = spark.sql('''SELECT 1 as id, STRUCT(2 as a, 3 as b) as s''')
>>> df.printSchema()
root
 |-- id: integer (nullable = false)
 |-- s: struct (nullable = false)
 |    |-- a: integer (nullable = false)
 |    |-- b: integer (nullable = false)

>>> df.show()
+---+------+
| id|     s|
+---+------+
|  1|{2, 3}|
+---+------+

Transformations on non-repeated fields may be expressed as a string representing a column name, a Column expression or None. (In this example the column "id" will be dropped because it was not selected)

>>> new_df = nested.select(df, {
...     "s.a": "s.a",                        # Column name (string)
...     "s.b": None,                         # None: use to keep a column without having to repeat its name
...     "s.c": f.col("s.a") + f.col("s.b")   # Column expression
... })
>>> new_df.printSchema()
root
 |-- s: struct (nullable = false)
 |    |-- a: integer (nullable = false)
 |    |-- b: integer (nullable = false)
 |    |-- c: integer (nullable = false)

>>> new_df.show()
+---------+
|        s|
+---------+
|{2, 3, 5}|
+---------+

Example 2: repeated fields

>>> df = spark.sql('SELECT 1 as id, ARRAY(STRUCT(1 as a, 2 as b), STRUCT(3 as a, 4 as b)) as s')
>>> nested.print_schema(df)
root
 |-- id: integer (nullable = false)
 |-- s!.a: integer (nullable = false)
 |-- s!.b: integer (nullable = false)

>>> df.show()
+---+----------------+
| id|               s|
+---+----------------+
|  1|[{1, 2}, {3, 4}]|
+---+----------------+

Transformations on repeated fields must be expressed as higher-order functions (lambda expressions or named functions). The value passed to this function will correspond to the last repeated element.

>>> df.transform(nested.select, {
...     "s!.a": lambda s: s["a"],
...     "s!.b": None,
...     "s!.c": lambda s: s["a"] + s["b"]
... }).show(truncate=False)
+----------------------+
|s                     |
+----------------------+
|[{1, 2, 3}, {3, 4, 7}]|
+----------------------+

String and column expressions can be used on repeated fields as well, but their value will be repeated multiple times.

>>> df.transform(nested.select, {
...     "id": None,
...     "s!.a": "id",
...     "s!.b": f.lit(2)
... }).show(truncate=False)
+---+----------------+
|id |s               |
+---+----------------+
|1  |[{1, 2}, {1, 2}]|
+---+----------------+

Example 3: field repeated twice

>>> df = spark.sql('''
...     SELECT
...         1 as id,
...         ARRAY(STRUCT(ARRAY(1, 2, 3) as e)) as s1,
...         ARRAY(STRUCT(ARRAY(4, 5, 6) as e)) as s2
... ''')
>>> nested.print_schema(df)
root
 |-- id: integer (nullable = false)
 |-- s1!.e!: integer (nullable = false)
 |-- s2!.e!: integer (nullable = false)

>>> df.show()
+---+-------------+-------------+
| id|           s1|           s2|
+---+-------------+-------------+
|  1|[{[1, 2, 3]}]|[{[4, 5, 6]}]|
+---+-------------+-------------+

Here, the lambda expression will be applied to the last repeated element e.

>>> new_df = df.transform(nested.select, {
...  "s1!.e!": None,
...  "s2!.e!": lambda e : e.cast("DOUBLE")
... })
>>> nested.print_schema(new_df)
root
 |-- s1!.e!: integer (nullable = false)
 |-- s2!.e!: double (nullable = false)

>>> new_df.show()
+-------------+-------------------+
|           s1|                 s2|
+-------------+-------------------+
|[{[1, 2, 3]}]|[{[4.0, 5.0, 6.0]}]|
+-------------+-------------------+

Example 4: Dataframe with maps

>>> df = spark.sql('''
...     SELECT
...         1 as id,
...         MAP("a", STRUCT(2 as a, 3 as b)) as m1
... ''')
>>> nested.print_schema(df)
root
 |-- id: integer (nullable = false)
 |-- m1%key: string (nullable = false)
 |-- m1%value.a: integer (nullable = false)
 |-- m1%value.b: integer (nullable = false)

>>> df.show()
+---+-------------+
| id|           m1|
+---+-------------+
|  1|{a -> {2, 3}}|
+---+-------------+
>>> new_df = df.transform(nested.select, {
...  "id": None,
...  "m1%key": lambda key : f.upper(key),
...  "m1%value.a": lambda value : value["a"].cast("DOUBLE")
... })
>>> nested.print_schema(new_df)
root
 |-- id: integer (nullable = false)
 |-- m1%key: string (nullable = false)
 |-- m1%value.a: double (nullable = false)

>>> new_df.show()
+---+------------+
| id|          m1|
+---+------------+
|  1|{A -> {2.0}}|
+---+------------+

Example 5: Accessing multiple repetition levels

>>> df = spark.sql('''
...     SELECT
...         1 as id,
...         ARRAY(
...             STRUCT(2 as average, ARRAY(1, 2, 3) as values),
...             STRUCT(3 as average, ARRAY(1, 2, 3, 4, 5) as values)
...         ) as s1
... ''')
>>> nested.print_schema(df)
root
 |-- id: integer (nullable = false)
 |-- s1!.average: integer (nullable = false)
 |-- s1!.values!: integer (nullable = false)

>>> df.show(truncate=False)
+---+--------------------------------------+
|id |s1                                    |
+---+--------------------------------------+
|1  |[{2, [1, 2, 3]}, {3, [1, 2, 3, 4, 5]}]|
+---+--------------------------------------+

Here, the transformation applied to "s1!.values!" takes two arguments.

>>> new_df = df.transform(nested.select, {
...  "id": None,
...  "s1!.average": None,
...  "s1!.values!": lambda s1, value : value - s1["average"]
... })
>>> new_df.show(truncate=False)
+---+-----------------------------------------+
|id |s1                                       |
+---+-----------------------------------------+
|1  |[{2, [-1, 0, 1]}, {3, [-2, -1, 0, 1, 2]}]|
+---+-----------------------------------------+

Extra arguments can be added to the left for each repetition level, up to the root level.

>>> new_df = df.transform(nested.select, {
...  "id": None,
...  "s1!.average": None,
...  "s1!.values!": lambda root, s1, value : value - s1["average"] + root["id"]
... })
>>> new_df.show(truncate=False)
+---+---------------------------------------+
|id |s1                                     |
+---+---------------------------------------+
|1  |[{2, [0, 1, 2]}, {3, [-1, 0, 1, 2, 3]}]|
+---+---------------------------------------+
Source code in spark_frame/nested_impl/select_impl.py
def select(df: DataFrame, fields: Mapping[str, ColumnTransformation]) -> DataFrame:
    """Project a set of expressions and returns a new [DataFrame][pyspark.sql.DataFrame].

    This method is similar to the [DataFrame.select][pyspark.sql.DataFrame.select] method, with the extra
    capability of working on nested and repeated fields (structs and arrays).

    The syntax for field names works as follows:

    - "." is the separator for struct elements
    - "!" must be appended at the end of fields that are repeated (arrays)
    - Map keys are appended with `%key`
    - Map values are appended with `%value`

    The following types of transformation are allowed:

    - String and column expressions can be used on any non-repeated field, even nested ones.
    - When working on repeated fields, transformations must be expressed as higher order functions
      (e.g. lambda expressions). String and column expressions can be used on repeated fields as well,
      but their value will be repeated multiple times.
    - When working on multiple levels of nested arrays, higher order functions may take multiple arguments,
      corresponding to each level of repetition (See Example 5.).
    - `None` can also be used to represent the identity transformation, this is useful to select a field without
       changing and without having to repeat its name.

    !!! warning "Limitation: Dots, percents, and exclamation marks are not supported in field names"
        Given the syntax used, every method defined in the `spark_frame.nested` module assumes that all field
        names in DataFrames do not contain any dot `.`, percent `%` or exclamation mark `!`.
        This can be worked around using the transformation
        [`spark_frame.transformations.transform_all_field_names`]
        [spark_frame.transformations_impl.transform_all_field_names.transform_all_field_names].

    Args:
        df: A Spark DataFrame
        fields: A Dict(field_name, transformation_to_apply)

    Returns:
        A new DataFrame where only the specified field have been selected and the corresponding
        transformations were applied to each of them.

    Examples: Example 1: non-repeated fields
        >>> from pyspark.sql import SparkSession
        >>> from pyspark.sql import functions as f
        >>> from spark_frame import nested
        >>> spark = SparkSession.builder.appName("doctest").getOrCreate()
        >>> df = spark.sql('''SELECT 1 as id, STRUCT(2 as a, 3 as b) as s''')
        >>> df.printSchema()
        root
         |-- id: integer (nullable = false)
         |-- s: struct (nullable = false)
         |    |-- a: integer (nullable = false)
         |    |-- b: integer (nullable = false)
        <BLANKLINE>
        >>> df.show()
        +---+------+
        | id|     s|
        +---+------+
        |  1|{2, 3}|
        +---+------+
        <BLANKLINE>

        Transformations on non-repeated fields may be expressed as a string representing a column name,
        a Column expression or None.
        (In this example the column "id" will be dropped because it was not selected)
        >>> new_df = nested.select(df, {
        ...     "s.a": "s.a",                        # Column name (string)
        ...     "s.b": None,                         # None: use to keep a column without having to repeat its name
        ...     "s.c": f.col("s.a") + f.col("s.b")   # Column expression
        ... })
        >>> new_df.printSchema()
        root
         |-- s: struct (nullable = false)
         |    |-- a: integer (nullable = false)
         |    |-- b: integer (nullable = false)
         |    |-- c: integer (nullable = false)
        <BLANKLINE>
        >>> new_df.show()
        +---------+
        |        s|
        +---------+
        |{2, 3, 5}|
        +---------+
        <BLANKLINE>

    Examples: Example 2: repeated fields
        >>> df = spark.sql('SELECT 1 as id, ARRAY(STRUCT(1 as a, 2 as b), STRUCT(3 as a, 4 as b)) as s')
        >>> nested.print_schema(df)
        root
         |-- id: integer (nullable = false)
         |-- s!.a: integer (nullable = false)
         |-- s!.b: integer (nullable = false)
        <BLANKLINE>
        >>> df.show()
        +---+----------------+
        | id|               s|
        +---+----------------+
        |  1|[{1, 2}, {3, 4}]|
        +---+----------------+
        <BLANKLINE>

        Transformations on repeated fields must be expressed as higher-order
        functions (lambda expressions or named functions).
        The value passed to this function will correspond to the last repeated element.
        >>> df.transform(nested.select, {
        ...     "s!.a": lambda s: s["a"],
        ...     "s!.b": None,
        ...     "s!.c": lambda s: s["a"] + s["b"]
        ... }).show(truncate=False)
        +----------------------+
        |s                     |
        +----------------------+
        |[{1, 2, 3}, {3, 4, 7}]|
        +----------------------+
        <BLANKLINE>

        String and column expressions can be used on repeated fields as well,
        but their value will be repeated multiple times.
        >>> df.transform(nested.select, {
        ...     "id": None,
        ...     "s!.a": "id",
        ...     "s!.b": f.lit(2)
        ... }).show(truncate=False)
        +---+----------------+
        |id |s               |
        +---+----------------+
        |1  |[{1, 2}, {1, 2}]|
        +---+----------------+
        <BLANKLINE>

    Examples: Example 3: field repeated twice
        >>> df = spark.sql('''
        ...     SELECT
        ...         1 as id,
        ...         ARRAY(STRUCT(ARRAY(1, 2, 3) as e)) as s1,
        ...         ARRAY(STRUCT(ARRAY(4, 5, 6) as e)) as s2
        ... ''')
        >>> nested.print_schema(df)
        root
         |-- id: integer (nullable = false)
         |-- s1!.e!: integer (nullable = false)
         |-- s2!.e!: integer (nullable = false)
        <BLANKLINE>
        >>> df.show()
        +---+-------------+-------------+
        | id|           s1|           s2|
        +---+-------------+-------------+
        |  1|[{[1, 2, 3]}]|[{[4, 5, 6]}]|
        +---+-------------+-------------+
        <BLANKLINE>

        Here, the lambda expression will be applied to the last repeated element `e`.
        >>> new_df = df.transform(nested.select, {
        ...  "s1!.e!": None,
        ...  "s2!.e!": lambda e : e.cast("DOUBLE")
        ... })
        >>> nested.print_schema(new_df)
        root
         |-- s1!.e!: integer (nullable = false)
         |-- s2!.e!: double (nullable = false)
        <BLANKLINE>
        >>> new_df.show()
        +-------------+-------------------+
        |           s1|                 s2|
        +-------------+-------------------+
        |[{[1, 2, 3]}]|[{[4.0, 5.0, 6.0]}]|
        +-------------+-------------------+
        <BLANKLINE>

    Examples: Example 4: Dataframe with maps
        >>> df = spark.sql('''
        ...     SELECT
        ...         1 as id,
        ...         MAP("a", STRUCT(2 as a, 3 as b)) as m1
        ... ''')
        >>> nested.print_schema(df)
        root
         |-- id: integer (nullable = false)
         |-- m1%key: string (nullable = false)
         |-- m1%value.a: integer (nullable = false)
         |-- m1%value.b: integer (nullable = false)
        <BLANKLINE>
        >>> df.show()
        +---+-------------+
        | id|           m1|
        +---+-------------+
        |  1|{a -> {2, 3}}|
        +---+-------------+
        <BLANKLINE>

        >>> new_df = df.transform(nested.select, {
        ...  "id": None,
        ...  "m1%key": lambda key : f.upper(key),
        ...  "m1%value.a": lambda value : value["a"].cast("DOUBLE")
        ... })
        >>> nested.print_schema(new_df)
        root
         |-- id: integer (nullable = false)
         |-- m1%key: string (nullable = false)
         |-- m1%value.a: double (nullable = false)
        <BLANKLINE>
        >>> new_df.show()
        +---+------------+
        | id|          m1|
        +---+------------+
        |  1|{A -> {2.0}}|
        +---+------------+
        <BLANKLINE>

    Examples: Example 5: Accessing multiple repetition levels
        >>> df = spark.sql('''
        ...     SELECT
        ...         1 as id,
        ...         ARRAY(
        ...             STRUCT(2 as average, ARRAY(1, 2, 3) as values),
        ...             STRUCT(3 as average, ARRAY(1, 2, 3, 4, 5) as values)
        ...         ) as s1
        ... ''')
        >>> nested.print_schema(df)
        root
         |-- id: integer (nullable = false)
         |-- s1!.average: integer (nullable = false)
         |-- s1!.values!: integer (nullable = false)
        <BLANKLINE>
        >>> df.show(truncate=False)
        +---+--------------------------------------+
        |id |s1                                    |
        +---+--------------------------------------+
        |1  |[{2, [1, 2, 3]}, {3, [1, 2, 3, 4, 5]}]|
        +---+--------------------------------------+
        <BLANKLINE>

        Here, the transformation applied to "s1!.values!" takes two arguments.
        >>> new_df = df.transform(nested.select, {
        ...  "id": None,
        ...  "s1!.average": None,
        ...  "s1!.values!": lambda s1, value : value - s1["average"]
        ... })
        >>> new_df.show(truncate=False)
        +---+-----------------------------------------+
        |id |s1                                       |
        +---+-----------------------------------------+
        |1  |[{2, [-1, 0, 1]}, {3, [-2, -1, 0, 1, 2]}]|
        +---+-----------------------------------------+
        <BLANKLINE>

        Extra arguments can be added to the left for each repetition level, up to the root level.
        >>> new_df = df.transform(nested.select, {
        ...  "id": None,
        ...  "s1!.average": None,
        ...  "s1!.values!": lambda root, s1, value : value - s1["average"] + root["id"]
        ... })
        >>> new_df.show(truncate=False)
        +---+---------------------------------------+
        |id |s1                                     |
        +---+---------------------------------------+
        |1  |[{2, [0, 1, 2]}, {3, [-1, 0, 1, 2, 3]}]|
        +---+---------------------------------------+
        <BLANKLINE>

    """
    return df.select(*resolve_nested_fields(fields, starting_level=df))

Aggregating nested fields [TODO]

Advanced transformations with nested fields [TODO]