Hi Jan, StateFun enables object reuse automatically, and it can't be disabled with a configuration. There is a technical reason for that that has to do with how we translate StateFun concepts to Flink concepts. I've created an issue to remove this limitation [1].
I might come up with a workaround in the upcoming few days, and let you know, if you are ok with building StateFun from source? Otherwise, we will try to address this in the next StateFun release. While we are on the topic of the JdbcSink, as far as I know, it doesn't support exactly once. If this is important to you, then I will suggest simply emitting the inserts to Kafka and periodically bulk insert them to the database. All the best, Igal. [1] https://issues.apache.org/jira/browse/FLINK-21280 On Thu, Feb 4, 2021 at 3:13 PM Jan Brusch <jan.bru...@neuland-bfi.de> wrote: > Hello, > > we are currently trying to implement a JDBC Sink in Stateful Functions > as documented here: > > https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/io-module/flink-connectors.html > > However, when starting the application we are running into this error: > > -------------------------------------------------------------------- > > java.lang.IllegalStateException: objects can not be reused with JDBC > sink function at > org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) > ~[flink-dist_2.12-1.11.3.jar:1.11.3] > at > org.apache.flink.connector.jdbc.JdbcSink.lambda$sink$97f3ed45$1(JdbcSink.java:67) > > ~[?:?] > at > org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.createAndOpenStatementExecutor(JdbcBatchingOutputFormat.java:131) > > ~[?:?] > at > org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.open(JdbcBatchingOutputFormat.java:113) > > ~[?:?] at > org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.open(GenericJdbcSinkFunction.java:50) > > ~[?:?] > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > > ~[flink-dist_2.12-1.11.3.jar:1.11.3] > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > > ~[flink-dist_2.12-1.11.3.jar:1.11.3] > at > org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48) > > ~[flink-dist_2.12-1.11.3.jar:1.11.3] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291) > > ~[flink-dist_2.12-1.11.3.ja > r:1.11.3] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506) > > ~[flink-dist_2.12-1.11.3.jar:1.11.3] > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) > > ~[flink-dist_2.12-1.11.3 > .jar:1.11.3] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475) > > ~[flink-dist_2.12-1.11.3.jar:1.11.3] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526) > > ~[flink-dist_2.12-1.11.3.jar:1.11.3] > at > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) > ~[flink-dist_2.12-1.11.3.jar:1.11.3] > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) > ~[flink-dist_2.12-1.11.3.jar:1.11.3] > at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_275] > 2021-02-04 13:59:49,121 INFO > org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy > > [] - Calculating tasks to > restart to recover the failed task 31284d56d1e2112b0f20099ee448a6a9_0. > 2021-02-04 13:59:49,122 INFO > org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy > > [] - 8 tasks should be re > started to recover the failed task 31284d56d1e2112b0f20099ee448a6a9_0. > > ------------------------------------------------------------------- > > We tested the same sink in a regular flink application under similar > circumstances (Protobuf Objects etc.) and it works just fine. As a > solution we have tried to set the parameter "pipeline.object-reuse" in > the flink-conf.yaml of the stateful functions application to true, but > that had no effect on the above error message. Stateful Functions > version is 2.2.2 > > > Did anyone else see this problem before? > > > Relevant Application Code: > > -------- MyMessageSink.java ----- > > public class MyMessageSink { > public static final EgressIdentifier<MyMessage> SINK_ID = > new EgressIdentifier<>("mynamespace", "MyMessageSink", > MyMessage.class); > > public EgressSpec<MyMessage> getEgressSpec() { > JdbcConnectionOptions jdbcConnectionOptions = new > JdbcConnectionOptions.JdbcConnectionOptionsBuilder() > .withDriverName("org.postgresql.Driver") > .withUrl("jdbc:postgresql://localhost:5432/mydb?user=foo&password=bar") > .build(); > JdbcStatementBuilder<MyMessage> jdbcStatementBuilder = > (statementTemplate, myMessage) -> { > statementTemplate.setString(1, myMessage.getFirstField()); > statementTemplate.setString(2, > accountSessionMessage.getSecondField()); > }; > SinkFunction<MyMessage> sinkFunction = JdbcSink.sink( > "INSERT INTO my_table (first_field, second_field) > VALUES( ?, ? ) ON CONFLICT (first_field, second_field) DO NOTHING;", > jdbcStatementBuilder, > jdbcConnectionOptions > ); > return new SinkFunctionSpec<>( > SINK_ID, > sinkFunction > ); > } > > } > > --------------------------------------- > > > ----------- Module.java --------------- > > ... > > MyMessageSink myMessageSink = new MyMessageSink(); > binder.bindEgress(myMessageSink.getEgressSpec()); > > ... > > ---------------------------------------------- > > > Best regards, > > Jan > > -- > neuland – Büro für Informatik GmbH > Konsul-Smidt-Str. 8g, 28217 Bremen > > Telefon (0421) 380107 57 > Fax (0421) 380107 99 > https://www.neuland-bfi.de > > https://twitter.com/neuland > https://facebook.com/neulandbfi > https://xing.com/company/neulandbfi > > > Geschäftsführer: Thomas Gebauer, Jan Zander > Registergericht: Amtsgericht Bremen, HRB 23395 HB > USt-ID. DE 246585501 > >