Recommended way to split datastream in PyFlink

2021-03-21 Thread Sumeet Malhotra
Hi, I have a use case where I need to process incoming records on a Kafka topic based on a certain record field that defines the record type. What I'm thinking is to split the incoming datastream into record-type specific streams and then apply record-type specific stream processing on each. What

Re: Recommended way to split datastream in PyFlink

2021-03-22 Thread Sumeet Malhotra
put datastream and then apply record > specific filters? > I'm afraid that yes. > > Regards, > Dian > > > On Sun, Mar 21, 2021 at 5:20 PM Sumeet Malhotra > wrote: > >> Hi, >> >> I have a use case where I need to process incoming records on a Ka

Re: Recommended way to split datastream in PyFlink

2021-03-22 Thread Sumeet Malhotra
Apologies. I meant `StreamTableEnvironment.to_append_stream` in my last message. On Mon, Mar 22, 2021 at 2:03 PM Sumeet Malhotra wrote: > Thanks Dian. > > Another question I have is, since PyFlink Datastream API still doesn't > have native Window support, what's the recomm

PyFlink Table API: Interpret datetime field from Kafka as event time

2021-03-29 Thread Sumeet Malhotra
Hi, Might be a simple, stupid question, but I'm not able to find how to convert/interpret a UTC datetime string like *2021-03-23T07:37:00.613910Z* as event-time using a DDL/Table API. I'm ingesting data from Kafka and can read this field as a string, but would like to mark it as event-time by defi

Re: PyFlink Table API: Interpret datetime field from Kafka as event time

2021-03-29 Thread Sumeet Malhotra
the DataStream API. > > Piotrek > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/time_attributes.html#during-datastream-to-table-conversion > > pon., 29 mar 2021 o 18:07 Sumeet Malhotra > napisał(a): > >> Hi, >> >

Re: PyFlink Table API: Interpret datetime field from Kafka as event time

2021-03-31 Thread Sumeet Malhotra
che.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/formats/json.html#json-timestamp-format-standard > On 30/03/2021 06:45, Sumeet Malhotra wrote: > > Thanks. Yes, that's a possibility. I'd still prefer something that can be > done within the Table API. If it&#x

PyFlink: Extract nested fields from JSON array

2021-03-31 Thread Sumeet Malhotra
Cross posting from StackOverlow here: https://stackoverflow.com/questions/66888486/pyflink-extract-nested-fields-from-json-array Any pointers are appreciated! Thanks, Sumeet

Avro schema

2021-04-01 Thread Sumeet Malhotra
Hi, Is it possible to directly import Avro schema while ingesting data into Flink? Or do we always have to specify the entire schema in either SQL DDL for Table API or using DataStream data types? From a code maintenance standpoint, it would be really helpful to keep one source of truth for the sc

Re: Avro schema

2021-04-01 Thread Sumeet Malhotra
Sumeet On Fri, Apr 2, 2021 at 8:26 AM Sumeet Malhotra wrote: > Hi, > > Is it possible to directly import Avro schema while ingesting data into > Flink? Or do we always have to specify the entire schema in either SQL DDL > for Table API or using DataStream data types? From a code ma

Re: Avro schema

2021-04-13 Thread Sumeet Malhotra
gt; >> I’m not a Table/SQL API expert, but from my knowledge, it’s not viable to >> derived SQL table schemas from Avro schemas, because table schemas would be >> the ground truth by design. >> Moreover, one Avro type can be mapped to multiple Flink types, so in >> prac

Extract/Interpret embedded byte data from a record

2021-04-13 Thread Sumeet Malhotra
Hi, I'm reading data from Kafka, which is Avro encoded and has the following general schema: { "name": "SomeName", "doc": "Avro schema with variable embedded encodings", "type": "record", "fields": [ { "name": "Name", "doc": "My name", "type": "string" }, {

Re: Extract/Interpret embedded byte data from a record

2021-04-14 Thread Sumeet Malhotra
ubsequent mapping function, you could change > the record type and deserialize the result. In Data Stream API: > > source.map(new MapFunction record_with_deserialized_result> { ...} ) > > Best, > Piotrek > > śr., 14 kwi 2021 o 03:17 Sumeet Malhotra > napisał(a): > >&

Re: Extract/Interpret embedded byte data from a record

2021-04-14 Thread Sumeet Malhotra
re supported in Python. Thanks, Sumeet On Thu, Apr 15, 2021 at 6:31 AM Sumeet Malhotra wrote: > Thanks Piotrek! I forgot to mention that I'm using PyFlink and mostly > Table APIs. The documentation ( > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/table

Accessing columns from input stream table during Window operations

2021-04-18 Thread Sumeet Malhotra
Hi, I have a use case where I'm creating a Tumbling window as follows: "input" table has columns [Timestamp, a, b, c] input \ .window(Tumble.over(lit(10).seconds).on(input.Timestamp).alias("w")) \ .group_by(col('w'), input.a) \ .select( col('w').start.alias('window_start'),

Re: Accessing columns from input stream table during Window operations

2021-04-19 Thread Sumeet Malhotra
ps://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#group-windows > > Best, > Guowei > > > On Mon, Apr 19, 2021 at 2:24 PM Sumeet Malhotra > wrote: > >> Hi, >> >> I have a use case where I'm creating a Tumbling window

Re: Accessing columns from input stream table during Window operations

2021-04-19 Thread Sumeet Malhotra
Windows[1], which could keep the > "non-group-key" column. > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#over-windows > > Best, > Guowei > > > On Mon, Apr 19, 2021 at 3:25 PM Sumeet Malhotra > wrote: > >

Re: Accessing columns from input stream table during Window operations

2021-04-19 Thread Sumeet Malhotra
] > https://ci.apache.org/projects/flink/flink-docs-release-1.12/flinkDev/building.html#build-pyflink > > > 2021年4月19日 下午5:16,Sumeet Malhotra 写道: > > Thanks Guowei. I'm trying out Over Windows, as follows: > > input \ > .over_window( >

Re: Accessing columns from input stream table during Window operations

2021-04-20 Thread Sumeet Malhotra
alue for input.a is always the same, it’s equal to group_by( >> col(‘w'), input.b) logically. The benefit is that you could access >> input.a directly in the select clause. >> >> Regards, >> Dian >> >> 2021年4月19日 下午6:29,Sumeet Malhotra 写道: >>

Best practice for packaging and deploying Flink jobs on K8S

2021-04-28 Thread Sumeet Malhotra
Hi, I have a PyFlink job that consists of: - Multiple Python files. - Multiple 3rdparty Python dependencies, specified in a `requirements.txt` file. - A few Java dependencies, mainly for external connectors. - An overall job config YAML file. Here's a simplified structure of the c

Re: Best practice for packaging and deploying Flink jobs on K8S

2021-04-29 Thread Sumeet Malhotra
tell you more about the > Python deployment options. > > If you are not running on a session cluster, then you can also create a > K8s image which contains your user code. That way you ship your job when > deploying the cluster. > > Cheers, > Till > > On Wed, Apr

Re: Best practice for packaging and deploying Flink jobs on K8S

2021-05-02 Thread Sumeet Malhotra
now. I’ll try to add it back ASAP. >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/cli.html >> >> Regards, >> Dian >> >> 2021年4月29日 下午3:24,Sumeet Malhotra 写道: >> >> Hi Till, >> >> There’s no problem wit

Is keyed state supported in PyFlink?

2021-05-03 Thread Sumeet Malhotra
Hi, Is keyed state [1] supported by PyFlink yet? I can see some code for it in the Flink master branch, but there's no mention of it in the 1.12 Python documentation. Thanks, Sumeet [1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/state.html

Define rowtime on intermediate table field

2021-05-04 Thread Sumeet Malhotra
Hi, My use case involves reading raw data records from Kafka and processing them. The records are coming from a database, where a periodic job reads new rows, packages them into a single JSON object (as described below) and writes the entire record to Kafka. { 'id': 'some_id', 'key_a': 'v

Re: Is keyed state supported in PyFlink?

2021-05-05 Thread Sumeet Malhotra
Thanks Dian. Yes, I hadn't looked at the 1.13.0 documentation earlier. On Wed, May 5, 2021 at 1:46 PM Dian Fu wrote: > Hi Sumeet, > > This feature is supported in 1.13.0 which was just released and so there > is no documentation about it in 1.12. > > Regards, > Dian &

PyFlink: Split input table stream using filter()

2021-05-05 Thread Sumeet Malhotra
Hi, I would like to split streamed data from Kafka into 2 streams based on some filter criteria, using PyFlink Table API. As described here [1], a way to do this is to use .filter() which should split the stream for parallel processing. Does this approach work in Table API as well? I'm doing the

Access Row fields by attribute name rather than by index in PyFlink TableFunction

2021-05-19 Thread Sumeet Malhotra
Hi, According to the documentation for PyFlink Table row based operations [1], typical usage is as follows: @udtf(result_types=[DataTypes.INT(), DataTypes.STRING()]) def split(x: Row) -> Row: for s in x[1].split(","): yield x[0], s table.flat_map(split) Is there any way that row fie

Re: Access Row fields by attribute name rather than by index in PyFlink TableFunction

2021-05-19 Thread Sumeet Malhotra
-22612 > [2] https://issues.apache.org/jira/browse/FLINK-22712 > > Best, > Xingbo > > Sumeet Malhotra 于2021年5月19日周三 下午4:45写道: > >> Hi, >> >> According to the documentation for PyFlink Table row based operations >> [1], typical usage is as fo

PyFlink: Upload resource files to Flink cluster

2021-06-09 Thread Sumeet Malhotra
Hi, I'm using UDTFs in PyFlink, that depend upon a few resource files (JSON schema files actually). The path of this file can be passed into the UDTF, but essentially this path needs to exist on the Task Manager node where the task executes. What's the best way to upload these resource files? As o

Re: PyFlink: Upload resource files to Flink cluster

2021-06-11 Thread Sumeet Malhotra
option is what you need. The documentation says > only zip format is supported. > Alternatively, you could upload the files to S3 or other DFS and > access from TMs and re-upload when needed. > > [1] > > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/python/depe

Re: PyFlink: Upload resource files to Flink cluster

2021-06-11 Thread Sumeet Malhotra
yan wrote: > Hi Sumeet, > > Probably there is an issue with uploading the archive while submitting the > job. > The commands and API usage look good to me. > Dian could you please confirm that? > > Regards, > Roman > > On Fri, Jun 11, 2021 at 9:04 AM Sumeet Malhotra >

Application mode - Custom Flink docker image with Python user code

2021-10-26 Thread Sumeet Malhotra
Hi, I'm currently submitting my Python user code from my local machine to a Flink cluster running in Session mode on Kubernetes. For this, I have a custom Flink image with Python as per this reference [1]. Now, I'd like to move to using the Application mode with Native Kubernetes, where the user