Hi TD,

I agree I think we are better off either with a full fix or no fix. I am ok
with the complete fix being available in master or some branch. I guess the
solution for me is to just build from the source.

On a similar note, I am not finding any JIRA tickets related to full outer
joins and update mode for maybe say Spark 2.3. I wonder how hard is it two
implement both of these? It turns out the update mode and full outer join
is very useful and required in my case, therefore, I'm just asking.

Thanks!

On Tue, Mar 6, 2018 at 6:25 PM, Tathagata Das <tathagata.das1...@gmail.com>
wrote:

> I thought about it.
> I am not 100% sure whether this fix should go into 2.3.1.
>
> There are two parts to this bug fix to enable self-joins.
>
> 1. Enabling deduping of leaf logical nodes by extending
> MultInstanceRelation
>   - This is safe to be backported into the 2.3 branch as it does not touch
> production code paths.
>
> 2. Fixing attribute rewriting in MicroBatchExecution, when the micro-batch
> plan is spliced into the streaming plan.
>   - This touches core production code paths and therefore, may not safe to
> backport.
>
> Part 1 enables self-joins in all but a small fraction of self-join
> queries. That small fraction can produce incorrect results, and part 2
> avoids that.
>
> So for 2.3.1, we can enable self-joins by merging only part 1, but it can
> give wrong results in some cases. I think that is strictly worse than no
> fix.
>
> TD
>
>
>
> On Thu, Feb 22, 2018 at 2:32 PM, kant kodali <kanth...@gmail.com> wrote:
>
>> Hi TD,
>>
>> I pulled your commit that is listed on this ticket
>> https://issues.apache.org/jira/browse/SPARK-23406 specifically I did the
>> following steps and self joins work after I cherry-pick your commit!
>> Good Job! I was hoping it will be part of 2.3.0 but looks like it is
>> targeted for 2.3.1 :(
>>
>> git clone https://github.com/apache/spark.gitcd spark
>> git fetch
>> git checkout branch-2.3
>> git cherry-pick 658d9d9d785a30857bf35d164e6cbbd9799d6959
>> export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m"
>> ./build/mvn -DskipTests compile
>> ./dev/make-distribution.sh --name custom-spark --pip --r --tgz -Psparkr 
>> -Phadoop-2.7 -Phive -Phive-thriftserver -Pmesos -Pyarn
>>
>>
>> On Thu, Feb 22, 2018 at 11:25 AM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> Hey,
>>>
>>> Thanks for testing out stream-stream joins and reporting this issue. I
>>> am going to take a look at this.
>>>
>>> TD
>>>
>>>
>>>
>>> On Tue, Feb 20, 2018 at 8:20 PM, kant kodali <kanth...@gmail.com> wrote:
>>>
>>>> if I change it to the below code it works. However, I don't believe it
>>>> is the solution I am looking for. I want to be able to do it in raw SQL and
>>>> moreover, If a user gives a big chained raw spark SQL join query I am not
>>>> even sure how to make copies of the dataframe to achieve the self-join. Is
>>>> there any other way here?
>>>>
>>>>
>>>>
>>>> import org.apache.spark.sql.streaming.Trigger
>>>>
>>>> val jdf = 
>>>> spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
>>>> "localhost:9092").option("subscribe", 
>>>> "join_test").option("startingOffsets", "earliest").load();
>>>> val jdf1 = 
>>>> spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
>>>> "localhost:9092").option("subscribe", 
>>>> "join_test").option("startingOffsets", "earliest").load();
>>>>
>>>> jdf.createOrReplaceTempView("table")
>>>> jdf1.createOrReplaceTempView("table")
>>>>
>>>> val resultdf = spark.sql("select * from table inner join table1 on 
>>>> table.offset=table1.offset")
>>>>
>>>> resultdf.writeStream.outputMode("append").format("console").option("truncate",
>>>>  false).trigger(Trigger.ProcessingTime(1000)).start()
>>>>
>>>>
>>>> On Tue, Feb 20, 2018 at 8:16 PM, kant kodali <kanth...@gmail.com>
>>>> wrote:
>>>>
>>>>> If I change it to this
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Feb 20, 2018 at 7:52 PM, kant kodali <kanth...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>> I have the following code
>>>>>>
>>>>>> import org.apache.spark.sql.streaming.Trigger
>>>>>>
>>>>>> val jdf = 
>>>>>> spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
>>>>>> "localhost:9092").option("subscribe", 
>>>>>> "join_test").option("startingOffsets", "earliest").load();
>>>>>>
>>>>>> jdf.createOrReplaceTempView("table")
>>>>>>
>>>>>> val resultdf = spark.sql("select * from table as x inner join table as y 
>>>>>> on x.offset=y.offset")
>>>>>>
>>>>>> resultdf.writeStream.outputMode("update").format("console").option("truncate",
>>>>>>  false).trigger(Trigger.ProcessingTime(1000)).start()
>>>>>>
>>>>>> and I get the following exception.
>>>>>>
>>>>>> org.apache.spark.sql.AnalysisException: cannot resolve '`x.offset`' 
>>>>>> given input columns: [x.value, x.offset, x.key, x.timestampType, 
>>>>>> x.topic, x.timestamp, x.partition]; line 1 pos 50;
>>>>>> 'Project [*]
>>>>>> +- 'Join Inner, ('x.offset = 'y.offset)
>>>>>>    :- SubqueryAlias x
>>>>>>    :  +- SubqueryAlias table
>>>>>>    :     +- StreamingRelation 
>>>>>> DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets
>>>>>>  -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> 
>>>>>> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, 
>>>>>> offset#32L, timestamp#33, timestampType#34]
>>>>>>    +- SubqueryAlias y
>>>>>>       +- SubqueryAlias table
>>>>>>          +- StreamingRelation 
>>>>>> DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets
>>>>>>  -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> 
>>>>>> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, 
>>>>>> offset#32L, timestamp#33, timestampType#34]
>>>>>>
>>>>>> any idea whats wrong here?
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to