[File partition Flink]

2022-03-17 Thread lan tran
Hi team, I have some questions about the format when I process the filesIn-progress / Pending: part--.inprogress.uidFinished: part--Can you explain more about the partFileIndex since the format of the files is quite weird. It produces two files (I wonder it related to the parallelism which we have

Asking about the partition files

2022-03-22 Thread lan tran
Hi team, So basically, when I use Flink Table API to generate the files and store in S3. The format files will be like this part-0d373ee1-d594-40b1-a3cf-8fa895260980-0-0. So my question is is there any way that we can config this files names (by adding the last_modified_value) to this files name ?B

FileSystem format

2022-03-22 Thread lan tran
Hi team, So basically, when I use Flink Table API to generate the files and store in S3. The format files will be like this part-0d373ee1-d594-40b1-a3cf-8fa895260980-0-0. So my question is is there any way that we can config this files names (by adding the last_modified_value) to this files name ?B

Date time convert

2022-03-27 Thread lan tran
Hi team,Currently, I was facing this situation, I have the format string datetime like this "2018-03-29T07:39:49.722594Z". So how can I convert this into timestamp with local time zone ?My current solution as cast as belowTO_TIMESTAMP(REPLACE(parcel.picked_up_date, 'T', ' '),'-MM-dd HH:mm:ss%z'

Datetime format

2022-03-27 Thread lan tran
Hi team, So basically, when I use Flink Table API to generate the files and store in S3. The format files will be like this part-0d373ee1-d594-40b1-a3cf-8fa895260980-0-0. So my question is is there any way that we can config this files names (by adding the last_modified_value) to this files name ?B

Naming sql_statment job

2022-03-30 Thread lan tran
Hi team, When I was using Table API to submit the SQL job using execute_query(), the name is created by Flink. However, I wonder there is a way to config that name. I see that in the SQL-Client they have this statementSET 'pipeline.name' = '{job_name}'. Wonder that if it can execute this using exec

AvroRowDeserializationSchema

2022-04-20 Thread lan tran
Hi team, I want to implement AvroRowDeserializationSchema when consume data from Kafka, however from the documentation, I did not understand what are avro_schema_string and record_class ? I would be great if you can give me the example on this (I only have the example on Java, however, I was doing

DebeziumAvroDeserializationSchema

2022-04-21 Thread lan tran
Hi team,Currently, I did not see this functions in PyFlink, therefore any suggestion on using this on PyFlink ?Best,Quynh. Sent from Mail for Windows 

UUID on TableAPI

2022-04-21 Thread lan tran
Hi team,Currently, I want to use savepoints in Flink. However, one of the things that I concern is that is there any way we can set the UUID while using Table API (SQL API) ? If not, does it has any mechanism to know that when we start the Flink again, it will know that it was that UUID ?Best,Quynh

RE: AvroRowDeserializationSchema

2022-04-21 Thread lan tran
Wonder if this is a bug or not but if I use AvroRowDeserializationSchema,In PyFlink the error still occure ?py4j.protocol.Py4JError: An error occurred while calling None.org.apache.flink.formats.avro.AvroRowDeserializationSchema. Trace:org.apache.flink.api.python.shaded.py4j.Py4JException: Construc

RE: DebeziumAvroDeserializationSchema

2022-04-21 Thread lan tran
: Constructor org.apache.flink.formats.avro.AvroRowDeserializationSchema([class org.apache.avro.Schema$RecordSchema]) does not existTherefore, please help check. ThanksBest,Quynh  On 2022/04/22 03:14:43 lan tran wrote:> Hi team,  >   > Currently, I did not see this functions in PyFlink, ther

RE: AvroRowDeserializationSchema

2022-04-24 Thread lan tran
egards,Dian On Fri, Apr 22, 2022 at 1:42 PM lan tran <indigoblue7...@gmail.com> wrote:Wonder if this is a bug or not but if I use AvroRowDeserializationSchema,In PyFlink the error still occure ?py4j.protocol.Py4JError: An error occurred w

RE: AvroRowDeserializationSchema

2022-04-24 Thread lan tran
s,Dian[1] https://github.com/apache/flink/blob/e11a5c52c613e121f7a7868cbbfd9e7c21551394/flink-python/pyflink/common/serialization.py#L308 On Mon, Apr 25, 2022 at 11:27 AM lan tran <indigoblue7...@gmail.com> wrote:Thank Dian !! Very appreciate this.However, I have another questions related to this. In c

RE: AvroRowDeserializationSchema

2022-04-25 Thread lan tran
end UPDATE messages to downstream operators. RegardsDian On Mon, Apr 25, 2022 at 12:31 PM lan tran <indigoblue7...@gmail.com> wrote:Yeah, I already tried that way. However, if we did not use DataStream at first. We cannot implement the Savepoint since through the doc if we use TableAPI (SQL API)

RE: UUID on TableAPI

2022-04-25 Thread lan tran
ution. However it might work for cases where you wish to pause/resume a job. On Fri, 22 Apr 2022 at 13:54, lan tran <indigoblue7...@gmail.com> wrote:Hi team,Currently, I want to use savepoints in Flink. However, one of the things that I concern is that is there any way we can set the UUID while usin

RE: UUID on TableAPI

2022-04-25 Thread lan tran
version and the query both remain the same then you can restart a job from a savepoint, this means that it might be workable for running a low-criticality job on say an AWS spot instance. That's about all I know. On Tue, 26 Apr 2022 at 10:17, lan tran <indigoblue7...@gmail.com> wrote:Hi Fra

RE: AvroRowDeserializationSchema

2022-04-27 Thread lan tran
source operator using the same code above, then perform operations with DataStream API.Regards,Dian On Mon, Apr 25, 2022 at 9:27 PM lan tran <indigoblue7...@gmail.com> wrote:Hi Dian, Thank again for fast response.As your suggestion above, we can apply to set the UID for only for the DataStream sta

RE: AvroRowDeserializationSchema

2022-04-28 Thread lan tran
Thu, Apr 28, 2022 at 11:00 AM lan tran <indigoblue7...@gmail.com> wrote:Hi Dian,Sorry for missing your mail, so if I did as your suggestion and the Flink somehow crashed and we have to restart the service, does the Flink job know the offset where does it read from Kafka ? Sent from Mail for W

Join two data stream using DataStream API using PyFlink

2022-05-13 Thread lan tran
Hi team, I have the use case is that I want to join two datastream that have the same id. If we convert into sql we will have something like thisSELECT suppliers.supplier_id, suppliers.supplier_name, orders.order_dateFROM suppliers INNER JOIN ordersON suppliers.supplier_id = orders.supplier_id;Howe

The methodlogy behind the join in Table API and Datastream

2022-06-29 Thread lan tran
Hi team,I have the question about the methodology behind the joining using SQL-Client and DataStream. I have some scenario like this: I have two tables: t1 and t2 and I consume the WAL log from it and send to Kafka. Next, I will join two tables above together and convert this table in changelog str

Out of memory in heap memory when working with state

2022-09-05 Thread lan tran
Hi team,Currently, I was facing the OutOfMemoryError: Java heap space. This was some how due to the fact that I was storing the state on FileSystem. With the FsStateBackend, the working state for each task manager is in memory (on the JVM heap), and state backups (checkpoints) go to a distributed f