Thanks.
yes, the *env.execute* is called and enabled checkpoints
I think the problem is where to place the *writer.close *to flush the cache
If I'll place on the sink after the write event e.g
addSink{
writer.write
writer.close
}
in this case only the first record will be included in the file but not the
rest of the stream.


On Thu, Nov 29, 2018 at 11:07 AM Kostas Kloudas <k.klou...@data-artisans.com>
wrote:

> Hi again Avi,
>
> In the first example that you posted (the one with the Kafka source), do
> you call env.execute()?
>
> Cheers,
> Kostas
>
> On Thu, Nov 29, 2018 at 10:01 AM Kostas Kloudas <
> k.klou...@data-artisans.com> wrote:
>
>> Hi Avi,
>>
>> In the last snippet that you posted, you have not activated checkpoints.
>>
>> Checkpoints are needed for the StreamingFileSink to produce results,
>> especially in the case of BulkWriters (like Parquet) where
>> the part file is rolled upon reception of a checkpoint and the part is
>> finalised (i.e. "committed") when the checkpoint gets completed
>> successfully.
>>
>> Could you please enable checkpointing and make sure that the job runs
>> long enough for at least some checkpoints to be completed?
>>
>> Thanks a lot,
>> Kostas
>>
>> On Thu, Nov 29, 2018 at 7:03 AM Avi Levi <avi.l...@bluevoyant.com> wrote:
>>
>>> Checkout this little App. you can see that the file is created but no
>>> data is written. even for a single record
>>>
>>> import io.eels.component.parquet.ParquetWriterConfig
>>> import org.apache.avro.Schema
>>> import org.apache.avro.generic.{ GenericData, GenericRecord }
>>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>>> import org.apache.hadoop.fs.Path
>>> import org.apache.parquet.avro.AvroParquetWriter
>>> import org.apache.parquet.hadoop.{ ParquetFileWriter, ParquetWriter }
>>> import org.apache.parquet.hadoop.metadata.CompressionCodecName
>>> import scala.io.Source
>>> import org.apache.flink.streaming.api.scala._
>>>
>>> object Tester extends App {
>>>   val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>   def now = System.currentTimeMillis()
>>>   val path = new Path(s"test-$now.parquet")
>>>   val schemaString = 
>>> Source.fromURL(getClass.getResource("/request_schema.avsc")).mkString
>>>   val schema: Schema = new Schema.Parser().parse(schemaString)
>>>   val compressionCodecName = CompressionCodecName.SNAPPY
>>>   val config = ParquetWriterConfig()
>>>   val genericReocrd: GenericRecord = new GenericData.Record(schema)
>>>   genericReocrd.put("name", "test_b")
>>>   genericReocrd.put("code", "NoError")
>>>   genericReocrd.put("ts", 100L)
>>>   val stream = env.fromElements(genericReocrd)
>>>   val writer: ParquetWriter[GenericRecord] = 
>>> AvroParquetWriter.builder[GenericRecord](path)
>>>     .withSchema(schema)
>>>     .withCompressionCodec(compressionCodecName)
>>>     .withPageSize(config.pageSize)
>>>     .withRowGroupSize(config.blockSize)
>>>     .withDictionaryEncoding(config.enableDictionary)
>>>     .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
>>>     .withValidation(config.validating)
>>>     .build()
>>>
>>>   writer.write(genericReocrd)
>>>   stream.addSink { r =>
>>>     println(s"In Sink $r")
>>>     writer.write(r)
>>>   }
>>>   env.execute()
>>>   //  writer.close()
>>> }
>>>
>>>
>>> On Thu, Nov 29, 2018 at 6:57 AM vipul singh <neoea...@gmail.com> wrote:
>>>
>>>> Can you try closing the writer?
>>>>
>>>> AvroParquetWriter has an internal buffer. Try doing a .close() in
>>>> snapshot()( since you are checkpointing hence this method will be called)
>>>>
>>>> On Wed, Nov 28, 2018 at 7:33 PM Avi Levi <avi.l...@bluevoyant.com>
>>>> wrote:
>>>>
>>>>> Thanks Rafi,
>>>>> I am actually not using assignTimestampsAndWatermarks , I will try to
>>>>> add it as you suggested. however it seems that the messages I repeating in
>>>>> the stream over and over even if I am pushing single message manually to
>>>>> the queue, that message will repeat infinity
>>>>>
>>>>> Cheers
>>>>> Avi
>>>>>
>>>>>
>>>>> On Wed, Nov 28, 2018 at 10:40 PM Rafi Aroch <rafi.ar...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Avi,
>>>>>>
>>>>>> I can't see the part where you use  assignTimestampsAndWatermarks.
>>>>>> If this part in not set properly, it's possible that watermarks are
>>>>>> not sent and nothing will be written to your Sink.
>>>>>>
>>>>>> See here for more details:
>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
>>>>>>
>>>>>> Hope this helps,
>>>>>> Rafi
>>>>>>
>>>>>> On Wed, Nov 28, 2018, 21:22 Avi Levi <avi.l...@bluevoyant.com wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I am trying to implement Parquet Writer as SinkFunction. The
>>>>>>> pipeline consists of kafka as source and parquet file as a sink however 
>>>>>>> it
>>>>>>> seems like the stream is repeating itself like endless loop and the 
>>>>>>> parquet
>>>>>>> file is not written . can someone please help me with this?
>>>>>>>
>>>>>>> object ParquetSinkWriter{
>>>>>>>   private val path = new Path("tmp/pfile")
>>>>>>>   private val schemaString =
>>>>>>> Source.fromURL(getClass.getResource("/dns_request_schema.avsc")).mkString
>>>>>>>   private val avroSchema: Schema = new
>>>>>>> Schema.Parser().parse(schemaString)
>>>>>>>   private val compressionCodecName = CompressionCodecName.SNAPPY
>>>>>>>   private   val config = ParquetWriterConfig()
>>>>>>>   val writer: ParquetWriter[GenericRecord] =
>>>>>>> AvroParquetWriter.builder[GenericRecord](path)
>>>>>>>     .withSchema(avroSchema)
>>>>>>>     .withCompressionCodec(compressionCodecName)
>>>>>>>     .withPageSize(config.pageSize)
>>>>>>>     .withRowGroupSize(config.blockSize)
>>>>>>>     .withDictionaryEncoding(config.enableDictionary)
>>>>>>>     .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
>>>>>>>     .withValidation(config.validating)
>>>>>>>     .build()
>>>>>>> }
>>>>>>>
>>>>>>> class ParquetSinkWriter(path: Path, avroSchema: Schema) extends
>>>>>>> SinkFunction[GenericRecord] {
>>>>>>>   import ParquetSinkWriter._
>>>>>>>   override def invoke(value: GenericRecord): Unit = {
>>>>>>>     println(s"ADDING TO File : $value") // getting this output
>>>>>>>     writer.write(value) //the output is not written to the file
>>>>>>>   }
>>>>>>> }
>>>>>>>
>>>>>>> //main app
>>>>>>> object StreamingJob extends App  {
>>>>>>>  implicit val env: StreamExecutionEnvironment =
>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment
>>>>>>>   env.enableCheckpointing(500)
>>>>>>>
>>>>>>> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
>>>>>>>
>>>>>>> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
>>>>>>>   env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
>>>>>>>   env.getCheckpointConfig.setCheckpointTimeout(600)
>>>>>>>   env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
>>>>>>>   env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
>>>>>>>   env.setRestartStrategy(RestartStrategies.failureRateRestart(2,
>>>>>>> Time.seconds(3), Time.seconds(3)))
>>>>>>>   val backend: StateBackend = new
>>>>>>> RocksDBStateBackend("file:///tmp/rocksdb", true)
>>>>>>>   env.setStateBackend(backend)
>>>>>>>   val writer = new ParquetSinkWriter(outputPath, schema)
>>>>>>>   *val stream2: DataStream[DnsRequest] = env.addSource(//consume
>>>>>>> from kafka)*
>>>>>>> *stream2.map { r =>*
>>>>>>> *    println(s"MAPPING $r") //this output keeps repeating in a loop*
>>>>>>> *    val genericReocrd: GenericRecord = new
>>>>>>> GenericData.Record(schema)*
>>>>>>> *    genericReocrd.put("qname", r.qname)*
>>>>>>> *    genericReocrd.put("rcode", r.rcode)*
>>>>>>> *    genericReocrd.put("ts", r.ts)*
>>>>>>> *    genericReocrd*
>>>>>>> *  }.addSink(writer) *
>>>>>>>
>>>>>>> Thanks for your help
>>>>>>> Avi
>>>>>>>
>>>>>>>
>>>>
>>>> --
>>>> Thanks,
>>>> Vipul
>>>>
>>>

Reply via email to