spark_frame.graph
This module contains implementations of graph algorithms and related methods.
ascending_forest_traversal(input_df: DataFrame, node_id: str, parent_id: str, keep_labels: bool = False) -> DataFrame
Given a DataFrame representing a labeled forest with columns id
, parent_id
and other label columns,
performs a graph traversal that will return a DataFrame with the same schema that gives for each node
the labels of it's furthest ancestor.
In the input DataFrame, a node is considered to have no parent if its parent_id is null or equal to its node_id. In the output DataFrame, a node that has no parent will have its parent_id equal to its node_id. Cycle protection: If the graph contains any cycle, the nodes in that cycle will have a NULL parent_id.
It has a security against dependency cycles, but no security preventing a combinatorial explosion if some nodes have more than one parent.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input_df |
DataFrame
|
A Spark DataFrame |
required |
node_id |
str
|
Name of the column that represent the node's ids |
required |
parent_id |
str
|
Name of the column that represent the parent node's ids |
required |
keep_labels |
bool
|
If set to true, add two structs column called "node" and "furthest_ancestor" containing the content of the row from the input DataFrame for the corresponding nodes and their furthest ancestor |
False
|
Returns:
Type | Description |
---|---|
DataFrame
|
A DataFrame with two columns named according to |
DataFrame
|
the id of it's furthest ancestor (in the |
DataFrame
|
If the option |
DataFrame
|
they represent the content of the rows in the input DataFrame corresponding to the node and its furthest |
DataFrame
|
ancestor, respectively. |
Examples:
>>> from pyspark.sql import SparkSession
>>> spark = SparkSession.builder.appName("doctest").getOrCreate()
Given a DataFrame with pokemon attributes and evolution links
>>> input_df = spark.sql('''
... SELECT
... col1 as `pokemon.id`,
... col2 as `pokemon.evolve_to_id`,
... col3 as `pokemon.name`,
... col4 as `pokemon.types`
... FROM VALUES
... (4, 5, 'Charmander', ARRAY('Fire')),
... (5, 6, 'Charmeleon', ARRAY('Fire')),
... (6, NULL, 'Charizard', ARRAY('Fire', 'Flying'))
... ''')
>>> input_df.show()
+----------+--------------------+------------+--------------+
|pokemon.id|pokemon.evolve_to_id|pokemon.name| pokemon.types|
+----------+--------------------+------------+--------------+
| 4| 5| Charmander| [Fire]|
| 5| 6| Charmeleon| [Fire]|
| 6| NULL| Charizard|[Fire, Flying]|
+----------+--------------------+------------+--------------+
We compute a DataFrame that for each pokemon.id gives the attributes of its highest level of evolution
>>> ascending_forest_traversal(input_df, "pokemon.id", "pokemon.evolve_to_id").orderBy("`pokemon.id`").show()
+----------+--------------------+
|pokemon.id|pokemon.evolve_to_id|
+----------+--------------------+
| 4| 6|
| 5| 6|
| 6| 6|
+----------+--------------------+
With the keep_label
option extra joins are performed at the end of the algorithm to add two struct columns
containing the corresponding row for the original node and the furthest ancestor.
>>> ascending_forest_traversal(input_df, "pokemon.id", "pokemon.evolve_to_id", keep_labels=True
... ).orderBy("`pokemon.id`").show(10, False)
+----------+--------------------+------------------------------------+------------------------------------+
|pokemon.id|pokemon.evolve_to_id|node |furthest_ancestor |
+----------+--------------------+------------------------------------+------------------------------------+
|4 |6 |{4, 5, Charmander, [Fire]} |{6, NULL, Charizard, [Fire, Flying]}|
|5 |6 |{5, 6, Charmeleon, [Fire]} |{6, NULL, Charizard, [Fire, Flying]}|
|6 |6 |{6, NULL, Charizard, [Fire, Flying]}|{6, NULL, Charizard, [Fire, Flying]}|
+----------+--------------------+------------------------------------+------------------------------------+
Cycle Protection: to prevent the algorithm from looping indefinitely, cycles are detected, and the nodes that are part of cycles will end up with a NULL value as their furthest ancestor
>>> input_df = spark.sql('''
... SELECT
... col1 as `node_id`,
... col2 as `parent_id`
... FROM VALUES (1, 2), (2, 3), (3, 1)
... ''')
>>> input_df.show()
+-------+---------+
|node_id|parent_id|
+-------+---------+
| 1| 2|
| 2| 3|
| 3| 1|
+-------+---------+
>>> ascending_forest_traversal(input_df, "node_id", "parent_id").orderBy("node_id").show()
+-------+---------+
|node_id|parent_id|
+-------+---------+
| 1| NULL|
| 2| NULL|
| 3| NULL|
+-------+---------+
Source code in spark_frame/graph_impl/ascending_forest_traversal.py
319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 |
|