By the way, what do you mean "close", close what?

And what should the stream parameter been, like the following "default"
one? It seems noting to do with the result.

private final SystemStream OUTPUT_STREAM = new SystemStream("hdfs",
* "default"*);

On Wed, Dec 21, 2016 at 8:23 AM Rui Tang <tangrui...@gmail.com> wrote:

> Thank you, I'll try it out!
>
> On Wed, Dec 21, 2016 at 1:45 AM Hai Lu <h...@linkedin.com> wrote:
>
> Hi Rui,
>
> I've tried out the HDFS producer, too. In my experience, you won't be able
> to see changes written into HDFS in realtime. The content of the files
> become visible only after they get closed.
>
> You can probably play with the "producer.hdfs.write.batch.size.bytes"
> config to force rolling over to new files so you can see the results of the
> previous one.
>
> Thanks,
> Hai
>
> On Mon, Dec 19, 2016 at 11:29 PM, Rui Tang <tangrui...@gmail.com> wrote:
>
> I'm using samza-hdfs to write Kafka streams to HDFS, but I can't make it
> work.
>
> Here is my samza job's properties file:
>
> # Job
> job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
> job.name=kafka2hdfs
>
> # YARN
>
> yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
>
> # Task
> task.class=net.tangrui.kafka2hdfs.Kafka2HDFSStreamTask
> task.inputs=kafka.events
>
> # Serializers
>
> serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
>
> # Systems
>
> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
> systems.kafka.samza.msg.serde=string
> systems.kafka.consumer.zookeeper.connect=localhost:2181
> systems.kafka.producer.bootstrap.servers=localhost:9092
>
> systems.hdfs.samza.factory=org.apache.samza.system.hdfs.HdfsSystemFactory
>
> systems.hdfs.producer.hdfs.writer.class=org.apache.samza.system.hdfs.writer.TextSequenceFileHdfsWriter
> systems.hdfs.producer.hdfs.base.output.dir=/events
>
> # Job Coordinator
> job.coordinator.system=kafka
> # Normally, this would be 3, but we have only one broker.
> job.coordinator.replication.factor=1
>
>
> Here is my simple task:
>
> public class Kafka2HDFSStreamTask implements StreamTask {
>
>   private final SystemStream OUTPUT_STREAM = new SystemStream("hdfs",*
> "default"*);
>
>
>
>   @Override
>   public void process(IncomingMessageEnvelope incomingMessageEnvelope,
>                       MessageCollector messageCollector, TaskCoordinator
> taskCoordinator) throws Exception {
>     String message = (String) incomingMessageEnvelope.getMessage();
>     OutgoingMessageEnvelope envelope = new
> OutgoingMessageEnvelope(OUTPUT_STREAM, message);
>     messageCollector.send(envelope);
>   }
> }
>
> When running this job, a sequence file will be created in HDFS, but only
> has some header info, no content. I cannot figure out where is wrong. And
> what should I provide with the "stream" parameter when building the
> SystemStream instance.
>
>

Reply via email to