Thanks a lot Kostas, but the file not created . what am I doing wrong?
BTW how can you set the encoding etc' in Flink's Avro - Parquet writer?

object Tester extends App {
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  def now = System.currentTimeMillis()
  val path = new Path(s"test-$now.parquet")
  val schema: Schema = new Schema.Parser().parse(schemaString)
  val streamingSink = StreamingFileSink.forBulkFormat( path,
  ParquetAvroWriters.forGenericRecord(schema))
  .build()
  env.enableCheckpointing(100)
  val stream = env.addSource(FirstSeenQueueImpl.consumer).map{ r =>
    val genericReocrd: GenericRecord = new GenericData.Record(schema)
    genericReocrd.put("name", r.name)
    genericReocrd.put("code", r.code.asString)
    genericReocrd.put("ts", r.ts)
    genericReocrd
  }
    stream.addSink { r =>
        println(s"In Sink $r") //getting this line
        streamingSink
    }
  env.execute()
}

Cheers
Avi

On Thu, Nov 29, 2018 at 1:36 PM Kostas Kloudas <k.klou...@data-artisans.com>
wrote:

>
> Sorry, previously I got confused and I assumed you were using Flink's
> StreamingFileSink.
>
> Could you try to use Flink's Avro - Parquet writer?
>
> StreamingFileSink.forBulkFormat(
>       Path...(MY_PATH),
>       ParquetAvroWriters.forGenericRecord(MY_SCHEMA))
> .build()
>
>
> Cheers,
> Kostas
>
> On Thu, Nov 29, 2018 at 12:25 PM Avi Levi <avi.l...@bluevoyant.com> wrote:
>
>> Thanks.
>> yes, the *env.execute* is called and enabled checkpoints
>> I think the problem is where to place the *writer.close *to flush the
>> cache
>> If I'll place on the sink after the write event e.g
>> addSink{
>> writer.write
>> writer.close
>> }
>> in this case only the first record will be included in the file but not
>> the rest of the stream.
>>
>>
>> On Thu, Nov 29, 2018 at 11:07 AM Kostas Kloudas <
>> k.klou...@data-artisans.com> wrote:
>>
>>> Hi again Avi,
>>>
>>> In the first example that you posted (the one with the Kafka source), do
>>> you call env.execute()?
>>>
>>> Cheers,
>>> Kostas
>>>
>>> On Thu, Nov 29, 2018 at 10:01 AM Kostas Kloudas <
>>> k.klou...@data-artisans.com> wrote:
>>>
>>>> Hi Avi,
>>>>
>>>> In the last snippet that you posted, you have not activated checkpoints.
>>>>
>>>> Checkpoints are needed for the StreamingFileSink to produce results,
>>>> especially in the case of BulkWriters (like Parquet) where
>>>> the part file is rolled upon reception of a checkpoint and the part is
>>>> finalised (i.e. "committed") when the checkpoint gets completed
>>>> successfully.
>>>>
>>>> Could you please enable checkpointing and make sure that the job runs
>>>> long enough for at least some checkpoints to be completed?
>>>>
>>>> Thanks a lot,
>>>> Kostas
>>>>
>>>> On Thu, Nov 29, 2018 at 7:03 AM Avi Levi <avi.l...@bluevoyant.com>
>>>> wrote:
>>>>
>>>>> Checkout this little App. you can see that the file is created but no
>>>>> data is written. even for a single record
>>>>>
>>>>> import io.eels.component.parquet.ParquetWriterConfig
>>>>> import org.apache.avro.Schema
>>>>> import org.apache.avro.generic.{ GenericData, GenericRecord }
>>>>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>>>>> import org.apache.hadoop.fs.Path
>>>>> import org.apache.parquet.avro.AvroParquetWriter
>>>>> import org.apache.parquet.hadoop.{ ParquetFileWriter, ParquetWriter }
>>>>> import org.apache.parquet.hadoop.metadata.CompressionCodecName
>>>>> import scala.io.Source
>>>>> import org.apache.flink.streaming.api.scala._
>>>>>
>>>>> object Tester extends App {
>>>>>   val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>>   def now = System.currentTimeMillis()
>>>>>   val path = new Path(s"test-$now.parquet")
>>>>>   val schemaString = 
>>>>> Source.fromURL(getClass.getResource("/request_schema.avsc")).mkString
>>>>>   val schema: Schema = new Schema.Parser().parse(schemaString)
>>>>>   val compressionCodecName = CompressionCodecName.SNAPPY
>>>>>   val config = ParquetWriterConfig()
>>>>>   val genericReocrd: GenericRecord = new GenericData.Record(schema)
>>>>>   genericReocrd.put("name", "test_b")
>>>>>   genericReocrd.put("code", "NoError")
>>>>>   genericReocrd.put("ts", 100L)
>>>>>   val stream = env.fromElements(genericReocrd)
>>>>>   val writer: ParquetWriter[GenericRecord] = 
>>>>> AvroParquetWriter.builder[GenericRecord](path)
>>>>>     .withSchema(schema)
>>>>>     .withCompressionCodec(compressionCodecName)
>>>>>     .withPageSize(config.pageSize)
>>>>>     .withRowGroupSize(config.blockSize)
>>>>>     .withDictionaryEncoding(config.enableDictionary)
>>>>>     .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
>>>>>     .withValidation(config.validating)
>>>>>     .build()
>>>>>
>>>>>   writer.write(genericReocrd)
>>>>>   stream.addSink { r =>
>>>>>     println(s"In Sink $r")
>>>>>     writer.write(r)
>>>>>   }
>>>>>   env.execute()
>>>>>   //  writer.close()
>>>>> }
>>>>>
>>>>>
>>>>> On Thu, Nov 29, 2018 at 6:57 AM vipul singh <neoea...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Can you try closing the writer?
>>>>>>
>>>>>> AvroParquetWriter has an internal buffer. Try doing a .close() in
>>>>>> snapshot()( since you are checkpointing hence this method will be called)
>>>>>>
>>>>>> On Wed, Nov 28, 2018 at 7:33 PM Avi Levi <avi.l...@bluevoyant.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks Rafi,
>>>>>>> I am actually not using assignTimestampsAndWatermarks , I will try
>>>>>>> to add it as you suggested. however it seems that the messages I 
>>>>>>> repeating
>>>>>>> in the stream over and over even if I am pushing single message 
>>>>>>> manually to
>>>>>>> the queue, that message will repeat infinity
>>>>>>>
>>>>>>> Cheers
>>>>>>> Avi
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Nov 28, 2018 at 10:40 PM Rafi Aroch <rafi.ar...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Avi,
>>>>>>>>
>>>>>>>> I can't see the part where you use  assignTimestampsAndWatermarks.
>>>>>>>> If this part in not set properly, it's possible that watermarks are
>>>>>>>> not sent and nothing will be written to your Sink.
>>>>>>>>
>>>>>>>> See here for more details:
>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
>>>>>>>>
>>>>>>>> Hope this helps,
>>>>>>>> Rafi
>>>>>>>>
>>>>>>>> On Wed, Nov 28, 2018, 21:22 Avi Levi <avi.l...@bluevoyant.com
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> I am trying to implement Parquet Writer as SinkFunction. The
>>>>>>>>> pipeline consists of kafka as source and parquet file as a sink 
>>>>>>>>> however it
>>>>>>>>> seems like the stream is repeating itself like endless loop and the 
>>>>>>>>> parquet
>>>>>>>>> file is not written . can someone please help me with this?
>>>>>>>>>
>>>>>>>>> object ParquetSinkWriter{
>>>>>>>>>   private val path = new Path("tmp/pfile")
>>>>>>>>>   private val schemaString =
>>>>>>>>> Source.fromURL(getClass.getResource("/dns_request_schema.avsc")).mkString
>>>>>>>>>   private val avroSchema: Schema = new
>>>>>>>>> Schema.Parser().parse(schemaString)
>>>>>>>>>   private val compressionCodecName = CompressionCodecName.SNAPPY
>>>>>>>>>   private   val config = ParquetWriterConfig()
>>>>>>>>>   val writer: ParquetWriter[GenericRecord] =
>>>>>>>>> AvroParquetWriter.builder[GenericRecord](path)
>>>>>>>>>     .withSchema(avroSchema)
>>>>>>>>>     .withCompressionCodec(compressionCodecName)
>>>>>>>>>     .withPageSize(config.pageSize)
>>>>>>>>>     .withRowGroupSize(config.blockSize)
>>>>>>>>>     .withDictionaryEncoding(config.enableDictionary)
>>>>>>>>>     .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
>>>>>>>>>     .withValidation(config.validating)
>>>>>>>>>     .build()
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> class ParquetSinkWriter(path: Path, avroSchema: Schema) extends
>>>>>>>>> SinkFunction[GenericRecord] {
>>>>>>>>>   import ParquetSinkWriter._
>>>>>>>>>   override def invoke(value: GenericRecord): Unit = {
>>>>>>>>>     println(s"ADDING TO File : $value") // getting this output
>>>>>>>>>     writer.write(value) //the output is not written to the file
>>>>>>>>>   }
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> //main app
>>>>>>>>> object StreamingJob extends App  {
>>>>>>>>>  implicit val env: StreamExecutionEnvironment =
>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment
>>>>>>>>>   env.enableCheckpointing(500)
>>>>>>>>>
>>>>>>>>> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
>>>>>>>>>
>>>>>>>>> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
>>>>>>>>>   env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
>>>>>>>>>   env.getCheckpointConfig.setCheckpointTimeout(600)
>>>>>>>>>   env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
>>>>>>>>>   env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
>>>>>>>>>   env.setRestartStrategy(RestartStrategies.failureRateRestart(2,
>>>>>>>>> Time.seconds(3), Time.seconds(3)))
>>>>>>>>>   val backend: StateBackend = new
>>>>>>>>> RocksDBStateBackend("file:///tmp/rocksdb", true)
>>>>>>>>>   env.setStateBackend(backend)
>>>>>>>>>   val writer = new ParquetSinkWriter(outputPath, schema)
>>>>>>>>>   *val stream2: DataStream[DnsRequest] = env.addSource(//consume
>>>>>>>>> from kafka)*
>>>>>>>>> *stream2.map { r =>*
>>>>>>>>> *    println(s"MAPPING $r") //this output keeps repeating in a
>>>>>>>>> loop*
>>>>>>>>> *    val genericReocrd: GenericRecord = new
>>>>>>>>> GenericData.Record(schema)*
>>>>>>>>> *    genericReocrd.put("qname", r.qname)*
>>>>>>>>> *    genericReocrd.put("rcode", r.rcode)*
>>>>>>>>> *    genericReocrd.put("ts", r.ts)*
>>>>>>>>> *    genericReocrd*
>>>>>>>>> *  }.addSink(writer) *
>>>>>>>>>
>>>>>>>>> Thanks for your help
>>>>>>>>> Avi
>>>>>>>>>
>>>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Thanks,
>>>>>> Vipul
>>>>>>
>>>>>

Reply via email to