I am using the Kafka command line producer, so I understand that I am sending a 
String.

        bin/kafka-console-producer.sh --broker-list localhost:9092 --topic 
syslog

What is actually the difference between a string and a json? Is it just a 
matter of deserialization or is there any kind of metadata included that 
specifies the contest type?

How do I enable the debug mode?

Thanks,

        Jordi

-----Mensaje original-----
De: Chinmay Soman [mailto:chinmay.cere...@gmail.com] 
Enviado el: lunes, 23 de marzo de 2015 17:55
Para: dev@samza.apache.org
Asunto: Re: cannot be cast to java.lang.String

Hey Jordi,

This is because you're sending String and not json in your output topic.
Try setting string on the output stream as well (if you haven't already).

If you have done that - then please enable debug mode and attach the log 
somewhere so that we can take a look.

On Mon, Mar 23, 2015 at 9:52 AM, Jordi Blasi Uribarri <jbl...@nextel.es>
wrote:

> Looks like that was one error.
>
> I have set the property like this:
>         systems.kafka.streams.syslog.samza.msg.serde=string
>
> But I am still getting the same error. Now I am seeing a different 
> thing in the log previous to the exception:
>
> 23 mar 2015 05:49:31  INFO KafkaSystemProducer - Creating a new 
> producer for system kafka.
> 23 mar 2015 05:49:31  INFO ProducerConfig - ProducerConfig values:
>         value.serializer = class
> org.apache.kafka.common.serialization.ByteArraySerializer
>         key.serializer = class
> org.apache.kafka.common.serialization.ByteArraySerializer
>         block.on.buffer.full = true
>         retry.backoff.ms = 100
>         buffer.memory = 33554432
>         batch.size = 16384
>         metrics.sample.window.ms = 30000
>         metadata.max.age.ms = 300000
>         receive.buffer.bytes = 32768
>         timeout.ms = 30000
>         max.in.flight.requests.per.connection = 1
>         bootstrap.servers = [broker01:9092]
>         metric.reporters = []
>         client.id = samza_producer-samzafroga_job1-1-1427086163149-3
>         compression.type = none
>         retries = 2147483647
>         max.request.size = 1048576
>         send.buffer.bytes = 131072
>         acks = 1
>         reconnect.backoff.ms = 10
>         linger.ms = 0
>         metrics.num.samples = 2
>         metadata.fetch.timeout.ms = 60000
>
> 23 mar 2015 05:49:31 ERROR SamzaContainer - Caught exception in 
> process loop.
> java.lang.ClassCastException: java.lang.String cannot be cast to [B
>         at
> org.apache.samza.system.kafka.KafkaSystemProducer.send(KafkaSystemProducer.scala:80)
>         at
> org.apache.samza.system.SystemProducers.send(SystemProducers.scala:87)
>         at
> org.apache.samza.task.TaskInstanceCollector.send(TaskInstanceCollector.scala:61)
>         at samzafroga.job1.process(job1.java:21)
>         at
> org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(TaskInstance.scala:129)
>         at
> org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(TaskInstanceExceptionHandler.scala:54)
>         at
> org.apache.samza.container.TaskInstance.process(TaskInstance.scala:128)
>         at
> org.apache.samza.container.RunLoop$$anonfun$process$2.apply(RunLoop.scala:114)
>         at
> org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37)
>         at org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36)
>         at org.apache.samza.container.RunLoop.process(RunLoop.scala:100)
>         at org.apache.samza.container.RunLoop.run(RunLoop.scala:83)
>         at
> org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:549)
>         at
> org.apache.samza.job.local.ThreadJob$$anon$1.run(ThreadJob.scala:42)
>
> Thanks.
>
>         Jordi
> -----Mensaje original-----
> De: Chinmay Soman [mailto:chinmay.cere...@gmail.com]
> Enviado el: lunes, 23 de marzo de 2015 17:36
> Para: dev@samza.apache.org
> Asunto: Re: cannot be cast to java.lang.String
>
> Have you tried setting this :
>
> systems.kafka.streams.syslog.samza.msg.serde=string   // And assuming
> you've defined a 'string' serializer in your config
>
> OR
>
> systems.kafka.streams.syslog.samza.msg.serde=json     // Depending on the
> corresponding format of your input data
>
> On Mon, Mar 23, 2015 at 9:24 AM, Jordi Blasi Uribarri 
> <jbl...@nextel.es>
> wrote:
>
> > Hi,
> >
> > As I understand it, I am setting "kafka" as the system name, "beste"
> > as the output topic in the system and "syslog" as the input topic.
> > Both topics syslog and beste are working correctly as I am streaming 
> > some syslogs to the "syslog" topic and I am testing "beste" with an 
> > internal application specifically designed. I am not sure about the
> kafka part.
> >
> > Thanks.
> >
> >         Jordi
> >
> > -----Mensaje original-----
> > De: Chinmay Soman [mailto:chinmay.cere...@gmail.com]
> > Enviado el: lunes, 23 de marzo de 2015 17:16
> > Para: dev@samza.apache.org
> > Asunto: Re: cannot be cast to java.lang.String
> >
> > Hey Jordi,
> >
> > I see 3 different stream names.
> >
> > 1. new SystemStream("kafka", "beste");
> >
> > 2. task.inputs=kafka.syslog
> >
> > 3. systems.kafka.streams.frogain.samza.msg.serde=json
> >
> > Just for a sanity check, can you double check you're setting the 
> > config params for the correct stream ?
> >
> >
> > On Mon, Mar 23, 2015 at 3:31 AM, Jordi Blasi Uribarri 
> > <jbl...@nextel.es>
> > wrote:
> >
> > > Hello,
> > >
> > > I have managed to get samza up and running an simple test job that 
> > > just sends the received message. This is the code:
> > >
> > > public class job1 implements StreamTask {
> > >          private final SystemStream OUTPUT_STREAM = new 
> > > SystemStream("kafka", "beste");
> > >
> > >        public void process(IncomingMessageEnvelope envelope,
> > >            MessageCollector collector,
> > >            TaskCoordinator coordinator)
> > >        {
> > >              String msg = (String)envelope.getMessage();
> > >              String outmsg = msg;
> > >              collector.send(new
> > > OutgoingMessageEnvelope(OUTPUT_STREAM,
> > > outmsg));
> > >        }
> > > }
> > >
> > > The properties file that runs it is this one:
> > >
> > > task.class=samzafroga.job1
> > > job.factory.class=org.apache.samza.job.local.ThreadJobFactory
> > > job.name=samzafroga.job1
> > >
> > >
> > > systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSys
> > > te mF actory systems.kafka.consumer.zookeeper.connect= 
> > > broker01:2181
> > > systems.kafka.producer.bootstrap.servers=broker01:9092
> > >
> > > task.inputs=kafka.syslog
> > >
> > > serializers.registry.json.class=org.apache.samza.serializers.JsonS
> > > er de Factory systems.kafka.streams.frogain.samza.msg.serde=json
> > >
> > > When I get a message in the line that contains the following command:
> > >                 String msg = (String)envelope.getMessage();
> > >
> > > I get an exception like this:
> > >
> > > 22 mar 2015 23:16:25 ERROR SamzaContainer - Caught exception in 
> > > process loop.
> > > java.lang.ClassCastException: [B cannot be cast to java.lang.String
> > >         at samzafroga.job1.process(job1.java:19)
> > >         at
> > >
> > org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV
> > $s
> > p(TaskInstance.scala:129)
> > >         at
> > >
> > org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(
> > Ta
> > skInstanceExceptionHandler.scala:54)
> > >         at
> > > org.apache.samza.container.TaskInstance.process(TaskInstance.scala:128)
> > >         at
> > >
> > org.apache.samza.container.RunLoop$$anonfun$process$2.apply(RunLoop.
> > sc
> > ala:114)
> > >         at
> > > org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37)
> > >         at
> > org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36)
> > >         at
> org.apache.samza.container.RunLoop.process(RunLoop.scala:100)
> > >         at org.apache.samza.container.RunLoop.run(RunLoop.scala:83)
> > >         at
> > > org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:549)
> > >         at
> > > org.apache.samza.job.local.ThreadJob$$anon$1.run(ThreadJob.scala:4
> > > 2)
> > >
> > > Is this a configuration issue? I tried changing the serializer to 
> > > String but I has the same effect.
> > >
> > > Thanks,
> > >
> > >                Jordi
> > > ________________________________
> > > Jordi Blasi Uribarri
> > > Área I+D+i
> > >
> > > jbl...@nextel.es
> > > Oficina Bilbao
> > >
> > > [http://www.nextel.es/wp-content/uploads/Firma_Nextel_2014.png]
> > >
> >
> >
> >
> > --
> > Thanks and regards
> >
> > Chinmay Soman
> >
>
>
>
> --
> Thanks and regards
>
> Chinmay Soman
>



--
Thanks and regards

Chinmay Soman

Reply via email to