[ 
https://issues.apache.org/jira/browse/SPARK-16950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15550562#comment-15550562
 ] 

Russell Jurney commented on SPARK-16950:
----------------------------------------

Probably doing something wrong, but I'm getting an exception when trying to 
create with offset 0 and I'm wondering if this patch worked? Probably me, but I 
don't know where else to complain.

{code:title=test.py}
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils, OffsetRange, TopicAndPartition

sc = SparkContext("local[2]", "activity_summary")
ssc = StreamingContext(sc, 1)

BROKERS = 'localhost:9092'
TOPIC = 'push'
PARTITION = 0

topicAndPartition = TopicAndPartition(TOPIC, PARTITION)
fromOffsets = {topicAndPartition: PARTITION}
directKafkaStream = KafkaUtils.createDirectStream(ssc, [TOPIC], 
{"metadata.broker.list": BROKERS}, fromOffsets=fromOffsets)
{code}

{code:title=error.java}
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-6-3ed224d6a1eb> in <module>()
----> 1 directKafkaStream = KafkaUtils.createDirectStream(ssc, [TOPIC], 
{"metadata.broker.list": BROKERS}, fromOffsets=fromOffsets)

/Users/rjurney/Software/Agile_Data_Code_2/spark/python/pyspark/streaming/kafka.py
 in createDirectStream(ssc, topics, kafkaParams, fromOffsets, keyDecoder, 
valueDecoder, messageHandler)
    128             func = funcWithoutMessageHandler
    129             jstream = helper.createDirectStreamWithoutMessageHandler(
--> 130                 ssc._jssc, kafkaParams, set(topics), jfromOffsets)
    131         else:
    132             ser = AutoBatchedSerializer(PickleSerializer())

/Users/rjurney/Software/Agile_Data_Code_2/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py
 in __call__(self, *args)
   1131         answer = self.gateway_client.send_command(command)
   1132         return_value = get_return_value(
-> 1133             answer, self.gateway_client, self.target_id, self.name)
   1134
   1135         for temp_arg in temp_args:

/Users/rjurney/Software/Agile_Data_Code_2/spark/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py
 in get_return_value(answer, gateway_client, target_id, name)
    317                 raise Py4JJavaError(
    318                     "An error occurred while calling {0}{1}{2}.\n".
--> 319                     format(target_id, ".", name), value)
    320             else:
    321                 raise Py4JError(

Py4JJavaError: An error occurred while calling 
o65.createDirectStreamWithoutMessageHandler.
: java.lang.ClassCastException: java.lang.Integer cannot be cast to 
java.lang.Long
        at 
org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper$$anonfun$17.apply(KafkaUtils.scala:717)
        at 
scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
        at 
scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
        at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
        at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
        at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
        at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
        at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
        at 
scala.collection.TraversableOnce$class.copyToBuffer(TraversableOnce.scala:275)
        at 
scala.collection.AbstractTraversable.copyToBuffer(Traversable.scala:104)
        at scala.collection.MapLike$class.toBuffer(MapLike.scala:326)
        at scala.collection.AbstractMap.toBuffer(Map.scala:59)
        at scala.collection.MapLike$class.toSeq(MapLike.scala:323)
        at scala.collection.AbstractMap.toSeq(Map.scala:59)
        at 
org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStream(KafkaUtils.scala:717)
        at 
org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStreamWithoutMessageHandler(KafkaUtils.scala:688)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:280)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:214)
        at java.lang.Thread.run(Thread.java:745)
{code}

> fromOffsets parameter in Kafka's Direct Streams does not work in python3
> ------------------------------------------------------------------------
>
>                 Key: SPARK-16950
>                 URL: https://issues.apache.org/jira/browse/SPARK-16950
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 2.0.0, 2.0.1, 2.1.0
>            Reporter: Mariusz Strzelecki
>             Fix For: 2.0.1, 2.1.0
>
>
> KafkaUtils.createDirectStream does not work in python3 when you set parameter 
> fromOffsets (which is starting offsets of the stream on Kafka). This is 
> because the {{long}} type is removed from python3 and py4j maps numeric 
> variables to {{java.lang.Integer}} or {{java.lang.Long}} depending on number 
> size, which causes ClassCastException for small offsets variables.
> This behaviour was noticed before and tests for this functionality are 
> disabled in python3: 
> https://github.com/apache/spark/blob/89e67d6667d5f8be9c6fb6c120fbcd350ae2950d/python/pyspark/streaming/tests.py#L1061



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to