[
https://issues.apache.org/jira/browse/SPARK-16950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15550562#comment-15550562
]
Russell Jurney commented on SPARK-16950:
----------------------------------------
Probably doing something wrong, but I'm getting an exception when trying to
create with offset 0 and I'm wondering if this patch worked? Probably me, but I
don't know where else to complain.
{code:title=test.py}
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils, OffsetRange, TopicAndPartition
sc = SparkContext("local[2]", "activity_summary")
ssc = StreamingContext(sc, 1)
BROKERS = 'localhost:9092'
TOPIC = 'push'
PARTITION = 0
topicAndPartition = TopicAndPartition(TOPIC, PARTITION)
fromOffsets = {topicAndPartition: PARTITION}
directKafkaStream = KafkaUtils.createDirectStream(ssc, [TOPIC],
{"metadata.broker.list": BROKERS}, fromOffsets=fromOffsets)
{code}
{code:title=error.java}
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-6-3ed224d6a1eb> in <module>()
----> 1 directKafkaStream = KafkaUtils.createDirectStream(ssc, [TOPIC],
{"metadata.broker.list": BROKERS}, fromOffsets=fromOffsets)
/Users/rjurney/Software/Agile_Data_Code_2/spark/python/pyspark/streaming/kafka.py
in createDirectStream(ssc, topics, kafkaParams, fromOffsets, keyDecoder,
valueDecoder, messageHandler)
128 func = funcWithoutMessageHandler
129 jstream = helper.createDirectStreamWithoutMessageHandler(
--> 130 ssc._jssc, kafkaParams, set(topics), jfromOffsets)
131 else:
132 ser = AutoBatchedSerializer(PickleSerializer())
/Users/rjurney/Software/Agile_Data_Code_2/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py
in __call__(self, *args)
1131 answer = self.gateway_client.send_command(command)
1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client, self.target_id, self.name)
1134
1135 for temp_arg in temp_args:
/Users/rjurney/Software/Agile_Data_Code_2/spark/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py
in get_return_value(answer, gateway_client, target_id, name)
317 raise Py4JJavaError(
318 "An error occurred while calling {0}{1}{2}.\n".
--> 319 format(target_id, ".", name), value)
320 else:
321 raise Py4JError(
Py4JJavaError: An error occurred while calling
o65.createDirectStreamWithoutMessageHandler.
: java.lang.ClassCastException: java.lang.Integer cannot be cast to
java.lang.Long
at
org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper$$anonfun$17.apply(KafkaUtils.scala:717)
at
scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
at
scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
at
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at
scala.collection.TraversableOnce$class.copyToBuffer(TraversableOnce.scala:275)
at
scala.collection.AbstractTraversable.copyToBuffer(Traversable.scala:104)
at scala.collection.MapLike$class.toBuffer(MapLike.scala:326)
at scala.collection.AbstractMap.toBuffer(Map.scala:59)
at scala.collection.MapLike$class.toSeq(MapLike.scala:323)
at scala.collection.AbstractMap.toSeq(Map.scala:59)
at
org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStream(KafkaUtils.scala:717)
at
org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStreamWithoutMessageHandler(KafkaUtils.scala:688)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)
{code}
> fromOffsets parameter in Kafka's Direct Streams does not work in python3
> ------------------------------------------------------------------------
>
> Key: SPARK-16950
> URL: https://issues.apache.org/jira/browse/SPARK-16950
> Project: Spark
> Issue Type: Bug
> Components: PySpark
> Affects Versions: 2.0.0, 2.0.1, 2.1.0
> Reporter: Mariusz Strzelecki
> Fix For: 2.0.1, 2.1.0
>
>
> KafkaUtils.createDirectStream does not work in python3 when you set parameter
> fromOffsets (which is starting offsets of the stream on Kafka). This is
> because the {{long}} type is removed from python3 and py4j maps numeric
> variables to {{java.lang.Integer}} or {{java.lang.Long}} depending on number
> size, which causes ClassCastException for small offsets variables.
> This behaviour was noticed before and tests for this functionality are
> disabled in python3:
> https://github.com/apache/spark/blob/89e67d6667d5f8be9c6fb6c120fbcd350ae2950d/python/pyspark/streaming/tests.py#L1061
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]