OK few observations

1) ID Generation Method: How are you generating unique IDs (UUIDs,
sequential numbers, etc.)?
2) Data Inconsistencies: Have you checked for missing values impacting ID
generation?
3) Join Verification: If relevant, can you share the code for joining data
points during ID creation? Are joins matching columns correctly?
4) Specific Edge Issues: Can you share examples of vertex IDs with
incorrect connections? Is this related to ID generation or edge creation
logic?

HTH
Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI, FinCrime
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".


On Wed, 24 Apr 2024 at 12:24, Nijland, J.G.W. (Jelle, Student M-CS) <
j.g.w.nijl...@student.utwente.nl> wrote:

> tags: pyspark,spark-graphframes
>
> Hello,
>
> I am running pyspark in a podman container and I have issues with
> incorrect edges when I build my graph.
> I start with loading a source dataframe from a parquet directory on my
> server. The source dataframe has the following columns:
>
> +---------+-------+-----------------+---------+------+-----------------+------+-------------------+
> |created |descr |last_modified|mnt_by |origin|start_address|prefix
> |external_origin|
>
> +---------+-------+-----------------+---------+------+-----------------+------+-------------------+
>
> I aim to build a graph connecting prefix, mnt_by, origin and descr with
> edges storing the created and last_modified values.
> I start with generating IDs for the prefix, mnt_by, origin and descr using
> monotonically_increasing_id() [1]
> These IDs are prefixed with "m_", "p_", "o_" or "org_" to ensure they are
> unique IDs across the dataframe.
>
> Then I construct the vertices dataframe by collecting the ID, value and
> whether they are external for each vertex. [2]
> These vertices are then unioned together.
> Following the vertices, I construct the edges dataframe by selecting the
> IDs that I want to be the src and the dst and union those together. [3]
> These edges store the created and last_modified.
>
> Now I am ready to construct the graph. Here is where I run into my issue.
>
> When verifying my graph, I looked at a couple of vertices to see if they
> have the correct edges.
> I looked at the Utwente prefix, origin, descr and mnt_by and found that it
> generates incorrect edges.
>
> I saw edges going out to vertices that are not associated with the utwente
> values at all.
> The methods to find the vertices, edges and the output can be found in [4]
> We can already observe inconsistencies by viewing the prefix->maintainer
> and origin -> prefix edges. [5]
> Depending on what column I filter on the results are inconsistent.
> To make matters worse some edges contain IDs that are not connected to the
> original values in the source dataframe at all.
>
> What I have tried to resolve my issue:
>
>    - Write a checker that verifies edges created against the source
>    dataframe. [6]
>    The aim of this checker was to determine where the inconsistency comes
>    fro, to locate the bug and resolve it.
>    I ran this checker a limited graphs from n=10 upwards to n=1000000 (or
>    1m).
>    This felt close enough as there are only ~6.5m records in my source
>    dataframe.
>    This ran correctly, near the 1m it did experience significant slowdown
>    at the full dataframe it errors/times out.
>    I blamed this on the large joins that it performs on the source
>    dataframe.
>    - I found a github issue of someone with significantly larger graphs
>    have similar issues.
>    One suggestion there blamed indexing using strings rather than ints or
>    longs.
>    I rewrote my system to use int for IDs but I ran into the same issue.
>    The amount of incorrect edges was the same, the link to which
>    incorrects vertices it links to was the same too.
>    - I re-ordered my source dataframe to see what the impact was.
>    This results in considerably more incorrect edges using the checker in
>    [4]
>    If helpful I can post the output of this checker as well.
>
>
> Can you give me any pointers in what I can try or what I can do to clarify
> my situation better?
> Thanks in advance for your time.
>
> Kind regards,
> Jelle Nijland
>
>
>
>
> [1]
> import pyspark.sql.functions as psf
>
> # ID labels
> PREFIX_ID = "prefix_id"
> MAINTAINER_ID = "mnt_by_id"
> ORIGIN_ID = "origin_id"
> ORGANISATION_ID = "organisation_id"
>
> # Source dataframe column names
> MNT_BY = "mnt_by"
> PREFIX = "prefix"
> ORIGIN = "origin"
> DESCR = "descr"
> EXTERNAL_O = "external_origin"
>
>
> def generate_ids(df: DataFrame) -> DataFrame:
>     """
>     Generates a unique ID for each distinct maintainer, prefix, origin and
> organisation
>
>     Parameters
>     ----------
>     df : DataFrame
>         DataFrame to generate IDs for
>     """
>     mnt_by_id = df.select(MNT_BY).distinct().withColumn(MAINTAINER_ID,
> psf.concat(psf.lit('m_'), psf.monotonically_increasing_id()))
>     prefix_id = df.select(PREFIX).distinct().withColumn(PREFIX_ID,
> psf.concat(psf.lit('p_'), psf.monotonically_increasing_id()))
>     origin_id = df.select(ORIGIN).distinct().withColumn(ORIGIN_ID,
> psf.concat(psf.lit('o_'), psf.monotonically_increasing_id()))
>     organisation_id =
> df.select(DESCR).distinct().withColumn(ORGANISATION_ID,
> psf.concat(psf.lit('org_'), psf.monotonically_increasing_id()))
>
>     df = df.join(mnt_by_id, on=MNT_BY, how="left").join(prefix_id,
> on=PREFIX, how="left").join(origin_id, on=ORIGIN,
> how="left").join(organisation_id, on=DESCR, how="left")
>     return df
>
> def create_vertices(df: DataFrame) -> DataFrame:
>     """
>     Creates vertices from a DataFrame with IDs
>     Vertices have the format:
>     ID (str) | VALUE (str) | EXTERNAL (bool)
>
>     ID follows the format [p_|o_|m_|org_][0-9]+
>
>     Parameters
>     ----------
>     df : DataFrame
>         DataFrame to generate vertices for
>     """
>     prefixes = df.select(PREFIX_ID, PREFIX, psf.lit(False))
>     maintainers = df.select(MAINTAINER_ID, MNT_BY, psf.lit(False))
>     origins = df.select(ORIGIN_ID, ORIGIN, EXTERNAL_O)
>     organisations = df.select(ORGANISATION_ID, DESCR, psf.lit(False))
>
>     result_df =
> prefixes.union(maintainers).union(origins).union(organisations)
>     result_df = result_df.dropDuplicates()
>     result_df = result_df.withColumnRenamed("false", EXTERNAL)
>     result_df = result_df.withColumnRenamed(PREFIX_ID, ID)
>     result_df = result_df.withColumnRenamed(PREFIX, VALUE)
>     return result_df
>
> [3]
> def create_edges(df: DataFrame) -> DataFrame:
>     """
>     Creates edges from DataFrame with IDs
>     Edges have the format:
>     SRC (str) | DST (str) | Created (str) | Last_modified (str)
>
>     Parameters
>     ----------
>     df : DataFrame
>         DataFrame to generate edges for
>     """
>     p_to_mnt = df.select(PREFIX_ID, MAINTAINER_ID, CREATED, LAST_MODIFIED)
>     m_to_o = df.select(MAINTAINER_ID, ORIGIN_ID, CREATED, LAST_MODIFIED)
>     o_to_org = df.select(ORIGIN_ID, ORGANISATION_ID, CREATED,
> LAST_MODIFIED)
>     o_to_p = df.select(ORIGIN_ID, PREFIX_ID, CREATED, LAST_MODIFIED)
>
>     edges = p_to_mnt.union(m_to_o).union(o_to_org).union(o_to_p)
>     # result_df = edges
>     result_df = edges.dropDuplicates()
>     result_df = result_df.withColumnRenamed(PREFIX_ID, SRC)
>     result_df = result_df.withColumnRenamed(MAINTAINER_ID, DST)
>     return result_df
>
> [4]
> # # Demonstrating bug with the edges, using UT's prefix/mnt/origin/org as
> example
> # # How-to-use: get the IDs and plug them in bug_show_related_edges()
> def bug_gather_ids(g: GraphFrame):
>     vertex = "130.89.0.0/16"
>     filtered_v = g.vertices.filter(psf.col(VALUE)==vertex)
>     filtered_v.show(truncate=False)
>
>     mnt = "SN-LIR-MNT RIPE-NCC-LEGACY-MNT"
>     filtered_m = g.vertices.filter(psf.col(VALUE)==mnt)
>     filtered_m.show(truncate=False)
>
>     origin = "1133"
>     filtered_o = g.vertices.filter(psf.col(VALUE)==origin)
>     filtered_o.show(truncate=False)
>
>     org = "Drienerlolaan 5 P.O. Box 217 NL - 7500 AE Enschede"
>     filtered_org = g.vertices.filter(psf.col(VALUE)==org)
>     filtered_org.show(truncate=False)
>
> def bug_show_related_edges(g: GraphFrame, p_id : str, m_id : str, o_id :
> str, org_id : str):
>     con1_m = psf.col(DST)==m_id
>     con2_m = psf.col(SRC)==m_id
>     edg1_m = g.edges.filter(con1_m)
>     edg1_m.show(truncate=False)
>     edg2_m = g.edges.filter(con2_m)
>     edg2_m.show(truncate=False)
>
>     con1_p = psf.col(DST)==p_id
>     con2_p = psf.col(SRC)==p_id
>     edg1_p = g.edges.filter(con1_p)
>     edg1_p.show(truncate=False)
>     edg2_p = g.edges.filter(con2_p)
>     edg2_p.show(truncate=False)
>
>     con1_o = psf.col(DST)==o_id
>     con2_o = psf.col(SRC)==o_id
>     edg1_o = g.edges.filter(con1_o)
>     edg1_o.show(truncate=False)
>     edg2_o = g.edges.filter(con2_o)
>     edg2_o.show(truncate=False)
>
>     con1_org = psf.col(DST)==org_id
>     con2_org = psf.col(SRC)==org_id
>     edg1_org = g.edges.filter(con1_org)
>     edg1_org.show(truncate=False)
>     edg2_org = g.edges.filter(con2_org)
>     edg2_org.show(truncate=False)
>
> # prefix 'p_60129612354' corresponds with 130.89.0.0/16
> # maintainer 'm_2897' corresponds with SN-LIR-MNT RIPE-NCC-LEGACY-MNT
> # origin 'o_5130' corresponds with1133
> # organisation 'org_197568516576' corresponds with Drienerlolaan 5 P.O.
> Box 217 NL - 7500 AE Enschede
> Output of bug_show_related_edges(g, "p_60129612354", "m_2897", "o_5130",
> "org_197568516576")
> # prefix -> maintainer edges (filtered on dst = maintainer)
> +---------------------+----------+----------------+-----------------+
> | src                             | dst           | created
> |last_modified|
> +---------------------+----------+----------------+-----------------+
> |p_197568533425 |m_2897 |0                         |0
>                 |
> |p_94489347499    |m_2897 |0                         |0
>                 |
> |p_25769898645    |m_2897 |1020678697 |1020678697   |
> |p_128849058299 |m_2897 |0                         |0
>                 |
> |p_68719514870    |m_2897 |0                         |0
>                 |
> |p_146028965124 |m_2897 |1020267786  |1020267786   |
> |p_60129579570    |m_2897 |0                          |0
>                |
> +---------------------+----------+-----------------+----------------+
>
> # maintainer to origin edges(filtered on src = maintainer)
> +---------+------------------+----------------+----------------+
> |src          | dst                        |created
> |last_modified|
> +---------+------------------+----------------+----------------+
> |m_2897|o_8589936949 |0                        |0
> |
> |m_2897|o_5130                 |0                        |0
>                |
> |m_2897|o_8589936949 |1020267786|1020267786   |
> |m_2897|o_8589936949 |1020678697|1020678697   |
> +---------+------------------+----------------+----------------+
>
> # origin to prefix edges (filtered in dst = prefix)
> +-------+-------------------+---------------+-----------------+
> |src       |dst                          |created          |last_modified |
> +-------+-------------------+---------------+-----------------+
> |o_380|p_60129612354|1220513130|1220513130   |
> +-------+-------------------+---------------+-----------------+
>
> # prefix to maintainer edges(filtered on src = prefix)
> +-------------------+--------+---------------+----------------+
> |src                           | dst       |created
> |last_modified|
> +-------------------+--------+---------------+----------------+
> |p_60129612354|m_533|1220513130|1220513130   |
> +-------------------+--------+---------------+----------------+
>
> # maintainer to origin edges(filtered on dst = origin)
> +---------+---------+----------+-----------------+
> |src          |dst          |created  |last_modified|
> +---------+---------+----------+-----------------+
> |m_2897|o_5130|0                 |0                           |
> +---------+---------+----------+-----------------+
>
> # origin to prefix(filtered on src = prefix)
> +--------+------------------------+---------+----------------+
> |src         |dst                                  |created|last_modified|
> +--------+------------------------+---------+----------------+
> |o_5130|p_94489347499       |0               |0                        |
> |o_5130|org_283467856326|0               |0                        |
> +--------+------------------------+---------+----------------+
>
> # origin to organisation(filtered on dst = organisation)
>
> +------------------+-----------------------+---------------+----------------+
> |src                         |dst                                 |created
>           |last_modified|
>
> +------------------+-----------------------+---------------+----------------+
> |o_8589936415|org_197568516576|1320919317|1320919317   |
> |o_8589936415|org_197568516576|1285464368|1285464368   |
>
> +------------------+-----------------------+---------------+----------------+
>
> # origin to organisation(filtered on src= organisation)
> # NB: this is intended, there are no outgoing edges from organisations
> +---+---+-------+-------------+
> |src|dst|created|last_modified|
> +---+---+-------+-------------+
> +---+---+-------+-------------+
>
> [6]
> # Determines which rows are missing from edges
> # Determines which rows in edges are not in src_df (superfluous)
> def checker_df(edges: DataFrame, src_df: DataFrame, src: str, dst: str,
> src_id: str, dst_id: str) -> DataFrame:
>     con1 = psf.col(SRC).startswith(src)
>     con2 = psf.col(DST).startswith(dst)
>     filtered_edges = edges.filter(con1 & con2)
>     if filtered_edges.count() == 0:
>         print(f"[Warning] No edges found after filtering! [checker_df] was
> checking {src_id} to {dst_id}")
>     if DEBUG:
>         print("[Checker_df] Printing Filtered edges:")
>         filtered_edges.show(truncate=False)
>         print(f"Records: {filtered_edges.count()}")
>     # Shows if there are rows which are not represented in the edge df
>     missing_edges = src_df.join(filtered_edges,
>             ((filtered_edges[SRC] == src_df[src_id]) &
>              (filtered_edges[DST] == src_df[dst_id])),
>              "left_anti")
>     missing_edges = missing_edges.dropDuplicates()
>     if DEBUG:
>         print("[Checker_df] Printing missing edges:")
>         missing_edges.show(truncate=False)
>         print(f"Records: {missing_edges.count()}")
>     #  Shows if there are edges which are not represented in the src df
>     superfluous_edges = filtered_edges.join(src_df,
>             ((filtered_edges[SRC] == src_df[src_id]) &
>              (filtered_edges[DST] == src_df[dst_id])),
>              "left")
>     superfluous_edges2 =
> superfluous_edges.filter(superfluous_edges[src_id].isNull() |
> superfluous_edges[dst_id].isNull())
>
>     if DEBUG:
>         print("[Checker_df] Printing superfluous edges:")
>         superfluous_edges2.show(truncate=False)
>         print(f"Records: {superfluous_edges2.count()}")
>     return missing_edges, superfluous_edges2
>
>

Reply via email to