spark_frame.nested_functions
Like with pyspark.sql.functions,
the methods in this module all return Column expressions and can be used to build operations
on Spark DataFrames using select
, withColumn
, etc.
aggregate(field_name: str, initial_value: StringOrColumn, merge: Callable[[Column, Column], Column], start: Optional[Callable[[Column], Column]] = None, finish: Optional[Callable[[Column], Column]] = None, starting_level: Union[Column, DataFrame, None] = None) -> Column
Recursively compute an aggregation of all elements in the given repeated field.
Limitation: Dots, percents, and exclamation marks are not supported in field names
Given the syntax used, every method defined in the spark_frame.nested
module assumes that all field
names in DataFrames do not contain any dot .
, percent %
or exclamation mark !
.
This can be worked around using the transformation
spark_frame.transformations.transform_all_field_names
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
field_name |
str
|
Name of the repeated field to sum. It may be repeated multiple times. |
required |
initial_value |
StringOrColumn
|
Name of column or Column expression. |
required |
merge |
Callable[[Column, Column], Column]
|
A binary function |
required |
start |
Optional[Callable[[Column], Column]]
|
An optional unary function |
None
|
finish |
Optional[Callable[[Column], Column]]
|
An optional unary function |
None
|
starting_level |
Union[Column, DataFrame, None]
|
Nesting level from which the aggregation is started |
None
|
Returns:
Type | Description |
---|---|
Column
|
A Column expression |
Examples:
>>> from spark_frame.nested_functions_impl.aggregate import _get_sample_data
>>> from spark_frame import nested
>>> from spark_frame import nested_functions as nf
>>> employee_df = _get_sample_data()
>>> nested.print_schema(employee_df)
root
|-- employee_id: integer (nullable = true)
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- projects!.name: string (nullable = true)
|-- projects!.client: string (nullable = true)
|-- projects!.tasks!.name: string (nullable = true)
|-- projects!.tasks!.estimate: long (nullable = true)
>>> employee_df.withColumn("projects", f.to_json("projects.tasks")).show(truncate=False)
+-----------+----------+---+-----------------------------------------------------------------------------------------------------------------------------------+
|employee_id|name |age|projects |
+-----------+----------+---+-----------------------------------------------------------------------------------------------------------------------------------+
|1 |John Smith|30 |[[{"name":"Task 1","estimate":8},{"name":"Task 2","estimate":5}],[{"name":"Task 3","estimate":13},{"name":"Task 4","estimate":3}]] |
|1 |Jane Doe |25 |[[{"name":"Task 5","estimate":20},{"name":"Task 6","estimate":13}],[{"name":"Task 7","estimate":8},{"name":"Task 8","estimate":5}]]|
+-----------+----------+---+-----------------------------------------------------------------------------------------------------------------------------------+
>>> employee_df.transform(nested.select, {
... "employee_id": None,
... "name": None,
... "age": None,
... "projects!.tasks!.estimate": None
... }).show(truncate=False)
+-----------+----------+---+------------------------------+
|employee_id|name |age|projects |
+-----------+----------+---+------------------------------+
|1 |John Smith|30 |[{[{8}, {5}]}, {[{13}, {3}]}] |
|1 |Jane Doe |25 |[{[{20}, {13}]}, {[{8}, {5}]}]|
+-----------+----------+---+------------------------------+
>>> employee_df.transform(
... nested.select,
... {
... "employee_id": None,
... "name": None,
... "age": None,
... "total_task_estimate": nf.aggregate(
... field_name="projects!.tasks!.estimate",
... initial_value=f.lit(0).cast("BIGINT"),
... merge=lambda acc, x: acc + x
... ),
... "projects!.task_estimate_per_project": lambda project: nf.aggregate(
... field_name="tasks!.estimate",
... initial_value=f.lit(0).cast("BIGINT"),
... merge=lambda acc, x: acc + x,
... starting_level=project,
... ),
... },
... ).show(truncate=False)
+-----------+----------+---+-------------------+------------+
|employee_id|name |age|total_task_estimate|projects |
+-----------+----------+---+-------------------+------------+
|1 |John Smith|30 |29 |[{13}, {16}]|
|1 |Jane Doe |25 |46 |[{33}, {13}]|
+-----------+----------+---+-------------------+------------+
Source code in spark_frame/nested_functions_impl/aggregate.py
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
|
average(field_name: str, starting_level: Union[Column, DataFrame, None] = None) -> Column
Recursively compute the average of all elements in the given repeated field.
Limitation: Dots, percents, and exclamation marks are not supported in field names
Given the syntax used, every method defined in the spark_frame.nested
module assumes that all field
names in DataFrames do not contain any dot .
, percent %
or exclamation mark !
.
This can be worked around using the transformation
spark_frame.transformations.transform_all_field_names
.
Parameters:
Returns:
Type | Description |
---|---|
Column
|
A Column expression |
Example 1
>>> from spark_frame.nested_functions_impl.aggregate import _get_sample_data
>>> from spark_frame import nested
>>> from spark_frame import nested_functions as nf
>>> employee_df = _get_sample_data()
>>> nested.print_schema(employee_df)
root
|-- employee_id: integer (nullable = true)
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- projects!.name: string (nullable = true)
|-- projects!.client: string (nullable = true)
|-- projects!.tasks!.name: string (nullable = true)
|-- projects!.tasks!.estimate: long (nullable = true)
>>> employee_df.withColumn("projects", f.to_json("projects.tasks")).show(truncate=False)
+-----------+----------+---+-----------------------------------------------------------------------------------------------------------------------------------+
|employee_id|name |age|projects |
+-----------+----------+---+-----------------------------------------------------------------------------------------------------------------------------------+
|1 |John Smith|30 |[[{"name":"Task 1","estimate":8},{"name":"Task 2","estimate":5}],[{"name":"Task 3","estimate":13},{"name":"Task 4","estimate":3}]] |
|1 |Jane Doe |25 |[[{"name":"Task 5","estimate":20},{"name":"Task 6","estimate":13}],[{"name":"Task 7","estimate":8},{"name":"Task 8","estimate":5}]]|
+-----------+----------+---+-----------------------------------------------------------------------------------------------------------------------------------+
>>> employee_df.transform(nested.select, {
... "employee_id": None,
... "name": None,
... "age": None,
... "projects!.tasks!.estimate": None
... }).show(truncate=False)
+-----------+----------+---+------------------------------+
|employee_id|name |age|projects |
+-----------+----------+---+------------------------------+
|1 |John Smith|30 |[{[{8}, {5}]}, {[{13}, {3}]}] |
|1 |Jane Doe |25 |[{[{20}, {13}]}, {[{8}, {5}]}]|
+-----------+----------+---+------------------------------+
>>> employee_df.transform(nested.select, {
... "employee_id": None,
... "name": None,
... "age": None,
... "average_task_estimate": nf.average("projects!.tasks!.estimate"),
... "projects!.average_task_estimate_per_project":
... lambda project: nf.average("tasks!.estimate", starting_level=project),
... }).show(truncate=False)
+-----------+----------+---+---------------------+---------------+
|employee_id|name |age|average_task_estimate|projects |
+-----------+----------+---+---------------------+---------------+
|1 |John Smith|30 |7.25 |[{6.5}, {8.0}] |
|1 |Jane Doe |25 |11.5 |[{16.5}, {6.5}]|
+-----------+----------+---+---------------------+---------------+
Example 2 : with all kind of nested structures
>>> from pyspark.sql import SparkSession
>>> spark = SparkSession.builder.appName("doctest").getOrCreate()
>>> df = spark.sql('''SELECT
... 1 as id,
... ARRAY(STRUCT(1 as a), STRUCT(2 as a)) as s1,
... ARRAY(ARRAY(1, 2), ARRAY(3, 4)) as s2,
... ARRAY(ARRAY(STRUCT(1 as a)), ARRAY(STRUCT(2 as a))) as s3,
... ARRAY(STRUCT(ARRAY(1, 2) as a), STRUCT(ARRAY(3, 4) as a)) as s4,
... ARRAY(
... STRUCT(ARRAY(STRUCT(STRUCT(1 as c) as b), STRUCT(STRUCT(2 as c) as b)) as a),
... STRUCT(ARRAY(STRUCT(STRUCT(3 as c) as b), STRUCT(STRUCT(4 as c) as b)) as a)
... ) as s5
... ''')
>>> nested.print_schema(df)
root
|-- id: integer (nullable = false)
|-- s1!.a: integer (nullable = false)
|-- s2!!: integer (nullable = false)
|-- s3!!.a: integer (nullable = false)
|-- s4!.a!: integer (nullable = false)
|-- s5!.a!.b.c: integer (nullable = false)
>>> df.show(truncate=False)
+---+----------+----------------+--------------+--------------------+------------------------------------+
|id |s1 |s2 |s3 |s4 |s5 |
+---+----------+----------------+--------------+--------------------+------------------------------------+
|1 |[{1}, {2}]|[[1, 2], [3, 4]]|[[{1}], [{2}]]|[{[1, 2]}, {[3, 4]}]|[{[{{1}}, {{2}}]}, {[{{3}}, {{4}}]}]|
+---+----------+----------------+--------------+--------------------+------------------------------------+
>>> df.select(nf.average("s1!.a").alias("average")).show()
+-------+
|average|
+-------+
| 1.5|
+-------+
>>> df.select(nf.average("s2!!").alias("average")).show()
+-------+
|average|
+-------+
| 2.5|
+-------+
>>> df.select(nf.average("s3!!.a").alias("average")).show()
+-------+
|average|
+-------+
| 1.5|
+-------+
>>> df.select(nf.average("s4!.a!").alias("average")).show()
+-------+
|average|
+-------+
| 2.5|
+-------+
>>> df.select(nf.average("s5!.a!.b.c").alias("average")).show()
+-------+
|average|
+-------+
| 2.5|
+-------+
Source code in spark_frame/nested_functions_impl/average.py
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 |
|
sum(field_name: str, starting_level: Union[Column, DataFrame, None] = None) -> Column
Recursively compute the sum of all elements in the given repeated field.
Limitation: Dots, percents, and exclamation marks are not supported in field names
Given the syntax used, every method defined in the spark_frame.nested
module assumes that all field
names in DataFrames do not contain any dot .
, percent %
or exclamation mark !
.
This can be worked around using the transformation
spark_frame.transformations.transform_all_field_names
.
Parameters:
Returns:
Type | Description |
---|---|
Column
|
A Column expression |
Example 1
>>> from spark_frame.nested_functions_impl.aggregate import _get_sample_data
>>> from spark_frame import nested
>>> from spark_frame import nested_functions as nf
>>> employee_df = _get_sample_data()
>>> nested.print_schema(employee_df)
root
|-- employee_id: integer (nullable = true)
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- projects!.name: string (nullable = true)
|-- projects!.client: string (nullable = true)
|-- projects!.tasks!.name: string (nullable = true)
|-- projects!.tasks!.estimate: long (nullable = true)
>>> employee_df.withColumn("projects", f.to_json("projects.tasks")).show(truncate=False)
+-----------+----------+---+-----------------------------------------------------------------------------------------------------------------------------------+
|employee_id|name |age|projects |
+-----------+----------+---+-----------------------------------------------------------------------------------------------------------------------------------+
|1 |John Smith|30 |[[{"name":"Task 1","estimate":8},{"name":"Task 2","estimate":5}],[{"name":"Task 3","estimate":13},{"name":"Task 4","estimate":3}]] |
|1 |Jane Doe |25 |[[{"name":"Task 5","estimate":20},{"name":"Task 6","estimate":13}],[{"name":"Task 7","estimate":8},{"name":"Task 8","estimate":5}]]|
+-----------+----------+---+-----------------------------------------------------------------------------------------------------------------------------------+
>>> employee_df.transform(nested.select, {
... "employee_id": None,
... "name": None,
... "age": None,
... "projects!.tasks!.estimate": None
... }).show(truncate=False)
+-----------+----------+---+------------------------------+
|employee_id|name |age|projects |
+-----------+----------+---+------------------------------+
|1 |John Smith|30 |[{[{8}, {5}]}, {[{13}, {3}]}] |
|1 |Jane Doe |25 |[{[{20}, {13}]}, {[{8}, {5}]}]|
+-----------+----------+---+------------------------------+
>>> employee_df.transform(nested.select, {
... "employee_id": None,
... "name": None,
... "age": None,
... "total_task_estimate": nf.sum("projects!.tasks!.estimate"),
... "projects!.task_estimate_per_project":
... lambda project: nf.sum("tasks!.estimate", starting_level=project),
... }).show(truncate=False)
+-----------+----------+---+-------------------+------------+
|employee_id|name |age|total_task_estimate|projects |
+-----------+----------+---+-------------------+------------+
|1 |John Smith|30 |29 |[{13}, {16}]|
|1 |Jane Doe |25 |46 |[{33}, {13}]|
+-----------+----------+---+-------------------+------------+
Example 2 : with all kind of nested structures*
>>> from pyspark.sql import SparkSession
>>> spark = SparkSession.builder.appName("doctest").getOrCreate()
>>> df = spark.sql('''SELECT
... 1 as id,
... ARRAY(STRUCT(1 as a), STRUCT(2 as a)) as s1,
... ARRAY(ARRAY(1, 2), ARRAY(3, 4)) as s2,
... ARRAY(ARRAY(STRUCT(1 as a)), ARRAY(STRUCT(2 as a))) as s3,
... ARRAY(STRUCT(ARRAY(1, 2) as a), STRUCT(ARRAY(3, 4) as a)) as s4,
... ARRAY(
... STRUCT(ARRAY(STRUCT(STRUCT(1 as c) as b), STRUCT(STRUCT(2 as c) as b)) as a),
... STRUCT(ARRAY(STRUCT(STRUCT(3 as c) as b), STRUCT(STRUCT(4 as c) as b)) as a)
... ) as s5
... ''')
>>> nested.print_schema(df)
root
|-- id: integer (nullable = false)
|-- s1!.a: integer (nullable = false)
|-- s2!!: integer (nullable = false)
|-- s3!!.a: integer (nullable = false)
|-- s4!.a!: integer (nullable = false)
|-- s5!.a!.b.c: integer (nullable = false)
>>> df.show(truncate=False)
+---+----------+----------------+--------------+--------------------+------------------------------------+
|id |s1 |s2 |s3 |s4 |s5 |
+---+----------+----------------+--------------+--------------------+------------------------------------+
|1 |[{1}, {2}]|[[1, 2], [3, 4]]|[[{1}], [{2}]]|[{[1, 2]}, {[3, 4]}]|[{[{{1}}, {{2}}]}, {[{{3}}, {{4}}]}]|
+---+----------+----------------+--------------+--------------------+------------------------------------+
>>> df.select(nf.sum("s1!.a").alias("sum")).show()
+---+
|sum|
+---+
| 3|
+---+
>>> df.select(nf.sum("s2!!").alias("sum")).show()
+---+
|sum|
+---+
| 10|
+---+
>>> df.select(nf.sum("s3!!.a").alias("sum")).show()
+---+
|sum|
+---+
| 3|
+---+
>>> df.select(nf.sum("s4!.a!").alias("sum")).show()
+---+
|sum|
+---+
| 10|
+---+
>>> df.select(nf.sum("s5!.a!.b.c").alias("sum")).show()
+---+
|sum|
+---+
| 10|
+---+
Source code in spark_frame/nested_functions_impl/sum.py
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 |
|