Sorry, previously I got confused and I assumed you were using Flink's
StreamingFileSink.

Could you try to use Flink's Avro - Parquet writer?

StreamingFileSink.forBulkFormat(
      Path...(MY_PATH),
      ParquetAvroWriters.forGenericRecord(MY_SCHEMA))
.build()


Cheers,
Kostas

On Thu, Nov 29, 2018 at 12:25 PM Avi Levi <avi.l...@bluevoyant.com> wrote:

> Thanks.
> yes, the *env.execute* is called and enabled checkpoints
> I think the problem is where to place the *writer.close *to flush the
> cache
> If I'll place on the sink after the write event e.g
> addSink{
> writer.write
> writer.close
> }
> in this case only the first record will be included in the file but not
> the rest of the stream.
>
>
> On Thu, Nov 29, 2018 at 11:07 AM Kostas Kloudas <
> k.klou...@data-artisans.com> wrote:
>
>> Hi again Avi,
>>
>> In the first example that you posted (the one with the Kafka source), do
>> you call env.execute()?
>>
>> Cheers,
>> Kostas
>>
>> On Thu, Nov 29, 2018 at 10:01 AM Kostas Kloudas <
>> k.klou...@data-artisans.com> wrote:
>>
>>> Hi Avi,
>>>
>>> In the last snippet that you posted, you have not activated checkpoints.
>>>
>>> Checkpoints are needed for the StreamingFileSink to produce results,
>>> especially in the case of BulkWriters (like Parquet) where
>>> the part file is rolled upon reception of a checkpoint and the part is
>>> finalised (i.e. "committed") when the checkpoint gets completed
>>> successfully.
>>>
>>> Could you please enable checkpointing and make sure that the job runs
>>> long enough for at least some checkpoints to be completed?
>>>
>>> Thanks a lot,
>>> Kostas
>>>
>>> On Thu, Nov 29, 2018 at 7:03 AM Avi Levi <avi.l...@bluevoyant.com>
>>> wrote:
>>>
>>>> Checkout this little App. you can see that the file is created but no
>>>> data is written. even for a single record
>>>>
>>>> import io.eels.component.parquet.ParquetWriterConfig
>>>> import org.apache.avro.Schema
>>>> import org.apache.avro.generic.{ GenericData, GenericRecord }
>>>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>>>> import org.apache.hadoop.fs.Path
>>>> import org.apache.parquet.avro.AvroParquetWriter
>>>> import org.apache.parquet.hadoop.{ ParquetFileWriter, ParquetWriter }
>>>> import org.apache.parquet.hadoop.metadata.CompressionCodecName
>>>> import scala.io.Source
>>>> import org.apache.flink.streaming.api.scala._
>>>>
>>>> object Tester extends App {
>>>>   val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>   def now = System.currentTimeMillis()
>>>>   val path = new Path(s"test-$now.parquet")
>>>>   val schemaString = 
>>>> Source.fromURL(getClass.getResource("/request_schema.avsc")).mkString
>>>>   val schema: Schema = new Schema.Parser().parse(schemaString)
>>>>   val compressionCodecName = CompressionCodecName.SNAPPY
>>>>   val config = ParquetWriterConfig()
>>>>   val genericReocrd: GenericRecord = new GenericData.Record(schema)
>>>>   genericReocrd.put("name", "test_b")
>>>>   genericReocrd.put("code", "NoError")
>>>>   genericReocrd.put("ts", 100L)
>>>>   val stream = env.fromElements(genericReocrd)
>>>>   val writer: ParquetWriter[GenericRecord] = 
>>>> AvroParquetWriter.builder[GenericRecord](path)
>>>>     .withSchema(schema)
>>>>     .withCompressionCodec(compressionCodecName)
>>>>     .withPageSize(config.pageSize)
>>>>     .withRowGroupSize(config.blockSize)
>>>>     .withDictionaryEncoding(config.enableDictionary)
>>>>     .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
>>>>     .withValidation(config.validating)
>>>>     .build()
>>>>
>>>>   writer.write(genericReocrd)
>>>>   stream.addSink { r =>
>>>>     println(s"In Sink $r")
>>>>     writer.write(r)
>>>>   }
>>>>   env.execute()
>>>>   //  writer.close()
>>>> }
>>>>
>>>>
>>>> On Thu, Nov 29, 2018 at 6:57 AM vipul singh <neoea...@gmail.com> wrote:
>>>>
>>>>> Can you try closing the writer?
>>>>>
>>>>> AvroParquetWriter has an internal buffer. Try doing a .close() in
>>>>> snapshot()( since you are checkpointing hence this method will be called)
>>>>>
>>>>> On Wed, Nov 28, 2018 at 7:33 PM Avi Levi <avi.l...@bluevoyant.com>
>>>>> wrote:
>>>>>
>>>>>> Thanks Rafi,
>>>>>> I am actually not using assignTimestampsAndWatermarks , I will try
>>>>>> to add it as you suggested. however it seems that the messages I 
>>>>>> repeating
>>>>>> in the stream over and over even if I am pushing single message manually 
>>>>>> to
>>>>>> the queue, that message will repeat infinity
>>>>>>
>>>>>> Cheers
>>>>>> Avi
>>>>>>
>>>>>>
>>>>>> On Wed, Nov 28, 2018 at 10:40 PM Rafi Aroch <rafi.ar...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Avi,
>>>>>>>
>>>>>>> I can't see the part where you use  assignTimestampsAndWatermarks.
>>>>>>> If this part in not set properly, it's possible that watermarks are
>>>>>>> not sent and nothing will be written to your Sink.
>>>>>>>
>>>>>>> See here for more details:
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
>>>>>>>
>>>>>>> Hope this helps,
>>>>>>> Rafi
>>>>>>>
>>>>>>> On Wed, Nov 28, 2018, 21:22 Avi Levi <avi.l...@bluevoyant.com wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I am trying to implement Parquet Writer as SinkFunction. The
>>>>>>>> pipeline consists of kafka as source and parquet file as a sink 
>>>>>>>> however it
>>>>>>>> seems like the stream is repeating itself like endless loop and the 
>>>>>>>> parquet
>>>>>>>> file is not written . can someone please help me with this?
>>>>>>>>
>>>>>>>> object ParquetSinkWriter{
>>>>>>>>   private val path = new Path("tmp/pfile")
>>>>>>>>   private val schemaString =
>>>>>>>> Source.fromURL(getClass.getResource("/dns_request_schema.avsc")).mkString
>>>>>>>>   private val avroSchema: Schema = new
>>>>>>>> Schema.Parser().parse(schemaString)
>>>>>>>>   private val compressionCodecName = CompressionCodecName.SNAPPY
>>>>>>>>   private   val config = ParquetWriterConfig()
>>>>>>>>   val writer: ParquetWriter[GenericRecord] =
>>>>>>>> AvroParquetWriter.builder[GenericRecord](path)
>>>>>>>>     .withSchema(avroSchema)
>>>>>>>>     .withCompressionCodec(compressionCodecName)
>>>>>>>>     .withPageSize(config.pageSize)
>>>>>>>>     .withRowGroupSize(config.blockSize)
>>>>>>>>     .withDictionaryEncoding(config.enableDictionary)
>>>>>>>>     .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
>>>>>>>>     .withValidation(config.validating)
>>>>>>>>     .build()
>>>>>>>> }
>>>>>>>>
>>>>>>>> class ParquetSinkWriter(path: Path, avroSchema: Schema) extends
>>>>>>>> SinkFunction[GenericRecord] {
>>>>>>>>   import ParquetSinkWriter._
>>>>>>>>   override def invoke(value: GenericRecord): Unit = {
>>>>>>>>     println(s"ADDING TO File : $value") // getting this output
>>>>>>>>     writer.write(value) //the output is not written to the file
>>>>>>>>   }
>>>>>>>> }
>>>>>>>>
>>>>>>>> //main app
>>>>>>>> object StreamingJob extends App  {
>>>>>>>>  implicit val env: StreamExecutionEnvironment =
>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment
>>>>>>>>   env.enableCheckpointing(500)
>>>>>>>>
>>>>>>>> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
>>>>>>>>
>>>>>>>> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
>>>>>>>>   env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
>>>>>>>>   env.getCheckpointConfig.setCheckpointTimeout(600)
>>>>>>>>   env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
>>>>>>>>   env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
>>>>>>>>   env.setRestartStrategy(RestartStrategies.failureRateRestart(2,
>>>>>>>> Time.seconds(3), Time.seconds(3)))
>>>>>>>>   val backend: StateBackend = new
>>>>>>>> RocksDBStateBackend("file:///tmp/rocksdb", true)
>>>>>>>>   env.setStateBackend(backend)
>>>>>>>>   val writer = new ParquetSinkWriter(outputPath, schema)
>>>>>>>>   *val stream2: DataStream[DnsRequest] = env.addSource(//consume
>>>>>>>> from kafka)*
>>>>>>>> *stream2.map { r =>*
>>>>>>>> *    println(s"MAPPING $r") //this output keeps repeating in a loop*
>>>>>>>> *    val genericReocrd: GenericRecord = new
>>>>>>>> GenericData.Record(schema)*
>>>>>>>> *    genericReocrd.put("qname", r.qname)*
>>>>>>>> *    genericReocrd.put("rcode", r.rcode)*
>>>>>>>> *    genericReocrd.put("ts", r.ts)*
>>>>>>>> *    genericReocrd*
>>>>>>>> *  }.addSink(writer) *
>>>>>>>>
>>>>>>>> Thanks for your help
>>>>>>>> Avi
>>>>>>>>
>>>>>>>>
>>>>>
>>>>> --
>>>>> Thanks,
>>>>> Vipul
>>>>>
>>>>

Reply via email to