Hello, I have two Dataframes I want to join using a condition such that each record from each Dataframe may be joined with multiple records from the other Dataframe. This means the original records should appear multiple times in the resulting joined Dataframe if the condition is fulfilled for multiple pairings of that record and records from the other Dataframe.
Is this possible, and if so, how? I tried "inner", "outer", "cross" and "full", but never got the desired result. One Dataframe contains genetic variants with chromosome and position, the other genes with chromosome, start and end. I want to obtain each variant-gene pair where the variant is sufficiently close to the gene, which means on the same chromosome and between start end end or within a padding. The join is this: *cond = (genes.chromosome == variants.chromosome) & \ (genes.start - padding <= variants.position) & \ (genes.end + padding >= variants.position) gene_variants = genes.join(variants.alias('variants'), cond, "left_outer")* The entire file looks like this: *import argparsefrom pyspark.sql import SparkSessiondef main(): """ Arguments: none """ arg_parser = argparse.ArgumentParser(prog='huge.py') arg_parser.add_argument("--phenotype", help="The phenotype", required=True) arg_parser.add_argument("--genes", help="Gene data with regions", required=True) arg_parser.add_argument("--gene-associations", help="Gene data with p-values", required=True) arg_parser.add_argument("--variants", help="Variant data", required=True) arg_parser.add_argument("--padding", help="Variants are considered this far away from the gene") cli_args = arg_parser.parse_args() phenotype = cli_args.phenotype files_glob = 'part-*' genes_glob = cli_args.genes + files_glob genes_assoc_glob = cli_args.gene_associations + files_glob variants_glob = cli_args.variants + files_glob padding = getattr(cli_args, 'padding', 100000) print('Phenotype: ' + phenotype) print('Genes data with regions: ' + genes_glob) print('Gene data with p-values: ' + genes_assoc_glob) print('Variant data: ' + variants_glob) spark = SparkSession.builder.appName('huge').getOrCreate() print('Genes from ' + genes_glob + ':') genes_regions_raw = spark.read.json(genes_glob) gene_regions = genes_regions_raw.select('chromosome', 'start', 'end', 'source', 'name')\ .filter(genes_regions_raw.source == 'symbol').drop(genes_regions_raw.source) print('There are ' + str(gene_regions.count()) + ' gene regions:') for row in gene_regions.take(42): print(row) gene_p_values = spark.read.json(genes_assoc_glob).select('gene', 'pValue') print('There are ' + str(gene_p_values.count()) + ' gene associations') for row in gene_p_values.take(42): print(row) genes = gene_regions.join(gene_p_values, gene_regions.name <http://gene_regions.name> == gene_p_values.gene) print("Joined gene data gives " + str(genes.count()) + ' rows:') for row in genes.take(42): print(row) variants = spark.read.json(variants_glob).select('chromosome', 'position', 'reference', 'alt', 'pValue') print('There is data from ' + str(variants.count()) + ' variants:') for row in variants.take(42): print(row) cond = (genes.chromosome == variants.chromosome) & \ (genes.start - padding <= variants.position) & \ (genes.end + padding >= variants.position) gene_variants = genes.join(variants.alias('variants'), cond, "left_outer") print('Joining genes and variants give ' + str(gene_variants.count()) + ' pairs:') for row in gene_variants.take(42): print(row) print('Stopping Spark') spark.stop()if __name__ == '__main__': main()* Thanks! Best, Oliver -- Oliver Ruebenacker, Ph.D. (he) Senior Software Engineer, Knowledge Portal Network <http://kp4cd.org/>, Flannick Lab <http://www.flannicklab.org/>, Broad Institute <http://www.broadinstitute.org/>