Hello,

I have managed to get samza up and running an simple test job that just sends 
the received message. This is the code:

public class job1 implements StreamTask {
         private final SystemStream OUTPUT_STREAM = new SystemStream("kafka", 
"beste");

       public void process(IncomingMessageEnvelope envelope,
           MessageCollector collector,
           TaskCoordinator coordinator)
       {
             String msg = (String)envelope.getMessage();
             String outmsg = msg;
             collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, outmsg));
       }
}

The properties file that runs it is this one:

task.class=samzafroga.job1
job.factory.class=org.apache.samza.job.local.ThreadJobFactory
job.name=samzafroga.job1

systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
systems.kafka.consumer.zookeeper.connect= broker01:2181
systems.kafka.producer.bootstrap.servers=broker01:9092

task.inputs=kafka.syslog
serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
systems.kafka.streams.frogain.samza.msg.serde=json

When I get a message in the line that contains the following command:
                String msg = (String)envelope.getMessage();

I get an exception like this:

22 mar 2015 23:16:25 ERROR SamzaContainer - Caught exception in process loop.
java.lang.ClassCastException: [B cannot be cast to java.lang.String
        at samzafroga.job1.process(job1.java:19)
        at 
org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(TaskInstance.scala:129)
        at 
org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(TaskInstanceExceptionHandler.scala:54)
        at 
org.apache.samza.container.TaskInstance.process(TaskInstance.scala:128)
        at 
org.apache.samza.container.RunLoop$$anonfun$process$2.apply(RunLoop.scala:114)
        at 
org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37)
        at org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36)
        at org.apache.samza.container.RunLoop.process(RunLoop.scala:100)
        at org.apache.samza.container.RunLoop.run(RunLoop.scala:83)
        at 
org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:549)
        at org.apache.samza.job.local.ThreadJob$$anon$1.run(ThreadJob.scala:42)

Is this a configuration issue? I tried changing the serializer to String but I 
has the same effect.

Thanks,

               Jordi
________________________________
Jordi Blasi Uribarri
Área I+D+i

jbl...@nextel.es
Oficina Bilbao

[http://www.nextel.es/wp-content/uploads/Firma_Nextel_2014.png]

Reply via email to