I think this is a bit specific to Samza. In the KafkaSystemProducer class, it does something like this:
envelope.getMessage.asInstanceOf[Array[Byte]] and not just 'byte[]'. This is why we need to be explicit about the serialization format. On Wed, Mar 25, 2015 at 3:14 AM, Jordi Blasi Uribarri <jbl...@nextel.es> wrote: > I am using the Kafka command line producer, so I understand that I am > sending a String. > > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic > syslog > > What is actually the difference between a string and a json? Is it just a > matter of deserialization or is there any kind of metadata included that > specifies the contest type? > > How do I enable the debug mode? > > Thanks, > > Jordi > > -----Mensaje original----- > De: Chinmay Soman [mailto:chinmay.cere...@gmail.com] > Enviado el: lunes, 23 de marzo de 2015 17:55 > Para: dev@samza.apache.org > Asunto: Re: cannot be cast to java.lang.String > > Hey Jordi, > > This is because you're sending String and not json in your output topic. > Try setting string on the output stream as well (if you haven't already). > > If you have done that - then please enable debug mode and attach the log > somewhere so that we can take a look. > > On Mon, Mar 23, 2015 at 9:52 AM, Jordi Blasi Uribarri <jbl...@nextel.es> > wrote: > > > Looks like that was one error. > > > > I have set the property like this: > > systems.kafka.streams.syslog.samza.msg.serde=string > > > > But I am still getting the same error. Now I am seeing a different > > thing in the log previous to the exception: > > > > 23 mar 2015 05:49:31 INFO KafkaSystemProducer - Creating a new > > producer for system kafka. > > 23 mar 2015 05:49:31 INFO ProducerConfig - ProducerConfig values: > > value.serializer = class > > org.apache.kafka.common.serialization.ByteArraySerializer > > key.serializer = class > > org.apache.kafka.common.serialization.ByteArraySerializer > > block.on.buffer.full = true > > retry.backoff.ms = 100 > > buffer.memory = 33554432 > > batch.size = 16384 > > metrics.sample.window.ms = 30000 > > metadata.max.age.ms = 300000 > > receive.buffer.bytes = 32768 > > timeout.ms = 30000 > > max.in.flight.requests.per.connection = 1 > > bootstrap.servers = [broker01:9092] > > metric.reporters = [] > > client.id = samza_producer-samzafroga_job1-1-1427086163149-3 > > compression.type = none > > retries = 2147483647 > > max.request.size = 1048576 > > send.buffer.bytes = 131072 > > acks = 1 > > reconnect.backoff.ms = 10 > > linger.ms = 0 > > metrics.num.samples = 2 > > metadata.fetch.timeout.ms = 60000 > > > > 23 mar 2015 05:49:31 ERROR SamzaContainer - Caught exception in > > process loop. > > java.lang.ClassCastException: java.lang.String cannot be cast to [B > > at > > > org.apache.samza.system.kafka.KafkaSystemProducer.send(KafkaSystemProducer.scala:80) > > at > > org.apache.samza.system.SystemProducers.send(SystemProducers.scala:87) > > at > > > org.apache.samza.task.TaskInstanceCollector.send(TaskInstanceCollector.scala:61) > > at samzafroga.job1.process(job1.java:21) > > at > > > org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(TaskInstance.scala:129) > > at > > > org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(TaskInstanceExceptionHandler.scala:54) > > at > > org.apache.samza.container.TaskInstance.process(TaskInstance.scala:128) > > at > > > org.apache.samza.container.RunLoop$$anonfun$process$2.apply(RunLoop.scala:114) > > at > > org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37) > > at > org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36) > > at org.apache.samza.container.RunLoop.process(RunLoop.scala:100) > > at org.apache.samza.container.RunLoop.run(RunLoop.scala:83) > > at > > org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:549) > > at > > org.apache.samza.job.local.ThreadJob$$anon$1.run(ThreadJob.scala:42) > > > > Thanks. > > > > Jordi > > -----Mensaje original----- > > De: Chinmay Soman [mailto:chinmay.cere...@gmail.com] > > Enviado el: lunes, 23 de marzo de 2015 17:36 > > Para: dev@samza.apache.org > > Asunto: Re: cannot be cast to java.lang.String > > > > Have you tried setting this : > > > > systems.kafka.streams.syslog.samza.msg.serde=string // And assuming > > you've defined a 'string' serializer in your config > > > > OR > > > > systems.kafka.streams.syslog.samza.msg.serde=json // Depending on the > > corresponding format of your input data > > > > On Mon, Mar 23, 2015 at 9:24 AM, Jordi Blasi Uribarri > > <jbl...@nextel.es> > > wrote: > > > > > Hi, > > > > > > As I understand it, I am setting "kafka" as the system name, "beste" > > > as the output topic in the system and "syslog" as the input topic. > > > Both topics syslog and beste are working correctly as I am streaming > > > some syslogs to the "syslog" topic and I am testing "beste" with an > > > internal application specifically designed. I am not sure about the > > kafka part. > > > > > > Thanks. > > > > > > Jordi > > > > > > -----Mensaje original----- > > > De: Chinmay Soman [mailto:chinmay.cere...@gmail.com] > > > Enviado el: lunes, 23 de marzo de 2015 17:16 > > > Para: dev@samza.apache.org > > > Asunto: Re: cannot be cast to java.lang.String > > > > > > Hey Jordi, > > > > > > I see 3 different stream names. > > > > > > 1. new SystemStream("kafka", "beste"); > > > > > > 2. task.inputs=kafka.syslog > > > > > > 3. systems.kafka.streams.frogain.samza.msg.serde=json > > > > > > Just for a sanity check, can you double check you're setting the > > > config params for the correct stream ? > > > > > > > > > On Mon, Mar 23, 2015 at 3:31 AM, Jordi Blasi Uribarri > > > <jbl...@nextel.es> > > > wrote: > > > > > > > Hello, > > > > > > > > I have managed to get samza up and running an simple test job that > > > > just sends the received message. This is the code: > > > > > > > > public class job1 implements StreamTask { > > > > private final SystemStream OUTPUT_STREAM = new > > > > SystemStream("kafka", "beste"); > > > > > > > > public void process(IncomingMessageEnvelope envelope, > > > > MessageCollector collector, > > > > TaskCoordinator coordinator) > > > > { > > > > String msg = (String)envelope.getMessage(); > > > > String outmsg = msg; > > > > collector.send(new > > > > OutgoingMessageEnvelope(OUTPUT_STREAM, > > > > outmsg)); > > > > } > > > > } > > > > > > > > The properties file that runs it is this one: > > > > > > > > task.class=samzafroga.job1 > > > > job.factory.class=org.apache.samza.job.local.ThreadJobFactory > > > > job.name=samzafroga.job1 > > > > > > > > > > > > systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSys > > > > te mF actory systems.kafka.consumer.zookeeper.connect= > > > > broker01:2181 > > > > systems.kafka.producer.bootstrap.servers=broker01:9092 > > > > > > > > task.inputs=kafka.syslog > > > > > > > > serializers.registry.json.class=org.apache.samza.serializers.JsonS > > > > er de Factory systems.kafka.streams.frogain.samza.msg.serde=json > > > > > > > > When I get a message in the line that contains the following command: > > > > String msg = (String)envelope.getMessage(); > > > > > > > > I get an exception like this: > > > > > > > > 22 mar 2015 23:16:25 ERROR SamzaContainer - Caught exception in > > > > process loop. > > > > java.lang.ClassCastException: [B cannot be cast to java.lang.String > > > > at samzafroga.job1.process(job1.java:19) > > > > at > > > > > > > org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV > > > $s > > > p(TaskInstance.scala:129) > > > > at > > > > > > > org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle( > > > Ta > > > skInstanceExceptionHandler.scala:54) > > > > at > > > > > org.apache.samza.container.TaskInstance.process(TaskInstance.scala:128) > > > > at > > > > > > > org.apache.samza.container.RunLoop$$anonfun$process$2.apply(RunLoop. > > > sc > > > ala:114) > > > > at > > > > > org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37) > > > > at > > > org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36) > > > > at > > org.apache.samza.container.RunLoop.process(RunLoop.scala:100) > > > > at org.apache.samza.container.RunLoop.run(RunLoop.scala:83) > > > > at > > > > > org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:549) > > > > at > > > > org.apache.samza.job.local.ThreadJob$$anon$1.run(ThreadJob.scala:4 > > > > 2) > > > > > > > > Is this a configuration issue? I tried changing the serializer to > > > > String but I has the same effect. > > > > > > > > Thanks, > > > > > > > > Jordi > > > > ________________________________ > > > > Jordi Blasi Uribarri > > > > Área I+D+i > > > > > > > > jbl...@nextel.es > > > > Oficina Bilbao > > > > > > > > [http://www.nextel.es/wp-content/uploads/Firma_Nextel_2014.png] > > > > > > > > > > > > > > > > -- > > > Thanks and regards > > > > > > Chinmay Soman > > > > > > > > > > > -- > > Thanks and regards > > > > Chinmay Soman > > > > > > -- > Thanks and regards > > Chinmay Soman > -- Thanks and regards Chinmay Soman