Hey Jordi, I see 3 different stream names.
1. new SystemStream("kafka", "beste"); 2. task.inputs=kafka.syslog 3. systems.kafka.streams.frogain.samza.msg.serde=json Just for a sanity check, can you double check you're setting the config params for the correct stream ? On Mon, Mar 23, 2015 at 3:31 AM, Jordi Blasi Uribarri <jbl...@nextel.es> wrote: > Hello, > > I have managed to get samza up and running an simple test job that just > sends the received message. This is the code: > > public class job1 implements StreamTask { > private final SystemStream OUTPUT_STREAM = new > SystemStream("kafka", "beste"); > > public void process(IncomingMessageEnvelope envelope, > MessageCollector collector, > TaskCoordinator coordinator) > { > String msg = (String)envelope.getMessage(); > String outmsg = msg; > collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, > outmsg)); > } > } > > The properties file that runs it is this one: > > task.class=samzafroga.job1 > job.factory.class=org.apache.samza.job.local.ThreadJobFactory > job.name=samzafroga.job1 > > > systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory > systems.kafka.consumer.zookeeper.connect= broker01:2181 > systems.kafka.producer.bootstrap.servers=broker01:9092 > > task.inputs=kafka.syslog > > serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory > systems.kafka.streams.frogain.samza.msg.serde=json > > When I get a message in the line that contains the following command: > String msg = (String)envelope.getMessage(); > > I get an exception like this: > > 22 mar 2015 23:16:25 ERROR SamzaContainer - Caught exception in process > loop. > java.lang.ClassCastException: [B cannot be cast to java.lang.String > at samzafroga.job1.process(job1.java:19) > at > org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(TaskInstance.scala:129) > at > org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(TaskInstanceExceptionHandler.scala:54) > at > org.apache.samza.container.TaskInstance.process(TaskInstance.scala:128) > at > org.apache.samza.container.RunLoop$$anonfun$process$2.apply(RunLoop.scala:114) > at > org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37) > at org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36) > at org.apache.samza.container.RunLoop.process(RunLoop.scala:100) > at org.apache.samza.container.RunLoop.run(RunLoop.scala:83) > at > org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:549) > at > org.apache.samza.job.local.ThreadJob$$anon$1.run(ThreadJob.scala:42) > > Is this a configuration issue? I tried changing the serializer to String > but I has the same effect. > > Thanks, > > Jordi > ________________________________ > Jordi Blasi Uribarri > Área I+D+i > > jbl...@nextel.es > Oficina Bilbao > > [http://www.nextel.es/wp-content/uploads/Firma_Nextel_2014.png] > -- Thanks and regards Chinmay Soman