Thanks. You can see the errors in debug mode. Consumer implementations should not fail with runtime errors without logging them. Are you not logging exceptions in your consumer code ? ---------- Forwarded message ---------- From: "Marcelo Romaniuc" <mroma...@yahoo.com> Date: 29 Dec 2015 20:23 Subject: Re: Failed to create a consumer To: "Jagadish Venkatraman" <jagadish1...@gmail.com> Cc:
Jagadish Thx for the feedback. Found an issue on getConsumer.. typo in one config prevented the Consumer from being created... (I used config.getInt(...) strange did not see anything on logs, even though method does raise exception. It is fixed now. Rgds, Marcelo ------------------------------ *From:* Jagadish Venkatraman <jagadish1...@gmail.com> *To:* dev@samza.apache.org; Marcelo Romaniuc <mroma...@yahoo.com> *Sent:* Tuesday, December 29, 2015 5:26 AM *Subject:* Re: Failed to create a consumer Hi Marcelo, *task.inputs=product.product.txt* The format of this property task.inputs is system_name.stream_name, for example - kafka.PageViewEvent. You've defined your own system called *product* with a custom *FileFeedSystemFactory*. It seems like there was an exception in your FileFeedSystemFactory.getConsumer method. Run your example in debug mode or catch/log exceptions in your FileFeedSystemFactory implementation. You can read from kafka by defining task.inputs = kafka.topic_name Thanks, Jagadish On Mon, Dec 28, 2015 at 5:28 AM, Marcelo Romaniuc < mroma...@yahoo.com.invalid> wrote: Hi, Trying a first app on Samza and used the hello-samza as model. When I run the app, get the following: 2015-12-28 14:17:14 SamzaContainer$ [INFO] Got system factories: Set(product, kafka) 2015-12-28 14:17:14 SamzaContainer$ [INFO] Got input stream metadata: Map(SystemStream [system=product, stream=product.txt] -> SystemStreamMetadata [streamName=product.txt, partitionMetadata={Partition [partition=0]=SystemStreamPartitionMetadata [oldestOffset=null, newestOffset=null, upcomingOffset=null]}]) 2015-12-28 14:17:14 SamzaContainer$ [INFO] Failed to create a consumer for product, so skipping. 2015-12-28 14:17:14 SamzaContainer$ [INFO] Got system consumers: Set() 2015-12-28 14:17:14 SamzaContainer$ [INFO] Failed to create a producer for product, so skipping. What am I missing ? My config... # Job job.factory.class=org.apache.samza.job.yarn.YarnJobFactory job.name=file-feed # YARN yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz # Task task.class=com.test.samza.task.FileFeedStreamTask task.inputs=product.product.txt # Serializers serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory # FileFeed System - product systems.product.samza.factory=com.test.samza.system.FileFeedSystemFactory # Kafka System systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory systems.kafka.samza.msg.serde=json systems.kafka.consumer.zookeeper.connect=localhost:2181/ systems.kafka.producer.bootstrap.servers=localhost:9092 # Job Coordinator job.coordinator.system=kafka # Add configuration to disable checkpointing for this job once it is available in the Coordinator Stream model # See https://issues.apache.org/jira/browse/SAMZA-465?focusedCommentId=14533346&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14533346 for more details job.coordinator.replication.factor=1 Thanks and Regards, marcelo -- Jagadish V, Graduate Student, Department of Computer Science, Stanford University