Hi @Gordon, `InitContext#createInputSerializer()` is a great option and will solve more than one problem, but I cant find a way to get the TypeInformation<T> on InitContextImpl (I can be missing something)
On current (legacy) implementations we rely on interface ´ InputTypeConfigurable´ to get the TypeInformation but this will not work for Sink2 as is not implemented (DataStream.sinkTo vs DataStream.addSink) As a side note, the ExecutionConfig provided by this interface could not be used as can be changed after the call is made, for Table Planning for example on DefaultExecutor.configureBatchSpecificProperties() At the end what we need to do is something like: if (isObjectReuseEnabled()) serializer.copy(record) else record; So responding to your question, yes last option is ok for this but I dont see how to implementing it as Im missing the TypeInformation on InitContextImpl. Best regards, On 2023/04/21 15:04:24 "Tzu-Li (Gordon) Tai" wrote: > Do we have to introduce `InitContext#createSerializer(TypeInformation<T>)` > which returns TypeSerializer<T>, or is it sufficient to only provide > `InitContext#createInputSerializer()` which returns TypeSerializer<IN>? > > I had the impression that buffering sinks like JDBC only need the > latter. @Joao, could you confirm? > > If that's the case, +1 to adding the following method signatures to > InitContext: > * TypeSerializer<IN> createInputSerializer() > * boolean isObjectReuseEnabled() > > Thanks, > Gordon > > On Fri, Apr 21, 2023 at 3:04 AM Zhu Zhu <reed...@gmail.com> wrote: > > > Good point! @Gordon > > Introducing an `InitContext#createSerializer(TypeInformation)` looks a > > better option to me, so we do not need to introduce an unmodifiable > > `ExecutionConfig` at this moment. > > > > Hope that we can make `ExecutionConfig` a read-only interface in > > Flink 2.0. It is exposed in `RuntimeContext` to user functions already, > > while mutating the values at runtime is actually an undefined behavior. > > > > Thanks, > > Zhu > > > > Tzu-Li (Gordon) Tai <tzuli...@apache.org> 于2023年4月18日周二 01:02写道: > > > > > > Hi, > > > > > > Sorry for chiming in late. > > > > > > I'm not so sure that exposing ExecutionConfig / ReadExecutionConfig > > > directly through Sink#InitContext is the right thing to do. > > > > > > 1. A lot of the read-only getter methods on ExecutionConfig are > > irrelevant > > > for sinks. Expanding the scope of the InitContext interface with so many > > > irrelevant methods is probably going to make writing unit tests a pain. > > > > > > 2. There's actually a few getter methods on `InitContext` that have > > > duplicate/redundant info for what ExecutionConfig exposes. For example, > > > InitContext#getNumberOfParallelSubtasks and InitContext#getAttemptNumber > > > currently exist and it can be confusing if users find 2 sources of that > > > information (either via the `InitContext` and via the wrapped > > > `ExecutionConfig`). > > > > > > All in all, it feels like `Sink#InitContext` was introduced initially as > > a > > > means to selectively only expose certain information to sinks. > > > > > > It looks like right now, the only requirement is that some sinks require > > 1) > > > isObjectReuseEnabled, and 2) TypeSerializer for the input type. Would it > > > make sense to follow the original intent and only selectively expose > > these? > > > For 1), we can just add a new method to `InitContext` and forward the > > > information from `ExecutionConfig` accessible at the operator level. > > > For 2), would it make sense to create the serializer at the operator > > level > > > and then provide it through `InitContext`? > > > > > > Thanks, > > > Gordon > > > > > > On Mon, Apr 17, 2023 at 8:23 AM Zhu Zhu <reed...@gmail.com> wrote: > > > > > > > We can let the `InitContext` return `ExecutionConfig` in the interface. > > > > However, a `ReadableExecutionConfig` implementation should be returned > > > > so that exceptions will be thrown if users tries to modify the > > > > `ExecutionConfig`. > > > > > > > > We can rework all the setters of `ExecutionConfig` to internally > > invoke a > > > > `setConfiguration(...)` method. Then the `ReadableExecutionConfig` can > > > > just override that method. But pay attention to a few exceptional > > > > setters, i.e. those for globalJobParameters and serializers. > > > > > > > > We should also explicitly state in the documentation of > > > > `InitContext #getExecutionConfig()`, that the returned > > `ExecutionConfig` > > > > is unmodifiable. > > > > > > > > Thanks, > > > > Zhu > > > > > > > > João Boto <eskabe...@apache.org> 于2023年4月17日周一 16:51写道: > > > > > > > > > > Hi Zhu, > > > > > > > > > > Thanks for you time for reviewing this. > > > > > > > > > > Extending ´ExecutionConfig´ will allow to modify the values in the > > > > config (this is what we want to prevent with Option2) > > > > > > > > > > To extend the ExecutionConfig is not simpler to do Option1 (expose > > > > ExecutionConfig directly). > > > > > > > > > > Regards > > > > > > > > > > > > > > > > > > > > On 2023/04/03 09:42:28 Zhu Zhu wrote: > > > > > > Hi João, > > > > > > > > > > > > Thanks for creating this FLIP! > > > > > > I'm overall +1 for it to unblock the migration of sinks to SinkV2. > > > > > > > > > > > > Yet I think it's better to let the `ReadableExecutionConfig` extend > > > > > > `ExecutionConfig`, because otherwise we have to introduce a new > > method > > > > > > `TypeInformation#createSerializer(ReadableExecutionConfig)`. The > > new > > > > > > method may require every `TypeInformation` to implement it, > > including > > > > > > Flink built-in ones and custom ones, otherwise exceptions will > > happen. > > > > > > That goal, however, is pretty hard to achieve. > > > > > > > > > > > > Thanks, > > > > > > Zhu > > > > > > > > > > > > João Boto <eskabe...@apache.org> 于2023年2月28日周二 23:34写道: > > > > > > > > > > > > > > I have update the FLIP with the 2 options that we have > > discussed.. > > > > > > > > > > > > > > Option 1: Expose ExecutionConfig directly on InitContext > > > > > > > this have a minimal impact as we only have to expose the new > > methods > > > > > > > > > > > > > > Option 2: Expose ReadableExecutionConfig on InitContext > > > > > > > with this option we have more impact as we need to add a new > > method > > > > to TypeInformation and change all implementations (current exists 72 > > > > implementations) > > > > > > > > > > > > > > Waiting for feedback or concerns about the two options > > > > > > > > > > > > >