Big Broadcast Hash Join with Dynamic Partition Pruning gives wrong results

2021-04-07 Thread Tomas Bartalos
when I try to do a Broadcast Hash Join on a bigger table (6Mil rows) I get an incorrect result of 0 rows. val rightDF = spark.read.format("parquet").load("table-a") val leftDF = spark.read.format("parquet").load("table-b") //needed to activate dynamic pruning subquery .where('part_ts === 2021

Re: Parquet read performance for different schemas

2019-09-20 Thread Tomas Bartalos
I forgot to mention important part that I'm issuing same query to both parquets - selecting only one column: df.select(sum('amount)) BR, Tomas št 19. 9. 2019 o 18:10 Tomas Bartalos napísal(a): > Hello, > > I have 2 parquets (each containing 1 file): > >- parquet

Parquet read performance for different schemas

2019-09-19 Thread Tomas Bartalos
Hello, I have 2 parquets (each containing 1 file): - parquet-wide - schema has 25 top level cols + 1 array - parquet-narrow - schema has 3 top level cols Both files have same data for given columns. When I read from parquet-wide spark reports* read 52.6 KB*, from parquet-narrow *only 2.6 K

Partition pruning by IDs from another table

2019-07-12 Thread Tomas Bartalos
Hello, I have 2 parquet tables: stored - table of 10 M records data - table of 100K records *This is fast:* val dataW = data.where("registration_ts in (20190516204l, 20190515143l,20190510125l, 20190503151l)") dataW.count res44: Long = 42 //takes 3 seconds stored.join(broadcast(dataW), Seq("registr

Re: Access to live data of cached dataFrame

2019-05-19 Thread Tomas Bartalos
ean Owen, wrote: > A cached DataFrame isn't supposed to change, by definition. > You can re-read each time or consider setting up a streaming source on > the table which provides a result that updates as new data comes in. > > On Fri, May 17, 2019 at 1:44 PM Tomas Ba

Access to live data of cached dataFrame

2019-05-17 Thread Tomas Bartalos
Hello, I have a cached dataframe: spark.read.format("delta").load("/data").groupBy(col("event_hour")).count.cache I would like to access the "live" data for this data frame without deleting the cache (using unpersist()). Whatever I do I always get the cached data on subsequent queries. Even addi

Howto force spark to honor parquet partitioning

2019-05-03 Thread Tomas Bartalos
Hello, I have partitioned parquet files based on "event_hour" column. After reading parquet files to spark: spark.read.format("parquet").load("...") Files from the same parquet partition are scattered in many spark partitions. Example of mapping spark partition -> parquet partition: Spark partit

thrift server in "streaming mode"

2019-02-20 Thread Tomas Bartalos
Hello, For one of our legacy workloads we use spark thriftserver to retrieve data from Kafka. The pipeline is: Oracle -- odbc --> Spark Thrift --> Kafka Spark is doing some transformation like: avro deserialize, array explode, etc, but no aggregation. The main issue we face is that thrift sends d

Re: Structured streaming from Kafka by timestamp

2019-02-01 Thread Tomas Bartalos
is more like a one-time SQL statement. >> Kafka doesn't support predicates how it's integrated with spark. What can >> be done from spark perspective is to look for an offset for a specific >> lowest timestamp and start the reading from there. >> >> BR, >>

Structured streaming from Kafka by timestamp

2019-01-24 Thread Tomas Bartalos
Hello, I'm trying to read Kafka via spark structured streaming. I'm trying to read data within specific time range: select count(*) from kafka_table where timestamp > cast('2019-01-23 1:00' as TIMESTAMP) and timestamp < cast('2019-01-23 1:01' as TIMESTAMP); The problem is that timestamp query i

Reading compacted Kafka topic is slow

2019-01-24 Thread Tomas Bartalos
Hello Spark folks, I'm reading compacted Kafka topic with spark 2.4, using direct stream - KafkaUtils.createDirectStream(...). I have configured necessary options for compacted stream, so its processed with CompactedKafkaRDDIterator. It works well, however in case of many gaps in the topic, the pr

Re: How to get all input tables of a SPARK SQL 'select' statement

2019-01-23 Thread Tomas Bartalos
This might help: show tables; st 23. 1. 2019 o 10:43 napísal(a): > Hi, All, > > We need to get all input tables of several SPARK SQL 'select' statements. > > We can get those information of Hive SQL statements by using 'explain > dependency select'. > But I can't find the equivalent command

cache table vs. parquet table performance

2019-01-15 Thread Tomas Bartalos
Hello, I'm using spark-thrift server and I'm searching for best performing solution to query hot set of data. I'm processing records with nested structure, containing subtypes and arrays. 1 record takes up several KB. I tried to make some improvement with cache table: cache table event_jan_01 as