FYI there is a stock timestamp interceptor, if you want to use that. Mike
On May 22, 2013, at 3:20 AM, ZORAIDA HIDALGO SANCHEZ <zora...@tid.es> wrote: > Dear all, > > I made a custom interceptor in order to insert the timestamp header that is > used by the HDFS sink. > Firstly, I run an example using SPOOLING dir as a source, FILE channel and > HDFS sink. It was find. > Secondly, I changed the configuration so having two machines, the conf in > each of them was: > > FIRST MACHINE: > SPOOLING source(with my custom interceptor) > FILE channel > AVRO sink > > SECOND MACHINE: > AVRO source > FILE channel > HDFS sink > > Now, the error that I am getting is(from second machine): > > ERROR hdfs.HDFSEventSink: process failed > java.lang.RuntimeException: Flume wasn't able to parse timestamp header in > the event to resolve time based bucketing. Please check that you're correctly > populating timestamp header (for example using TimestampInterceptor source > interceptor). > at > org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:160) > at > org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:343) > at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:392) > at > org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) > at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) > at java.lang.Thread.run(Thread.java:662) > Caused by: java.lang.NumberFormatException: null > at java.lang.Long.parseLong(Long.java:375) > at java.lang.Long.valueOf(Long.java:525) > at > org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:158) > ... 5 more > > It looks like the header is missing. > > FIRST MACHINE CONF: > tier1.sources.source1.type = spooldir > tier1.sources.source1.spoolDir = /home/user/flume/data > tier1.sources.source1.batchSize = 1000 > tier1.sources.source1.bufferMaxLines = 3000 > tier1.sources.source1.fileHeader = true > tier1.sources.source1.fileSuffix=.COMPLETED > tier1.sources.source1.channels = channel1 > tier1.sources.source1.interceptors = it1 > tier1.sources.source1.interceptors.it1.type = > com.pdi.koios.flume.interceptors.DatetimeInterceptor$Builder > tier1.sources.source1.interceptors.it1.preserveExisting=true > tier1.sources.source1.interceptors.it1.dateRegex=\\d{4}-\\d{2}-\\d{2} > tier1.sources.source1.interceptors.it1.dateFormat=yyyy-MM-dd > > tier1.channels.channel1.type = file > tier1.channels.channel1.checkpointDir = /home/user/flume/channelcheckpoint > tier1.channels.channel1.dataDirs = /home/user/flume/channeldata > tier1.channels.channel1.capacity = 10000 > tier1.channels.channel1.transactionCapacity = 10000 > > tier1.sinks.sink1.type = avro > tier1.sinks.sink1.hostname = 10.95.108.245 > tier1.sinks.sink1.port = 4141 > tier1.sinks.sink1.channel = channel1 > > SECOND MACHINE CONF: > tier1.sources.source1.type = avro > tier1.sources.source1.bind = 0.0.0.0 > tier1.sources.source1.port = 4141 > tier1.sources.source1.channels = channel1 > > tier1.channels.channel1.type = file > tier1.channels.channel1.checkpointDir = /home/user/flume/channelcheckpoint > tier1.channels.channel1.dataDirs = /home/user/flume/channeldata > tier1.channels.channel1.capacity = 10000 > tier1.channels.channel1.transactionCapacity = 10000 > > tier1.sinks.sink1.type = hdfs > tier1.sinks.sink1.channel = channel1 > tier1.sinks.sink1.hdfs.batchSize = 1000 > tier1.sinks.sink1.hdfs.rollInterval = 5 > tier1.sinks.sink1.hdfs.rollTimeout = 10 > tier1.sinks.sink1.hdfs.rollCount = 0 > tier1.sinks.sink1.hdfs.rollSize = 0 > tier1.sinks.sink1.hdfs.path = /user/user/flume/%Y/%m/%d > tier1.sinks.sink1.hdfs.fileType = DataStream > tier1.sinks.sink1.writeFormat = Text > > > Este mensaje se dirige exclusivamente a su destinatario. Puede consultar > nuestra política de envío y recepción de correo electrónico en el enlace > situado más abajo. > This message is intended exclusively for its addressee. We only send and > receive email on the basis of the terms set out at: > http://www.tid.es/ES/PAGINAS/disclaimer.aspx