Thanks Ewen. We decided to update our producer side of the application to use schema-registry and post avro messages. Now I am able to store avro messages in HDFS using connect. I have couple more questions :
1) I am using TimeBasedPartitioner and trying to store data in hourly buckets. But the rotation for a particular hour XX is happening only in XX+1 hour, which is a problem when I have batch jobs reading data off /XX bucket. For. example I have rotate.interval.ms=600000(5 minutes), - 3:58 one file gets rotated under /YYYY/MM/dd/03 in HDFS - 4:03 -> one file gets rotated under /YYYY/MM/dd/04 in HDFS for data from (4:00 to 4:03) -> one file gets rotated under /YYYY/MM/dd/03 in HDFS for data from (3:58 to 4:00) In this case if I have a hourly batch job starting at 4:00 to process /YYYY/MM/dd/03, it would miss one file. *Below is my connector config* : *name=hdfs-sink* *connector.class=io.confluent.connect.hdfs.HdfsSinkConnector* *tasks.max=1* *topics=raw-message-avro* *hdfs.url=hdfs://localhost:8020* *topics.dir=/raw/avro/hourly/* *flush.size=10000* *partitioner.class=io.confluent.connect.hdfs.partitioner.TimeBasedPartitioner* *partition.duration.ms <http://partition.duration.ms>=120000* *rotate.interval.ms <http://rotate.interval.ms>=600000* *timezone=UTC* *path.format=YYYY/MM/dd/HH/* *locale=US* 2) Can I control the file commit based on size like Flume does ? Right now I see flush.size and rotate.interval.ms related to file commit/flush. Is there any other config I am missing? Thanks, Venkatesh On Tue, Feb 23, 2016 at 9:09 PM, Ewen Cheslack-Postava <e...@confluent.io> wrote: > Consuming plain JSON is a bit tricky for something like HDFS because all > the output formats expect the data to have a schema. You can read the JSON > data with the provided JsonConverter, but it'll be returned without a > schema. The HDFS connector will currently fail on this because it expects a > fixed structure. > > Note however that it *does not* depend on already being in Avro format. > Kafka Connect is specifically designed to abstract away the serialization > format of data in Kafka so that connectors don't need to be written a > half-dozen times to support different formats. > > There are a couple of possibilities to allow the HDFS connector to handle > schemaless (i.e. JSON-like) data. One possibility is to infer the schema > automatically based on the incoming data. If you can make guarantees about > the compatibility of the data, this could work with the existing connector > code. Alternatively, an option could be added to handle this type of data > and force file rotation if a new schema was encountered. The risk with this > is that if you have data interleaved with different schemas (as might > happen as you transition an app to a new format) and no easy way to project > between them, you'll have a lot of small HDFS files for awhile. > > Dealing with schemaless data will be tricky for connectors like HDFS, but > is definitely possible. But its worth thinking through the right way to > handle that data with a minimum of additional configuration options > required. > > -Ewen > > On Wed, Feb 17, 2016 at 11:14 AM, Venkatesh Rudraraju < > venkatengineer...@gmail.com> wrote: > >> Hi, >> >> I tried using the HDFS connector sink with kafka-connect and works as >> described-> >> http://docs.confluent.io/2.0.0/connect/connect-hdfs/docs/index.html >> >> My Scenario : >> >> I have plain Json data in a kafka topic. Can I still use HDFS connector >> sink to read data from kafka-topic and write to HDFS in avro format ? >> >> As I read from the documentation, HDFS connector expects data in kafka >> already in avro format? Is there a workaround where I can consume plain >> Json and write to HDFS in avro ? Say I have a schema for the plain json >> data. >> >> Thanks, >> Venkatesh >> > > > > -- > Thanks, > Ewen >