Hi,
I have a use case where I need to process incoming records on a Kafka topic
based on a certain record field that defines the record type.
What I'm thinking is to split the incoming datastream into record-type
specific streams and then apply record-type specific stream processing on
each. What
put datastream and then apply record
> specific filters?
> I'm afraid that yes.
>
> Regards,
> Dian
>
>
> On Sun, Mar 21, 2021 at 5:20 PM Sumeet Malhotra
> wrote:
>
>> Hi,
>>
>> I have a use case where I need to process incoming records on a Ka
Apologies. I meant `StreamTableEnvironment.to_append_stream` in my last
message.
On Mon, Mar 22, 2021 at 2:03 PM Sumeet Malhotra
wrote:
> Thanks Dian.
>
> Another question I have is, since PyFlink Datastream API still doesn't
> have native Window support, what's the recomm
Hi,
Might be a simple, stupid question, but I'm not able to find how to
convert/interpret a UTC datetime string like *2021-03-23T07:37:00.613910Z*
as event-time using a DDL/Table API. I'm ingesting data from Kafka and can
read this field as a string, but would like to mark it as event-time by
defi
the DataStream API.
>
> Piotrek
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/time_attributes.html#during-datastream-to-table-conversion
>
> pon., 29 mar 2021 o 18:07 Sumeet Malhotra
> napisał(a):
>
>> Hi,
>>
>
che.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/formats/json.html#json-timestamp-format-standard
> On 30/03/2021 06:45, Sumeet Malhotra wrote:
>
> Thanks. Yes, that's a possibility. I'd still prefer something that can be
> done within the Table API. If it
Cross posting from StackOverlow here:
https://stackoverflow.com/questions/66888486/pyflink-extract-nested-fields-from-json-array
Any pointers are appreciated!
Thanks,
Sumeet
Hi,
Is it possible to directly import Avro schema while ingesting data into
Flink? Or do we always have to specify the entire schema in either SQL DDL
for Table API or using DataStream data types? From a code maintenance
standpoint, it would be really helpful to keep one source of truth for the
sc
Sumeet
On Fri, Apr 2, 2021 at 8:26 AM Sumeet Malhotra
wrote:
> Hi,
>
> Is it possible to directly import Avro schema while ingesting data into
> Flink? Or do we always have to specify the entire schema in either SQL DDL
> for Table API or using DataStream data types? From a code ma
gt;
>> I’m not a Table/SQL API expert, but from my knowledge, it’s not viable to
>> derived SQL table schemas from Avro schemas, because table schemas would be
>> the ground truth by design.
>> Moreover, one Avro type can be mapped to multiple Flink types, so in
>> prac
Hi,
I'm reading data from Kafka, which is Avro encoded and has the following
general schema:
{
"name": "SomeName",
"doc": "Avro schema with variable embedded encodings",
"type": "record",
"fields": [
{
"name": "Name",
"doc": "My name",
"type": "string"
},
{
ubsequent mapping function, you could change
> the record type and deserialize the result. In Data Stream API:
>
> source.map(new MapFunction record_with_deserialized_result> { ...} )
>
> Best,
> Piotrek
>
> śr., 14 kwi 2021 o 03:17 Sumeet Malhotra
> napisał(a):
>
>&
re supported in Python.
Thanks,
Sumeet
On Thu, Apr 15, 2021 at 6:31 AM Sumeet Malhotra
wrote:
> Thanks Piotrek! I forgot to mention that I'm using PyFlink and mostly
> Table APIs. The documentation (
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/table
Hi,
I have a use case where I'm creating a Tumbling window as follows:
"input" table has columns [Timestamp, a, b, c]
input \
.window(Tumble.over(lit(10).seconds).on(input.Timestamp).alias("w")) \
.group_by(col('w'), input.a) \
.select(
col('w').start.alias('window_start'),
ps://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#group-windows
>
> Best,
> Guowei
>
>
> On Mon, Apr 19, 2021 at 2:24 PM Sumeet Malhotra
> wrote:
>
>> Hi,
>>
>> I have a use case where I'm creating a Tumbling window
Windows[1], which could keep the
> "non-group-key" column.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#over-windows
>
> Best,
> Guowei
>
>
> On Mon, Apr 19, 2021 at 3:25 PM Sumeet Malhotra
> wrote:
>
>
]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/flinkDev/building.html#build-pyflink
>
>
> 2021年4月19日 下午5:16,Sumeet Malhotra 写道:
>
> Thanks Guowei. I'm trying out Over Windows, as follows:
>
> input \
> .over_window(
>
alue for input.a is always the same, it’s equal to group_by(
>> col(‘w'), input.b) logically. The benefit is that you could access
>> input.a directly in the select clause.
>>
>> Regards,
>> Dian
>>
>> 2021年4月19日 下午6:29,Sumeet Malhotra 写道:
>>
Hi,
I have a PyFlink job that consists of:
- Multiple Python files.
- Multiple 3rdparty Python dependencies, specified in a
`requirements.txt` file.
- A few Java dependencies, mainly for external connectors.
- An overall job config YAML file.
Here's a simplified structure of the c
tell you more about the
> Python deployment options.
>
> If you are not running on a session cluster, then you can also create a
> K8s image which contains your user code. That way you ship your job when
> deploying the cluster.
>
> Cheers,
> Till
>
> On Wed, Apr
now. I’ll try to add it back ASAP.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/cli.html
>>
>> Regards,
>> Dian
>>
>> 2021年4月29日 下午3:24,Sumeet Malhotra 写道:
>>
>> Hi Till,
>>
>> There’s no problem wit
Hi,
Is keyed state [1] supported by PyFlink yet? I can see some code for it in
the Flink master branch, but there's no mention of it in the 1.12 Python
documentation.
Thanks,
Sumeet
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/state.html
Hi,
My use case involves reading raw data records from Kafka and processing
them. The records are coming from a database, where a periodic job reads
new rows, packages them into a single JSON object (as described below) and
writes the entire record to Kafka.
{
'id': 'some_id',
'key_a': 'v
Thanks Dian. Yes, I hadn't looked at the 1.13.0 documentation earlier.
On Wed, May 5, 2021 at 1:46 PM Dian Fu wrote:
> Hi Sumeet,
>
> This feature is supported in 1.13.0 which was just released and so there
> is no documentation about it in 1.12.
>
> Regards,
> Dian
&
Hi,
I would like to split streamed data from Kafka into 2 streams based on some
filter criteria, using PyFlink Table API. As described here [1], a way to
do this is to use .filter() which should split the stream for parallel
processing.
Does this approach work in Table API as well? I'm doing the
Hi,
According to the documentation for PyFlink Table row based operations [1],
typical usage is as follows:
@udtf(result_types=[DataTypes.INT(), DataTypes.STRING()])
def split(x: Row) -> Row:
for s in x[1].split(","):
yield x[0], s
table.flat_map(split)
Is there any way that row fie
-22612
> [2] https://issues.apache.org/jira/browse/FLINK-22712
>
> Best,
> Xingbo
>
> Sumeet Malhotra 于2021年5月19日周三 下午4:45写道:
>
>> Hi,
>>
>> According to the documentation for PyFlink Table row based operations
>> [1], typical usage is as fo
Hi,
I'm using UDTFs in PyFlink, that depend upon a few resource files (JSON
schema files actually). The path of this file can be passed into the UDTF,
but essentially this path needs to exist on the Task Manager node where the
task executes. What's the best way to upload these resource files? As o
option is what you need. The documentation says
> only zip format is supported.
> Alternatively, you could upload the files to S3 or other DFS and
> access from TMs and re-upload when needed.
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/python/depe
yan wrote:
> Hi Sumeet,
>
> Probably there is an issue with uploading the archive while submitting the
> job.
> The commands and API usage look good to me.
> Dian could you please confirm that?
>
> Regards,
> Roman
>
> On Fri, Jun 11, 2021 at 9:04 AM Sumeet Malhotra
>
Hi,
I'm currently submitting my Python user code from my local machine to a
Flink cluster running in Session mode on Kubernetes. For this, I have a
custom Flink image with Python as per this reference [1].
Now, I'd like to move to using the Application mode with Native Kubernetes,
where the user
31 matches
Mail list logo