Thank you, I'll try it out!

On Wed, Dec 21, 2016 at 1:45 AM Hai Lu <h...@linkedin.com> wrote:

> Hi Rui,
>
> I've tried out the HDFS producer, too. In my experience, you won't be able
> to see changes written into HDFS in realtime. The content of the files
> become visible only after they get closed.
>
> You can probably play with the "producer.hdfs.write.batch.size.bytes"
> config to force rolling over to new files so you can see the results of the
> previous one.
>
> Thanks,
> Hai
>
> On Mon, Dec 19, 2016 at 11:29 PM, Rui Tang <tangrui...@gmail.com> wrote:
>
> I'm using samza-hdfs to write Kafka streams to HDFS, but I can't make it
> work.
>
> Here is my samza job's properties file:
>
> # Job
> job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
> job.name=kafka2hdfs
>
> # YARN
>
> yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
>
> # Task
> task.class=net.tangrui.kafka2hdfs.Kafka2HDFSStreamTask
> task.inputs=kafka.events
>
> # Serializers
>
> serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
>
> # Systems
>
> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
> systems.kafka.samza.msg.serde=string
> systems.kafka.consumer.zookeeper.connect=localhost:2181
> systems.kafka.producer.bootstrap.servers=localhost:9092
>
> systems.hdfs.samza.factory=org.apache.samza.system.hdfs.HdfsSystemFactory
>
> systems.hdfs.producer.hdfs.writer.class=org.apache.samza.system.hdfs.writer.TextSequenceFileHdfsWriter
> systems.hdfs.producer.hdfs.base.output.dir=/events
>
> # Job Coordinator
> job.coordinator.system=kafka
> # Normally, this would be 3, but we have only one broker.
> job.coordinator.replication.factor=1
>
>
> Here is my simple task:
>
> public class Kafka2HDFSStreamTask implements StreamTask {
>
>   private final SystemStream OUTPUT_STREAM = new SystemStream("hdfs",*
> "default"*);
>
>
>
>   @Override
>   public void process(IncomingMessageEnvelope incomingMessageEnvelope,
>                       MessageCollector messageCollector, TaskCoordinator
> taskCoordinator) throws Exception {
>     String message = (String) incomingMessageEnvelope.getMessage();
>     OutgoingMessageEnvelope envelope = new
> OutgoingMessageEnvelope(OUTPUT_STREAM, message);
>     messageCollector.send(envelope);
>   }
> }
>
> When running this job, a sequence file will be created in HDFS, but only
> has some header info, no content. I cannot figure out where is wrong. And
> what should I provide with the "stream" parameter when building the
> SystemStream instance.
>
>

Reply via email to