Re: Problems with multiple sinks using postgres-cdc connector

2024-06-17 Thread Hongshun Wang
Hi David,
> When I add this second sink, the postgres-cdc connector appears to add a
second reader from the replication log, but with the same slot name.

I don't understand what you mean by adding a second sink. Do they share the
same source, or does each have a separate pipeline? If the former one, you
can share the same source for two sinks, in which case one replication slot
is sufficient. If the later one, if you want each sink to have its own
source, you can set a different slot name for each source (the option name
is slot.name[1]).

[1]
https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/connectors/flink-sources/postgres-cdc/#connector-options

On Sat, Jun 15, 2024 at 12:40 AM David Bryson  wrote:

> Hi,
>
> I have a stream reading from postgres-cdc connector version 3.1.0. I read
> from two tables:
>
> flink.cleaned_migrations
> public.cleaned
>
> I convert the tables into a datastream, do some processing, then write it
> to a sink at the end of my stream:
>
> joined_table_result =
> joined_with_metadata.execute_insert(daily_sink_property_map['flink_table_name'])
>
> This works well, however I recently tried to add a second table which
> contains state reached in the middle of my stream:
>
> continuous_metrics_table = table_env.execute_sql("SELECT f1, f2, f3
> from joined_processed_table")
>
>  
> continuous_metrics_table.execute_insert(continuous_sink_property_map['flink_table_name'])
>
> When I add this second sink, the postgres-cdc connector appears to add a
> second reader from the replication log, but with the same slot name. It
> seems to behave this way regardless of the sink connector I use, and seems
> to happen in addition to the existing slot that is already allocated to the
> stream.  This second reader of course cannot use the same replication slot,
> and so the connector eventually times out.  Is this expected behavior from
> the connector? It seems strange the connector would attempt to use a slot
> twice.
>
> I am using incremental snapshots, and I am passing a unique slot per table
> connector.
>
> Logs below:
>
> 2024-06-14 09:23:59,600 INFO  
> org.apache.flink.cdc.connectors.postgres.source.utils.TableDiscoveryUtils
> [] - Postgres captured tables : flink.cleaned_migrations .
>
> 2024-06-14 09:23:59,603 INFO  io.debezium.jdbc.JdbcConnection
>   [] - Connection gracefully closed
>
> 2024-06-14 09:24:00,198 INFO  
> org.apache.flink.cdc.connectors.postgres.source.utils.TableDiscoveryUtils
> [] - Postgres captured tables : public.cleaned .
>
> 2024-06-14 09:24:00,199 INFO  io.debezium.jdbc.JdbcConnection
>   [] - Connection gracefully closed
>
> 2024-06-14 09:24:00,224 INFO  
> io.debezium.connector.postgresql.PostgresSnapshotChangeEventSource
> [] - Creating initial offset context
>
> 2024-06-14 09:24:00,417 INFO  
> io.debezium.connector.postgresql.PostgresSnapshotChangeEventSource
> [] - Read xlogStart at 'LSN{6/C9806378}' from transaction '73559679'
>
> 2024-06-14 09:24:00,712 INFO  io.debezium.jdbc.JdbcConnection
>   [] - Connection gracefully closed
>
> 2024-06-14 09:24:00,712 INFO  
> org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader
> [] - Source reader 0 discovers table schema for stream split stream-split
> success
>
> 2024-06-14 09:24:00,712 INFO  
> org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader
> [] - Source reader 0 received the stream split :
> StreamSplit{splitId='stream-split', offset=Offset{lsn=LSN{6/C98060F8},
> txId=73559674, lastCommitTs=-9223372036854775808],
> endOffset=Offset{lsn=LSN{/}, txId=null,
> lastCommitTs=-9223372036853775810], isSuspended=false}.
>
> 2024-06-14 09:24:00,714 INFO  
> org.apache.flink.connector.base.source.reader.SourceReaderBase
> [] - Adding split(s) to reader: [StreamSplit{splitId='stream-split',
> offset=Offset{lsn=LSN{6/C98060F8}, txId=73559674,
> lastCommitTs=-9223372036854775808],
> endOffset=Offset{lsn=LSN{/}, txId=null,
> lastCommitTs=-9223372036853775810], isSuspended=false}]
>
> 2024-06-14 09:24:00,714 INFO  
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher
> [] - Starting split fetcher 0
>
> 2024-06-14 09:24:00,716 INFO  
> org.apache.flink.cdc.connectors.base.source.enumerator.IncrementalSourceEnumerator
> [] - The enumerator receives notice from subtask 0 for the stream split
> assignment.
>
> 2024-06-14 09:24:00,721 INFO  
> org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresSourceFetchTaskContext
> [] - PostgresConnectorConfig is
>
> 2024-06-14 09:24:00,847 INFO  
> io.debezium.connector.postgresql.PostgresSnapshotChangeEventSource
> [] - Creating initial offset context
>
> 2024-06-14 09:24:01,000 INFO  
> io.debezium.connector.postgresql.PostgresSnapshotChangeEventSource
> [] - Read xlogStart at 'LSN{6/C9806430}' from transaction '73559682'
>
> 2024-06-14 09:24:01,270 INFO  io.debezium.jdbc.JdbcConnection
>   [] - Connecti

RE: Problem reading a CSV file with pyflink datastream in k8s with Flink operator

2024-06-17 Thread gwenael . lebarzic
Hello everyone.

Does someone know how to solve this please ?

Cdt.
[Logo Orange]

Gwenael Le Barzic
Ingénieur technique techno BigData
Orange/OF/DTSI/SI/DATA-IA/SOLID/CXP

Mobile : +33 6 48 70 85 75 

gwenael.lebar...@orange.com

Nouveau lien vers le Portail de suivi des Tickets du 
CXP



Orange Restricted

De : LE BARZIC Gwenael DTSI/SI
Envoyé : vendredi 14 juin 2024 22:02
À : user@flink.apache.org
Objet : Problem reading a CSV file with pyflink datastream in k8s with Flink 
operator

Hello everyone.

I get the following error when trying to read a CSV file with pyflink 
datastream in a k8s environment using the flink operator.
###
  File "/opt/myworkdir/myscript.py", line 30, in 
run_flink_job(myfile)
  File "/opt/myworkdir/myscript.py", line 21, in run_flink_job
csvshem = CsvReaderFormat.for_schema(file_csv_schema)
  File "/opt/flink/opt/python/pyflink.zip/pyflink/datastream/formats/csv.py", 
line 322, in for_schema
items = list(charFrequency[char].items())
  File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 
1322, in __call__
  File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 
146, in deco
  File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 
326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling 
z:org.apache.flink.formats.csv.PythonCsvUtils.createCsvReaderFormat.
: java.lang.IllegalAccessError: class 
org.apache.flink.formats.csv.PythonCsvUtils tried to access method 'void 
org.apache.flink.formats.csv.CsvReaderFormat.(org.apache.flink.util.function.SerializableSupplier,
 org.apache.flink.util.function.SerializableFunction, java.lang.Class, 
org.apache.flink.formats.common.Converter, 
org.apache.flink.api.common.typeinfo.TypeInformation, boolean)' 
(org.apache.flink.formats.csv.PythonCsvUtils is in unnamed module of loader 
org.apache.flink.util.ChildFirstClassLoader @5d9b7a8a; 
org.apache.flink.formats.csv.CsvReaderFormat is in unnamed module of loader 
'app')
at 
org.apache.flink.formats.csv.PythonCsvUtils.createCsvReaderFormat(PythonCsvUtils.java:48)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown 
Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at 
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Unknown Source)
###

Here is my dockerfile :
###
FROM flink:1.18.1

RUN apt-get update -y && \
apt-get install -y python3 python3-pip python3-dev && rm -rf 
/var/lib/apt/lists/*
RUN ln -s /usr/bin/python3 /usr/bin/python

RUN mkdir -p /opt/myworkdir
WORKDIR /opt/myworkdir

RUN alias python=python3

COPY requirements.txt .
RUN pip3 install --no-cache-dir -r requirements.txt

COPY src .
RUN chown -R flink:flink /opt/myworkdir
RUN chmod -R 755 /opt/myworkdir

###

Here is my flinkdeployment custom resource :
###
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
 finalizers:
  - flinkdeployments.flink.apache.org/finalizer
 name: myscript
 namespace: flink
spec:
 image: myscript:0.1
 flinkVersion: v1_18
 flinkConfiguration:
   taskmanager.numberOfTaskSlots: "2"
   state.backend.fs.checkpointdir: file:///checkpoints/flink/checkpoints
   state.checkpoints.dir: file:///checkpoints/flink/externalized-checkpoints
   state.savepoints.dir: file:///checkpoints/flink/savepoints
   job.autoscaler.enabled: "true"
   job.autoscaler.stabilization.interval: 1m
   job.autoscaler.metrics.window: 5m
   job.autoscaler.target.utilization: "0.6"
   job.autoscaler.target.utilization.boundary: "0.2"
   job.autoscaler.restart.time: 2m
   job.autoscaler.catch-up.duration: 5m
   pipeline.max-parallelism: "720"
 serviceAccount: flink
 jobManager:
   resource:
 memory: "2048m"
 cpu: 1
 taskManager:
   resource:
 memory: "2048m"
 cpu: 1
 job:
   jarURI: local:///opt/myworkdir/myscript.py
   entryClass

A way to meter number of deserialization errors

2024-06-17 Thread Ilya Karpov
Hi all,
we are planning to use flink as a connector between kafka and
external systems. We use protobuf as a message format in kafka. If
non-backward compatible changes occur we want to skip those messages
('protobuf.ignore-parse-errors' = 'true') but record an error and raise an
alert. I didn't find any way to record deserialization errors (
https://github.com/apache/flink/blob/3a15d1ce69ac21d619f60033ec45cae303489c8f/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbRowDataDeserializationSchema.java#L73),
does it exist?


Re: Problems with multiple sinks using postgres-cdc connector

2024-06-17 Thread David Bryson
These sink share the same source.  The pipeline that works looks something
like this:

table1 -> process1 -> process2 -> sink2

When I change it to this:

table1 -> process1 -> process2 -> sink2
 `--> sink1

I get the errors described, where it appears that a second process is
created that attempts to use the current slot twice.

On Mon, Jun 17, 2024 at 1:58 AM Hongshun Wang 
wrote:

> Hi David,
> > When I add this second sink, the postgres-cdc connector appears to add a
> second reader from the replication log, but with the same slot name.
>
> I don't understand what you mean by adding a second sink. Do they share
> the same source, or does each have a separate pipeline? If the former one,
> you can share the same source for two sinks, in which case one replication
> slot is sufficient. If the later one, if you want each sink to have its own
> source, you can set a different slot name for each source (the option name
> is slot.name[1]).
>
> [1]
> https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/connectors/flink-sources/postgres-cdc/#connector-options
>
> On Sat, Jun 15, 2024 at 12:40 AM David Bryson  wrote:
>
>> Hi,
>>
>> I have a stream reading from postgres-cdc connector version 3.1.0. I read
>> from two tables:
>>
>> flink.cleaned_migrations
>> public.cleaned
>>
>> I convert the tables into a datastream, do some processing, then write it
>> to a sink at the end of my stream:
>>
>> joined_table_result =
>> joined_with_metadata.execute_insert(daily_sink_property_map['flink_table_name'])
>>
>> This works well, however I recently tried to add a second table which
>> contains state reached in the middle of my stream:
>>
>> continuous_metrics_table = table_env.execute_sql("SELECT f1, f2, f3
>> from joined_processed_table")
>>
>>  
>> continuous_metrics_table.execute_insert(continuous_sink_property_map['flink_table_name'])
>>
>> When I add this second sink, the postgres-cdc connector appears to add a
>> second reader from the replication log, but with the same slot name. It
>> seems to behave this way regardless of the sink connector I use, and seems
>> to happen in addition to the existing slot that is already allocated to the
>> stream.  This second reader of course cannot use the same replication slot,
>> and so the connector eventually times out.  Is this expected behavior from
>> the connector? It seems strange the connector would attempt to use a slot
>> twice.
>>
>> I am using incremental snapshots, and I am passing a unique slot per
>> table connector.
>>
>> Logs below:
>>
>> 2024-06-14 09:23:59,600 INFO  
>> org.apache.flink.cdc.connectors.postgres.source.utils.TableDiscoveryUtils
>> [] - Postgres captured tables : flink.cleaned_migrations .
>>
>> 2024-06-14 09:23:59,603 INFO  io.debezium.jdbc.JdbcConnection
>>   [] - Connection gracefully closed
>>
>> 2024-06-14 09:24:00,198 INFO  
>> org.apache.flink.cdc.connectors.postgres.source.utils.TableDiscoveryUtils
>> [] - Postgres captured tables : public.cleaned .
>>
>> 2024-06-14 09:24:00,199 INFO  io.debezium.jdbc.JdbcConnection
>>   [] - Connection gracefully closed
>>
>> 2024-06-14 09:24:00,224 INFO  
>> io.debezium.connector.postgresql.PostgresSnapshotChangeEventSource
>> [] - Creating initial offset context
>>
>> 2024-06-14 09:24:00,417 INFO  
>> io.debezium.connector.postgresql.PostgresSnapshotChangeEventSource
>> [] - Read xlogStart at 'LSN{6/C9806378}' from transaction '73559679'
>>
>> 2024-06-14 09:24:00,712 INFO  io.debezium.jdbc.JdbcConnection
>>   [] - Connection gracefully closed
>>
>> 2024-06-14 09:24:00,712 INFO  
>> org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader
>> [] - Source reader 0 discovers table schema for stream split stream-split
>> success
>>
>> 2024-06-14 09:24:00,712 INFO  
>> org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader
>> [] - Source reader 0 received the stream split :
>> StreamSplit{splitId='stream-split', offset=Offset{lsn=LSN{6/C98060F8},
>> txId=73559674, lastCommitTs=-9223372036854775808],
>> endOffset=Offset{lsn=LSN{/}, txId=null,
>> lastCommitTs=-9223372036853775810], isSuspended=false}.
>>
>> 2024-06-14 09:24:00,714 INFO  
>> org.apache.flink.connector.base.source.reader.SourceReaderBase
>> [] - Adding split(s) to reader: [StreamSplit{splitId='stream-split',
>> offset=Offset{lsn=LSN{6/C98060F8}, txId=73559674,
>> lastCommitTs=-9223372036854775808],
>> endOffset=Offset{lsn=LSN{/}, txId=null,
>> lastCommitTs=-9223372036853775810], isSuspended=false}]
>>
>> 2024-06-14 09:24:00,714 INFO  
>> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher
>> [] - Starting split fetcher 0
>>
>> 2024-06-14 09:24:00,716 INFO  
>> org.apache.flink.cdc.connectors.base.source.enumerator.IncrementalSourceEnumerator
>> [] - The enumerator receives notice from subtask 0 for the stream split
>> assignment.
>>
>> 2024-06-14 09:24:00,721 INFO  
>>

Flink Stateful Functions 3.4

2024-06-17 Thread L. Jiang
Hi there,
Anyone knows which Flink version that Flink Stateful Functions 3.4 is
compatible with?
https://nightlies.apache.org/flink/flink-statefun-docs-master/docs/deployment/state-bootstrap/

I know Stateful Functions 3.3 is compatible with Flink 1.16.2, and Stateful
Functions 3.2 is good with Flink 1.14.3 and earlier. But couldn't find
information about Flink Stateful Functions 3.4. Hope it works with Flink
1.8 at minimum.

Thanks,

-- 
Best,
Liangjun


-- 
Best,
Liangjun


Re: Problems with multiple sinks using postgres-cdc connector

2024-06-17 Thread Hongshun Wang
Hi David,
In your modified pipeline, just one source from table1 is sufficient, with
both sink1 and process2 sharing a single source from process1. However,
based on your log, it appears that two sources have been generated. Do you
have the execution graph available in the Flink UI?

Best,
Hongshun

On Mon, Jun 17, 2024 at 11:40 PM David Bryson  wrote:

> These sink share the same source.  The pipeline that works looks something
> like this:
>
> table1 -> process1 -> process2 -> sink2
>
> When I change it to this:
>
> table1 -> process1 -> process2 -> sink2
>  `--> sink1
>
> I get the errors described, where it appears that a second process is
> created that attempts to use the current slot twice.
>
> On Mon, Jun 17, 2024 at 1:58 AM Hongshun Wang 
> wrote:
>
>> Hi David,
>> > When I add this second sink, the postgres-cdc connector appears to add
>> a second reader from the replication log, but with the same slot name.
>>
>> I don't understand what you mean by adding a second sink. Do they share
>> the same source, or does each have a separate pipeline? If the former one,
>> you can share the same source for two sinks, in which case one replication
>> slot is sufficient. If the later one, if you want each sink to have its own
>> source, you can set a different slot name for each source (the option name
>> is slot.name[1]).
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/connectors/flink-sources/postgres-cdc/#connector-options
>>
>> On Sat, Jun 15, 2024 at 12:40 AM David Bryson  wrote:
>>
>>> Hi,
>>>
>>> I have a stream reading from postgres-cdc connector version 3.1.0. I
>>> read from two tables:
>>>
>>> flink.cleaned_migrations
>>> public.cleaned
>>>
>>> I convert the tables into a datastream, do some processing, then write
>>> it to a sink at the end of my stream:
>>>
>>> joined_table_result =
>>> joined_with_metadata.execute_insert(daily_sink_property_map['flink_table_name'])
>>>
>>> This works well, however I recently tried to add a second table which
>>> contains state reached in the middle of my stream:
>>>
>>> continuous_metrics_table = table_env.execute_sql("SELECT f1, f2, f3
>>> from joined_processed_table")
>>>
>>>  
>>> continuous_metrics_table.execute_insert(continuous_sink_property_map['flink_table_name'])
>>>
>>> When I add this second sink, the postgres-cdc connector appears to add a
>>> second reader from the replication log, but with the same slot name. It
>>> seems to behave this way regardless of the sink connector I use, and seems
>>> to happen in addition to the existing slot that is already allocated to the
>>> stream.  This second reader of course cannot use the same replication slot,
>>> and so the connector eventually times out.  Is this expected behavior from
>>> the connector? It seems strange the connector would attempt to use a slot
>>> twice.
>>>
>>> I am using incremental snapshots, and I am passing a unique slot per
>>> table connector.
>>>
>>> Logs below:
>>>
>>> 2024-06-14 09:23:59,600 INFO  
>>> org.apache.flink.cdc.connectors.postgres.source.utils.TableDiscoveryUtils
>>> [] - Postgres captured tables : flink.cleaned_migrations .
>>>
>>> 2024-06-14 09:23:59,603 INFO  io.debezium.jdbc.JdbcConnection
>>> [] - Connection gracefully closed
>>>
>>> 2024-06-14 09:24:00,198 INFO  
>>> org.apache.flink.cdc.connectors.postgres.source.utils.TableDiscoveryUtils
>>> [] - Postgres captured tables : public.cleaned .
>>>
>>> 2024-06-14 09:24:00,199 INFO  io.debezium.jdbc.JdbcConnection
>>> [] - Connection gracefully closed
>>>
>>> 2024-06-14 09:24:00,224 INFO  
>>> io.debezium.connector.postgresql.PostgresSnapshotChangeEventSource
>>> [] - Creating initial offset context
>>>
>>> 2024-06-14 09:24:00,417 INFO  
>>> io.debezium.connector.postgresql.PostgresSnapshotChangeEventSource
>>> [] - Read xlogStart at 'LSN{6/C9806378}' from transaction '73559679'
>>>
>>> 2024-06-14 09:24:00,712 INFO  io.debezium.jdbc.JdbcConnection
>>> [] - Connection gracefully closed
>>>
>>> 2024-06-14 09:24:00,712 INFO  
>>> org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader
>>> [] - Source reader 0 discovers table schema for stream split stream-split
>>> success
>>>
>>> 2024-06-14 09:24:00,712 INFO  
>>> org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader
>>> [] - Source reader 0 received the stream split :
>>> StreamSplit{splitId='stream-split', offset=Offset{lsn=LSN{6/C98060F8},
>>> txId=73559674, lastCommitTs=-9223372036854775808],
>>> endOffset=Offset{lsn=LSN{/}, txId=null,
>>> lastCommitTs=-9223372036853775810], isSuspended=false}.
>>>
>>> 2024-06-14 09:24:00,714 INFO  
>>> org.apache.flink.connector.base.source.reader.SourceReaderBase
>>> [] - Adding split(s) to reader: [StreamSplit{splitId='stream-split',
>>> offset=Offset{lsn=LSN{6/C98060F8}, txId=73559674,
>>> lastCommitTs=-9223372036854775808],
>>> endOffset=Offset{lsn=L

Re: Problem reading a CSV file with pyflink datastream in k8s with Flink operator

2024-06-17 Thread Robert Young
Hi Gwenael,

>From the logs I thought it was a JVM module opens/exports issue, but I
found it had a similar issue using a java8 base image too. I think the
issue is it's not permitted for PythonCsvUtils to call the package-private
constructor of CsvReaderFormat across class loaders.

One workaround I found is to add a `RUN cp /opt/flink/opt/flink-python*
/opt/flink/lib/` to the Dockerfile, so that the flink-python-1.18.1,jar is
present in both /opt and /lib. Then when Flink tries to
classload org.apache.flink.formats.csv.PythonCsvUtils it will be available
to the app classloader.

Thanks
Rob Young

On Mon, Jun 17, 2024 at 11:53 PM  wrote:

> Hello everyone.
>
>
>
> Does someone know how to solve this please ?
>
>
>
> Cdt.
>
> [image: Logo Orange] 
>
>
>
> *Gwenael Le Barzic *
> Ingénieur technique techno BigData
> Orange/OF/DTSI/SI/DATA-IA/SOLID/CXP
>
>
>
> Mobile : +33 6 48 70 85 75
> 
> gwenael.lebar...@orange.com
>
>
>
> Nouveau lien vers le Portail de suivi des Tickets du CXP
> 
>
>
>
>
> Orange Restricted
> De : LE BARZIC Gwenael DTSI/SI
> *Envoyé :* vendredi 14 juin 2024 22:02
> *À :* user@flink.apache.org
> *Objet :* Problem reading a CSV file with pyflink datastream in k8s with
> Flink operator
>
>
>
> Hello everyone.
>
>
>
> I get the following error when trying to read a CSV file with pyflink
> datastream in a k8s environment using the flink operator.
>
> ###
>
>   File "/opt/myworkdir/myscript.py", line 30, in 
>
> run_flink_job(myfile)
>
>   File "/opt/myworkdir/myscript.py", line 21, in run_flink_job
>
> csvshem = CsvReaderFormat.for_schema(file_csv_schema)
>
>   File
> "/opt/flink/opt/python/pyflink.zip/pyflink/datastream/formats/csv.py", line
> 322, in for_schema
>
> items = list(charFrequency[char].items())
>
>   File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py",
> line 1322, in __call__
>
>   File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py",
> line 146, in deco
>
>   File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/protocol.py",
> line 326, in get_return_value
>
> py4j.protocol.Py4JJavaError: An error occurred while calling
> z:org.apache.flink.formats.csv.PythonCsvUtils.createCsvReaderFormat.
>
> : java.lang.IllegalAccessError: class
> org.apache.flink.formats.csv.PythonCsvUtils tried to access method 'void
> org.apache.flink.formats.csv.CsvReaderFormat.(org.apache.flink.util.function.SerializableSupplier,
> org.apache.flink.util.function.SerializableFunction, java.lang.Class,
> org.apache.flink.formats.common.Converter,
> org.apache.flink.api.common.typeinfo.TypeInformation, boolean)'
> (org.apache.flink.formats.csv.PythonCsvUtils is in unnamed module of loader
> org.apache.flink.util.ChildFirstClassLoader @5d9b7a8a;
> org.apache.flink.formats.csv.CsvReaderFormat is in unnamed module of loader
> 'app')
>
> at
> org.apache.flink.formats.csv.PythonCsvUtils.createCsvReaderFormat(PythonCsvUtils.java:48)
>
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
>
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown
> Source)
>
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
> Source)
>
> at java.base/java.lang.reflect.Method.invoke(Unknown Source)
>
> at
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>
> at
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
>
> at
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>
> at
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>
> at
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>
> at
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>
> at java.base/java.lang.Thread.run(Unknown Source)
>
> ###
>
>
>
> Here is my dockerfile :
>
> ###
>
> FROM flink:1.18.1
>
>
>
> RUN apt-get update -y && \
>
> apt-get install -y python3 python3-pip python3-dev && rm -rf
> /var/lib/apt/lists/*
>
> RUN ln -s /usr/bin/python3 /usr/bin/python
>
>
>
> RUN mkdir -p /opt/myworkdir
>
> WORKDIR /opt/myworkdir
>
>
>
> RUN alias python=python3
>
>
>
> COPY requirements.txt .
>
> RUN pip3 install --no-cache-dir -r requirements.txt
>
>
>
> COPY src .
>
> RUN chown -R flink:flink /opt/myworkdir
>
> RUN chmod -R 755 /opt/myworkdir
>
>
>
> ###
>
>
>
> Here is my flinkdeployment custom resource :
>
> ###
>
> apiVersion: flink.apache.org/v1beta1
>
> kind: FlinkDeployment
>
> metadata:

Re: Flink Stateful Functions 3.4

2024-06-17 Thread Zhanghao Chen
There's no active maintenance of the StateFun project since the release of v3.3 
in last September by GitHub commit history [1]. So currently, there's no 
estimate on when v3.4 could be released and which Flink version would be 
supported.

[1] https://github.com/apache/flink-statefun/commits/master/

Best,
Zhanghao Chen

From: L. Jiang 
Sent: Tuesday, June 18, 2024 4:57
To: user@flink.apache.org 
Subject: Flink Stateful Functions 3.4

Hi there,
Anyone knows which Flink version that Flink Stateful Functions 3.4 is 
compatible with?
https://nightlies.apache.org/flink/flink-statefun-docs-master/docs/deployment/state-bootstrap/

I know Stateful Functions 3.3 is compatible with Flink 1.16.2, and Stateful 
Functions 3.2 is good with Flink 1.14.3 and earlier. But couldn't find 
information about Flink Stateful Functions 3.4. Hope it works with Flink 1.8 at 
minimum.

Thanks,

--
Best,
Liangjun


--
Best,
Liangjun