Hi TD,

I pulled your commit that is listed on this ticket
https://issues.apache.org/jira/browse/SPARK-23406 specifically I did the
following steps and self joins work after I cherry-pick your commit!
Good Job! I was hoping it will be part of 2.3.0 but looks like it is
targeted for 2.3.1 :(

git clone https://github.com/apache/spark.gitcd spark
git fetch
git checkout branch-2.3
git cherry-pick 658d9d9d785a30857bf35d164e6cbbd9799d6959
export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m"
./build/mvn -DskipTests compile
./dev/make-distribution.sh --name custom-spark --pip --r --tgz
-Psparkr -Phadoop-2.7 -Phive -Phive-thriftserver -Pmesos -Pyarn


On Thu, Feb 22, 2018 at 11:25 AM, Tathagata Das <tathagata.das1...@gmail.com
> wrote:

> Hey,
>
> Thanks for testing out stream-stream joins and reporting this issue. I am
> going to take a look at this.
>
> TD
>
>
>
> On Tue, Feb 20, 2018 at 8:20 PM, kant kodali <kanth...@gmail.com> wrote:
>
>> if I change it to the below code it works. However, I don't believe it is
>> the solution I am looking for. I want to be able to do it in raw SQL and
>> moreover, If a user gives a big chained raw spark SQL join query I am not
>> even sure how to make copies of the dataframe to achieve the self-join. Is
>> there any other way here?
>>
>>
>>
>> import org.apache.spark.sql.streaming.Trigger
>>
>> val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
>> "localhost:9092").option("subscribe", "join_test").option("startingOffsets", 
>> "earliest").load();
>> val jdf1 = 
>> spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
>> "localhost:9092").option("subscribe", "join_test").option("startingOffsets", 
>> "earliest").load();
>>
>> jdf.createOrReplaceTempView("table")
>> jdf1.createOrReplaceTempView("table")
>>
>> val resultdf = spark.sql("select * from table inner join table1 on 
>> table.offset=table1.offset")
>>
>> resultdf.writeStream.outputMode("append").format("console").option("truncate",
>>  false).trigger(Trigger.ProcessingTime(1000)).start()
>>
>>
>> On Tue, Feb 20, 2018 at 8:16 PM, kant kodali <kanth...@gmail.com> wrote:
>>
>>> If I change it to this
>>>
>>>
>>>
>>>
>>> On Tue, Feb 20, 2018 at 7:52 PM, kant kodali <kanth...@gmail.com> wrote:
>>>
>>>> Hi All,
>>>>
>>>> I have the following code
>>>>
>>>> import org.apache.spark.sql.streaming.Trigger
>>>>
>>>> val jdf = 
>>>> spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
>>>> "localhost:9092").option("subscribe", 
>>>> "join_test").option("startingOffsets", "earliest").load();
>>>>
>>>> jdf.createOrReplaceTempView("table")
>>>>
>>>> val resultdf = spark.sql("select * from table as x inner join table as y 
>>>> on x.offset=y.offset")
>>>>
>>>> resultdf.writeStream.outputMode("update").format("console").option("truncate",
>>>>  false).trigger(Trigger.ProcessingTime(1000)).start()
>>>>
>>>> and I get the following exception.
>>>>
>>>> org.apache.spark.sql.AnalysisException: cannot resolve '`x.offset`' given 
>>>> input columns: [x.value, x.offset, x.key, x.timestampType, x.topic, 
>>>> x.timestamp, x.partition]; line 1 pos 50;
>>>> 'Project [*]
>>>> +- 'Join Inner, ('x.offset = 'y.offset)
>>>>    :- SubqueryAlias x
>>>>    :  +- SubqueryAlias table
>>>>    :     +- StreamingRelation 
>>>> DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets
>>>>  -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> 
>>>> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, 
>>>> offset#32L, timestamp#33, timestampType#34]
>>>>    +- SubqueryAlias y
>>>>       +- SubqueryAlias table
>>>>          +- StreamingRelation 
>>>> DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets
>>>>  -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> 
>>>> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, 
>>>> offset#32L, timestamp#33, timestampType#34]
>>>>
>>>> any idea whats wrong here?
>>>>
>>>> Thanks!
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to