"close" as in the file is closed - either the producer moves on to the next file or the job gets shutdown. I meant to say that this is liking writing into disk, the data don't actually reach the disk until a flush happens. So while you are still producing into HDFS, the content don't get committed/flush until you are finished. That's why if you change the size per file and make them smaller, you will probably see the previous results earlier.
It's all just my theory, though. But it's worth a shot:) And yeah, the stream name doesn't really make a difference as far as I know. Thanks, Hai On Tue, Dec 20, 2016 at 4:40 PM, Rui Tang <tangrui...@gmail.com> wrote: > By the way, what do you mean "close", close what? > > And what should the stream parameter been, like the following "default" > one? It seems noting to do with the result. > > private final SystemStream OUTPUT_STREAM = new SystemStream("hdfs", > * "default"*); > > On Wed, Dec 21, 2016 at 8:23 AM Rui Tang <tangrui...@gmail.com> wrote: > >> Thank you, I'll try it out! >> >> On Wed, Dec 21, 2016 at 1:45 AM Hai Lu <h...@linkedin.com> wrote: >> >> Hi Rui, >> >> I've tried out the HDFS producer, too. In my experience, you won't be >> able to see changes written into HDFS in realtime. The content of the files >> become visible only after they get closed. >> >> You can probably play with the "producer.hdfs.write.batch.size.bytes" >> config to force rolling over to new files so you can see the results of the >> previous one. >> >> Thanks, >> Hai >> >> On Mon, Dec 19, 2016 at 11:29 PM, Rui Tang <tangrui...@gmail.com> wrote: >> >> I'm using samza-hdfs to write Kafka streams to HDFS, but I can't make it >> work. >> >> Here is my samza job's properties file: >> >> # Job >> job.factory.class=org.apache.samza.job.yarn.YarnJobFactory >> job.name=kafka2hdfs >> >> # YARN >> yarn.package.path=file://${basedir}/target/${project. >> artifactId}-${pom.version}-dist.tar.gz >> >> # Task >> task.class=net.tangrui.kafka2hdfs.Kafka2HDFSStreamTask >> task.inputs=kafka.events >> >> # Serializers >> serializers.registry.string.class=org.apache.samza. >> serializers.StringSerdeFactory >> >> # Systems >> systems.kafka.samza.factory=org.apache.samza.system.kafka. >> KafkaSystemFactory >> systems.kafka.samza.msg.serde=string >> systems.kafka.consumer.zookeeper.connect=localhost:2181 >> systems.kafka.producer.bootstrap.servers=localhost:9092 >> >> systems.hdfs.samza.factory=org.apache.samza.system.hdfs.HdfsSystemFactory >> systems.hdfs.producer.hdfs.writer.class=org.apache.samza. >> system.hdfs.writer.TextSequenceFileHdfsWriter >> systems.hdfs.producer.hdfs.base.output.dir=/events >> >> # Job Coordinator >> job.coordinator.system=kafka >> # Normally, this would be 3, but we have only one broker. >> job.coordinator.replication.factor=1 >> >> >> Here is my simple task: >> >> public class Kafka2HDFSStreamTask implements StreamTask { >> >> private final SystemStream OUTPUT_STREAM = new SystemStream("hdfs",* >> "default"*); >> >> >> >> @Override >> public void process(IncomingMessageEnvelope incomingMessageEnvelope, >> MessageCollector messageCollector, TaskCoordinator >> taskCoordinator) throws Exception { >> String message = (String) incomingMessageEnvelope.getMessage(); >> OutgoingMessageEnvelope envelope = new >> OutgoingMessageEnvelope(OUTPUT_STREAM, message); >> messageCollector.send(envelope); >> } >> } >> >> When running this job, a sequence file will be created in HDFS, but only >> has some header info, no content. I cannot figure out where is wrong. And >> what should I provide with the "stream" parameter when building the >> SystemStream instance. >> >>