Skip to content

spark_frame.nested_functions

Like with pyspark.sql.functions, the methods in this module all return Column expressions and can be used to build operations on Spark DataFrames using select, withColumn, etc.


aggregate(field_name: str, initial_value: StringOrColumn, merge: Callable[[Column, Column], Column], start: Optional[Callable[[Column], Column]] = None, finish: Optional[Callable[[Column], Column]] = None, starting_level: Union[Column, DataFrame, None] = None) -> Column

Recursively compute an aggregation of all elements in the given repeated field.

Limitation: Dots, percents, and exclamation marks are not supported in field names

Given the syntax used, every method defined in the spark_frame.nested module assumes that all field names in DataFrames do not contain any dot ., percent % or exclamation mark !. This can be worked around using the transformation spark_frame.transformations.transform_all_field_names.

Parameters:

Name Type Description Default
field_name str

Name of the repeated field to sum. It may be repeated multiple times.

required
initial_value StringOrColumn

Name of column or Column expression.

required
merge Callable[[Column, Column], Column]

A binary function (acc: Column, x: Column[) -> Column returning an expression of the same type as initial_value.

required
start Optional[Callable[[Column], Column]]

An optional unary function (x: Column) -> Column that transforms the values to aggregate into the same type as initial_value.

None
finish Optional[Callable[[Column], Column]]

An optional unary function (x: Column) -> Column used to convert accumulated value into the final result.

None
starting_level Union[Column, DataFrame, None]

Nesting level from which the aggregation is started

None

Returns:

Type Description
Column

A Column expression

Examples:

>>> from spark_frame.nested_functions_impl.aggregate import _get_sample_data
>>> from spark_frame import nested
>>> from spark_frame import nested_functions as nf
>>> employee_df = _get_sample_data()
>>> nested.print_schema(employee_df)
root
 |-- employee_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- projects!.name: string (nullable = true)
 |-- projects!.client: string (nullable = true)
 |-- projects!.tasks!.name: string (nullable = true)
 |-- projects!.tasks!.estimate: long (nullable = true)

>>> employee_df.withColumn("projects", f.to_json("projects.tasks")).show(truncate=False)
+-----------+----------+---+-----------------------------------------------------------------------------------------------------------------------------------+
|employee_id|name      |age|projects                                                                                                                           |
+-----------+----------+---+-----------------------------------------------------------------------------------------------------------------------------------+
|1          |John Smith|30 |[[{"name":"Task 1","estimate":8},{"name":"Task 2","estimate":5}],[{"name":"Task 3","estimate":13},{"name":"Task 4","estimate":3}]] |
|1          |Jane Doe  |25 |[[{"name":"Task 5","estimate":20},{"name":"Task 6","estimate":13}],[{"name":"Task 7","estimate":8},{"name":"Task 8","estimate":5}]]|
+-----------+----------+---+-----------------------------------------------------------------------------------------------------------------------------------+

>>> employee_df.transform(nested.select, {
...     "employee_id": None,
...     "name": None,
...     "age": None,
...     "projects!.tasks!.estimate": None
... }).show(truncate=False)
+-----------+----------+---+------------------------------+
|employee_id|name      |age|projects                      |
+-----------+----------+---+------------------------------+
|1          |John Smith|30 |[{[{8}, {5}]}, {[{13}, {3}]}] |
|1          |Jane Doe  |25 |[{[{20}, {13}]}, {[{8}, {5}]}]|
+-----------+----------+---+------------------------------+

>>> employee_df.transform(
...     nested.select,
...     {
...         "employee_id": None,
...         "name": None,
...         "age": None,
...         "total_task_estimate": nf.aggregate(
...             field_name="projects!.tasks!.estimate",
...             initial_value=f.lit(0).cast("BIGINT"),
...             merge=lambda acc, x: acc + x
...         ),
...         "projects!.task_estimate_per_project": lambda project: nf.aggregate(
...             field_name="tasks!.estimate",
...             initial_value=f.lit(0).cast("BIGINT"),
...             merge=lambda acc, x: acc + x,
...             starting_level=project,
...         ),
...     },
... ).show(truncate=False)
+-----------+----------+---+-------------------+------------+
|employee_id|name      |age|total_task_estimate|projects    |
+-----------+----------+---+-------------------+------------+
|1          |John Smith|30 |29                 |[{13}, {16}]|
|1          |Jane Doe  |25 |46                 |[{33}, {13}]|
+-----------+----------+---+-------------------+------------+
Source code in spark_frame/nested_functions_impl/aggregate.py
def aggregate(
    field_name: str,
    initial_value: StringOrColumn,
    merge: Callable[[Column, Column], Column],
    start: Optional[Callable[[Column], Column]] = None,
    finish: Optional[Callable[[Column], Column]] = None,
    starting_level: Union[Column, DataFrame, None] = None,
) -> Column:
    """Recursively compute an aggregation of all elements in the given repeated field.

    !!! warning "Limitation: Dots, percents, and exclamation marks are not supported in field names"
        Given the syntax used, every method defined in the `spark_frame.nested` module assumes that all field
        names in DataFrames do not contain any dot `.`, percent `%` or exclamation mark `!`.
        This can be worked around using the transformation
        [`spark_frame.transformations.transform_all_field_names`]
        [spark_frame.transformations_impl.transform_all_field_names.transform_all_field_names].

    Args:
        field_name: Name of the repeated field to sum. It may be repeated multiple times.
        initial_value: Name of column or Column expression.
        merge: A binary function `(acc: Column, x: Column[) -> Column` returning an expression
            of the same type as `initial_value`.
        start: An optional unary function `(x: Column) -> Column` that transforms the values to aggregate into the
            same type as `initial_value`.
        finish: An optional unary function `(x: Column) -> Column` used to convert accumulated value into the final
            result.
        starting_level: Nesting level from which the aggregation is started

    Returns:
        A Column expression

    Examples:
        >>> from spark_frame.nested_functions_impl.aggregate import _get_sample_data
        >>> from spark_frame import nested
        >>> from spark_frame import nested_functions as nf
        >>> employee_df = _get_sample_data()
        >>> nested.print_schema(employee_df)
        root
         |-- employee_id: integer (nullable = true)
         |-- name: string (nullable = true)
         |-- age: long (nullable = true)
         |-- projects!.name: string (nullable = true)
         |-- projects!.client: string (nullable = true)
         |-- projects!.tasks!.name: string (nullable = true)
         |-- projects!.tasks!.estimate: long (nullable = true)
        <BLANKLINE>
        >>> employee_df.withColumn("projects", f.to_json("projects.tasks")).show(truncate=False)
        +-----------+----------+---+-----------------------------------------------------------------------------------------------------------------------------------+
        |employee_id|name      |age|projects                                                                                                                           |
        +-----------+----------+---+-----------------------------------------------------------------------------------------------------------------------------------+
        |1          |John Smith|30 |[[{"name":"Task 1","estimate":8},{"name":"Task 2","estimate":5}],[{"name":"Task 3","estimate":13},{"name":"Task 4","estimate":3}]] |
        |1          |Jane Doe  |25 |[[{"name":"Task 5","estimate":20},{"name":"Task 6","estimate":13}],[{"name":"Task 7","estimate":8},{"name":"Task 8","estimate":5}]]|
        +-----------+----------+---+-----------------------------------------------------------------------------------------------------------------------------------+
        <BLANKLINE>
        >>> employee_df.transform(nested.select, {
        ...     "employee_id": None,
        ...     "name": None,
        ...     "age": None,
        ...     "projects!.tasks!.estimate": None
        ... }).show(truncate=False)
        +-----------+----------+---+------------------------------+
        |employee_id|name      |age|projects                      |
        +-----------+----------+---+------------------------------+
        |1          |John Smith|30 |[{[{8}, {5}]}, {[{13}, {3}]}] |
        |1          |Jane Doe  |25 |[{[{20}, {13}]}, {[{8}, {5}]}]|
        +-----------+----------+---+------------------------------+
        <BLANKLINE>
        >>> employee_df.transform(
        ...     nested.select,
        ...     {
        ...         "employee_id": None,
        ...         "name": None,
        ...         "age": None,
        ...         "total_task_estimate": nf.aggregate(
        ...             field_name="projects!.tasks!.estimate",
        ...             initial_value=f.lit(0).cast("BIGINT"),
        ...             merge=lambda acc, x: acc + x
        ...         ),
        ...         "projects!.task_estimate_per_project": lambda project: nf.aggregate(
        ...             field_name="tasks!.estimate",
        ...             initial_value=f.lit(0).cast("BIGINT"),
        ...             merge=lambda acc, x: acc + x,
        ...             starting_level=project,
        ...         ),
        ...     },
        ... ).show(truncate=False)
        +-----------+----------+---+-------------------+------------+
        |employee_id|name      |age|total_task_estimate|projects    |
        +-----------+----------+---+-------------------+------------+
        |1          |John Smith|30 |29                 |[{13}, {16}]|
        |1          |Jane Doe  |25 |46                 |[{33}, {13}]|
        +-----------+----------+---+-------------------+------------+
        <BLANKLINE>
    """  # noqa: E501
    validate_nested_field_names(field_name, allow_maps=False)
    agg_merge = PrintableFunction(
        lambda a: f.aggregate(a, initial_value, merge),
        lambda s: f"f.aggregate({s}, initial_value, merge)",
    )
    if finish is not None:
        agg_finish = PrintableFunction(
            lambda a: f.aggregate(f.array(a), initial_value, merge, finish),
            lambda s: f"f.aggregate(f.array({s}), initial_value, merge, finish))",
        )
    else:
        agg_finish = higher_order.identity
    if start is not None:
        agg_start = PrintableFunction(start, lambda s: f"start({s})")
    else:
        agg_start = higher_order.identity

    field_parts = _split_field_name(field_name)

    def recurse_item(parts: List[str], prefix: str = "") -> PrintableFunction:
        key = parts[0]
        is_struct = key == STRUCT_SEPARATOR
        is_repeated = key == REPETITION_MARKER
        has_children = len(parts) > 1
        if has_children:
            child_transformation = recurse_item(parts[1:], prefix + key)
        else:
            child_transformation = agg_start
        if is_struct:
            assert_true(
                has_children,
                "Error, this should not happen: struct without children",
            )
            return child_transformation
        elif is_repeated:
            return fp.compose(agg_merge, higher_order.transform(child_transformation))
        else:
            return fp.compose(child_transformation, higher_order.struct_get(key))

    root_transformation = recurse_item(field_parts)
    root_transformation = fp.compose(agg_finish, root_transformation)
    return root_transformation(starting_level)

average(field_name: str, starting_level: Union[Column, DataFrame, None] = None) -> Column

Recursively compute the average of all elements in the given repeated field.

Limitation: Dots, percents, and exclamation marks are not supported in field names

Given the syntax used, every method defined in the spark_frame.nested module assumes that all field names in DataFrames do not contain any dot ., percent % or exclamation mark !. This can be worked around using the transformation spark_frame.transformations.transform_all_field_names.

Parameters:

Name Type Description Default
field_name str

Name of the repeated field to sum. It may be repeated multiple times.

required
starting_level Union[Column, DataFrame, None]

Nesting level from which the aggregation is started.

None

Returns:

Type Description
Column

A Column expression

Example 1

>>> from spark_frame.nested_functions_impl.aggregate import _get_sample_data
>>> from spark_frame import nested
>>> from spark_frame import nested_functions as nf
>>> employee_df = _get_sample_data()
>>> nested.print_schema(employee_df)
root
 |-- employee_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- projects!.name: string (nullable = true)
 |-- projects!.client: string (nullable = true)
 |-- projects!.tasks!.name: string (nullable = true)
 |-- projects!.tasks!.estimate: long (nullable = true)

>>> employee_df.withColumn("projects", f.to_json("projects.tasks")).show(truncate=False)
+-----------+----------+---+-----------------------------------------------------------------------------------------------------------------------------------+
|employee_id|name      |age|projects                                                                                                                           |
+-----------+----------+---+-----------------------------------------------------------------------------------------------------------------------------------+
|1          |John Smith|30 |[[{"name":"Task 1","estimate":8},{"name":"Task 2","estimate":5}],[{"name":"Task 3","estimate":13},{"name":"Task 4","estimate":3}]] |
|1          |Jane Doe  |25 |[[{"name":"Task 5","estimate":20},{"name":"Task 6","estimate":13}],[{"name":"Task 7","estimate":8},{"name":"Task 8","estimate":5}]]|
+-----------+----------+---+-----------------------------------------------------------------------------------------------------------------------------------+

>>> employee_df.transform(nested.select, {
...     "employee_id": None,
...     "name": None,
...     "age": None,
...     "projects!.tasks!.estimate": None
... }).show(truncate=False)
+-----------+----------+---+------------------------------+
|employee_id|name      |age|projects                      |
+-----------+----------+---+------------------------------+
|1          |John Smith|30 |[{[{8}, {5}]}, {[{13}, {3}]}] |
|1          |Jane Doe  |25 |[{[{20}, {13}]}, {[{8}, {5}]}]|
+-----------+----------+---+------------------------------+

>>> employee_df.transform(nested.select, {
...     "employee_id": None,
...     "name": None,
...     "age": None,
...     "average_task_estimate": nf.average("projects!.tasks!.estimate"),
...     "projects!.average_task_estimate_per_project":
...         lambda project: nf.average("tasks!.estimate", starting_level=project),
... }).show(truncate=False)
+-----------+----------+---+---------------------+---------------+
|employee_id|name      |age|average_task_estimate|projects       |
+-----------+----------+---+---------------------+---------------+
|1          |John Smith|30 |7.25                 |[{6.5}, {8.0}] |
|1          |Jane Doe  |25 |11.5                 |[{16.5}, {6.5}]|
+-----------+----------+---+---------------------+---------------+

Example 2 : with all kind of nested structures

>>> from pyspark.sql import SparkSession
>>> spark = SparkSession.builder.appName("doctest").getOrCreate()
>>> df = spark.sql('''SELECT
...     1 as id,
...     ARRAY(STRUCT(1 as a), STRUCT(2 as a)) as s1,
...     ARRAY(ARRAY(1, 2), ARRAY(3, 4)) as s2,
...     ARRAY(ARRAY(STRUCT(1 as a)), ARRAY(STRUCT(2 as a))) as s3,
...     ARRAY(STRUCT(ARRAY(1, 2) as a), STRUCT(ARRAY(3, 4) as a)) as s4,
...     ARRAY(
...         STRUCT(ARRAY(STRUCT(STRUCT(1 as c) as b), STRUCT(STRUCT(2 as c) as b)) as a),
...         STRUCT(ARRAY(STRUCT(STRUCT(3 as c) as b), STRUCT(STRUCT(4 as c) as b)) as a)
...     ) as s5
... ''')
>>> nested.print_schema(df)
root
 |-- id: integer (nullable = false)
 |-- s1!.a: integer (nullable = false)
 |-- s2!!: integer (nullable = false)
 |-- s3!!.a: integer (nullable = false)
 |-- s4!.a!: integer (nullable = false)
 |-- s5!.a!.b.c: integer (nullable = false)

>>> df.show(truncate=False)
+---+----------+----------------+--------------+--------------------+------------------------------------+
|id |s1        |s2              |s3            |s4                  |s5                                  |
+---+----------+----------------+--------------+--------------------+------------------------------------+
|1  |[{1}, {2}]|[[1, 2], [3, 4]]|[[{1}], [{2}]]|[{[1, 2]}, {[3, 4]}]|[{[{{1}}, {{2}}]}, {[{{3}}, {{4}}]}]|
+---+----------+----------------+--------------+--------------------+------------------------------------+

>>> df.select(nf.average("s1!.a").alias("average")).show()
+-------+
|average|
+-------+
|    1.5|
+-------+

>>> df.select(nf.average("s2!!").alias("average")).show()
+-------+
|average|
+-------+
|    2.5|
+-------+

>>> df.select(nf.average("s3!!.a").alias("average")).show()
+-------+
|average|
+-------+
|    1.5|
+-------+

>>> df.select(nf.average("s4!.a!").alias("average")).show()
+-------+
|average|
+-------+
|    2.5|
+-------+

>>> df.select(nf.average("s5!.a!.b.c").alias("average")).show()
+-------+
|average|
+-------+
|    2.5|
+-------+
Source code in spark_frame/nested_functions_impl/average.py
def average(
    field_name: str,
    starting_level: Union[Column, DataFrame, None] = None,
) -> Column:
    """Recursively compute the average of all elements in the given repeated field.

    !!! warning "Limitation: Dots, percents, and exclamation marks are not supported in field names"
        Given the syntax used, every method defined in the `spark_frame.nested` module assumes that all field
        names in DataFrames do not contain any dot `.`, percent `%` or exclamation mark `!`.
        This can be worked around using the transformation
        [`spark_frame.transformations.transform_all_field_names`]
        [spark_frame.transformations_impl.transform_all_field_names.transform_all_field_names].

    Args:
        field_name: Name of the repeated field to sum. It may be repeated multiple times.
        starting_level: Nesting level from which the aggregation is started.

    Returns:
        A Column expression

    Examples: Example 1
        >>> from spark_frame.nested_functions_impl.aggregate import _get_sample_data
        >>> from spark_frame import nested
        >>> from spark_frame import nested_functions as nf
        >>> employee_df = _get_sample_data()
        >>> nested.print_schema(employee_df)
        root
         |-- employee_id: integer (nullable = true)
         |-- name: string (nullable = true)
         |-- age: long (nullable = true)
         |-- projects!.name: string (nullable = true)
         |-- projects!.client: string (nullable = true)
         |-- projects!.tasks!.name: string (nullable = true)
         |-- projects!.tasks!.estimate: long (nullable = true)
        <BLANKLINE>
        >>> employee_df.withColumn("projects", f.to_json("projects.tasks")).show(truncate=False)
        +-----------+----------+---+-----------------------------------------------------------------------------------------------------------------------------------+
        |employee_id|name      |age|projects                                                                                                                           |
        +-----------+----------+---+-----------------------------------------------------------------------------------------------------------------------------------+
        |1          |John Smith|30 |[[{"name":"Task 1","estimate":8},{"name":"Task 2","estimate":5}],[{"name":"Task 3","estimate":13},{"name":"Task 4","estimate":3}]] |
        |1          |Jane Doe  |25 |[[{"name":"Task 5","estimate":20},{"name":"Task 6","estimate":13}],[{"name":"Task 7","estimate":8},{"name":"Task 8","estimate":5}]]|
        +-----------+----------+---+-----------------------------------------------------------------------------------------------------------------------------------+
        <BLANKLINE>
        >>> employee_df.transform(nested.select, {
        ...     "employee_id": None,
        ...     "name": None,
        ...     "age": None,
        ...     "projects!.tasks!.estimate": None
        ... }).show(truncate=False)
        +-----------+----------+---+------------------------------+
        |employee_id|name      |age|projects                      |
        +-----------+----------+---+------------------------------+
        |1          |John Smith|30 |[{[{8}, {5}]}, {[{13}, {3}]}] |
        |1          |Jane Doe  |25 |[{[{20}, {13}]}, {[{8}, {5}]}]|
        +-----------+----------+---+------------------------------+
        <BLANKLINE>
        >>> employee_df.transform(nested.select, {
        ...     "employee_id": None,
        ...     "name": None,
        ...     "age": None,
        ...     "average_task_estimate": nf.average("projects!.tasks!.estimate"),
        ...     "projects!.average_task_estimate_per_project":
        ...         lambda project: nf.average("tasks!.estimate", starting_level=project),
        ... }).show(truncate=False)
        +-----------+----------+---+---------------------+---------------+
        |employee_id|name      |age|average_task_estimate|projects       |
        +-----------+----------+---+---------------------+---------------+
        |1          |John Smith|30 |7.25                 |[{6.5}, {8.0}] |
        |1          |Jane Doe  |25 |11.5                 |[{16.5}, {6.5}]|
        +-----------+----------+---+---------------------+---------------+
        <BLANKLINE>

    Examples: Example 2 : with all kind of nested structures
        >>> from pyspark.sql import SparkSession
        >>> spark = SparkSession.builder.appName("doctest").getOrCreate()
        >>> df = spark.sql('''SELECT
        ...     1 as id,
        ...     ARRAY(STRUCT(1 as a), STRUCT(2 as a)) as s1,
        ...     ARRAY(ARRAY(1, 2), ARRAY(3, 4)) as s2,
        ...     ARRAY(ARRAY(STRUCT(1 as a)), ARRAY(STRUCT(2 as a))) as s3,
        ...     ARRAY(STRUCT(ARRAY(1, 2) as a), STRUCT(ARRAY(3, 4) as a)) as s4,
        ...     ARRAY(
        ...         STRUCT(ARRAY(STRUCT(STRUCT(1 as c) as b), STRUCT(STRUCT(2 as c) as b)) as a),
        ...         STRUCT(ARRAY(STRUCT(STRUCT(3 as c) as b), STRUCT(STRUCT(4 as c) as b)) as a)
        ...     ) as s5
        ... ''')
        >>> nested.print_schema(df)
        root
         |-- id: integer (nullable = false)
         |-- s1!.a: integer (nullable = false)
         |-- s2!!: integer (nullable = false)
         |-- s3!!.a: integer (nullable = false)
         |-- s4!.a!: integer (nullable = false)
         |-- s5!.a!.b.c: integer (nullable = false)
        <BLANKLINE>
        >>> df.show(truncate=False)
        +---+----------+----------------+--------------+--------------------+------------------------------------+
        |id |s1        |s2              |s3            |s4                  |s5                                  |
        +---+----------+----------------+--------------+--------------------+------------------------------------+
        |1  |[{1}, {2}]|[[1, 2], [3, 4]]|[[{1}], [{2}]]|[{[1, 2]}, {[3, 4]}]|[{[{{1}}, {{2}}]}, {[{{3}}, {{4}}]}]|
        +---+----------+----------------+--------------+--------------------+------------------------------------+
        <BLANKLINE>
        >>> df.select(nf.average("s1!.a").alias("average")).show()
        +-------+
        |average|
        +-------+
        |    1.5|
        +-------+
        <BLANKLINE>
        >>> df.select(nf.average("s2!!").alias("average")).show()
        +-------+
        |average|
        +-------+
        |    2.5|
        +-------+
        <BLANKLINE>
        >>> df.select(nf.average("s3!!.a").alias("average")).show()
        +-------+
        |average|
        +-------+
        |    1.5|
        +-------+
        <BLANKLINE>
        >>> df.select(nf.average("s4!.a!").alias("average")).show()
        +-------+
        |average|
        +-------+
        |    2.5|
        +-------+
        <BLANKLINE>
        >>> df.select(nf.average("s5!.a!.b.c").alias("average")).show()
        +-------+
        |average|
        +-------+
        |    2.5|
        +-------+
        <BLANKLINE>
    """  # noqa: E501
    initial_value = f.struct(
        f.lit(0).cast("BIGINT").alias("sum"),
        f.lit(0).cast("BIGINT").alias("count"),
    )

    def start(x: Column) -> Column:
        return f.struct(x.alias("sum"), f.lit(1).alias("count"))

    def merge(acc: Column, x: Column) -> Column:
        return f.struct(
            (acc["sum"] + x["sum"]).alias("sum"),
            (acc["count"] + x["count"]).alias("count"),
        )

    def finish(acc: Column) -> Column:
        return f.when(acc["count"] > 0, acc["sum"] / acc["count"])

    return aggregate(
        field_name,
        initial_value=initial_value,
        merge=merge,
        start=start,
        finish=finish,
        starting_level=starting_level,
    )

sum(field_name: str, starting_level: Union[Column, DataFrame, None] = None) -> Column

Recursively compute the sum of all elements in the given repeated field.

Limitation: Dots, percents, and exclamation marks are not supported in field names

Given the syntax used, every method defined in the spark_frame.nested module assumes that all field names in DataFrames do not contain any dot ., percent % or exclamation mark !. This can be worked around using the transformation spark_frame.transformations.transform_all_field_names.

Parameters:

Name Type Description Default
field_name str

Name of the repeated field to sum. It may be repeated multiple times.

required
starting_level Union[Column, DataFrame, None]

Nesting level from which the aggregation is started.

None

Returns:

Type Description
Column

A Column expression

Example 1

>>> from spark_frame.nested_functions_impl.aggregate import _get_sample_data
>>> from spark_frame import nested
>>> from spark_frame import nested_functions as nf
>>> employee_df = _get_sample_data()
>>> nested.print_schema(employee_df)
root
 |-- employee_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- projects!.name: string (nullable = true)
 |-- projects!.client: string (nullable = true)
 |-- projects!.tasks!.name: string (nullable = true)
 |-- projects!.tasks!.estimate: long (nullable = true)

>>> employee_df.withColumn("projects", f.to_json("projects.tasks")).show(truncate=False)
+-----------+----------+---+-----------------------------------------------------------------------------------------------------------------------------------+
|employee_id|name      |age|projects                                                                                                                           |
+-----------+----------+---+-----------------------------------------------------------------------------------------------------------------------------------+
|1          |John Smith|30 |[[{"name":"Task 1","estimate":8},{"name":"Task 2","estimate":5}],[{"name":"Task 3","estimate":13},{"name":"Task 4","estimate":3}]] |
|1          |Jane Doe  |25 |[[{"name":"Task 5","estimate":20},{"name":"Task 6","estimate":13}],[{"name":"Task 7","estimate":8},{"name":"Task 8","estimate":5}]]|
+-----------+----------+---+-----------------------------------------------------------------------------------------------------------------------------------+

>>> employee_df.transform(nested.select, {
...     "employee_id": None,
...     "name": None,
...     "age": None,
...     "projects!.tasks!.estimate": None
... }).show(truncate=False)
+-----------+----------+---+------------------------------+
|employee_id|name      |age|projects                      |
+-----------+----------+---+------------------------------+
|1          |John Smith|30 |[{[{8}, {5}]}, {[{13}, {3}]}] |
|1          |Jane Doe  |25 |[{[{20}, {13}]}, {[{8}, {5}]}]|
+-----------+----------+---+------------------------------+

>>> employee_df.transform(nested.select, {
...     "employee_id": None,
...     "name": None,
...     "age": None,
...     "total_task_estimate": nf.sum("projects!.tasks!.estimate"),
...     "projects!.task_estimate_per_project":
...         lambda project: nf.sum("tasks!.estimate", starting_level=project),
... }).show(truncate=False)
+-----------+----------+---+-------------------+------------+
|employee_id|name      |age|total_task_estimate|projects    |
+-----------+----------+---+-------------------+------------+
|1          |John Smith|30 |29                 |[{13}, {16}]|
|1          |Jane Doe  |25 |46                 |[{33}, {13}]|
+-----------+----------+---+-------------------+------------+

Example 2 : with all kind of nested structures*

>>> from pyspark.sql import SparkSession
>>> spark = SparkSession.builder.appName("doctest").getOrCreate()
>>> df = spark.sql('''SELECT
...     1 as id,
...     ARRAY(STRUCT(1 as a), STRUCT(2 as a)) as s1,
...     ARRAY(ARRAY(1, 2), ARRAY(3, 4)) as s2,
...     ARRAY(ARRAY(STRUCT(1 as a)), ARRAY(STRUCT(2 as a))) as s3,
...     ARRAY(STRUCT(ARRAY(1, 2) as a), STRUCT(ARRAY(3, 4) as a)) as s4,
...     ARRAY(
...         STRUCT(ARRAY(STRUCT(STRUCT(1 as c) as b), STRUCT(STRUCT(2 as c) as b)) as a),
...         STRUCT(ARRAY(STRUCT(STRUCT(3 as c) as b), STRUCT(STRUCT(4 as c) as b)) as a)
...     ) as s5
... ''')
>>> nested.print_schema(df)
root
 |-- id: integer (nullable = false)
 |-- s1!.a: integer (nullable = false)
 |-- s2!!: integer (nullable = false)
 |-- s3!!.a: integer (nullable = false)
 |-- s4!.a!: integer (nullable = false)
 |-- s5!.a!.b.c: integer (nullable = false)

>>> df.show(truncate=False)
+---+----------+----------------+--------------+--------------------+------------------------------------+
|id |s1        |s2              |s3            |s4                  |s5                                  |
+---+----------+----------------+--------------+--------------------+------------------------------------+
|1  |[{1}, {2}]|[[1, 2], [3, 4]]|[[{1}], [{2}]]|[{[1, 2]}, {[3, 4]}]|[{[{{1}}, {{2}}]}, {[{{3}}, {{4}}]}]|
+---+----------+----------------+--------------+--------------------+------------------------------------+

>>> df.select(nf.sum("s1!.a").alias("sum")).show()
+---+
|sum|
+---+
|  3|
+---+

>>> df.select(nf.sum("s2!!").alias("sum")).show()
+---+
|sum|
+---+
| 10|
+---+

>>> df.select(nf.sum("s3!!.a").alias("sum")).show()
+---+
|sum|
+---+
|  3|
+---+

>>> df.select(nf.sum("s4!.a!").alias("sum")).show()
+---+
|sum|
+---+
| 10|
+---+

>>> df.select(nf.sum("s5!.a!.b.c").alias("sum")).show()
+---+
|sum|
+---+
| 10|
+---+
Source code in spark_frame/nested_functions_impl/sum.py
def sum(  # noqa: A001
    field_name: str,
    starting_level: Union[Column, DataFrame, None] = None,
) -> Column:
    """Recursively compute the sum of all elements in the given repeated field.

    !!! warning "Limitation: Dots, percents, and exclamation marks are not supported in field names"
        Given the syntax used, every method defined in the `spark_frame.nested` module assumes that all field
        names in DataFrames do not contain any dot `.`, percent `%` or exclamation mark `!`.
        This can be worked around using the transformation
        [`spark_frame.transformations.transform_all_field_names`]
        [spark_frame.transformations_impl.transform_all_field_names.transform_all_field_names].

    Args:
        field_name: Name of the repeated field to sum. It may be repeated multiple times.
        starting_level: Nesting level from which the aggregation is started.

    Returns:
        A Column expression

    Examples: Example 1
        >>> from spark_frame.nested_functions_impl.aggregate import _get_sample_data
        >>> from spark_frame import nested
        >>> from spark_frame import nested_functions as nf
        >>> employee_df = _get_sample_data()
        >>> nested.print_schema(employee_df)
        root
         |-- employee_id: integer (nullable = true)
         |-- name: string (nullable = true)
         |-- age: long (nullable = true)
         |-- projects!.name: string (nullable = true)
         |-- projects!.client: string (nullable = true)
         |-- projects!.tasks!.name: string (nullable = true)
         |-- projects!.tasks!.estimate: long (nullable = true)
        <BLANKLINE>
        >>> employee_df.withColumn("projects", f.to_json("projects.tasks")).show(truncate=False)
        +-----------+----------+---+-----------------------------------------------------------------------------------------------------------------------------------+
        |employee_id|name      |age|projects                                                                                                                           |
        +-----------+----------+---+-----------------------------------------------------------------------------------------------------------------------------------+
        |1          |John Smith|30 |[[{"name":"Task 1","estimate":8},{"name":"Task 2","estimate":5}],[{"name":"Task 3","estimate":13},{"name":"Task 4","estimate":3}]] |
        |1          |Jane Doe  |25 |[[{"name":"Task 5","estimate":20},{"name":"Task 6","estimate":13}],[{"name":"Task 7","estimate":8},{"name":"Task 8","estimate":5}]]|
        +-----------+----------+---+-----------------------------------------------------------------------------------------------------------------------------------+
        <BLANKLINE>
        >>> employee_df.transform(nested.select, {
        ...     "employee_id": None,
        ...     "name": None,
        ...     "age": None,
        ...     "projects!.tasks!.estimate": None
        ... }).show(truncate=False)
        +-----------+----------+---+------------------------------+
        |employee_id|name      |age|projects                      |
        +-----------+----------+---+------------------------------+
        |1          |John Smith|30 |[{[{8}, {5}]}, {[{13}, {3}]}] |
        |1          |Jane Doe  |25 |[{[{20}, {13}]}, {[{8}, {5}]}]|
        +-----------+----------+---+------------------------------+
        <BLANKLINE>
        >>> employee_df.transform(nested.select, {
        ...     "employee_id": None,
        ...     "name": None,
        ...     "age": None,
        ...     "total_task_estimate": nf.sum("projects!.tasks!.estimate"),
        ...     "projects!.task_estimate_per_project":
        ...         lambda project: nf.sum("tasks!.estimate", starting_level=project),
        ... }).show(truncate=False)
        +-----------+----------+---+-------------------+------------+
        |employee_id|name      |age|total_task_estimate|projects    |
        +-----------+----------+---+-------------------+------------+
        |1          |John Smith|30 |29                 |[{13}, {16}]|
        |1          |Jane Doe  |25 |46                 |[{33}, {13}]|
        +-----------+----------+---+-------------------+------------+
        <BLANKLINE>

    Examples: Example 2 : with all kind of nested structures*
        >>> from pyspark.sql import SparkSession
        >>> spark = SparkSession.builder.appName("doctest").getOrCreate()
        >>> df = spark.sql('''SELECT
        ...     1 as id,
        ...     ARRAY(STRUCT(1 as a), STRUCT(2 as a)) as s1,
        ...     ARRAY(ARRAY(1, 2), ARRAY(3, 4)) as s2,
        ...     ARRAY(ARRAY(STRUCT(1 as a)), ARRAY(STRUCT(2 as a))) as s3,
        ...     ARRAY(STRUCT(ARRAY(1, 2) as a), STRUCT(ARRAY(3, 4) as a)) as s4,
        ...     ARRAY(
        ...         STRUCT(ARRAY(STRUCT(STRUCT(1 as c) as b), STRUCT(STRUCT(2 as c) as b)) as a),
        ...         STRUCT(ARRAY(STRUCT(STRUCT(3 as c) as b), STRUCT(STRUCT(4 as c) as b)) as a)
        ...     ) as s5
        ... ''')
        >>> nested.print_schema(df)
        root
         |-- id: integer (nullable = false)
         |-- s1!.a: integer (nullable = false)
         |-- s2!!: integer (nullable = false)
         |-- s3!!.a: integer (nullable = false)
         |-- s4!.a!: integer (nullable = false)
         |-- s5!.a!.b.c: integer (nullable = false)
        <BLANKLINE>
        >>> df.show(truncate=False)
        +---+----------+----------------+--------------+--------------------+------------------------------------+
        |id |s1        |s2              |s3            |s4                  |s5                                  |
        +---+----------+----------------+--------------+--------------------+------------------------------------+
        |1  |[{1}, {2}]|[[1, 2], [3, 4]]|[[{1}], [{2}]]|[{[1, 2]}, {[3, 4]}]|[{[{{1}}, {{2}}]}, {[{{3}}, {{4}}]}]|
        +---+----------+----------------+--------------+--------------------+------------------------------------+
        <BLANKLINE>
        >>> df.select(nf.sum("s1!.a").alias("sum")).show()
        +---+
        |sum|
        +---+
        |  3|
        +---+
        <BLANKLINE>
        >>> df.select(nf.sum("s2!!").alias("sum")).show()
        +---+
        |sum|
        +---+
        | 10|
        +---+
        <BLANKLINE>
        >>> df.select(nf.sum("s3!!.a").alias("sum")).show()
        +---+
        |sum|
        +---+
        |  3|
        +---+
        <BLANKLINE>
        >>> df.select(nf.sum("s4!.a!").alias("sum")).show()
        +---+
        |sum|
        +---+
        | 10|
        +---+
        <BLANKLINE>
        >>> df.select(nf.sum("s5!.a!.b.c").alias("sum")).show()
        +---+
        |sum|
        +---+
        | 10|
        +---+
        <BLANKLINE>
    """  # noqa: E501
    initial_value = f.lit(0).cast("BIGINT")

    def merge(acc: Column, x: Column) -> Column:
        return acc + x

    return aggregate(
        field_name,
        initial_value=initial_value,
        merge=merge,
        starting_level=starting_level,
    )