I'm using samza-hdfs to write Kafka streams to HDFS, but I can't make it
work.
Here is my samza job's properties file:
# Job
job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
job.name=kafka2hdfs
# YARN
yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
# Task
task.class=net.tangrui.kafka2hdfs.Kafka2HDFSStreamTask
task.inputs=kafka.events
# Serializers
serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
# Systems
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
systems.kafka.samza.msg.serde=string
systems.kafka.consumer.zookeeper.connect=localhost:2181
systems.kafka.producer.bootstrap.servers=localhost:9092
systems.hdfs.samza.factory=org.apache.samza.system.hdfs.HdfsSystemFactory
systems.hdfs.producer.hdfs.writer.class=org.apache.samza.system.hdfs.writer.TextSequenceFileHdfsWriter
systems.hdfs.producer.hdfs.base.output.dir=/events
# Job Coordinator
job.coordinator.system=kafka
# Normally, this would be 3, but we have only one broker.
job.coordinator.replication.factor=1
Here is my simple task:
public class Kafka2HDFSStreamTask implements StreamTask {
private final SystemStream OUTPUT_STREAM = new SystemStream("hdfs",*
"default"*);
@Override
public void process(IncomingMessageEnvelope incomingMessageEnvelope,
MessageCollector messageCollector, TaskCoordinator
taskCoordinator) throws Exception {
String message = (String) incomingMessageEnvelope.getMessage();
OutgoingMessageEnvelope envelope = new
OutgoingMessageEnvelope(OUTPUT_STREAM, message);
messageCollector.send(envelope);
}
}
When running this job, a sequence file will be created in HDFS, but only
has some header info, no content. I cannot figure out where is wrong. And
what should I provide with the "stream" parameter when building the
SystemStream instance.