Thank you, I'll try it out! On Wed, Dec 21, 2016 at 1:45 AM Hai Lu <h...@linkedin.com> wrote:
> Hi Rui, > > I've tried out the HDFS producer, too. In my experience, you won't be able > to see changes written into HDFS in realtime. The content of the files > become visible only after they get closed. > > You can probably play with the "producer.hdfs.write.batch.size.bytes" > config to force rolling over to new files so you can see the results of the > previous one. > > Thanks, > Hai > > On Mon, Dec 19, 2016 at 11:29 PM, Rui Tang <tangrui...@gmail.com> wrote: > > I'm using samza-hdfs to write Kafka streams to HDFS, but I can't make it > work. > > Here is my samza job's properties file: > > # Job > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory > job.name=kafka2hdfs > > # YARN > > yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz > > # Task > task.class=net.tangrui.kafka2hdfs.Kafka2HDFSStreamTask > task.inputs=kafka.events > > # Serializers > > serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory > > # Systems > > systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory > systems.kafka.samza.msg.serde=string > systems.kafka.consumer.zookeeper.connect=localhost:2181 > systems.kafka.producer.bootstrap.servers=localhost:9092 > > systems.hdfs.samza.factory=org.apache.samza.system.hdfs.HdfsSystemFactory > > systems.hdfs.producer.hdfs.writer.class=org.apache.samza.system.hdfs.writer.TextSequenceFileHdfsWriter > systems.hdfs.producer.hdfs.base.output.dir=/events > > # Job Coordinator > job.coordinator.system=kafka > # Normally, this would be 3, but we have only one broker. > job.coordinator.replication.factor=1 > > > Here is my simple task: > > public class Kafka2HDFSStreamTask implements StreamTask { > > private final SystemStream OUTPUT_STREAM = new SystemStream("hdfs",* > "default"*); > > > > @Override > public void process(IncomingMessageEnvelope incomingMessageEnvelope, > MessageCollector messageCollector, TaskCoordinator > taskCoordinator) throws Exception { > String message = (String) incomingMessageEnvelope.getMessage(); > OutgoingMessageEnvelope envelope = new > OutgoingMessageEnvelope(OUTPUT_STREAM, message); > messageCollector.send(envelope); > } > } > > When running this job, a sequence file will be created in HDFS, but only > has some header info, no content. I cannot figure out where is wrong. And > what should I provide with the "stream" parameter when building the > SystemStream instance. > >