The problem resolved.

I change systems.hdfs.producer.hdfs.write.batch.size.bytes to very little
value then the sequence file is filled with my data.

Thank you!

On Wed, Dec 21, 2016 at 9:37 AM Hai Lu <h...@linkedin.com> wrote:

> "close" as in the file is closed - either the producer moves on to the
> next file or the job gets shutdown. I meant to say that this is liking
> writing into disk, the data don't actually reach the disk until a flush
> happens. So while you are still producing into HDFS, the content don't get
> committed/flush until you are finished. That's why if you change the size
> per file and make them smaller, you will probably see the previous results
> earlier.
>
> It's all just my theory, though. But it's worth a shot:)
>
> And yeah, the stream name doesn't really make a difference as far as I
> know.
>
> Thanks,
> Hai
>
> On Tue, Dec 20, 2016 at 4:40 PM, Rui Tang <tangrui...@gmail.com> wrote:
>
> By the way, what do you mean "close", close what?
>
> And what should the stream parameter been, like the following "default"
> one? It seems noting to do with the result.
>
> private final SystemStream OUTPUT_STREAM = new SystemStream("hdfs",
> * "default"*);
>
> On Wed, Dec 21, 2016 at 8:23 AM Rui Tang <tangrui...@gmail.com> wrote:
>
> Thank you, I'll try it out!
>
> On Wed, Dec 21, 2016 at 1:45 AM Hai Lu <h...@linkedin.com> wrote:
>
> Hi Rui,
>
> I've tried out the HDFS producer, too. In my experience, you won't be able
> to see changes written into HDFS in realtime. The content of the files
> become visible only after they get closed.
>
> You can probably play with the "producer.hdfs.write.batch.size.bytes"
> config to force rolling over to new files so you can see the results of the
> previous one.
>
> Thanks,
> Hai
>
> On Mon, Dec 19, 2016 at 11:29 PM, Rui Tang <tangrui...@gmail.com> wrote:
>
> I'm using samza-hdfs to write Kafka streams to HDFS, but I can't make it
> work.
>
> Here is my samza job's properties file:
>
> # Job
> job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
> job.name=kafka2hdfs
>
> # YARN
>
> yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
>
> # Task
> task.class=net.tangrui.kafka2hdfs.Kafka2HDFSStreamTask
> task.inputs=kafka.events
>
> # Serializers
>
> serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
>
> # Systems
>
> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
> systems.kafka.samza.msg.serde=string
> systems.kafka.consumer.zookeeper.connect=localhost:2181
> systems.kafka.producer.bootstrap.servers=localhost:9092
>
> systems.hdfs.samza.factory=org.apache.samza.system.hdfs.HdfsSystemFactory
>
> systems.hdfs.producer.hdfs.writer.class=org.apache.samza.system.hdfs.writer.TextSequenceFileHdfsWriter
> systems.hdfs.producer.hdfs.base.output.dir=/events
>
> # Job Coordinator
> job.coordinator.system=kafka
> # Normally, this would be 3, but we have only one broker.
> job.coordinator.replication.factor=1
>
>
> Here is my simple task:
>
> public class Kafka2HDFSStreamTask implements StreamTask {
>
>   private final SystemStream OUTPUT_STREAM = new SystemStream("hdfs",*
> "default"*);
>
>
>
>   @Override
>   public void process(IncomingMessageEnvelope incomingMessageEnvelope,
>                       MessageCollector messageCollector, TaskCoordinator
> taskCoordinator) throws Exception {
>     String message = (String) incomingMessageEnvelope.getMessage();
>     OutgoingMessageEnvelope envelope = new
> OutgoingMessageEnvelope(OUTPUT_STREAM, message);
>     messageCollector.send(envelope);
>   }
> }
>
> When running this job, a sequence file will be created in HDFS, but only
> has some header info, no content. I cannot figure out where is wrong. And
> what should I provide with the "stream" parameter when building the
> SystemStream instance.
>
>
>

Reply via email to