Sorry, previously I got confused and I assumed you were using Flink's
StreamingFileSink.
Could you try to use Flink's Avro - Parquet writer?
StreamingFileSink.forBulkFormat(
Path...(MY_PATH),
ParquetAvroWriters.forGenericRecord(MY_SCHEMA))
.build()
Cheers,
Kostas
On Thu, Nov 29, 2018 at 12:25 PM Avi Levi <[email protected]> wrote:
> Thanks.
> yes, the *env.execute* is called and enabled checkpoints
> I think the problem is where to place the *writer.close *to flush the
> cache
> If I'll place on the sink after the write event e.g
> addSink{
> writer.write
> writer.close
> }
> in this case only the first record will be included in the file but not
> the rest of the stream.
>
>
> On Thu, Nov 29, 2018 at 11:07 AM Kostas Kloudas <
> [email protected]> wrote:
>
>> Hi again Avi,
>>
>> In the first example that you posted (the one with the Kafka source), do
>> you call env.execute()?
>>
>> Cheers,
>> Kostas
>>
>> On Thu, Nov 29, 2018 at 10:01 AM Kostas Kloudas <
>> [email protected]> wrote:
>>
>>> Hi Avi,
>>>
>>> In the last snippet that you posted, you have not activated checkpoints.
>>>
>>> Checkpoints are needed for the StreamingFileSink to produce results,
>>> especially in the case of BulkWriters (like Parquet) where
>>> the part file is rolled upon reception of a checkpoint and the part is
>>> finalised (i.e. "committed") when the checkpoint gets completed
>>> successfully.
>>>
>>> Could you please enable checkpointing and make sure that the job runs
>>> long enough for at least some checkpoints to be completed?
>>>
>>> Thanks a lot,
>>> Kostas
>>>
>>> On Thu, Nov 29, 2018 at 7:03 AM Avi Levi <[email protected]>
>>> wrote:
>>>
>>>> Checkout this little App. you can see that the file is created but no
>>>> data is written. even for a single record
>>>>
>>>> import io.eels.component.parquet.ParquetWriterConfig
>>>> import org.apache.avro.Schema
>>>> import org.apache.avro.generic.{ GenericData, GenericRecord }
>>>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>>>> import org.apache.hadoop.fs.Path
>>>> import org.apache.parquet.avro.AvroParquetWriter
>>>> import org.apache.parquet.hadoop.{ ParquetFileWriter, ParquetWriter }
>>>> import org.apache.parquet.hadoop.metadata.CompressionCodecName
>>>> import scala.io.Source
>>>> import org.apache.flink.streaming.api.scala._
>>>>
>>>> object Tester extends App {
>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>> def now = System.currentTimeMillis()
>>>> val path = new Path(s"test-$now.parquet")
>>>> val schemaString =
>>>> Source.fromURL(getClass.getResource("/request_schema.avsc")).mkString
>>>> val schema: Schema = new Schema.Parser().parse(schemaString)
>>>> val compressionCodecName = CompressionCodecName.SNAPPY
>>>> val config = ParquetWriterConfig()
>>>> val genericReocrd: GenericRecord = new GenericData.Record(schema)
>>>> genericReocrd.put("name", "test_b")
>>>> genericReocrd.put("code", "NoError")
>>>> genericReocrd.put("ts", 100L)
>>>> val stream = env.fromElements(genericReocrd)
>>>> val writer: ParquetWriter[GenericRecord] =
>>>> AvroParquetWriter.builder[GenericRecord](path)
>>>> .withSchema(schema)
>>>> .withCompressionCodec(compressionCodecName)
>>>> .withPageSize(config.pageSize)
>>>> .withRowGroupSize(config.blockSize)
>>>> .withDictionaryEncoding(config.enableDictionary)
>>>> .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
>>>> .withValidation(config.validating)
>>>> .build()
>>>>
>>>> writer.write(genericReocrd)
>>>> stream.addSink { r =>
>>>> println(s"In Sink $r")
>>>> writer.write(r)
>>>> }
>>>> env.execute()
>>>> // writer.close()
>>>> }
>>>>
>>>>
>>>> On Thu, Nov 29, 2018 at 6:57 AM vipul singh <[email protected]> wrote:
>>>>
>>>>> Can you try closing the writer?
>>>>>
>>>>> AvroParquetWriter has an internal buffer. Try doing a .close() in
>>>>> snapshot()( since you are checkpointing hence this method will be called)
>>>>>
>>>>> On Wed, Nov 28, 2018 at 7:33 PM Avi Levi <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Thanks Rafi,
>>>>>> I am actually not using assignTimestampsAndWatermarks , I will try
>>>>>> to add it as you suggested. however it seems that the messages I
>>>>>> repeating
>>>>>> in the stream over and over even if I am pushing single message manually
>>>>>> to
>>>>>> the queue, that message will repeat infinity
>>>>>>
>>>>>> Cheers
>>>>>> Avi
>>>>>>
>>>>>>
>>>>>> On Wed, Nov 28, 2018 at 10:40 PM Rafi Aroch <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Avi,
>>>>>>>
>>>>>>> I can't see the part where you use assignTimestampsAndWatermarks.
>>>>>>> If this part in not set properly, it's possible that watermarks are
>>>>>>> not sent and nothing will be written to your Sink.
>>>>>>>
>>>>>>> See here for more details:
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
>>>>>>>
>>>>>>> Hope this helps,
>>>>>>> Rafi
>>>>>>>
>>>>>>> On Wed, Nov 28, 2018, 21:22 Avi Levi <[email protected] wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I am trying to implement Parquet Writer as SinkFunction. The
>>>>>>>> pipeline consists of kafka as source and parquet file as a sink
>>>>>>>> however it
>>>>>>>> seems like the stream is repeating itself like endless loop and the
>>>>>>>> parquet
>>>>>>>> file is not written . can someone please help me with this?
>>>>>>>>
>>>>>>>> object ParquetSinkWriter{
>>>>>>>> private val path = new Path("tmp/pfile")
>>>>>>>> private val schemaString =
>>>>>>>> Source.fromURL(getClass.getResource("/dns_request_schema.avsc")).mkString
>>>>>>>> private val avroSchema: Schema = new
>>>>>>>> Schema.Parser().parse(schemaString)
>>>>>>>> private val compressionCodecName = CompressionCodecName.SNAPPY
>>>>>>>> private val config = ParquetWriterConfig()
>>>>>>>> val writer: ParquetWriter[GenericRecord] =
>>>>>>>> AvroParquetWriter.builder[GenericRecord](path)
>>>>>>>> .withSchema(avroSchema)
>>>>>>>> .withCompressionCodec(compressionCodecName)
>>>>>>>> .withPageSize(config.pageSize)
>>>>>>>> .withRowGroupSize(config.blockSize)
>>>>>>>> .withDictionaryEncoding(config.enableDictionary)
>>>>>>>> .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
>>>>>>>> .withValidation(config.validating)
>>>>>>>> .build()
>>>>>>>> }
>>>>>>>>
>>>>>>>> class ParquetSinkWriter(path: Path, avroSchema: Schema) extends
>>>>>>>> SinkFunction[GenericRecord] {
>>>>>>>> import ParquetSinkWriter._
>>>>>>>> override def invoke(value: GenericRecord): Unit = {
>>>>>>>> println(s"ADDING TO File : $value") // getting this output
>>>>>>>> writer.write(value) //the output is not written to the file
>>>>>>>> }
>>>>>>>> }
>>>>>>>>
>>>>>>>> //main app
>>>>>>>> object StreamingJob extends App {
>>>>>>>> implicit val env: StreamExecutionEnvironment =
>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment
>>>>>>>> env.enableCheckpointing(500)
>>>>>>>>
>>>>>>>> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
>>>>>>>>
>>>>>>>> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
>>>>>>>> env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
>>>>>>>> env.getCheckpointConfig.setCheckpointTimeout(600)
>>>>>>>> env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
>>>>>>>> env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
>>>>>>>> env.setRestartStrategy(RestartStrategies.failureRateRestart(2,
>>>>>>>> Time.seconds(3), Time.seconds(3)))
>>>>>>>> val backend: StateBackend = new
>>>>>>>> RocksDBStateBackend("file:///tmp/rocksdb", true)
>>>>>>>> env.setStateBackend(backend)
>>>>>>>> val writer = new ParquetSinkWriter(outputPath, schema)
>>>>>>>> *val stream2: DataStream[DnsRequest] = env.addSource(//consume
>>>>>>>> from kafka)*
>>>>>>>> *stream2.map { r =>*
>>>>>>>> * println(s"MAPPING $r") //this output keeps repeating in a loop*
>>>>>>>> * val genericReocrd: GenericRecord = new
>>>>>>>> GenericData.Record(schema)*
>>>>>>>> * genericReocrd.put("qname", r.qname)*
>>>>>>>> * genericReocrd.put("rcode", r.rcode)*
>>>>>>>> * genericReocrd.put("ts", r.ts)*
>>>>>>>> * genericReocrd*
>>>>>>>> * }.addSink(writer) *
>>>>>>>>
>>>>>>>> Thanks for your help
>>>>>>>> Avi
>>>>>>>>
>>>>>>>>
>>>>>
>>>>> --
>>>>> Thanks,
>>>>> Vipul
>>>>>
>>>>