Hi Mich,
Thanks for your suggestions.
1) It currently runs on one server with plenty of resources assigned. But I
will keep it in mind to replace monotonically_increasing_id() with uuid() once
we scale up.
2) I have replaced the null values in origin with a string
{prefix}-{mnt_by}-{organisation}
replacement_string = psf.concat_ws("-", psf.col("prefix"), psf.col("mnt_by"),
psf.col("descr"))
df = df.withColumn("origin", psf.coalesce(psf.col("origin"),
replacement_string))
I have verified my other columns have no Null values.
3) This is my logic how i generate IDs
mnt_by_id = df.select(MNT_BY).distinct().withColumn(MAINTAINER_ID,
psf.concat(psf.lit('m_'), psf.monotonically_increasing_id()))
prefix_id = df.select(PREFIX).distinct().withColumn(PREFIX_ID,
psf.concat(psf.lit('p_'), psf.monotonically_increasing_id()))
origin_id = df.select(ORIGIN).distinct().withColumn(ORIGIN_ID,
psf.concat(psf.lit('o_'), psf.monotonically_increasing_id()))
organisation_id = df.select(DESCR).distinct().withColumn(ORGANISATION_ID,
psf.concat(psf.lit('org_'), psf.monotonically_increasing_id()))
df = df.join(mnt_by_id, on=MNT_BY, how="left").join(prefix_id, on=PREFIX,
how="left").join(origin_id, on=ORIGIN, how="left").join(organisation_id,
on=DESCR, how="left")
I create the ID using the distinct values in the columns "mnt_by", "prefix",
"origin" and "descr". The same columns I join "on".
4) This is my current resource allocation, I run it on the server of my
university.
It has 112 cores and 1.48T ram, I can request more resources but in my eyes
this sound be plenty.
If you think more resource would help, I will ask them.
spark_conf = SparkConf().setAppName(f"pyspark-{APP_NAME}-{int(time())}").set(
"spark.submit.deployMode", "client"
).set("spark.sql.parquet.binaryAsString", "true"
).set("spark.driver.bindAddress", "localhost"
).set("spark.driver.host", "127.0.0.1"
# ).set("spark.driver.port", "0"
).set("spark.ui.port", "4041"
).set("spark.executor.instances", "1"
).set("spark.executor.cores", "50"
).set("spark.executor.memory", "128G"
).set("spark.executor.memoryOverhead", "32G"
).set("spark.driver.cores", "16"
).set("spark.driver.memory", "64G"
)
I dont think b) applies as its a single machine.
Kind regards,
Jelle
________________________________
From: Mich Talebzadeh <[email protected]>
Sent: Wednesday, April 24, 2024 6:12 PM
To: Nijland, J.G.W. (Jelle, Student M-CS) <[email protected]>
Cc: [email protected] <[email protected]>
Subject: Re: [spark-graphframes]: Generating incorrect edges
OK let us have a look at these
1) You are using monotonically_increasing_id(), which is not
collision-resistant in distributed environments like Spark. Multiple hosts
can generate the same ID. I suggest switching to UUIDs (e.g., uuid.uuid4())
for guaranteed uniqueness.
2) Missing values in the Origin column lead to null IDs, potentially causing
problems downstream. You can handle missing values appropriately, say
a) Filter out rows with missing origins or b) impute missing values with a
strategy that preserves relationships (if applicable).
3) With join code, you mentioned left joining on the same column used for ID
creation, not very clear!
4) Edge Issue, it appears to me the issue seems to occur with larger datasets
(>100K records). Possible causes could be
a) Resource Constraints as data size increases, PySpark might struggle with
joins or computations if resources are limited (memory, CPU).
b) Data Skew: Uneven distribution of values in certain columns could lead to
imbalanced processing across machines. Check Spark UI (4040) on staging and
execution tabs
HTH
Mich Talebzadeh,
Technologist | Architect | Data Engineer | Generative AI | FinCrime
London
United Kingdom
[https://ci3.googleusercontent.com/mail-sig/AIorK4zholKucR2Q9yMrKbHNn-o1TuS4mYXyi2KO6Xmx6ikHPySa9MLaLZ8t2hrA6AUcxSxDgHIwmKE]
view my Linkedin
profile<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
https://en.everybodywiki.com/Mich_Talebzadeh
Disclaimer: The information provided is correct to the best of my knowledge but
of course cannot be guaranteed . It is essential to note that, as with any
advice, quote "one test result is worth one-thousand expert opinions (Werner
<https://en.wikipedia.org/wiki/Wernher_von_Braun> Von
Braun<https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
On Wed, 24 Apr 2024 at 16:44, Nijland, J.G.W. (Jelle, Student M-CS)
<[email protected]<mailto:[email protected]>>
wrote:
Hi Mich,
Thanks for your reply,
1) ID generation is done using
monotonically_increasing_id()<https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.monotonically_increasing_id.html>
this is then prefixed with "p_", "m_", "o_" or "org_" depending on the type of
the value it identifies.
2) There are some missing values in the Origin column, these will result in a
Null ID
3) The join code is present in [1], I join "left" on the same column I create
the ID on
4) I dont think the issue is in ID or edge generation, if i limit my input
dataframe and union it with my Utwente data row, I can verify those edges are
created correctly up to 100K records.
Once I go past that amount of records the results become inconsistent and
incorrect.
Kind regards,
Jelle Nijland
________________________________
From: Mich Talebzadeh
<[email protected]<mailto:[email protected]>>
Sent: Wednesday, April 24, 2024 4:40 PM
To: Nijland, J.G.W. (Jelle, Student M-CS)
<[email protected]<mailto:[email protected]>>
Cc: [email protected]<mailto:[email protected]>
<[email protected]<mailto:[email protected]>>
Subject: Re: [spark-graphframes]: Generating incorrect edges
OK few observations
1) ID Generation Method: How are you generating unique IDs (UUIDs, sequential
numbers, etc.)?
2) Data Inconsistencies: Have you checked for missing values impacting ID
generation?
3) Join Verification: If relevant, can you share the code for joining data
points during ID creation? Are joins matching columns correctly?
4) Specific Edge Issues: Can you share examples of vertex IDs with incorrect
connections? Is this related to ID generation or edge creation logic?
HTH
Mich Talebzadeh,
Technologist | Architect | Data Engineer | Generative AI, FinCrime
London
United Kingdom
[https://ci3.googleusercontent.com/mail-sig/AIorK4zholKucR2Q9yMrKbHNn-o1TuS4mYXyi2KO6Xmx6ikHPySa9MLaLZ8t2hrA6AUcxSxDgHIwmKE]
view my Linkedin
profile<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
https://en.everybodywiki.com/Mich_Talebzadeh
Disclaimer: The information provided is correct to the best of my knowledge but
of course cannot be guaranteed . It is essential to note that, as with any
advice, quote "one test result is worth one-thousand expert opinions (Werner
<https://en.wikipedia.org/wiki/Wernher_von_Braun> Von
Braun<https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
On Wed, 24 Apr 2024 at 12:24, Nijland, J.G.W. (Jelle, Student M-CS)
<[email protected]<mailto:[email protected]>>
wrote:
tags: pyspark,spark-graphframes
Hello,
I am running pyspark in a podman container and I have issues with incorrect
edges when I build my graph.
I start with loading a source dataframe from a parquet directory on my server.
The source dataframe has the following columns:
+---------+-------+-----------------+---------+------+-----------------+------+-------------------+
|created |descr |last_modified|mnt_by |origin|start_address|prefix
|external_origin|
+---------+-------+-----------------+---------+------+-----------------+------+-------------------+
I aim to build a graph connecting prefix, mnt_by, origin and descr with edges
storing the created and last_modified values.
I start with generating IDs for the prefix, mnt_by, origin and descr using
monotonically_increasing_id() [1]
These IDs are prefixed with "m_", "p_", "o_" or "org_" to ensure they are
unique IDs across the dataframe.
Then I construct the vertices dataframe by collecting the ID, value and whether
they are external for each vertex. [2]
These vertices are then unioned together.
Following the vertices, I construct the edges dataframe by selecting the IDs
that I want to be the src and the dst and union those together. [3]
These edges store the created and last_modified.
Now I am ready to construct the graph. Here is where I run into my issue.
When verifying my graph, I looked at a couple of vertices to see if they have
the correct edges.
I looked at the Utwente prefix, origin, descr and mnt_by and found that it
generates incorrect edges.
I saw edges going out to vertices that are not associated with the utwente
values at all.
The methods to find the vertices, edges and the output can be found in [4]
We can already observe inconsistencies by viewing the prefix->maintainer and
origin -> prefix edges. [5]
Depending on what column I filter on the results are inconsistent.
To make matters worse some edges contain IDs that are not connected to the
original values in the source dataframe at all.
What I have tried to resolve my issue:
*
Write a checker that verifies edges created against the source dataframe. [6]
The aim of this checker was to determine where the inconsistency comes fro, to
locate the bug and resolve it.
I ran this checker a limited graphs from n=10 upwards to n=1000000 (or 1m).
This felt close enough as there are only ~6.5m records in my source dataframe.
This ran correctly, near the 1m it did experience significant slowdown at the
full dataframe it errors/times out.
I blamed this on the large joins that it performs on the source dataframe.
*
I found a github issue of someone with significantly larger graphs have similar
issues.
One suggestion there blamed indexing using strings rather than ints or longs.
I rewrote my system to use int for IDs but I ran into the same issue.
The amount of incorrect edges was the same, the link to which incorrects
vertices it links to was the same too.
*
I re-ordered my source dataframe to see what the impact was.
This results in considerably more incorrect edges using the checker in [4]
If helpful I can post the output of this checker as well.
Can you give me any pointers in what I can try or what I can do to clarify my
situation better?
Thanks in advance for your time.
Kind regards,
Jelle Nijland
[1]
import pyspark.sql.functions as psf
# ID labels
PREFIX_ID = "prefix_id"
MAINTAINER_ID = "mnt_by_id"
ORIGIN_ID = "origin_id"
ORGANISATION_ID = "organisation_id"
# Source dataframe column names
MNT_BY = "mnt_by"
PREFIX = "prefix"
ORIGIN = "origin"
DESCR = "descr"
EXTERNAL_O = "external_origin"
def generate_ids(df: DataFrame) -> DataFrame:
"""
Generates a unique ID for each distinct maintainer, prefix, origin and
organisation
Parameters
----------
df : DataFrame
DataFrame to generate IDs for
"""
mnt_by_id = df.select(MNT_BY).distinct().withColumn(MAINTAINER_ID,
psf.concat(psf.lit('m_'), psf.monotonically_increasing_id()))
prefix_id = df.select(PREFIX).distinct().withColumn(PREFIX_ID,
psf.concat(psf.lit('p_'), psf.monotonically_increasing_id()))
origin_id = df.select(ORIGIN).distinct().withColumn(ORIGIN_ID,
psf.concat(psf.lit('o_'), psf.monotonically_increasing_id()))
organisation_id = df.select(DESCR).distinct().withColumn(ORGANISATION_ID,
psf.concat(psf.lit('org_'), psf.monotonically_increasing_id()))
df = df.join(mnt_by_id, on=MNT_BY, how="left").join(prefix_id, on=PREFIX,
how="left").join(origin_id, on=ORIGIN, how="left").join(organisation_id,
on=DESCR, how="left")
return df
def create_vertices(df: DataFrame) -> DataFrame:
"""
Creates vertices from a DataFrame with IDs
Vertices have the format:
ID (str) | VALUE (str) | EXTERNAL (bool)
ID follows the format [p_|o_|m_|org_][0-9]+
Parameters
----------
df : DataFrame
DataFrame to generate vertices for
"""
prefixes = df.select(PREFIX_ID, PREFIX, psf.lit(False))
maintainers = df.select(MAINTAINER_ID, MNT_BY, psf.lit(False))
origins = df.select(ORIGIN_ID, ORIGIN, EXTERNAL_O)
organisations = df.select(ORGANISATION_ID, DESCR, psf.lit(False))
result_df = prefixes.union(maintainers).union(origins).union(organisations)
result_df = result_df.dropDuplicates()
result_df = result_df.withColumnRenamed("false", EXTERNAL)
result_df = result_df.withColumnRenamed(PREFIX_ID, ID)
result_df = result_df.withColumnRenamed(PREFIX, VALUE)
return result_df
[3]
def create_edges(df: DataFrame) -> DataFrame:
"""
Creates edges from DataFrame with IDs
Edges have the format:
SRC (str) | DST (str) | Created (str) | Last_modified (str)
Parameters
----------
df : DataFrame
DataFrame to generate edges for
"""
p_to_mnt = df.select(PREFIX_ID, MAINTAINER_ID, CREATED, LAST_MODIFIED)
m_to_o = df.select(MAINTAINER_ID, ORIGIN_ID, CREATED, LAST_MODIFIED)
o_to_org = df.select(ORIGIN_ID, ORGANISATION_ID, CREATED, LAST_MODIFIED)
o_to_p = df.select(ORIGIN_ID, PREFIX_ID, CREATED, LAST_MODIFIED)
edges = p_to_mnt.union(m_to_o).union(o_to_org).union(o_to_p)
# result_df = edges
result_df = edges.dropDuplicates()
result_df = result_df.withColumnRenamed(PREFIX_ID, SRC)
result_df = result_df.withColumnRenamed(MAINTAINER_ID, DST)
return result_df
[4]
# # Demonstrating bug with the edges, using UT's prefix/mnt/origin/org as
example
# # How-to-use: get the IDs and plug them in bug_show_related_edges()
def bug_gather_ids(g: GraphFrame):
vertex = "130.89.0.0/16<http://130.89.0.0/16>"
filtered_v = g.vertices.filter(psf.col(VALUE)==vertex)
filtered_v.show(truncate=False)
mnt = "SN-LIR-MNT RIPE-NCC-LEGACY-MNT"
filtered_m = g.vertices.filter(psf.col(VALUE)==mnt)
filtered_m.show(truncate=False)
origin = "1133"
filtered_o = g.vertices.filter(psf.col(VALUE)==origin)
filtered_o.show(truncate=False)
org = "Drienerlolaan 5 P.O. Box 217 NL - 7500 AE Enschede"
filtered_org = g.vertices.filter(psf.col(VALUE)==org)
filtered_org.show(truncate=False)
def bug_show_related_edges(g: GraphFrame, p_id : str, m_id : str, o_id : str,
org_id : str):
con1_m = psf.col(DST)==m_id
con2_m = psf.col(SRC)==m_id
edg1_m = g.edges.filter(con1_m)
edg1_m.show(truncate=False)
edg2_m = g.edges.filter(con2_m)
edg2_m.show(truncate=False)
con1_p = psf.col(DST)==p_id
con2_p = psf.col(SRC)==p_id
edg1_p = g.edges.filter(con1_p)
edg1_p.show(truncate=False)
edg2_p = g.edges.filter(con2_p)
edg2_p.show(truncate=False)
con1_o = psf.col(DST)==o_id
con2_o = psf.col(SRC)==o_id
edg1_o = g.edges.filter(con1_o)
edg1_o.show(truncate=False)
edg2_o = g.edges.filter(con2_o)
edg2_o.show(truncate=False)
con1_org = psf.col(DST)==org_id
con2_org = psf.col(SRC)==org_id
edg1_org = g.edges.filter(con1_org)
edg1_org.show(truncate=False)
edg2_org = g.edges.filter(con2_org)
edg2_org.show(truncate=False)
# prefix 'p_60129612354' corresponds with 130.89.0.0/16<http://130.89.0.0/16>
# maintainer 'm_2897' corresponds with SN-LIR-MNT RIPE-NCC-LEGACY-MNT
# origin 'o_5130' corresponds with1133
# organisation 'org_197568516576' corresponds with Drienerlolaan 5 P.O. Box 217
NL - 7500 AE Enschede
Output of bug_show_related_edges(g, "p_60129612354", "m_2897", "o_5130",
"org_197568516576")
# prefix -> maintainer edges (filtered on dst = maintainer)
+---------------------+----------+----------------+-----------------+
| src | dst | created
|last_modified|
+---------------------+----------+----------------+-----------------+
|p_197568533425 |m_2897 |0 |0
|
|p_94489347499 |m_2897 |0 |0
|
|p_25769898645 |m_2897 |1020678697 |1020678697 |
|p_128849058299 |m_2897 |0 |0
|
|p_68719514870 |m_2897 |0 |0
|
|p_146028965124 |m_2897 |1020267786 |1020267786 |
|p_60129579570 |m_2897 |0 |0
|
+---------------------+----------+-----------------+----------------+
# maintainer to origin edges(filtered on src = maintainer)
+---------+------------------+----------------+----------------+
|src | dst |created |last_modified|
+---------+------------------+----------------+----------------+
|m_2897|o_8589936949 |0 |0 |
|m_2897|o_5130 |0 |0
|
|m_2897|o_8589936949 |1020267786|1020267786 |
|m_2897|o_8589936949 |1020678697|1020678697 |
+---------+------------------+----------------+----------------+
# origin to prefix edges (filtered in dst = prefix)
+-------+-------------------+---------------+-----------------+
|src |dst |created |last_modified |
+-------+-------------------+---------------+-----------------+
|o_380|p_60129612354|1220513130|1220513130 |
+-------+-------------------+---------------+-----------------+
# prefix to maintainer edges(filtered on src = prefix)
+-------------------+--------+---------------+----------------+
|src | dst |created |last_modified|
+-------------------+--------+---------------+----------------+
|p_60129612354|m_533|1220513130|1220513130 |
+-------------------+--------+---------------+----------------+
# maintainer to origin edges(filtered on dst = origin)
+---------+---------+----------+-----------------+
|src |dst |created |last_modified|
+---------+---------+----------+-----------------+
|m_2897|o_5130|0 |0 |
+---------+---------+----------+-----------------+
# origin to prefix(filtered on src = prefix)
+--------+------------------------+---------+----------------+
|src |dst |created|last_modified|
+--------+------------------------+---------+----------------+
|o_5130|p_94489347499 |0 |0 |
|o_5130|org_283467856326|0 |0 |
+--------+------------------------+---------+----------------+
# origin to organisation(filtered on dst = organisation)
+------------------+-----------------------+---------------+----------------+
|src |dst |created
|last_modified|
+------------------+-----------------------+---------------+----------------+
|o_8589936415|org_197568516576|1320919317|1320919317 |
|o_8589936415|org_197568516576|1285464368|1285464368 |
+------------------+-----------------------+---------------+----------------+
# origin to organisation(filtered on src= organisation)
# NB: this is intended, there are no outgoing edges from organisations
+---+---+-------+-------------+
|src|dst|created|last_modified|
+---+---+-------+-------------+
+---+---+-------+-------------+
[6]
# Determines which rows are missing from edges
# Determines which rows in edges are not in src_df (superfluous)
def checker_df(edges: DataFrame, src_df: DataFrame, src: str, dst: str, src_id:
str, dst_id: str) -> DataFrame:
con1 = psf.col(SRC).startswith(src)
con2 = psf.col(DST).startswith(dst)
filtered_edges = edges.filter(con1 & con2)
if filtered_edges.count() == 0:
print(f"[Warning] No edges found after filtering! [checker_df] was
checking {src_id} to {dst_id}")
if DEBUG:
print("[Checker_df] Printing Filtered edges:")
filtered_edges.show(truncate=False)
print(f"Records: {filtered_edges.count()}")
# Shows if there are rows which are not represented in the edge df
missing_edges = src_df.join(filtered_edges,
((filtered_edges[SRC] == src_df[src_id]) &
(filtered_edges[DST] == src_df[dst_id])),
"left_anti")
missing_edges = missing_edges.dropDuplicates()
if DEBUG:
print("[Checker_df] Printing missing edges:")
missing_edges.show(truncate=False)
print(f"Records: {missing_edges.count()}")
# Shows if there are edges which are not represented in the src df
superfluous_edges = filtered_edges.join(src_df,
((filtered_edges[SRC] == src_df[src_id]) &
(filtered_edges[DST] == src_df[dst_id])),
"left")
superfluous_edges2 =
superfluous_edges.filter(superfluous_edges[src_id].isNull() |
superfluous_edges[dst_id].isNull())
if DEBUG:
print("[Checker_df] Printing superfluous edges:")
superfluous_edges2.show(truncate=False)
print(f"Records: {superfluous_edges2.count()}")
return missing_edges, superfluous_edges2