Thanks. You can see the errors in debug mode. Consumer implementations
should not fail with runtime errors without logging them. Are you not
logging exceptions in your consumer code ?
---------- Forwarded message ----------
From: "Marcelo Romaniuc" <mroma...@yahoo.com>
Date: 29 Dec 2015 20:23
Subject: Re: Failed to create a consumer
To: "Jagadish Venkatraman" <jagadish1...@gmail.com>
Cc:

Jagadish

    Thx for the feedback. Found an issue on getConsumer.. typo in one
config prevented the Consumer from being created... (I used
config.getInt(...) strange did not see anything on logs, even though method
does raise exception.
    It is fixed now.

Rgds,

Marcelo

------------------------------
*From:* Jagadish Venkatraman <jagadish1...@gmail.com>
*To:* dev@samza.apache.org; Marcelo Romaniuc <mroma...@yahoo.com>
*Sent:* Tuesday, December 29, 2015 5:26 AM
*Subject:* Re: Failed to create a consumer

Hi Marcelo,


*task.inputs=product.product.txt*
The format of this property task.inputs is system_name.stream_name, for
example - kafka.PageViewEvent.

You've defined your own system called *product* with a custom
*FileFeedSystemFactory*. It seems like there was an exception in your
FileFeedSystemFactory.getConsumer method. Run your example in debug mode or
catch/log exceptions in your FileFeedSystemFactory implementation.

You can read from kafka by defining task.inputs = kafka.topic_name

Thanks,
Jagadish


On Mon, Dec 28, 2015 at 5:28 AM, Marcelo Romaniuc <
mroma...@yahoo.com.invalid> wrote:

Hi,

    Trying a first app on Samza and used the hello-samza as model.    When
I run the app, get the following:

2015-12-28 14:17:14 SamzaContainer$ [INFO] Got system factories:
Set(product, kafka)
2015-12-28 14:17:14 SamzaContainer$ [INFO] Got input stream metadata:
Map(SystemStream [system=product, stream=product.txt] ->
SystemStreamMetadata [streamName=product.txt, partitionMetadata={Partition
[partition=0]=SystemStreamPartitionMetadata [oldestOffset=null,
newestOffset=null, upcomingOffset=null]}])
2015-12-28 14:17:14 SamzaContainer$ [INFO] Failed to create a consumer for
product, so skipping.
2015-12-28 14:17:14 SamzaContainer$ [INFO] Got system consumers: Set()
2015-12-28 14:17:14 SamzaContainer$ [INFO] Failed to create a producer for
product, so skipping.

    What am I missing ?

   My config...
# Job
job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
job.name=file-feed

# YARN
yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz

# Task
task.class=com.test.samza.task.FileFeedStreamTask
task.inputs=product.product.txt

# Serializers
serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory

# FileFeed System - product
systems.product.samza.factory=com.test.samza.system.FileFeedSystemFactory


# Kafka System
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
systems.kafka.samza.msg.serde=json
systems.kafka.consumer.zookeeper.connect=localhost:2181/
systems.kafka.producer.bootstrap.servers=localhost:9092

# Job Coordinator
job.coordinator.system=kafka
# Add configuration to disable checkpointing for this job once it is
available in the Coordinator Stream model
# See
https://issues.apache.org/jira/browse/SAMZA-465?focusedCommentId=14533346&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14533346
for more details
job.coordinator.replication.factor=1



Thanks and Regards,
marcelo




-- 
Jagadish V,
Graduate Student,
Department of Computer Science,
Stanford University

Reply via email to