Hello,

  I have two Dataframes I want to join using a condition such that each
record from each Dataframe may be joined with multiple records from the
other Dataframe. This means the original records should appear multiple
times in the resulting joined Dataframe if the condition is fulfilled for
multiple pairings of that record and records from the other Dataframe.

  Is this possible, and if so, how? I tried "inner", "outer", "cross" and
"full", but never got the desired result.

  One Dataframe contains genetic variants with chromosome and position, the
other genes with chromosome, start and end. I want to obtain each
variant-gene pair where the variant is sufficiently close to the gene,
which means on the same chromosome and between start end end or within a
padding. The join is this:





*cond = (genes.chromosome == variants.chromosome) & \
 (genes.start - padding <= variants.position) & \           (genes.end +
padding >= variants.position)    gene_variants =
genes.join(variants.alias('variants'), cond, "left_outer")*

  The entire file looks like this:



























































*import argparsefrom pyspark.sql import SparkSessiondef main():    """
Arguments: none    """    arg_parser =
argparse.ArgumentParser(prog='huge.py')
arg_parser.add_argument("--phenotype", help="The phenotype",
required=True)    arg_parser.add_argument("--genes", help="Gene data with
regions", required=True)    arg_parser.add_argument("--gene-associations",
help="Gene data with p-values", required=True)
arg_parser.add_argument("--variants", help="Variant data", required=True)
  arg_parser.add_argument("--padding", help="Variants are considered this
far away from the gene")    cli_args = arg_parser.parse_args()    phenotype
= cli_args.phenotype    files_glob = 'part-*'    genes_glob =
cli_args.genes + files_glob    genes_assoc_glob =
cli_args.gene_associations + files_glob    variants_glob =
cli_args.variants + files_glob    padding = getattr(cli_args, 'padding',
100000)    print('Phenotype: ' + phenotype)    print('Genes data with
regions: ' + genes_glob)    print('Gene data with p-values: ' +
genes_assoc_glob)    print('Variant data: ' + variants_glob)    spark =
SparkSession.builder.appName('huge').getOrCreate()    print('Genes from ' +
genes_glob + ':')    genes_regions_raw = spark.read.json(genes_glob)
gene_regions = genes_regions_raw.select('chromosome', 'start', 'end',
'source', 'name')\        .filter(genes_regions_raw.source ==
'symbol').drop(genes_regions_raw.source)    print('There are ' +
str(gene_regions.count()) + ' gene regions:')    for row in
gene_regions.take(42):        print(row)    gene_p_values =
spark.read.json(genes_assoc_glob).select('gene', 'pValue')    print('There
are ' + str(gene_p_values.count()) + ' gene associations')    for row in
gene_p_values.take(42):        print(row)    genes =
gene_regions.join(gene_p_values, gene_regions.name
<http://gene_regions.name> == gene_p_values.gene)    print("Joined gene
data gives " + str(genes.count()) + ' rows:')    for row in
genes.take(42):        print(row)    variants =
spark.read.json(variants_glob).select('chromosome', 'position',
'reference', 'alt', 'pValue')    print('There is data from ' +
str(variants.count()) + ' variants:')    for row in variants.take(42):
  print(row)    cond = (genes.chromosome == variants.chromosome) & \
   (genes.start - padding <= variants.position) & \           (genes.end +
padding >= variants.position)    gene_variants =
genes.join(variants.alias('variants'), cond, "left_outer")
print('Joining genes and variants give ' + str(gene_variants.count()) + '
pairs:')    for row in gene_variants.take(42):        print(row)
print('Stopping Spark')    spark.stop()if __name__ == '__main__':    main()*

  Thanks!

     Best, Oliver

-- 
Oliver Ruebenacker, Ph.D. (he)
Senior Software Engineer, Knowledge Portal Network
<http://kp4cd.org/>, Flannick
Lab <http://www.flannicklab.org/>, Broad Institute
<http://www.broadinstitute.org/>

Reply via email to