The problem resolved. I change systems.hdfs.producer.hdfs.write.batch.size.bytes to very little value then the sequence file is filled with my data.
Thank you! On Wed, Dec 21, 2016 at 9:37 AM Hai Lu <h...@linkedin.com> wrote: > "close" as in the file is closed - either the producer moves on to the > next file or the job gets shutdown. I meant to say that this is liking > writing into disk, the data don't actually reach the disk until a flush > happens. So while you are still producing into HDFS, the content don't get > committed/flush until you are finished. That's why if you change the size > per file and make them smaller, you will probably see the previous results > earlier. > > It's all just my theory, though. But it's worth a shot:) > > And yeah, the stream name doesn't really make a difference as far as I > know. > > Thanks, > Hai > > On Tue, Dec 20, 2016 at 4:40 PM, Rui Tang <tangrui...@gmail.com> wrote: > > By the way, what do you mean "close", close what? > > And what should the stream parameter been, like the following "default" > one? It seems noting to do with the result. > > private final SystemStream OUTPUT_STREAM = new SystemStream("hdfs", > * "default"*); > > On Wed, Dec 21, 2016 at 8:23 AM Rui Tang <tangrui...@gmail.com> wrote: > > Thank you, I'll try it out! > > On Wed, Dec 21, 2016 at 1:45 AM Hai Lu <h...@linkedin.com> wrote: > > Hi Rui, > > I've tried out the HDFS producer, too. In my experience, you won't be able > to see changes written into HDFS in realtime. The content of the files > become visible only after they get closed. > > You can probably play with the "producer.hdfs.write.batch.size.bytes" > config to force rolling over to new files so you can see the results of the > previous one. > > Thanks, > Hai > > On Mon, Dec 19, 2016 at 11:29 PM, Rui Tang <tangrui...@gmail.com> wrote: > > I'm using samza-hdfs to write Kafka streams to HDFS, but I can't make it > work. > > Here is my samza job's properties file: > > # Job > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory > job.name=kafka2hdfs > > # YARN > > yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz > > # Task > task.class=net.tangrui.kafka2hdfs.Kafka2HDFSStreamTask > task.inputs=kafka.events > > # Serializers > > serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory > > # Systems > > systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory > systems.kafka.samza.msg.serde=string > systems.kafka.consumer.zookeeper.connect=localhost:2181 > systems.kafka.producer.bootstrap.servers=localhost:9092 > > systems.hdfs.samza.factory=org.apache.samza.system.hdfs.HdfsSystemFactory > > systems.hdfs.producer.hdfs.writer.class=org.apache.samza.system.hdfs.writer.TextSequenceFileHdfsWriter > systems.hdfs.producer.hdfs.base.output.dir=/events > > # Job Coordinator > job.coordinator.system=kafka > # Normally, this would be 3, but we have only one broker. > job.coordinator.replication.factor=1 > > > Here is my simple task: > > public class Kafka2HDFSStreamTask implements StreamTask { > > private final SystemStream OUTPUT_STREAM = new SystemStream("hdfs",* > "default"*); > > > > @Override > public void process(IncomingMessageEnvelope incomingMessageEnvelope, > MessageCollector messageCollector, TaskCoordinator > taskCoordinator) throws Exception { > String message = (String) incomingMessageEnvelope.getMessage(); > OutgoingMessageEnvelope envelope = new > OutgoingMessageEnvelope(OUTPUT_STREAM, message); > messageCollector.send(envelope); > } > } > > When running this job, a sequence file will be created in HDFS, but only > has some header info, no content. I cannot figure out where is wrong. And > what should I provide with the "stream" parameter when building the > SystemStream instance. > > >