tags: pyspark,spark-graphframes
Hello,
I am running pyspark in a podman container and I have issues with incorrect
edges when I build my graph.
I start with loading a source dataframe from a parquet directory on my server.
The source dataframe has the following columns:
+---------+-------+-----------------+---------+------+-----------------+------+-------------------+
|created |descr |last_modified|mnt_by |origin|start_address|prefix
|external_origin|
+---------+-------+-----------------+---------+------+-----------------+------+-------------------+
I aim to build a graph connecting prefix, mnt_by, origin and descr with edges
storing the created and last_modified values.
I start with generating IDs for the prefix, mnt_by, origin and descr using
monotonically_increasing_id() [1]
These IDs are prefixed with "m_", "p_", "o_" or "org_" to ensure they are
unique IDs across the dataframe.
Then I construct the vertices dataframe by collecting the ID, value and whether
they are external for each vertex. [2]
These vertices are then unioned together.
Following the vertices, I construct the edges dataframe by selecting the IDs
that I want to be the src and the dst and union those together. [3]
These edges store the created and last_modified.
Now I am ready to construct the graph. Here is where I run into my issue.
When verifying my graph, I looked at a couple of vertices to see if they have
the correct edges.
I looked at the Utwente prefix, origin, descr and mnt_by and found that it
generates incorrect edges.
I saw edges going out to vertices that are not associated with the utwente
values at all.
The methods to find the vertices, edges and the output can be found in [4]
We can already observe inconsistencies by viewing the prefix->maintainer and
origin -> prefix edges. [5]
Depending on what column I filter on the results are inconsistent.
To make matters worse some edges contain IDs that are not connected to the
original values in the source dataframe at all.
What I have tried to resolve my issue:
*
Write a checker that verifies edges created against the source dataframe. [6]
The aim of this checker was to determine where the inconsistency comes fro, to
locate the bug and resolve it.
I ran this checker a limited graphs from n=10 upwards to n=1000000 (or 1m).
This felt close enough as there are only ~6.5m records in my source dataframe.
This ran correctly, near the 1m it did experience significant slowdown at the
full dataframe it errors/times out.
I blamed this on the large joins that it performs on the source dataframe.
*
I found a github issue of someone with significantly larger graphs have similar
issues.
One suggestion there blamed indexing using strings rather than ints or longs.
I rewrote my system to use int for IDs but I ran into the same issue.
The amount of incorrect edges was the same, the link to which incorrects
vertices it links to was the same too.
*
I re-ordered my source dataframe to see what the impact was.
This results in considerably more incorrect edges using the checker in [4]
If helpful I can post the output of this checker as well.
Can you give me any pointers in what I can try or what I can do to clarify my
situation better?
Thanks in advance for your time.
Kind regards,
Jelle Nijland
[1]
import pyspark.sql.functions as psf
# ID labels
PREFIX_ID = "prefix_id"
MAINTAINER_ID = "mnt_by_id"
ORIGIN_ID = "origin_id"
ORGANISATION_ID = "organisation_id"
# Source dataframe column names
MNT_BY = "mnt_by"
PREFIX = "prefix"
ORIGIN = "origin"
DESCR = "descr"
EXTERNAL_O = "external_origin"
def generate_ids(df: DataFrame) -> DataFrame:
"""
Generates a unique ID for each distinct maintainer, prefix, origin and
organisation
Parameters
----------
df : DataFrame
DataFrame to generate IDs for
"""
mnt_by_id = df.select(MNT_BY).distinct().withColumn(MAINTAINER_ID,
psf.concat(psf.lit('m_'), psf.monotonically_increasing_id()))
prefix_id = df.select(PREFIX).distinct().withColumn(PREFIX_ID,
psf.concat(psf.lit('p_'), psf.monotonically_increasing_id()))
origin_id = df.select(ORIGIN).distinct().withColumn(ORIGIN_ID,
psf.concat(psf.lit('o_'), psf.monotonically_increasing_id()))
organisation_id = df.select(DESCR).distinct().withColumn(ORGANISATION_ID,
psf.concat(psf.lit('org_'), psf.monotonically_increasing_id()))
df = df.join(mnt_by_id, on=MNT_BY, how="left").join(prefix_id, on=PREFIX,
how="left").join(origin_id, on=ORIGIN, how="left").join(organisation_id,
on=DESCR, how="left")
return df
def create_vertices(df: DataFrame) -> DataFrame:
"""
Creates vertices from a DataFrame with IDs
Vertices have the format:
ID (str) | VALUE (str) | EXTERNAL (bool)
ID follows the format [p_|o_|m_|org_][0-9]+
Parameters
----------
df : DataFrame
DataFrame to generate vertices for
"""
prefixes = df.select(PREFIX_ID, PREFIX, psf.lit(False))
maintainers = df.select(MAINTAINER_ID, MNT_BY, psf.lit(False))
origins = df.select(ORIGIN_ID, ORIGIN, EXTERNAL_O)
organisations = df.select(ORGANISATION_ID, DESCR, psf.lit(False))
result_df = prefixes.union(maintainers).union(origins).union(organisations)
result_df = result_df.dropDuplicates()
result_df = result_df.withColumnRenamed("false", EXTERNAL)
result_df = result_df.withColumnRenamed(PREFIX_ID, ID)
result_df = result_df.withColumnRenamed(PREFIX, VALUE)
return result_df
[3]
def create_edges(df: DataFrame) -> DataFrame:
"""
Creates edges from DataFrame with IDs
Edges have the format:
SRC (str) | DST (str) | Created (str) | Last_modified (str)
Parameters
----------
df : DataFrame
DataFrame to generate edges for
"""
p_to_mnt = df.select(PREFIX_ID, MAINTAINER_ID, CREATED, LAST_MODIFIED)
m_to_o = df.select(MAINTAINER_ID, ORIGIN_ID, CREATED, LAST_MODIFIED)
o_to_org = df.select(ORIGIN_ID, ORGANISATION_ID, CREATED, LAST_MODIFIED)
o_to_p = df.select(ORIGIN_ID, PREFIX_ID, CREATED, LAST_MODIFIED)
edges = p_to_mnt.union(m_to_o).union(o_to_org).union(o_to_p)
# result_df = edges
result_df = edges.dropDuplicates()
result_df = result_df.withColumnRenamed(PREFIX_ID, SRC)
result_df = result_df.withColumnRenamed(MAINTAINER_ID, DST)
return result_df
[4]
# # Demonstrating bug with the edges, using UT's prefix/mnt/origin/org as
example
# # How-to-use: get the IDs and plug them in bug_show_related_edges()
def bug_gather_ids(g: GraphFrame):
vertex = "130.89.0.0/16"
filtered_v = g.vertices.filter(psf.col(VALUE)==vertex)
filtered_v.show(truncate=False)
mnt = "SN-LIR-MNT RIPE-NCC-LEGACY-MNT"
filtered_m = g.vertices.filter(psf.col(VALUE)==mnt)
filtered_m.show(truncate=False)
origin = "1133"
filtered_o = g.vertices.filter(psf.col(VALUE)==origin)
filtered_o.show(truncate=False)
org = "Drienerlolaan 5 P.O. Box 217 NL - 7500 AE Enschede"
filtered_org = g.vertices.filter(psf.col(VALUE)==org)
filtered_org.show(truncate=False)
def bug_show_related_edges(g: GraphFrame, p_id : str, m_id : str, o_id : str,
org_id : str):
con1_m = psf.col(DST)==m_id
con2_m = psf.col(SRC)==m_id
edg1_m = g.edges.filter(con1_m)
edg1_m.show(truncate=False)
edg2_m = g.edges.filter(con2_m)
edg2_m.show(truncate=False)
con1_p = psf.col(DST)==p_id
con2_p = psf.col(SRC)==p_id
edg1_p = g.edges.filter(con1_p)
edg1_p.show(truncate=False)
edg2_p = g.edges.filter(con2_p)
edg2_p.show(truncate=False)
con1_o = psf.col(DST)==o_id
con2_o = psf.col(SRC)==o_id
edg1_o = g.edges.filter(con1_o)
edg1_o.show(truncate=False)
edg2_o = g.edges.filter(con2_o)
edg2_o.show(truncate=False)
con1_org = psf.col(DST)==org_id
con2_org = psf.col(SRC)==org_id
edg1_org = g.edges.filter(con1_org)
edg1_org.show(truncate=False)
edg2_org = g.edges.filter(con2_org)
edg2_org.show(truncate=False)
# prefix 'p_60129612354' corresponds with 130.89.0.0/16
# maintainer 'm_2897' corresponds with SN-LIR-MNT RIPE-NCC-LEGACY-MNT
# origin 'o_5130' corresponds with1133
# organisation 'org_197568516576' corresponds with Drienerlolaan 5 P.O. Box 217
NL - 7500 AE Enschede
Output of bug_show_related_edges(g, "p_60129612354", "m_2897", "o_5130",
"org_197568516576")
# prefix -> maintainer edges (filtered on dst = maintainer)
+---------------------+----------+----------------+-----------------+
| src | dst | created
|last_modified|
+---------------------+----------+----------------+-----------------+
|p_197568533425 |m_2897 |0 |0
|
|p_94489347499 |m_2897 |0 |0
|
|p_25769898645 |m_2897 |1020678697 |1020678697 |
|p_128849058299 |m_2897 |0 |0
|
|p_68719514870 |m_2897 |0 |0
|
|p_146028965124 |m_2897 |1020267786 |1020267786 |
|p_60129579570 |m_2897 |0 |0
|
+---------------------+----------+-----------------+----------------+
# maintainer to origin edges(filtered on src = maintainer)
+---------+------------------+----------------+----------------+
|src | dst |created |last_modified|
+---------+------------------+----------------+----------------+
|m_2897|o_8589936949 |0 |0 |
|m_2897|o_5130 |0 |0
|
|m_2897|o_8589936949 |1020267786|1020267786 |
|m_2897|o_8589936949 |1020678697|1020678697 |
+---------+------------------+----------------+----------------+
# origin to prefix edges (filtered in dst = prefix)
+-------+-------------------+---------------+-----------------+
|src |dst |created |last_modified |
+-------+-------------------+---------------+-----------------+
|o_380|p_60129612354|1220513130|1220513130 |
+-------+-------------------+---------------+-----------------+
# prefix to maintainer edges(filtered on src = prefix)
+-------------------+--------+---------------+----------------+
|src | dst |created |last_modified|
+-------------------+--------+---------------+----------------+
|p_60129612354|m_533|1220513130|1220513130 |
+-------------------+--------+---------------+----------------+
# maintainer to origin edges(filtered on dst = origin)
+---------+---------+----------+-----------------+
|src |dst |created |last_modified|
+---------+---------+----------+-----------------+
|m_2897|o_5130|0 |0 |
+---------+---------+----------+-----------------+
# origin to prefix(filtered on src = prefix)
+--------+------------------------+---------+----------------+
|src |dst |created|last_modified|
+--------+------------------------+---------+----------------+
|o_5130|p_94489347499 |0 |0 |
|o_5130|org_283467856326|0 |0 |
+--------+------------------------+---------+----------------+
# origin to organisation(filtered on dst = organisation)
+------------------+-----------------------+---------------+----------------+
|src |dst |created
|last_modified|
+------------------+-----------------------+---------------+----------------+
|o_8589936415|org_197568516576|1320919317|1320919317 |
|o_8589936415|org_197568516576|1285464368|1285464368 |
+------------------+-----------------------+---------------+----------------+
# origin to organisation(filtered on src= organisation)
# NB: this is intended, there are no outgoing edges from organisations
+---+---+-------+-------------+
|src|dst|created|last_modified|
+---+---+-------+-------------+
+---+---+-------+-------------+
[6]
# Determines which rows are missing from edges
# Determines which rows in edges are not in src_df (superfluous)
def checker_df(edges: DataFrame, src_df: DataFrame, src: str, dst: str, src_id:
str, dst_id: str) -> DataFrame:
con1 = psf.col(SRC).startswith(src)
con2 = psf.col(DST).startswith(dst)
filtered_edges = edges.filter(con1 & con2)
if filtered_edges.count() == 0:
print(f"[Warning] No edges found after filtering! [checker_df] was
checking {src_id} to {dst_id}")
if DEBUG:
print("[Checker_df] Printing Filtered edges:")
filtered_edges.show(truncate=False)
print(f"Records: {filtered_edges.count()}")
# Shows if there are rows which are not represented in the edge df
missing_edges = src_df.join(filtered_edges,
((filtered_edges[SRC] == src_df[src_id]) &
(filtered_edges[DST] == src_df[dst_id])),
"left_anti")
missing_edges = missing_edges.dropDuplicates()
if DEBUG:
print("[Checker_df] Printing missing edges:")
missing_edges.show(truncate=False)
print(f"Records: {missing_edges.count()}")
# Shows if there are edges which are not represented in the src df
superfluous_edges = filtered_edges.join(src_df,
((filtered_edges[SRC] == src_df[src_id]) &
(filtered_edges[DST] == src_df[dst_id])),
"left")
superfluous_edges2 =
superfluous_edges.filter(superfluous_edges[src_id].isNull() |
superfluous_edges[dst_id].isNull())
if DEBUG:
print("[Checker_df] Printing superfluous edges:")
superfluous_edges2.show(truncate=False)
print(f"Records: {superfluous_edges2.count()}")
return missing_edges, superfluous_edges2