OK few observations 1) ID Generation Method: How are you generating unique IDs (UUIDs, sequential numbers, etc.)? 2) Data Inconsistencies: Have you checked for missing values impacting ID generation? 3) Join Verification: If relevant, can you share the code for joining data points during ID creation? Are joins matching columns correctly? 4) Specific Edge Issues: Can you share examples of vertex IDs with incorrect connections? Is this related to ID generation or edge creation logic?
HTH Mich Talebzadeh, Technologist | Architect | Data Engineer | Generative AI, FinCrime London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Wed, 24 Apr 2024 at 12:24, Nijland, J.G.W. (Jelle, Student M-CS) < j.g.w.nijl...@student.utwente.nl> wrote: > tags: pyspark,spark-graphframes > > Hello, > > I am running pyspark in a podman container and I have issues with > incorrect edges when I build my graph. > I start with loading a source dataframe from a parquet directory on my > server. The source dataframe has the following columns: > > +---------+-------+-----------------+---------+------+-----------------+------+-------------------+ > |created |descr |last_modified|mnt_by |origin|start_address|prefix > |external_origin| > > +---------+-------+-----------------+---------+------+-----------------+------+-------------------+ > > I aim to build a graph connecting prefix, mnt_by, origin and descr with > edges storing the created and last_modified values. > I start with generating IDs for the prefix, mnt_by, origin and descr using > monotonically_increasing_id() [1] > These IDs are prefixed with "m_", "p_", "o_" or "org_" to ensure they are > unique IDs across the dataframe. > > Then I construct the vertices dataframe by collecting the ID, value and > whether they are external for each vertex. [2] > These vertices are then unioned together. > Following the vertices, I construct the edges dataframe by selecting the > IDs that I want to be the src and the dst and union those together. [3] > These edges store the created and last_modified. > > Now I am ready to construct the graph. Here is where I run into my issue. > > When verifying my graph, I looked at a couple of vertices to see if they > have the correct edges. > I looked at the Utwente prefix, origin, descr and mnt_by and found that it > generates incorrect edges. > > I saw edges going out to vertices that are not associated with the utwente > values at all. > The methods to find the vertices, edges and the output can be found in [4] > We can already observe inconsistencies by viewing the prefix->maintainer > and origin -> prefix edges. [5] > Depending on what column I filter on the results are inconsistent. > To make matters worse some edges contain IDs that are not connected to the > original values in the source dataframe at all. > > What I have tried to resolve my issue: > > - Write a checker that verifies edges created against the source > dataframe. [6] > The aim of this checker was to determine where the inconsistency comes > fro, to locate the bug and resolve it. > I ran this checker a limited graphs from n=10 upwards to n=1000000 (or > 1m). > This felt close enough as there are only ~6.5m records in my source > dataframe. > This ran correctly, near the 1m it did experience significant slowdown > at the full dataframe it errors/times out. > I blamed this on the large joins that it performs on the source > dataframe. > - I found a github issue of someone with significantly larger graphs > have similar issues. > One suggestion there blamed indexing using strings rather than ints or > longs. > I rewrote my system to use int for IDs but I ran into the same issue. > The amount of incorrect edges was the same, the link to which > incorrects vertices it links to was the same too. > - I re-ordered my source dataframe to see what the impact was. > This results in considerably more incorrect edges using the checker in > [4] > If helpful I can post the output of this checker as well. > > > Can you give me any pointers in what I can try or what I can do to clarify > my situation better? > Thanks in advance for your time. > > Kind regards, > Jelle Nijland > > > > > [1] > import pyspark.sql.functions as psf > > # ID labels > PREFIX_ID = "prefix_id" > MAINTAINER_ID = "mnt_by_id" > ORIGIN_ID = "origin_id" > ORGANISATION_ID = "organisation_id" > > # Source dataframe column names > MNT_BY = "mnt_by" > PREFIX = "prefix" > ORIGIN = "origin" > DESCR = "descr" > EXTERNAL_O = "external_origin" > > > def generate_ids(df: DataFrame) -> DataFrame: > """ > Generates a unique ID for each distinct maintainer, prefix, origin and > organisation > > Parameters > ---------- > df : DataFrame > DataFrame to generate IDs for > """ > mnt_by_id = df.select(MNT_BY).distinct().withColumn(MAINTAINER_ID, > psf.concat(psf.lit('m_'), psf.monotonically_increasing_id())) > prefix_id = df.select(PREFIX).distinct().withColumn(PREFIX_ID, > psf.concat(psf.lit('p_'), psf.monotonically_increasing_id())) > origin_id = df.select(ORIGIN).distinct().withColumn(ORIGIN_ID, > psf.concat(psf.lit('o_'), psf.monotonically_increasing_id())) > organisation_id = > df.select(DESCR).distinct().withColumn(ORGANISATION_ID, > psf.concat(psf.lit('org_'), psf.monotonically_increasing_id())) > > df = df.join(mnt_by_id, on=MNT_BY, how="left").join(prefix_id, > on=PREFIX, how="left").join(origin_id, on=ORIGIN, > how="left").join(organisation_id, on=DESCR, how="left") > return df > > def create_vertices(df: DataFrame) -> DataFrame: > """ > Creates vertices from a DataFrame with IDs > Vertices have the format: > ID (str) | VALUE (str) | EXTERNAL (bool) > > ID follows the format [p_|o_|m_|org_][0-9]+ > > Parameters > ---------- > df : DataFrame > DataFrame to generate vertices for > """ > prefixes = df.select(PREFIX_ID, PREFIX, psf.lit(False)) > maintainers = df.select(MAINTAINER_ID, MNT_BY, psf.lit(False)) > origins = df.select(ORIGIN_ID, ORIGIN, EXTERNAL_O) > organisations = df.select(ORGANISATION_ID, DESCR, psf.lit(False)) > > result_df = > prefixes.union(maintainers).union(origins).union(organisations) > result_df = result_df.dropDuplicates() > result_df = result_df.withColumnRenamed("false", EXTERNAL) > result_df = result_df.withColumnRenamed(PREFIX_ID, ID) > result_df = result_df.withColumnRenamed(PREFIX, VALUE) > return result_df > > [3] > def create_edges(df: DataFrame) -> DataFrame: > """ > Creates edges from DataFrame with IDs > Edges have the format: > SRC (str) | DST (str) | Created (str) | Last_modified (str) > > Parameters > ---------- > df : DataFrame > DataFrame to generate edges for > """ > p_to_mnt = df.select(PREFIX_ID, MAINTAINER_ID, CREATED, LAST_MODIFIED) > m_to_o = df.select(MAINTAINER_ID, ORIGIN_ID, CREATED, LAST_MODIFIED) > o_to_org = df.select(ORIGIN_ID, ORGANISATION_ID, CREATED, > LAST_MODIFIED) > o_to_p = df.select(ORIGIN_ID, PREFIX_ID, CREATED, LAST_MODIFIED) > > edges = p_to_mnt.union(m_to_o).union(o_to_org).union(o_to_p) > # result_df = edges > result_df = edges.dropDuplicates() > result_df = result_df.withColumnRenamed(PREFIX_ID, SRC) > result_df = result_df.withColumnRenamed(MAINTAINER_ID, DST) > return result_df > > [4] > # # Demonstrating bug with the edges, using UT's prefix/mnt/origin/org as > example > # # How-to-use: get the IDs and plug them in bug_show_related_edges() > def bug_gather_ids(g: GraphFrame): > vertex = "130.89.0.0/16" > filtered_v = g.vertices.filter(psf.col(VALUE)==vertex) > filtered_v.show(truncate=False) > > mnt = "SN-LIR-MNT RIPE-NCC-LEGACY-MNT" > filtered_m = g.vertices.filter(psf.col(VALUE)==mnt) > filtered_m.show(truncate=False) > > origin = "1133" > filtered_o = g.vertices.filter(psf.col(VALUE)==origin) > filtered_o.show(truncate=False) > > org = "Drienerlolaan 5 P.O. Box 217 NL - 7500 AE Enschede" > filtered_org = g.vertices.filter(psf.col(VALUE)==org) > filtered_org.show(truncate=False) > > def bug_show_related_edges(g: GraphFrame, p_id : str, m_id : str, o_id : > str, org_id : str): > con1_m = psf.col(DST)==m_id > con2_m = psf.col(SRC)==m_id > edg1_m = g.edges.filter(con1_m) > edg1_m.show(truncate=False) > edg2_m = g.edges.filter(con2_m) > edg2_m.show(truncate=False) > > con1_p = psf.col(DST)==p_id > con2_p = psf.col(SRC)==p_id > edg1_p = g.edges.filter(con1_p) > edg1_p.show(truncate=False) > edg2_p = g.edges.filter(con2_p) > edg2_p.show(truncate=False) > > con1_o = psf.col(DST)==o_id > con2_o = psf.col(SRC)==o_id > edg1_o = g.edges.filter(con1_o) > edg1_o.show(truncate=False) > edg2_o = g.edges.filter(con2_o) > edg2_o.show(truncate=False) > > con1_org = psf.col(DST)==org_id > con2_org = psf.col(SRC)==org_id > edg1_org = g.edges.filter(con1_org) > edg1_org.show(truncate=False) > edg2_org = g.edges.filter(con2_org) > edg2_org.show(truncate=False) > > # prefix 'p_60129612354' corresponds with 130.89.0.0/16 > # maintainer 'm_2897' corresponds with SN-LIR-MNT RIPE-NCC-LEGACY-MNT > # origin 'o_5130' corresponds with1133 > # organisation 'org_197568516576' corresponds with Drienerlolaan 5 P.O. > Box 217 NL - 7500 AE Enschede > Output of bug_show_related_edges(g, "p_60129612354", "m_2897", "o_5130", > "org_197568516576") > # prefix -> maintainer edges (filtered on dst = maintainer) > +---------------------+----------+----------------+-----------------+ > | src | dst | created > |last_modified| > +---------------------+----------+----------------+-----------------+ > |p_197568533425 |m_2897 |0 |0 > | > |p_94489347499 |m_2897 |0 |0 > | > |p_25769898645 |m_2897 |1020678697 |1020678697 | > |p_128849058299 |m_2897 |0 |0 > | > |p_68719514870 |m_2897 |0 |0 > | > |p_146028965124 |m_2897 |1020267786 |1020267786 | > |p_60129579570 |m_2897 |0 |0 > | > +---------------------+----------+-----------------+----------------+ > > # maintainer to origin edges(filtered on src = maintainer) > +---------+------------------+----------------+----------------+ > |src | dst |created > |last_modified| > +---------+------------------+----------------+----------------+ > |m_2897|o_8589936949 |0 |0 > | > |m_2897|o_5130 |0 |0 > | > |m_2897|o_8589936949 |1020267786|1020267786 | > |m_2897|o_8589936949 |1020678697|1020678697 | > +---------+------------------+----------------+----------------+ > > # origin to prefix edges (filtered in dst = prefix) > +-------+-------------------+---------------+-----------------+ > |src |dst |created |last_modified | > +-------+-------------------+---------------+-----------------+ > |o_380|p_60129612354|1220513130|1220513130 | > +-------+-------------------+---------------+-----------------+ > > # prefix to maintainer edges(filtered on src = prefix) > +-------------------+--------+---------------+----------------+ > |src | dst |created > |last_modified| > +-------------------+--------+---------------+----------------+ > |p_60129612354|m_533|1220513130|1220513130 | > +-------------------+--------+---------------+----------------+ > > # maintainer to origin edges(filtered on dst = origin) > +---------+---------+----------+-----------------+ > |src |dst |created |last_modified| > +---------+---------+----------+-----------------+ > |m_2897|o_5130|0 |0 | > +---------+---------+----------+-----------------+ > > # origin to prefix(filtered on src = prefix) > +--------+------------------------+---------+----------------+ > |src |dst |created|last_modified| > +--------+------------------------+---------+----------------+ > |o_5130|p_94489347499 |0 |0 | > |o_5130|org_283467856326|0 |0 | > +--------+------------------------+---------+----------------+ > > # origin to organisation(filtered on dst = organisation) > > +------------------+-----------------------+---------------+----------------+ > |src |dst |created > |last_modified| > > +------------------+-----------------------+---------------+----------------+ > |o_8589936415|org_197568516576|1320919317|1320919317 | > |o_8589936415|org_197568516576|1285464368|1285464368 | > > +------------------+-----------------------+---------------+----------------+ > > # origin to organisation(filtered on src= organisation) > # NB: this is intended, there are no outgoing edges from organisations > +---+---+-------+-------------+ > |src|dst|created|last_modified| > +---+---+-------+-------------+ > +---+---+-------+-------------+ > > [6] > # Determines which rows are missing from edges > # Determines which rows in edges are not in src_df (superfluous) > def checker_df(edges: DataFrame, src_df: DataFrame, src: str, dst: str, > src_id: str, dst_id: str) -> DataFrame: > con1 = psf.col(SRC).startswith(src) > con2 = psf.col(DST).startswith(dst) > filtered_edges = edges.filter(con1 & con2) > if filtered_edges.count() == 0: > print(f"[Warning] No edges found after filtering! [checker_df] was > checking {src_id} to {dst_id}") > if DEBUG: > print("[Checker_df] Printing Filtered edges:") > filtered_edges.show(truncate=False) > print(f"Records: {filtered_edges.count()}") > # Shows if there are rows which are not represented in the edge df > missing_edges = src_df.join(filtered_edges, > ((filtered_edges[SRC] == src_df[src_id]) & > (filtered_edges[DST] == src_df[dst_id])), > "left_anti") > missing_edges = missing_edges.dropDuplicates() > if DEBUG: > print("[Checker_df] Printing missing edges:") > missing_edges.show(truncate=False) > print(f"Records: {missing_edges.count()}") > # Shows if there are edges which are not represented in the src df > superfluous_edges = filtered_edges.join(src_df, > ((filtered_edges[SRC] == src_df[src_id]) & > (filtered_edges[DST] == src_df[dst_id])), > "left") > superfluous_edges2 = > superfluous_edges.filter(superfluous_edges[src_id].isNull() | > superfluous_edges[dst_id].isNull()) > > if DEBUG: > print("[Checker_df] Printing superfluous edges:") > superfluous_edges2.show(truncate=False) > print(f"Records: {superfluous_edges2.count()}") > return missing_edges, superfluous_edges2 > >