Re: [DISCUSS] FLIP-211: Kerberos delegation token framework

2022-02-03 Thread Chesnay Schepler

I have a few question that I'd appreciate if you could answer them.

1. How does the Provider know whether it is required or not?
2. How does the configuration of Providers work (how do they get access
   to a configuration)?
3. How does a user select providers? (Is it purely based on the
   provider being on the classpath?)
4. How can a user override an existing provider?
5. What is DelegationTokenProvider#name() used for?
6. What happens if the names of 2 providers are identical?
7. Will we directly load the provider, or first load a factory (usually
   preferable)?
8. What is the Credentials class (it would necessarily have to be a
   public api as well)?
9. What does the TaskManager do with the received token?
10. Is there any functionality in the TaskManager that could require a
   token on startup (i.e., before registering with the RM)?

On 11/01/2022 14:58, Gabor Somogyi wrote:


Hi All,

Hope all of you have enjoyed the holiday season.

I would like to start the discussion on FLIP-211

which
aims to provide a
Kerberos delegation token framework that /obtains/renews/distributes tokens
out-of-the-box.

Please be aware that the FLIP wiki area is not fully done since the
discussion may
change the feature in major ways. The proposal can be found in a google doc
here

.
As the community agrees on the approach the content will be moved to the
wiki page.

Feel free to add your thoughts to make this feature better!

BR,
G



Re: Off for a week starting Friday

2022-02-03 Thread Etienne Chauchot

Hi Till,

Thanks.

Please tell me if you want that we do the review of 
https://github.com/apache/flink/pull/18610 before Friday 3h pm so that I 
book some time or if we do that when I get back.


Best

Etienne

Le 02/02/2022 à 16:48, Till Rohrmann a écrit :

Thanks for letting us know Etienne. Have a nice time off :-)

Cheers,
Till

On Wed, Feb 2, 2022 at 3:56 PM Etienne Chauchot 
wrote:


Hi all,

I'll be off for a week starting Friday afternoon so I might be
unresponsive on ongoing PRs/tickets.

Best

Etienne.




[jira] [Created] (FLINK-25937) SQL Client end-to-end test e2e fails on AZP

2022-02-03 Thread Till Rohrmann (Jira)
Till Rohrmann created FLINK-25937:
-

 Summary: SQL Client end-to-end test e2e fails on AZP
 Key: FLINK-25937
 URL: https://issues.apache.org/jira/browse/FLINK-25937
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination, Table SQL / API
Affects Versions: 1.15.0
Reporter: Till Rohrmann


The {{SQL Client end-to-end test}} e2e tests fails on AZP when using the 
{{AdaptiveScheduler}} because the scheduler expects that the parallelism is set 
for all vertices:

{code}
Feb 03 03:45:13 org.apache.flink.runtime.client.JobInitializationException: 
Could not start the JobMaster.
Feb 03 03:45:13 at 
org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
Feb 03 03:45:13 at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
Feb 03 03:45:13 at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
Feb 03 03:45:13 at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
Feb 03 03:45:13 at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)
Feb 03 03:45:13 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
Feb 03 03:45:13 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
Feb 03 03:45:13 at java.lang.Thread.run(Thread.java:748)
Feb 03 03:45:13 Caused by: java.util.concurrent.CompletionException: 
java.lang.IllegalStateException: The adaptive scheduler expects the parallelism 
being set for each JobVertex (violated JobVertex: 
f74b775b58627a33e46b8c155b320255).
Feb 03 03:45:13 at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
Feb 03 03:45:13 at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
Feb 03 03:45:13 at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
Feb 03 03:45:13 ... 3 more
Feb 03 03:45:13 Caused by: java.lang.IllegalStateException: The adaptive 
scheduler expects the parallelism being set for each JobVertex (violated 
JobVertex: f74b775b58627a33e46b8c155b320255).
Feb 03 03:45:13 at 
org.apache.flink.util.Preconditions.checkState(Preconditions.java:215)
Feb 03 03:45:13 at 
org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler.assertPreconditions(AdaptiveScheduler.java:296)
Feb 03 03:45:13 at 
org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler.(AdaptiveScheduler.java:230)
Feb 03 03:45:13 at 
org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerFactory.createInstance(AdaptiveSchedulerFactory.java:122)
Feb 03 03:45:13 at 
org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:115)
Feb 03 03:45:13 at 
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:345)
Feb 03 03:45:13 at 
org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:322)
Feb 03 03:45:13 at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:106)
Feb 03 03:45:13 at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:94)
Feb 03 03:45:13 at 
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
Feb 03 03:45:13 at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
Feb 03 03:45:13 ... 3 more
{code}

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=30662&view=logs&j=fb37c667-81b7-5c22-dd91-846535e99a97&t=39a035c3-c65e-573c-fb66-104c66c28912&l=5782



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] FLIP-211: Kerberos delegation token framework

2022-02-03 Thread Gabor Somogyi
Please see my answers inline. Hope provided satisfying answers to all
questions.

G

On Thu, Feb 3, 2022 at 9:17 AM Chesnay Schepler  wrote:

> I have a few question that I'd appreciate if you could answer them.
>
>1. How does the Provider know whether it is required or not?
>
> All registered providers which are registered properly are going to be
loaded and asked to obtain tokens. Worth to mention every provider
has the right to decide whether it wants to obtain tokens or not (bool
delegationTokensRequired()). For instance if provider detects that
HBase is not on classpath or not configured properly then no tokens are
obtained from that specific provider.

You may ask how a provider is registered. Here it is:
The provider is on classpath + there is a META-INF file which contains the
name of the provider, for example:
META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenProvider



>
>1. How does the configuration of Providers work (how do they get
>access to a configuration)?
>
> Flink configuration is going to be passed to all providers. Please see the
POC here:
https://github.com/apache/flink/compare/master...gaborgsomogyi:dt?expand=1
Service specific configurations are loaded on-the-fly. For example in HBase
case it looks for HBase configuration class which will be instantiated
within the provider.

>
>1. How does a user select providers? (Is it purely based on the
>provider being on the classpath?)
>
> Providers can be explicitly turned off with the following config:
"security.kerberos.tokens.${name}.enabled". I've never seen that 2
different implementation would exist for a specific
external service, but if this edge case would exist then the mentioned
config need to be added, a new provider with a different name need to be
implemented and registered.
All in all we've seen that provider handling is not user specific task but
a cluster admin one. If a specific provider is needed then it's implemented
once per company, registered once
to the clusters and then all users may or may not use the obtained tokens.

Worth to mention the system will know which token need to be used when HDFS
is accessed, this part is automatic.

>
>1. How can a user override an existing provider?
>
> Pease see the previous bulletpoint.

>
>1. What is DelegationTokenProvider#name() used for?
>
> By default all providers which are registered properly (on classpath +
META-INF entry) are on by default. With
"security.kerberos.tokens.${name}.enabled" a specific provider can be
turned off.
Additionally I'm intended to use this in log entries later on for debugging
purposes. For example "hadoopfs provider obtained 2 tokens with ID...".
This would help what and when is happening
with tokens. The same applies to TaskManager side: "2 hadoopfs provider
tokens arrived with ID...". Important to note that the secret part will be
hidden in the mentioned log entries to keep the
attach surface low.

>
>1. What happens if the names of 2 providers are identical?
>
> Presume you mean 2 different classes which both registered and having the
same logic inside. This case both will be loaded and both is going to
obtain token(s) for the same service.
Both obtained token(s) are going to be added to the UGI. As a result the
second will overwrite the first but the order is not defined. Since both
token(s) are valid no matter which one is
used then access to the external system will work.

When the class names are same then service loader only loads a single entry
because services are singletons. That's the reason why state inside
providers are not advised.

>
>1. Will we directly load the provider, or first load a factory
>(usually preferable)?
>
> Intended to load a provider directly by DTM. We can add an extra layer to
have factory but after consideration I came to a conclusion that it would
be and overkill this case.
Please have a look how it's planned to load providers now:
https://github.com/apache/flink/compare/master...gaborgsomogyi:dt?expand=1#diff-d56a0bc77335ff23c0318f6dec1872e7b19b1a9ef6d10fff8fbaab9aecac94faR54-R81


>
>1. What is the Credentials class (it would necessarily have to be a
>public api as well)?
>
> Credentials class is coming from Hadoop. My main intention was not to bind
the implementation to Hadoop completely. It is not possible because of the
following reasons:
* Several functionalities are must because there are no alternatives,
including but not limited to login from keytab, proper TGT cache handling,
passing tokens to Hadoop services like HDFS, HBase, Hive, etc.
* The partial win is that the whole delegation token framework is going to
be initiated if hadoop-common is on classpath (Hadoop is optional in core
libraries)
The possibility to eliminate Credentials from API could be:
* to convert Credentials to byte a

[jira] [Created] (FLINK-25938) SynchronousCheckpointITCase.taskDispatcherThreadPoolAllowsForSynchronousCheckpoints fails on AZP

2022-02-03 Thread Till Rohrmann (Jira)
Till Rohrmann created FLINK-25938:
-

 Summary: 
SynchronousCheckpointITCase.taskDispatcherThreadPoolAllowsForSynchronousCheckpoints
 fails on AZP
 Key: FLINK-25938
 URL: https://issues.apache.org/jira/browse/FLINK-25938
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.14.3
Reporter: Till Rohrmann


The test 
{{SynchronousCheckpointITCase.taskDispatcherThreadPoolAllowsForSynchronousCheckpoints}}
 fails on AZP with

{code}
2022-02-03T02:56:19.1950622Z Feb 03 02:56:19 [ERROR] 
taskDispatcherThreadPoolAllowsForSynchronousCheckpoints  Time elapsed: 10.234 s 
 <<< ERROR!
2022-02-03T02:56:19.1953312Z Feb 03 02:56:19 
org.junit.runners.model.TestTimedOutException: test timed out after 10 seconds
2022-02-03T02:56:19.1954442Z Feb 03 02:56:19at 
java.util.zip.ZipFile.read(Native Method)
2022-02-03T02:56:19.1955186Z Feb 03 02:56:19at 
java.util.zip.ZipFile.access$1400(ZipFile.java:60)
2022-02-03T02:56:19.1956156Z Feb 03 02:56:19at 
java.util.zip.ZipFile$ZipFileInputStream.read(ZipFile.java:734)
2022-02-03T02:56:19.1957004Z Feb 03 02:56:19at 
java.util.zip.ZipFile$ZipFileInflaterInputStream.fill(ZipFile.java:434)
2022-02-03T02:56:19.1957886Z Feb 03 02:56:19at 
java.util.zip.InflaterInputStream.read(InflaterInputStream.java:158)
2022-02-03T02:56:19.1958732Z Feb 03 02:56:19at 
sun.misc.Resource.getBytes(Resource.java:124)
2022-02-03T02:56:19.1959541Z Feb 03 02:56:19at 
java.net.URLClassLoader.defineClass(URLClassLoader.java:463)
2022-02-03T02:56:19.1960342Z Feb 03 02:56:19at 
java.net.URLClassLoader.access$100(URLClassLoader.java:74)
2022-02-03T02:56:19.1961173Z Feb 03 02:56:19at 
java.net.URLClassLoader$1.run(URLClassLoader.java:369)
2022-02-03T02:56:19.1962349Z Feb 03 02:56:19at 
java.net.URLClassLoader$1.run(URLClassLoader.java:363)
2022-02-03T02:56:19.1963187Z Feb 03 02:56:19at 
java.security.AccessController.doPrivileged(Native Method)
2022-02-03T02:56:19.1964157Z Feb 03 02:56:19at 
java.net.URLClassLoader.findClass(URLClassLoader.java:362)
2022-02-03T02:56:19.1964917Z Feb 03 02:56:19at 
java.lang.ClassLoader.loadClass(ClassLoader.java:418)
2022-02-03T02:56:19.1965815Z Feb 03 02:56:19at 
sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
2022-02-03T02:56:19.1966620Z Feb 03 02:56:19at 
java.lang.ClassLoader.loadClass(ClassLoader.java:351)
2022-02-03T02:56:19.1967373Z Feb 03 02:56:19at 
scala.collection.AbstractSeq.(Seq.scala:41)
2022-02-03T02:56:19.1968142Z Feb 03 02:56:19at 
scala.collection.mutable.AbstractSeq.(Seq.scala:48)
2022-02-03T02:56:19.1969039Z Feb 03 02:56:19at 
scala.collection.mutable.StringBuilder.(StringBuilder.scala:32)
2022-02-03T02:56:19.1980285Z Feb 03 02:56:19at 
scala.collection.mutable.StringBuilder.(StringBuilder.scala:49)
2022-02-03T02:56:19.1981000Z Feb 03 02:56:19at 
scala.collection.mutable.StringBuilder.(StringBuilder.scala:54)
2022-02-03T02:56:19.1981572Z Feb 03 02:56:19at 
scala.util.PropertiesTrait$class.$init$(Properties.scala:31)
2022-02-03T02:56:19.1982087Z Feb 03 02:56:19at 
scala.util.Properties$.(Properties.scala:17)
2022-02-03T02:56:19.1982561Z Feb 03 02:56:19at 
scala.util.Properties$.(Properties.scala)
2022-02-03T02:56:19.1983060Z Feb 03 02:56:19at 
scala.tools.nsc.interpreter.IMain$Factory.(IMain.scala:1300)
2022-02-03T02:56:19.1983833Z Feb 03 02:56:19at 
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
2022-02-03T02:56:19.1984904Z Feb 03 02:56:19at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
2022-02-03T02:56:19.1986146Z Feb 03 02:56:19at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
2022-02-03T02:56:19.1987162Z Feb 03 02:56:19at 
java.lang.reflect.Constructor.newInstance(Constructor.java:423)
2022-02-03T02:56:19.1988035Z Feb 03 02:56:19at 
java.lang.Class.newInstance(Class.java:442)
2022-02-03T02:56:19.194Z Feb 03 02:56:19at 
java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:380)
2022-02-03T02:56:19.1989794Z Feb 03 02:56:19at 
java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
2022-02-03T02:56:19.1990590Z Feb 03 02:56:19at 
java.util.ServiceLoader$1.next(ServiceLoader.java:480)
2022-02-03T02:56:19.1991363Z Feb 03 02:56:19at 
javax.script.ScriptEngineManager.initEngines(ScriptEngineManager.java:122)
2022-02-03T02:56:19.1992201Z Feb 03 02:56:19at 
javax.script.ScriptEngineManager.init(ScriptEngineManager.java:84)
2022-02-03T02:56:19.1993066Z Feb 03 02:56:19at 
javax.script.ScriptEngineManager.(ScriptEngineManager.java:61)
2022-02-03T02:56:19.1993870Z Feb 03 02:56:19at 
org.apache.logging.log4j.core.script.ScriptManager.(ScriptManager.java:70)
2022-02-03T02:56:19.1994633Z Feb 03 02:56:19at 
org.a

[jira] [Created] (FLINK-25939) PyFlink YARN per-job on Docker test fails on AZP because it could not acquire all required slots

2022-02-03 Thread Till Rohrmann (Jira)
Till Rohrmann created FLINK-25939:
-

 Summary: PyFlink YARN per-job on Docker test fails on AZP because 
it could not acquire all required slots
 Key: FLINK-25939
 URL: https://issues.apache.org/jira/browse/FLINK-25939
 Project: Flink
  Issue Type: Bug
  Components: API / Python, Deployment / YARN
Affects Versions: 1.14.3
Reporter: Till Rohrmann


The test {{PyFlink YARN per-job on Docker test}} fails on AZP with:

{code}
2022-02-03T03:32:41.0643749Z Feb 03 03:32:40 2022-02-03 03:31:12,281 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source: 
Values(tuples=[[{ 1 }, { 2 }, { 3 }]]) -> Calc(select=[f0]) -> 
PythonCalc(select=[add_one(f0) AS a]) -> Sink: Collect table sink (1/1) 
(1c3df1a51de35e44664305a57f4047a7) switched from SCHEDULED to FAILED on 
[unassigned resource].
2022-02-03T03:32:41.0645152Z Feb 03 03:32:40 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
Slot request bulk is not fulfillable! Could not allocate the required slot 
within slot request timeout
2022-02-03T03:32:41.0647529Z Feb 03 03:32:40at 
org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:86)
 ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
2022-02-03T03:32:41.0648830Z Feb 03 03:32:40at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
~[?:1.8.0_312]
2022-02-03T03:32:41.0649605Z Feb 03 03:32:40at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_312]
2022-02-03T03:32:41.0651215Z Feb 03 03:32:40at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:455)
 ~[flink-rpc-akka_92d95797-5383-46fa-abb4-628d6de63216.jar:1.14-SNAPSHOT]
2022-02-03T03:32:41.0652935Z Feb 03 03:32:40at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
 ~[flink-rpc-akka_92d95797-5383-46fa-abb4-628d6de63216.jar:1.14-SNAPSHOT]
2022-02-03T03:32:41.0654715Z Feb 03 03:32:40at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:455)
 ~[flink-rpc-akka_92d95797-5383-46fa-abb4-628d6de63216.jar:1.14-SNAPSHOT]
2022-02-03T03:32:41.0656399Z Feb 03 03:32:40at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213)
 ~[flink-rpc-akka_92d95797-5383-46fa-abb4-628d6de63216.jar:1.14-SNAPSHOT]
2022-02-03T03:32:41.0657987Z Feb 03 03:32:40at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
 ~[flink-rpc-akka_92d95797-5383-46fa-abb4-628d6de63216.jar:1.14-SNAPSHOT]
2022-02-03T03:32:41.0659528Z Feb 03 03:32:40at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
 ~[flink-rpc-akka_92d95797-5383-46fa-abb4-628d6de63216.jar:1.14-SNAPSHOT]
2022-02-03T03:32:41.0661186Z Feb 03 03:32:40at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) 
[flink-rpc-akka_92d95797-5383-46fa-abb4-628d6de63216.jar:1.14-SNAPSHOT]
2022-02-03T03:32:41.0662745Z Feb 03 03:32:40at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) 
[flink-rpc-akka_92d95797-5383-46fa-abb4-628d6de63216.jar:1.14-SNAPSHOT]
2022-02-03T03:32:41.0664203Z Feb 03 03:32:40at 
scala.PartialFunction.applyOrElse(PartialFunction.scala:123) 
[flink-rpc-akka_92d95797-5383-46fa-abb4-628d6de63216.jar:1.14-SNAPSHOT]
2022-02-03T03:32:41.0665591Z Feb 03 03:32:40at 
scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) 
[flink-rpc-akka_92d95797-5383-46fa-abb4-628d6de63216.jar:1.14-SNAPSHOT]
2022-02-03T03:32:41.0667179Z Feb 03 03:32:40at 
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) 
[flink-rpc-akka_92d95797-5383-46fa-abb4-628d6de63216.jar:1.14-SNAPSHOT]
2022-02-03T03:32:41.0668708Z Feb 03 03:32:40at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
[flink-rpc-akka_92d95797-5383-46fa-abb4-628d6de63216.jar:1.14-SNAPSHOT]
2022-02-03T03:32:41.0670130Z Feb 03 03:32:40at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
[flink-rpc-akka_92d95797-5383-46fa-abb4-628d6de63216.jar:1.14-SNAPSHOT]
2022-02-03T03:32:41.0671634Z Feb 03 03:32:40at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
[flink-rpc-akka_92d95797-5383-46fa-abb4-628d6de63216.jar:1.14-SNAPSHOT]
2022-02-03T03:32:41.0672985Z Feb 03 03:32:40at 
akka.actor.Actor.aroundReceive(Actor.scala:537) 
[flink-rpc-akka_92d95797-5383-46fa-abb4-628d6de63216.jar:1.14-SNAPSHOT]
2022-02-03T03:32:41.0674418Z Feb 03 03:32:40at 
akka.actor.Actor.aroundReceive$(Actor.scala:535) 
[flink-rpc-akka_92d95797-5383-46fa-abb4-628d6de63216.jar:1.14-SNAPSHOT]
2022-02-03T03:32:41.0675750Z Feb 03 03:32:40at 
akka.actor.AbstractActor.aroundReceive(AbstractActor.scala

[jira] [Created] (FLINK-25940) pyflink/datastream/tests/test_data_stream.py::StreamingModeDataStreamTests::test_keyed_process_function_with_state failed on AZP

2022-02-03 Thread Till Rohrmann (Jira)
Till Rohrmann created FLINK-25940:
-

 Summary: 
pyflink/datastream/tests/test_data_stream.py::StreamingModeDataStreamTests::test_keyed_process_function_with_state
 failed on AZP
 Key: FLINK-25940
 URL: https://issues.apache.org/jira/browse/FLINK-25940
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.15.0
Reporter: Till Rohrmann


The test 
{{pyflink/datastream/tests/test_data_stream.py::StreamingModeDataStreamTests::test_keyed_process_function_with_state}}
 fails on AZP:

{code}
2022-02-02T17:44:12.1898582Z Feb 02 17:44:12 
=== FAILURES ===
2022-02-02T17:44:12.1899860Z Feb 02 17:44:12 _ 
StreamingModeDataStreamTests.test_keyed_process_function_with_state __
2022-02-02T17:44:12.1900493Z Feb 02 17:44:12 
2022-02-02T17:44:12.1901218Z Feb 02 17:44:12 self = 

2022-02-02T17:44:12.1901948Z Feb 02 17:44:12 
2022-02-02T17:44:12.1902745Z Feb 02 17:44:12 def 
test_keyed_process_function_with_state(self):
2022-02-02T17:44:12.1903722Z Feb 02 17:44:12 
self.env.get_config().set_auto_watermark_interval(2000)
2022-02-02T17:44:12.1904473Z Feb 02 17:44:12 
self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
2022-02-02T17:44:12.1906780Z Feb 02 17:44:12 data_stream = 
self.env.from_collection([(1, 'hi', '1603708211000'),
2022-02-02T17:44:12.1908034Z Feb 02 17:44:12
 (2, 'hello', '1603708224000'),
2022-02-02T17:44:12.1909166Z Feb 02 17:44:12
 (3, 'hi', '1603708226000'),
2022-02-02T17:44:12.1910122Z Feb 02 17:44:12
 (4, 'hello', '1603708289000'),
2022-02-02T17:44:12.1911099Z Feb 02 17:44:12
 (5, 'hi', '1603708291000'),
2022-02-02T17:44:12.1912451Z Feb 02 17:44:12
 (6, 'hello', '1603708293000')],
2022-02-02T17:44:12.1913456Z Feb 02 17:44:12
type_info=Types.ROW([Types.INT(), Types.STRING(),
2022-02-02T17:44:12.1914338Z Feb 02 17:44:12
 Types.STRING()]))
2022-02-02T17:44:12.1914811Z Feb 02 17:44:12 
2022-02-02T17:44:12.1915317Z Feb 02 17:44:12 class 
MyTimestampAssigner(TimestampAssigner):
2022-02-02T17:44:12.1915724Z Feb 02 17:44:12 
2022-02-02T17:44:12.1916782Z Feb 02 17:44:12 def 
extract_timestamp(self, value, record_timestamp) -> int:
2022-02-02T17:44:12.1917621Z Feb 02 17:44:12 return 
int(value[2])
2022-02-02T17:44:12.1918262Z Feb 02 17:44:12 
2022-02-02T17:44:12.1918855Z Feb 02 17:44:12 class 
MyProcessFunction(KeyedProcessFunction):
2022-02-02T17:44:12.1919363Z Feb 02 17:44:12 
2022-02-02T17:44:12.1919744Z Feb 02 17:44:12 def __init__(self):
2022-02-02T17:44:12.1920143Z Feb 02 17:44:12 self.value_state = 
None
2022-02-02T17:44:12.1920648Z Feb 02 17:44:12 self.list_state = 
None
2022-02-02T17:44:12.1921298Z Feb 02 17:44:12 self.map_state = 
None
2022-02-02T17:44:12.1921864Z Feb 02 17:44:12 
2022-02-02T17:44:12.1922479Z Feb 02 17:44:12 def open(self, 
runtime_context: RuntimeContext):
2022-02-02T17:44:12.1923907Z Feb 02 17:44:12 
value_state_descriptor = ValueStateDescriptor('value_state', Types.INT())
2022-02-02T17:44:12.1924922Z Feb 02 17:44:12 self.value_state = 
runtime_context.get_state(value_state_descriptor)
2022-02-02T17:44:12.1925741Z Feb 02 17:44:12 
list_state_descriptor = ListStateDescriptor('list_state', Types.INT())
2022-02-02T17:44:12.1926482Z Feb 02 17:44:12 self.list_state = 
runtime_context.get_list_state(list_state_descriptor)
2022-02-02T17:44:12.1927465Z Feb 02 17:44:12 
map_state_descriptor = MapStateDescriptor('map_state', Types.INT(), 
Types.STRING())
2022-02-02T17:44:12.1927998Z Feb 02 17:44:12 state_ttl_config = 
StateTtlConfig \
2022-02-02T17:44:12.1928444Z Feb 02 17:44:12 
.new_builder(Time.seconds(1)) \
2022-02-02T17:44:12.1928943Z Feb 02 17:44:12 
.set_update_type(StateTtlConfig.UpdateType.OnReadAndWrite) \
2022-02-02T17:44:12.1929462Z Feb 02 17:44:12 
.set_state_visibility(
2022-02-02T17:44:12.1929939Z Feb 02 17:44:12 
StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp) \
2022-02-02T17:44:12.1930601Z Feb 02 17:44:12 
.disable_cleanup_in_background() \
2022-02-02T17:44:12.1931032Z Feb 02 17:44:12 .build()
2022-02-02T17:44:12.1931480Z Feb 02 17:44:12 
map_state_descriptor.enable_time_to_live(state_ttl_co

[jira] [Created] (FLINK-25941) KafkaSinkITCase.testAbortTransactionsAfterScaleInBeforeFirstCheckpoint fails on AZP

2022-02-03 Thread Till Rohrmann (Jira)
Till Rohrmann created FLINK-25941:
-

 Summary: 
KafkaSinkITCase.testAbortTransactionsAfterScaleInBeforeFirstCheckpoint fails on 
AZP
 Key: FLINK-25941
 URL: https://issues.apache.org/jira/browse/FLINK-25941
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.15.0
Reporter: Till Rohrmann


The test 
{{KafkaSinkITCase.testAbortTransactionsAfterScaleInBeforeFirstCheckpoint}} 
fails on AZP with

{code}
2022-02-02T17:22:29.5131631Z Feb 02 17:22:29 [ERROR] 
org.apache.flink.connector.kafka.sink.KafkaSinkITCase.testAbortTransactionsAfterScaleInBeforeFirstCheckpoint
  Time elapsed: 2.186 s  <<< FAILURE!
2022-02-02T17:22:29.5146972Z Feb 02 17:22:29 java.lang.AssertionError
2022-02-02T17:22:29.5148918Z Feb 02 17:22:29at 
org.junit.Assert.fail(Assert.java:87)
2022-02-02T17:22:29.5149843Z Feb 02 17:22:29at 
org.junit.Assert.assertTrue(Assert.java:42)
2022-02-02T17:22:29.5150644Z Feb 02 17:22:29at 
org.junit.Assert.assertTrue(Assert.java:53)
2022-02-02T17:22:29.5151730Z Feb 02 17:22:29at 
org.apache.flink.connector.kafka.sink.KafkaSinkITCase.testAbortTransactionsAfterScaleInBeforeFirstCheckpoint(KafkaSinkITCase.java:267)
2022-02-02T17:22:29.5152858Z Feb 02 17:22:29at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
2022-02-02T17:22:29.5153757Z Feb 02 17:22:29at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
2022-02-02T17:22:29.5155002Z Feb 02 17:22:29at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
2022-02-02T17:22:29.5156464Z Feb 02 17:22:29at 
java.lang.reflect.Method.invoke(Method.java:498)
2022-02-02T17:22:29.5157384Z Feb 02 17:22:29at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
2022-02-02T17:22:29.5158445Z Feb 02 17:22:29at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
2022-02-02T17:22:29.5159478Z Feb 02 17:22:29at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
2022-02-02T17:22:29.5160524Z Feb 02 17:22:29at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
2022-02-02T17:22:29.5161758Z Feb 02 17:22:29at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
2022-02-02T17:22:29.5162775Z Feb 02 17:22:29at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
2022-02-02T17:22:29.5163744Z Feb 02 17:22:29at 
org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
2022-02-02T17:22:29.5164913Z Feb 02 17:22:29at 
org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
2022-02-02T17:22:29.5166101Z Feb 02 17:22:29at 
org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
2022-02-02T17:22:29.5167030Z Feb 02 17:22:29at 
org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
2022-02-02T17:22:29.5167953Z Feb 02 17:22:29at 
org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
2022-02-02T17:22:29.5168956Z Feb 02 17:22:29at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
2022-02-02T17:22:29.5169936Z Feb 02 17:22:29at 
org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
2022-02-02T17:22:29.5170903Z Feb 02 17:22:29at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
2022-02-02T17:22:29.5171953Z Feb 02 17:22:29at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
2022-02-02T17:22:29.5172919Z Feb 02 17:22:29at 
org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
2022-02-02T17:22:29.5173811Z Feb 02 17:22:29at 
org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
2022-02-02T17:22:29.5174874Z Feb 02 17:22:29at 
org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
2022-02-02T17:22:29.5175917Z Feb 02 17:22:29at 
org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
2022-02-02T17:22:29.5176851Z Feb 02 17:22:29at 
org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
2022-02-02T17:22:29.5177816Z Feb 02 17:22:29at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
2022-02-02T17:22:29.5178816Z Feb 02 17:22:29at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
2022-02-02T17:22:29.5179929Z Feb 02 17:22:29at 
org.testcontainers.containers.FailureDetectingExternalResource$1.evaluate(FailureDetectingExternalResource.java:30)
2022-02-02T17:22:29.5180960Z Feb 02 17:22:29at 
org.junit.rules.RunRules.evaluate(RunRules.java:20)
2022-02-02T17:22:29.5181827Z Feb 02 17:22:29at 
org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
2022-02-02T17:22:29.5182733Z Feb 02 17:22:29at 
org.junit.runners

[jira] [Created] (FLINK-25942) Use jackson jdk8/time modules for Duration ser/de

2022-02-03 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-25942:
---

 Summary: Use jackson jdk8/time modules for Duration ser/de
 Key: FLINK-25942
 URL: https://issues.apache.org/jira/browse/FLINK-25942
 Project: Flink
  Issue Type: Sub-task
Reporter: Francesco Guardiani
Assignee: Francesco Guardiani


https://issues.apache.org/jira/browse/FLINK-25588 introduced jackson jdk8 and 
datetime modules to flink-shaded jackson, so now we don't need our Duration 
ser/de anymore (which was introduced because we lacked this jackson module).



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25943) New Kinesis, Firehose to provide a state serializer

2022-02-03 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-25943:
--

 Summary: New Kinesis, Firehose to provide a state serializer
 Key: FLINK-25943
 URL: https://issues.apache.org/jira/browse/FLINK-25943
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / Kinesis
Reporter: Zichen Liu
Assignee: Ahmed Hamdy
 Fix For: 1.15.0


h2. Motivation

*User stories:*
As a Flink user, I’d like to use the Table API for the new Kinesis Data Streams 
 sink.

*Scope:*
 * Introduce {{AsyncDynamicTableSink}} that enables Sinking Tables into Async 
Implementations.
 * Implement a new {{KinesisDynamicTableSink}} that uses 
{{KinesisDataStreamSink}} Async Implementation and implements 
{{{}AsyncDynamicTableSink{}}}.
 * The implementation introduces Async Sink configurations as optional options 
in the table definition, with default values derived from the 
{{KinesisDataStream}} default values.
 * Unit/Integration testing. modify KinesisTableAPI tests for the new 
implementation, add unit tests for {{AsyncDynamicTableSink}} and 
{{KinesisDynamicTableSink}} and {{{}KinesisDynamicTableSinkFactory{}}}.
 * Java / code-level docs.

h2. References

More details to be found 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]

 

*Update:*

^^ Status Update ^^
__List of all work outstanding for 1.15 release__

[Merged] https://github.com/apache/flink/pull/18165 - KDS DataStream Docs
[Merged] https://github.com/apache/flink/pull/18396 - [hotfix] for infinte loop 
if not flushing during commit
[Merged] https://github.com/apache/flink/pull/18421 - Mark Kinesis Producer as 
deprecated (Prod: FLINK-24227)
[Merged] https://github.com/apache/flink/pull/18348 - KDS Table API Sink & Docs
[Merged] https://github.com/apache/flink/pull/18488 - base sink retry entries 
in order not in reverse
[Merged] https://github.com/apache/flink/pull/18512 - changing failed requests 
handler to accept List in AsyncSinkWriter
[Merged] https://github.com/apache/flink/pull/18483 - Do not expose the element 
converter
[Merged] https://github.com/apache/flink/pull/18468 - Adding Kinesis data 
streams sql uber-jar

Ready for review:
[SUCCESS ] https://github.com/apache/flink/pull/18314 - KDF DataStream Sink & 
Docs
[BLOCKED on ^^ ] https://github.com/apache/flink/pull/18426 - rename 
flink-connector-aws-kinesis-data-* to flink-connector-aws-kinesis-* (module 
names) and KinesisData*Sink to Kinesis*Sink (class names)

Pending PR:
* Firehose Table API Sink & Docs
* KDF Table API SQL jar

TBD:
* FLINK-25846: Not shutting down
* FLINK-25848: Validation during start up
* FLINK-25792: flush() bug
* FLINK-25793: throughput exceeded
* Update the defaults of KDS sink and update the docs + do the same for KDF
* add a `AsyncSinkCommonConfig` class (to wrap the 6 params) to the 
`flink-connector-base` and propagate it to the two connectors
- feature freeze
* KDS performance testing
* KDF performance testing
* Clone the new docs to .../contents.zh/... and add the location to the 
corresponding Chinese translation jira - KDS -
* Rename [Amazon AWS Kinesis Streams] to [Amazon Kinesis Data Streams] in docs 
(legacy issue)
- Flink 1.15 release
* KDS end to end sanity test - hits aws apis rather than local docker images
* KDS Python wrappers
* FLINK-25733 - Create A migration guide for Kinesis Table API connector - can 
happen after 1.15
* If `endpoint` is provided, `region` should not be required like it currently 
is
* Test if Localstack container requires the 1ms timeout
* Adaptive level of logging (in discussion)

FYI:
* FLINK-25661 - Add Custom Fatal Exception handler in AsyncSinkWriter - 
https://github.com/apache/flink/pull/18449
* https://issues.apache.org/jira/browse/FLINK-24229 DDB Sink

Chinese translation:
https://issues.apache.org/jira/browse/FLINK-25735 - KDS DataStream Sink
https://issues.apache.org/jira/browse/FLINK-25736 - KDS Table API Sink
https://issues.apache.org/jira/browse/FLINK-25737 - KDF DataStream Sink



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] FLIP-211: Kerberos delegation token framework

2022-02-03 Thread Chesnay Schepler

Thanks for answering the questions!

1) Does the HBase provider require HBase to be on the classpath?
    If so, then could it even be loaded if Hbase is on the classpath?
    If not, then you're assuming the classpath of the JM/TM to be the 
same, which isn't necessarily true (in general; and also if Hbase is 
loaded from the user-jar).
2) None of the /Providers/ in your PoC get access to the configuration. 
Only the /Manager/ is. Note that I do not know whether there is a need 
for the providers to have access to the config, as that's very 
implementation specific I suppose.
10) I'm not sure myself. It could be something as trivial as creating 
some temporary directory in HDFS I suppose.


On 03/02/2022 10:23, Gabor Somogyi wrote:

Please see my answers inline. Hope provided satisfying answers to all
questions.

G

On Thu, Feb 3, 2022 at 9:17 AM Chesnay Schepler  wrote:


I have a few question that I'd appreciate if you could answer them.

1. How does the Provider know whether it is required or not?

All registered providers which are registered properly are going to be

loaded and asked to obtain tokens. Worth to mention every provider
has the right to decide whether it wants to obtain tokens or not (bool
delegationTokensRequired()). For instance if provider detects that
HBase is not on classpath or not configured properly then no tokens are
obtained from that specific provider.

You may ask how a provider is registered. Here it is:
The provider is on classpath + there is a META-INF file which contains the
name of the provider, for example:
META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenProvider




1. How does the configuration of Providers work (how do they get
access to a configuration)?

Flink configuration is going to be passed to all providers. Please see the

POC here:
https://github.com/apache/flink/compare/master...gaborgsomogyi:dt?expand=1
Service specific configurations are loaded on-the-fly. For example in HBase
case it looks for HBase configuration class which will be instantiated
within the provider.


1. How does a user select providers? (Is it purely based on the
provider being on the classpath?)

Providers can be explicitly turned off with the following config:

"security.kerberos.tokens.${name}.enabled". I've never seen that 2
different implementation would exist for a specific
external service, but if this edge case would exist then the mentioned
config need to be added, a new provider with a different name need to be
implemented and registered.
All in all we've seen that provider handling is not user specific task but
a cluster admin one. If a specific provider is needed then it's implemented
once per company, registered once
to the clusters and then all users may or may not use the obtained tokens.

Worth to mention the system will know which token need to be used when HDFS
is accessed, this part is automatic.


1. How can a user override an existing provider?

Pease see the previous bulletpoint.
1. What is DelegationTokenProvider#name() used for?

By default all providers which are registered properly (on classpath +

META-INF entry) are on by default. With
"security.kerberos.tokens.${name}.enabled" a specific provider can be
turned off.
Additionally I'm intended to use this in log entries later on for debugging
purposes. For example "hadoopfs provider obtained 2 tokens with ID...".
This would help what and when is happening
with tokens. The same applies to TaskManager side: "2 hadoopfs provider
tokens arrived with ID...". Important to note that the secret part will be
hidden in the mentioned log entries to keep the
attach surface low.


1. What happens if the names of 2 providers are identical?

Presume you mean 2 different classes which both registered and having the

same logic inside. This case both will be loaded and both is going to
obtain token(s) for the same service.
Both obtained token(s) are going to be added to the UGI. As a result the
second will overwrite the first but the order is not defined. Since both
token(s) are valid no matter which one is
used then access to the external system will work.

When the class names are same then service loader only loads a single entry
because services are singletons. That's the reason why state inside
providers are not advised.


1. Will we directly load the provider, or first load a factory
(usually preferable)?

Intended to load a provider directly by DTM. We can add an extra layer to

have factory but after consideration I came to a conclusion that it would
be and overkill this case.
Please have a look how it's planned to load providers now:
https://github.com/apache/flink/compare/master...gaborgsomogyi:dt?expand=1#diff-d56a0bc77335ff23c0318f6dec1872e7b19b1a9ef6d10fff8fbaab9aecac94faR54-R81



1. What is the Creden

[jira] [Created] (FLINK-25944) Intermittent Failures on KDF AZP

2022-02-03 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-25944:
--

 Summary: Intermittent Failures on KDF AZP
 Key: FLINK-25944
 URL: https://issues.apache.org/jira/browse/FLINK-25944
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / Kinesis
Reporter: Zichen Liu
Assignee: Ahmed Hamdy
 Fix For: 1.15.0


AsyncSinkWriter implements snapshotState to write the pending request into 
state but none of the implementation (Kinesis, Firehose) provides a state 
serializer nor interacts with the recovered state.

 

* Implement 
{code:java}
getWriterStateSerializer{code}
 for the Kinesis/Firehose Sinks



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25945) Intermittent Failures on KDF AZP 2

2022-02-03 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-25945:
--

 Summary: Intermittent Failures on KDF AZP 2
 Key: FLINK-25945
 URL: https://issues.apache.org/jira/browse/FLINK-25945
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / Kinesis
Reporter: Zichen Liu
Assignee: Ahmed Hamdy
 Fix For: 1.15.0


Problem: ci fails on [#18553|https://github.com/apache/flink/pull/18553] and 
issue is not reproducible locally. Furthermore the failure is intermittent, 
frequency c. 1 in 5(?). Resolution: unknown - theory is to use HTTP1_1 as 
protocol.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25946) table-planner-loader jar NOTICE should list Scala

2022-02-03 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-25946:


 Summary: table-planner-loader jar NOTICE should list Scala
 Key: FLINK-25946
 URL: https://issues.apache.org/jira/browse/FLINK-25946
 Project: Flink
  Issue Type: Technical Debt
  Components: Build System, Table SQL / Planner
Affects Versions: 1.15.0
Reporter: Chesnay Schepler
 Fix For: 1.15.0






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25947) License checker doesn't flink-table-planner

2022-02-03 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-25947:


 Summary: License checker doesn't flink-table-planner
 Key: FLINK-25947
 URL: https://issues.apache.org/jira/browse/FLINK-25947
 Project: Flink
  Issue Type: Technical Debt
  Components: Build System
Affects Versions: 1.15.0
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.15.0






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] FLIP-211: Kerberos delegation token framework

2022-02-03 Thread Gabor Somogyi
Thanks for the quick response!
Appreciate your invested time...

G

On Thu, Feb 3, 2022 at 11:12 AM Chesnay Schepler  wrote:

> Thanks for answering the questions!
>
> 1) Does the HBase provider require HBase to be on the classpath?
>

To be instantiated no, to obtain a token yes.


> If so, then could it even be loaded if Hbase is on the classpath?
>

The provider can be loaded but inside the provider it would detect whether
HBase is on classpath.
Just to be crystal clear here this is the actual implementation what I
would like to take over into the Provider.
Please see:
https://github.com/apache/flink/blob/e6210d40491ff28c779b8604e425f01983f8a3d7/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java#L243-L254

I've considered to load only the necessary Providers but that would mean a
generic Manager need to know that if the newly loaded Provider is
instanceof HBaseDelegationTokenProvider, then it need to be skipped.
I think it would add unnecessary complexity to the Manager and it would
contain ugly code parts(at least in my view ugly), like this
if (provider instanceof HBaseDelegationTokenProvider &&
hbaseIsNotOnClasspath()) {
  // Skip intentionally
} else if (provider instanceof SomethingElseDelegationTokenProvider &&
somethingElseIsNotOnClasspath()) {
  // Skip intentionally
} else {
  providers.put(provider.serviceName(), provider);
}
I think the least code and most clear approach is to load the providers and
decide inside whether everything is given to obtain a token.

If not, then you're assuming the classpath of the JM/TM to be the same,
> which isn't necessarily true (in general; and also if Hbase is loaded from
> the user-jar).
>

I'm not assuming that the classpath of JM/TM must be the same. If the HBase
jar is coming from the user-jar then the HBase code is going to use UGI
within the JVM when authentication required.
Of course I've not yet tested within Flink but in Spark it is working fine.
All in all JM/TM classpath may be different but on both side HBase jar must
exists somehow.


> 2) None of the *Providers* in your PoC get access to the configuration.
> Only the *Manager* is. Note that I do not know whether there is a need
> for the providers to have access to the config, as that's very
> implementation specific I suppose.
>

You're right. Since this is just a POC and I don't have green light I've
not put too many effort for a proper
self-review. DelegationTokenProvider#init method must get Flink
configuration.
The reason behind is that several further configuration can be find out
using that. A good example is to get Hadoop conf.
The rationale behind is the same just like before, it would be good to
create a generic Manager as possible.
To be more specific some code must load Hadoop conf which could be the
Manager or the Provider.
If the manager does that then the generic Manager must be modified all the
time when something special thing is needed for a new provider.
This could be super problematic when a custom provider is written.


> 10) I'm not sure myself. It could be something as trivial as creating some
> temporary directory in HDFS I suppose.
>

I've not found of such task.YARN and K8S are not expecting such things from
executors and workloads are not yet running until the initial token set is
not propagated.


>
> On 03/02/2022 10:23, Gabor Somogyi wrote:
>
> Please see my answers inline. Hope provided satisfying answers to all
> questions.
>
> G
>
> On Thu, Feb 3, 2022 at 9:17 AM Chesnay Schepler  
>  wrote:
>
>
> I have a few question that I'd appreciate if you could answer them.
>
>1. How does the Provider know whether it is required or not?
>
> All registered providers which are registered properly are going to be
>
> loaded and asked to obtain tokens. Worth to mention every provider
> has the right to decide whether it wants to obtain tokens or not (bool
> delegationTokensRequired()). For instance if provider detects that
> HBase is not on classpath or not configured properly then no tokens are
> obtained from that specific provider.
>
> You may ask how a provider is registered. Here it is:
> The provider is on classpath + there is a META-INF file which contains the
> name of the provider, for example:
> META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenProvider
>  
> 
>
> 1. How does the configuration of Providers work (how do they get
>access to a configuration)?
>
> Flink configuration is going to be passed to all providers. Please see the
>
> POC 
> here:https://github.com/apache/flink/compare/master...gaborgsomogyi:dt?expand=1
> Service specific configurations are loaded on-the-fly. For example in HBase
> case it looks for HBase confi

[jira] [Created] (FLINK-25948) KDS / KDF Sink should call .close() to clean up resources

2022-02-03 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-25948:
--

 Summary: KDS / KDF Sink should call .close() to clean up resources
 Key: FLINK-25948
 URL: https://issues.apache.org/jira/browse/FLINK-25948
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Common
Affects Versions: 1.15.0
Reporter: Zichen Liu
Assignee: Ahmed Hamdy
 Fix For: 1.15.0


Intermittent failures introduced as part of merge (PR#18314: 
[FLINK-24228[connectors/firehose] - Unified Async Sink for Kinesis 
Firehose|https://github.com/apache/flink/pull/18314]):
 # Failures are intermittent and affecting c. 1 in 7 of builds- on 
{{flink-ci.flink}} and {{flink-ci.flink-master-mirror}} .
 # The issue looks identical to the KinesaliteContainer startup issue (Appendix 
1).
 # I have managed to reproduce the issue locally - if I start some parallel 
containers and keep them running - and then run {{KinesisFirehoseSinkITCase}}  
then c. 1 in 6 gives the error.
 # The errors have a slightly different appearance on 
{{flink-ci.flink-master-mirror}} vs {{flink-ci.flink}} which has the same 
appearance as local. I only hope it is a difference in logging/killing 
environment variables. (and that there aren’t 2 distinct issues)

Appendix 1:
{code:java}
org.testcontainers.containers.ContainerLaunchException: Container startup failed

at 
org.testcontainers.containers.GenericContainer.doStart(GenericContainer.java:336)
at 
org.testcontainers.containers.GenericContainer.start(GenericContainer.java:317)
at 
org.testcontainers.containers.GenericContainer.starting(GenericContainer.java:1066)
at 
... 11 more
Caused by: org.testcontainers.containers.ContainerLaunchException: Could not 
create/start container
at 
org.testcontainers.containers.GenericContainer.tryStart(GenericContainer.java:525)
at 
org.testcontainers.containers.GenericContainer.lambda$doStart$0(GenericContainer.java:331)
at 
org.rnorth.ducttape.unreliables.Unreliables.retryUntilSuccess(Unreliables.java:81)
... 12 more
Caused by: org.rnorth.ducttape.TimeoutException: Timeout waiting for result 
with exception
at 
org.rnorth.ducttape.unreliables.Unreliables.retryUntilSuccess(Unreliables.java:54)
at
{code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] FLIP-211: Kerberos delegation token framework

2022-02-03 Thread Chesnay Schepler

1)
The manager certainly shouldn't check for specific implementations.
The problem with classpath-based checks is it can easily happen that the 
provider can't be loaded in the first place (e.g., if you don't use 
reflection, which you currently kinda force), and in that case Flink 
can't tell whether the token is not required or the cluster isn't set up 
correctly.
As I see it we shouldn't try to be clever; if the users wants kerberos, 
then have him enable the providers. Any error in loading the provider 
(be it by accident or explicit checks) then is a setup error and we can 
fail the cluster.
If we still want to auto-detect whether the provider should be used, 
note that using factories would make this easier; the factory can check 
the classpath (not having any direct dependencies on HBase avoids the 
case above), and the provider no longer needs reflection because it will 
only be used iff HBase is on the CP.


Requiring HBase (and Hadoop for that matter) to be on the JM system 
classpath would be a bit unfortunate. Have you considered loading the 
providers as plugins?


2) > DelegationTokenProvider#init method

This is missing from the FLIP. From my experience with the metric 
reporters, having the implementation rely on the configuration is really 
annoying for testing purposes. That's why I suggested factories; they 
can take care of extracting all parameters that the implementation 
needs, and then pass them nicely via the constructor. This also implies 
that any fields of the provider wouldn't inherently have to be mutable.


> workloads are not yet running until the initial token set is not 
propagated.


This isn't necessarily true. It can happen that tasks are being deployed 
to the TM without it having registered with the RM; there is currently 
no requirement that a TM must be registered before it may offer slots / 
accept task submissions.
One examples is a TM restart + local recovery, where the TM eagerly 
offers the previous set of slots to the leading JM.


On 03/02/2022 12:39, Gabor Somogyi wrote:

Thanks for the quick response!
Appreciate your invested time...

G

On Thu, Feb 3, 2022 at 11:12 AM Chesnay Schepler  
wrote:


Thanks for answering the questions!

1) Does the HBase provider require HBase to be on the classpath?


To be instantiated no, to obtain a token yes.

    If so, then could it even be loaded if Hbase is on the classpath?


The provider can be loaded but inside the provider it would detect 
whether HBase is on classpath.
Just to be crystal clear here this is the actual implementation what I 
would like to take over into the Provider.
Please see: 
https://github.com/apache/flink/blob/e6210d40491ff28c779b8604e425f01983f8a3d7/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java#L243-L254


I've considered to load only the necessary Providers but that would 
mean a generic Manager need to know that if the newly loaded Provider 
is instanceof HBaseDelegationTokenProvider, then it need to be skipped.
I think it would add unnecessary complexity to the Manager and it 
would contain ugly code parts(at least in my view ugly), like this
if (provider instanceof HBaseDelegationTokenProvider && 
hbaseIsNotOnClasspath()) {

  // Skip intentionally
} else if (provider instanceof SomethingElseDelegationTokenProvider && 
somethingElseIsNotOnClasspath()) {

  // Skip intentionally
} else {
  providers.put(provider.serviceName(), provider);
}
I think the least code and most clear approach is to load the 
providers and decide inside whether everything is given to obtain a token.


    If not, then you're assuming the classpath of the JM/TM to be
the same, which isn't necessarily true (in general; and also if
Hbase is loaded from the user-jar).


I'm not assuming that the classpath of JM/TM must be the same. If the 
HBase jar is coming from the user-jar then the HBase code is going to 
use UGI within the JVM when authentication required.
Of course I've not yet tested within Flink but in Spark it is working 
fine.
All in all JM/TM classpath may be different but on both side HBase jar 
must exists somehow.


2) None of the /Providers/ in your PoC get access to the
configuration. Only the /Manager/ is. Note that I do not know
whether there is a need for the providers to have access to the
config, as that's very implementation specific I suppose.


You're right. Since this is just a POC and I don't have green light 
I've not put too many effort for a proper 
self-review. DelegationTokenProvider#init method must get Flink 
configuration.
The reason behind is that several further configuration can be find 
out using that. A good example is to get Hadoop conf.
The rationale behind is the same just like before, it would be good to 
create a generic Manager as possible.
To be more specific some code must load Hadoop conf which could be the 
Manager or the Provider.
If the manager does that then the generic Manager must be modified all 
the time when s

[jira] [Created] (FLINK-25949) [FLIP-171] Kinesis Firehose sink builder falls back to wrong http protocol.

2022-02-03 Thread Ahmed Hamdy (Jira)
Ahmed Hamdy created FLINK-25949:
---

 Summary: [FLIP-171] Kinesis Firehose sink builder falls back to 
wrong http protocol.
 Key: FLINK-25949
 URL: https://issues.apache.org/jira/browse/FLINK-25949
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kinesis
Affects Versions: 1.15.0
Reporter: Ahmed Hamdy






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] FLIP-211: Kerberos delegation token framework

2022-02-03 Thread Gabor Somogyi
> Any error in loading the provider (be it by accident or explicit checks)
then is a setup error and we can fail the cluster.

Fail fast is a good direction in my view. In Spark I wanted to go to this
direction but there were other opinions so there if a provider is not
loaded then the workload goes further.
Of course the processing will fail if the token is missing...

> Requiring HBase (and Hadoop for that matter) to be on the JM system
classpath would be a bit unfortunate. Have you considered loading the
providers as plugins?

Even if it's unfortunate the actual implementation is depending on that
already. Moving HBase and/or all token providers into plugins is a
possibility.
That way if one wants to use a specific provider then a plugin need to be
added. If we would like to go to this direction I would do that in a
separate
FLIP not to have feature creep here. The actual FLIP already covers several
thousand lines of code changes.

> This is missing from the FLIP. From my experience with the metric
reporters, having the implementation rely on the configuration is really
annoying for testing purposes. That's why I suggested factories; they can
take care of extracting all parameters that the implementation needs, and
then pass them nicely via the constructor.

ServiceLoader provided services must have a norarg constructor where no
parameters can be passed.
As a side note testing delegation token providers is pain in the ass and
not possible with automated tests without creating a fully featured
kerberos cluster with KDC, HDFS, HBase, Kafka, etc..
We've had several tries in Spark but then gave it up because of the
complexity and the flakyness of it so I wouldn't care much about unit
testing.
The sad truth is that most of the token providers can be tested manually on
cluster.

Of course this doesn't mean that the whole code is not intended to be
covered with tests. I mean couple of parts can be automatically tested but
providers are not such.

> This also implies that any fields of the provider wouldn't inherently
have to be mutable.

I think this is not an issue. A provider connects to a service, obtains
token(s) and then close the connection and never seen the need of an
intermediate state.
I've just mentioned the singleton behavior to be clear.

> One examples is a TM restart + local recovery, where the TM eagerly
offers the previous set of slots to the leading JM.

Just to learn something new. I think local recovery is clear to me which is
not touching external systems like Kafka or so (correct me if I'm wrong).
Is it possible that such case the user code just starts to run blindly w/o
JM coordination and connects to external systems to do data processing?


On Thu, Feb 3, 2022 at 1:09 PM Chesnay Schepler  wrote:

> 1)
> The manager certainly shouldn't check for specific implementations.
> The problem with classpath-based checks is it can easily happen that the
> provider can't be loaded in the first place (e.g., if you don't use
> reflection, which you currently kinda force), and in that case Flink can't
> tell whether the token is not required or the cluster isn't set up
> correctly.
> As I see it we shouldn't try to be clever; if the users wants kerberos,
> then have him enable the providers. Any error in loading the provider (be
> it by accident or explicit checks) then is a setup error and we can fail
> the cluster.
> If we still want to auto-detect whether the provider should be used, note
> that using factories would make this easier; the factory can check the
> classpath (not having any direct dependencies on HBase avoids the case
> above), and the provider no longer needs reflection because it will only be
> used iff HBase is on the CP.
>
> Requiring HBase (and Hadoop for that matter) to be on the JM system
> classpath would be a bit unfortunate. Have you considered loading the
> providers as plugins?
>
> 2) > DelegationTokenProvider#init method
>
> This is missing from the FLIP. From my experience with the metric
> reporters, having the implementation rely on the configuration is really
> annoying for testing purposes. That's why I suggested factories; they can
> take care of extracting all parameters that the implementation needs, and
> then pass them nicely via the constructor. This also implies that any
> fields of the provider wouldn't inherently have to be mutable.
>
> > workloads are not yet running until the initial token set is not
> propagated.
>
> This isn't necessarily true. It can happen that tasks are being deployed
> to the TM without it having registered with the RM; there is currently no
> requirement that a TM must be registered before it may offer slots / accept
> task submissions.
> One examples is a TM restart + local recovery, where the TM eagerly offers
> the previous set of slots to the leading JM.
>
> On 03/02/2022 12:39, Gabor Somogyi wrote:
>
> Thanks for the quick response!
> Appreciate your invested time...
>
> G
>
> On Thu, Feb 3, 2022 at 11:12 AM Chesnay Schepler 

Re: [DISCUSS] FLIP-211: Kerberos delegation token framework

2022-02-03 Thread Chesnay Schepler
> Just to learn something new. I think local recovery is clear to me 
which is not touching external systems like Kafka or so (correct me if 
I'm wrong). Is it possible that such case the user code just starts to 
run blindly w/o JM coordination and connects to external systems to do 
data processing?


Local recovery itself shouldn't touch external systems; the TM cannot 
just run user-code without the JobMaster being involved, but it can 
happen that the JobMaster+TM collaborate to run stuff without the TM 
being registered at the RM.


On 03/02/2022 13:48, Gabor Somogyi wrote:
> Any error in loading the provider (be it by accident or explicit 
checks) then is a setup error and we can fail the cluster.


Fail fast is a good direction in my view. In Spark I wanted to go to 
this direction but there were other opinions so there if a provider is 
not loaded then the workload goes further.

Of course the processing will fail if the token is missing...

> Requiring HBase (and Hadoop for that matter) to be on the JM system 
classpath would be a bit unfortunate. Have you considered loading the 
providers as plugins?


Even if it's unfortunate the actual implementation is depending on 
that already. Moving HBase and/or all token providers into plugins is 
a possibility.
That way if one wants to use a specific provider then a plugin need to 
be added. If we would like to go to this direction I would do that in 
a separate
FLIP not to have feature creep here. The actual FLIP already covers 
several thousand lines of code changes.


> This is missing from the FLIP. From my experience with the metric 
reporters, having the implementation rely on the configuration is 
really annoying for testing purposes. That's why I suggested 
factories; they can take care of extracting all parameters that the 
implementation needs, and then pass them nicely via the constructor.


ServiceLoader provided services must have a norarg constructor where 
no parameters can be passed.
As a side note testing delegation token providers is pain in the ass 
and not possible with automated tests without creating a fully 
featured kerberos cluster with KDC, HDFS, HBase, Kafka, etc..
We've had several tries in Spark but then gave it up because of the 
complexity and the flakyness of it so I wouldn't care much about unit 
testing.
The sad truth is that most of the token providers can be tested 
manually on cluster.


Of course this doesn't mean that the whole code is not intended to be 
covered with tests. I mean couple of parts can be automatically tested 
but providers are not such.


> This also implies that any fields of the provider wouldn't 
inherently have to be mutable.


I think this is not an issue. A provider connects to a service, 
obtains token(s) and then close the connection and never seen the need 
of an intermediate state.

I've just mentioned the singleton behavior to be clear.

> One examples is a TM restart + local recovery, where the TM eagerly 
offers the previous set of slots to the leading JM.


Just to learn something new. I think local recovery is clear to me 
which is not touching external systems like Kafka or so (correct me if 
I'm wrong).
Is it possible that such case the user code just starts to run blindly 
w/o JM coordination and connects to external systems to do data 
processing?



On Thu, Feb 3, 2022 at 1:09 PM Chesnay Schepler  
wrote:


1)
The manager certainly shouldn't check for specific implementations.
The problem with classpath-based checks is it can easily happen
that the provider can't be loaded in the first place (e.g., if you
don't use reflection, which you currently kinda force), and in
that case Flink can't tell whether the token is not required or
the cluster isn't set up correctly.
As I see it we shouldn't try to be clever; if the users wants
kerberos, then have him enable the providers. Any error in loading
the provider (be it by accident or explicit checks) then is a
setup error and we can fail the cluster.
If we still want to auto-detect whether the provider should be
used, note that using factories would make this easier; the
factory can check the classpath (not having any direct
dependencies on HBase avoids the case above), and the provider no
longer needs reflection because it will only be used iff HBase is
on the CP.

Requiring HBase (and Hadoop for that matter) to be on the JM
system classpath would be a bit unfortunate. Have you considered
loading the providers as plugins?

2) > DelegationTokenProvider#init method

This is missing from the FLIP. From my experience with the metric
reporters, having the implementation rely on the configuration is
really annoying for testing purposes. That's why I suggested
factories; they can take care of extracting all parameters that
the implementation needs, and then pass them nicely via the
constructor. This also implies that any fields of the p

Re: [DISCUSS] FLIP-211: Kerberos delegation token framework

2022-02-03 Thread Gabor Somogyi
>  but it can happen that the JobMaster+TM collaborate to run stuff without
the TM being registered at the RM

Honestly I'm not educated enough within Flink to give an example to such
scenario.
Until now I thought JM defines tasks to be done and TM just blindly
connects to external systems and does the processing.
All in all if external systems can be touched when JM + TM collaboration
happens then we need to consider that in the design.
Since I don't have an example scenario I don't know what exactly needs to
be solved.
I think we need an example case to decide whether we face a real issue or
the design is not leaking.


On Thu, Feb 3, 2022 at 2:12 PM Chesnay Schepler  wrote:

> > Just to learn something new. I think local recovery is clear to me which
> is not touching external systems like Kafka or so (correct me if I'm
> wrong). Is it possible that such case the user code just starts to run
> blindly w/o JM coordination and connects to external systems to do data
> processing?
>
> Local recovery itself shouldn't touch external systems; the TM cannot just
> run user-code without the JobMaster being involved, but it can happen that
> the JobMaster+TM collaborate to run stuff without the TM being registered
> at the RM.
>
> On 03/02/2022 13:48, Gabor Somogyi wrote:
>
> > Any error in loading the provider (be it by accident or explicit checks)
> then is a setup error and we can fail the cluster.
>
> Fail fast is a good direction in my view. In Spark I wanted to go to this
> direction but there were other opinions so there if a provider is not
> loaded then the workload goes further.
> Of course the processing will fail if the token is missing...
>
> > Requiring HBase (and Hadoop for that matter) to be on the JM system
> classpath would be a bit unfortunate. Have you considered loading the
> providers as plugins?
>
> Even if it's unfortunate the actual implementation is depending on that
> already. Moving HBase and/or all token providers into plugins is a
> possibility.
> That way if one wants to use a specific provider then a plugin need to be
> added. If we would like to go to this direction I would do that in a
> separate
> FLIP not to have feature creep here. The actual FLIP already covers
> several thousand lines of code changes.
>
> > This is missing from the FLIP. From my experience with the metric
> reporters, having the implementation rely on the configuration is really
> annoying for testing purposes. That's why I suggested factories; they can
> take care of extracting all parameters that the implementation needs, and
> then pass them nicely via the constructor.
>
> ServiceLoader provided services must have a norarg constructor where no
> parameters can be passed.
> As a side note testing delegation token providers is pain in the ass and
> not possible with automated tests without creating a fully featured
> kerberos cluster with KDC, HDFS, HBase, Kafka, etc..
> We've had several tries in Spark but then gave it up because of the
> complexity and the flakyness of it so I wouldn't care much about unit
> testing.
> The sad truth is that most of the token providers can be tested manually
> on cluster.
>
> Of course this doesn't mean that the whole code is not intended to be
> covered with tests. I mean couple of parts can be automatically tested but
> providers are not such.
>
> > This also implies that any fields of the provider wouldn't inherently
> have to be mutable.
>
> I think this is not an issue. A provider connects to a service, obtains
> token(s) and then close the connection and never seen the need of an
> intermediate state.
> I've just mentioned the singleton behavior to be clear.
>
> > One examples is a TM restart + local recovery, where the TM eagerly
> offers the previous set of slots to the leading JM.
>
> Just to learn something new. I think local recovery is clear to me which
> is not touching external systems like Kafka or so (correct me if I'm wrong).
> Is it possible that such case the user code just starts to run blindly w/o
> JM coordination and connects to external systems to do data processing?
>
>
> On Thu, Feb 3, 2022 at 1:09 PM Chesnay Schepler 
> wrote:
>
>> 1)
>> The manager certainly shouldn't check for specific implementations.
>> The problem with classpath-based checks is it can easily happen that the
>> provider can't be loaded in the first place (e.g., if you don't use
>> reflection, which you currently kinda force), and in that case Flink can't
>> tell whether the token is not required or the cluster isn't set up
>> correctly.
>> As I see it we shouldn't try to be clever; if the users wants kerberos,
>> then have him enable the providers. Any error in loading the provider (be
>> it by accident or explicit checks) then is a setup error and we can fail
>> the cluster.
>> If we still want to auto-detect whether the provider should be used, note
>> that using factories would make this easier; the factory can check the
>> classpath (not having any direct dependencies on

Re: [DISCUSS] FLIP-211: Kerberos delegation token framework

2022-02-03 Thread Chesnay Schepler
Here's an example for the TM to run workloads without being connected to 
the RM, while potentially having a valid token:


1. TM registers at RM
2. JobMaster requests slot from RM -> TM gets notified
3. JM fails over
4. TM re-offers the slot to the failed over JobMaster
5. TM reconnects to RM at some point

Here's an example for the TM to run workloads without being connected to 
the RM, without ever having a valid token:


1. TM1 has a valid token and is running some tasks.
2. TM1 crashes
3. TM2 is started to take over, and re-uses the working directory of
   TM1 (new feature in 1.15!)
4. TM2 recovers the previous slot allocations
5. TM2 is informed about leading JM
6. TM2 starts registration with RM
7. TM2 offers slots to JobMaster
8. TM2 accepts task submission from JobMaster
9. ...some time later the registration completes...


On 03/02/2022 14:24, Gabor Somogyi wrote:
> but it can happen that the JobMaster+TM collaborate to run stuff 
without the TM being registered at the RM


Honestly I'm not educated enough within Flink to give an example to 
such scenario.
Until now I thought JM defines tasks to be done and TM just blindly 
connects to external systems and does the processing.
All in all if external systems can be touched when JM + TM 
collaboration happens then we need to consider that in the design.
Since I don't have an example scenario I don't know what exactly needs 
to be solved.
I think we need an example case to decide whether we face a real issue 
or the design is not leaking.



On Thu, Feb 3, 2022 at 2:12 PM Chesnay Schepler  
wrote:


> Just to learn something new. I think local recovery is clear to
me which is not touching external systems like Kafka or so
(correct me if I'm wrong). Is it possible that such case the user
code just starts to run blindly w/o JM coordination and connects
to external systems to do data processing?

Local recovery itself shouldn't touch external systems; the TM
cannot just run user-code without the JobMaster being involved,
but it can happen that the JobMaster+TM collaborate to run stuff
without the TM being registered at the RM.

On 03/02/2022 13:48, Gabor Somogyi wrote:

> Any error in loading the provider (be it by accident or
explicit checks) then is a setup error and we can fail the cluster.

Fail fast is a good direction in my view. In Spark I wanted to go
to this direction but there were other opinions so there if a
provider is not loaded then the workload goes further.
Of course the processing will fail if the token is missing...

> Requiring HBase (and Hadoop for that matter) to be on the JM
system classpath would be a bit unfortunate. Have you considered
loading the providers as plugins?

Even if it's unfortunate the actual implementation is depending
on that already. Moving HBase and/or all token providers into
plugins is a possibility.
That way if one wants to use a specific provider then a plugin
need to be added. If we would like to go to this direction I
would do that in a separate
FLIP not to have feature creep here. The actual FLIP already
covers several thousand lines of code changes.

> This is missing from the FLIP. From my experience with the
metric reporters, having the implementation rely on the
configuration is really annoying for testing purposes. That's why
I suggested factories; they can take care of extracting all
parameters that the implementation needs, and then pass them
nicely via the constructor.

ServiceLoader provided services must have a norarg constructor
where no parameters can be passed.
As a side note testing delegation token providers is pain in the
ass and not possible with automated tests without creating a
fully featured kerberos cluster with KDC, HDFS, HBase, Kafka, etc..
We've had several tries in Spark but then gave it up because of
the complexity and the flakyness of it so I wouldn't care much
about unit testing.
The sad truth is that most of the token providers can be tested
manually on cluster.

Of course this doesn't mean that the whole code is not intended
to be covered with tests. I mean couple of parts can be
automatically tested but providers are not such.

> This also implies that any fields of the provider wouldn't
inherently have to be mutable.

I think this is not an issue. A provider connects to a service,
obtains token(s) and then close the connection and never seen the
need of an intermediate state.
I've just mentioned the singleton behavior to be clear.

> One examples is a TM restart + local recovery, where the TM
eagerly offers the previous set of slots to the leading JM.

Just to learn something new. I think local recovery is clear to
me which is not touching external systems like Kafka or so
(correct me if I'm wrong).
Is it possible that such case the user code 

[jira] [Created] (FLINK-25950) Delete retry mechanism from ZooKeeperUtils.deleteZNode

2022-02-03 Thread Matthias Pohl (Jira)
Matthias Pohl created FLINK-25950:
-

 Summary: Delete retry mechanism from ZooKeeperUtils.deleteZNode
 Key: FLINK-25950
 URL: https://issues.apache.org/jira/browse/FLINK-25950
 Project: Flink
  Issue Type: Technical Debt
  Components: Runtime / Coordination
Affects Versions: 1.15.0
Reporter: Matthias Pohl


{{ZooKeeperUtils.deleteZNode}} implements a retry loop that is not necessary 
for curator version 4.0.1+. This code can be cleaned up



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] FLIP-211: Kerberos delegation token framework

2022-02-03 Thread Till Rohrmann
Hi everyone,

Sorry for joining this discussion late. I also did not read all responses
in this thread so my question might already be answered: Why does Flink
need to be involved in the propagation of the tokens? Why do we need
explicit RPC calls in the Flink domain? Isn't this something the underlying
resource management system could do or which every process could do on its
own? I am a bit worried that we are making Flink responsible for something
that it is not really designed to do so.

Cheers,
Till

On Thu, Feb 3, 2022 at 2:54 PM Chesnay Schepler  wrote:

> Here's an example for the TM to run workloads without being connected to
> the RM, while potentially having a valid token:
>
>  1. TM registers at RM
>  2. JobMaster requests slot from RM -> TM gets notified
>  3. JM fails over
>  4. TM re-offers the slot to the failed over JobMaster
>  5. TM reconnects to RM at some point
>
> Here's an example for the TM to run workloads without being connected to
> the RM, without ever having a valid token:
>
>  1. TM1 has a valid token and is running some tasks.
>  2. TM1 crashes
>  3. TM2 is started to take over, and re-uses the working directory of
> TM1 (new feature in 1.15!)
>  4. TM2 recovers the previous slot allocations
>  5. TM2 is informed about leading JM
>  6. TM2 starts registration with RM
>  7. TM2 offers slots to JobMaster
>  8. TM2 accepts task submission from JobMaster
>  9. ...some time later the registration completes...
>
>
> On 03/02/2022 14:24, Gabor Somogyi wrote:
> > > but it can happen that the JobMaster+TM collaborate to run stuff
> > without the TM being registered at the RM
> >
> > Honestly I'm not educated enough within Flink to give an example to
> > such scenario.
> > Until now I thought JM defines tasks to be done and TM just blindly
> > connects to external systems and does the processing.
> > All in all if external systems can be touched when JM + TM
> > collaboration happens then we need to consider that in the design.
> > Since I don't have an example scenario I don't know what exactly needs
> > to be solved.
> > I think we need an example case to decide whether we face a real issue
> > or the design is not leaking.
> >
> >
> > On Thu, Feb 3, 2022 at 2:12 PM Chesnay Schepler 
> > wrote:
> >
> > > Just to learn something new. I think local recovery is clear to
> > me which is not touching external systems like Kafka or so
> > (correct me if I'm wrong). Is it possible that such case the user
> > code just starts to run blindly w/o JM coordination and connects
> > to external systems to do data processing?
> >
> > Local recovery itself shouldn't touch external systems; the TM
> > cannot just run user-code without the JobMaster being involved,
> > but it can happen that the JobMaster+TM collaborate to run stuff
> > without the TM being registered at the RM.
> >
> > On 03/02/2022 13:48, Gabor Somogyi wrote:
> >> > Any error in loading the provider (be it by accident or
> >> explicit checks) then is a setup error and we can fail the cluster.
> >>
> >> Fail fast is a good direction in my view. In Spark I wanted to go
> >> to this direction but there were other opinions so there if a
> >> provider is not loaded then the workload goes further.
> >> Of course the processing will fail if the token is missing...
> >>
> >> > Requiring HBase (and Hadoop for that matter) to be on the JM
> >> system classpath would be a bit unfortunate. Have you considered
> >> loading the providers as plugins?
> >>
> >> Even if it's unfortunate the actual implementation is depending
> >> on that already. Moving HBase and/or all token providers into
> >> plugins is a possibility.
> >> That way if one wants to use a specific provider then a plugin
> >> need to be added. If we would like to go to this direction I
> >> would do that in a separate
> >> FLIP not to have feature creep here. The actual FLIP already
> >> covers several thousand lines of code changes.
> >>
> >> > This is missing from the FLIP. From my experience with the
> >> metric reporters, having the implementation rely on the
> >> configuration is really annoying for testing purposes. That's why
> >> I suggested factories; they can take care of extracting all
> >> parameters that the implementation needs, and then pass them
> >> nicely via the constructor.
> >>
> >> ServiceLoader provided services must have a norarg constructor
> >> where no parameters can be passed.
> >> As a side note testing delegation token providers is pain in the
> >> ass and not possible with automated tests without creating a
> >> fully featured kerberos cluster with KDC, HDFS, HBase, Kafka, etc..
> >> We've had several tries in Spark but then gave it up because of
> >> the complexity and the flakyness of it so I wouldn't care much
> >> about unit testing.
> >> The sad truth is that most of the token p

Re: [DISCUSS] Move Flink website to privacy friendly Analytics solution

2022-02-03 Thread Martijn Visser
Hi everyone,

A short update from my end: with the help of Chesnay and Konstantin both
the Project Website, all Flink documentation (back to 1.0) and the Statefun
websites no longer send data to Google Analytics, only to Matomo. The
privacy policy has also been adjusted and it includes an opt-out.

Anyone can view the results [1]. For example, you could now write a blog
and see how well it's being read/visited by others.

I'll drive some more implementations in the near future, for which you can
follow the umbrella ticket. [2]

Best regards,

Martijn

[1] https://matomo.privacy.apache.org/
[2] https://issues.apache.org/jira/browse/FLINK-25863

On Fri, 14 Jan 2022 at 18:55, David Morávek  wrote:

> +1, thanks for driving this Martijn
>
> On Fri 14. 1. 2022 at 15:01, Chesnay Schepler  wrote:
>
> > +1
> >
> > On 14/01/2022 14:47, Till Rohrmann wrote:
> > > Hi Martijn,
> > >
> > > big +1 for this effort. Thanks a lot for pushing this initiative
> forward!
> > >
> > > Cheers,
> > > Till
> > >
> > > On Fri, Jan 14, 2022 at 11:49 AM Konstantin Knauf 
> > wrote:
> > >
> > >> Hi Martijn,
> > >>
> > >> I think this is a great initiative. Thank you for pursuing this. It
> > allows
> > >> us to
> > >>
> > >> a) generate better insights into the usage of Apache Flink and its
> > >> documentation as shown in the video
> > >> a) do this in a privacy preserving way and
> > >> c) act as a role model for other Apache projects on this matter
> > >>
> > >> Big +1. I am happy to help, if I can.
> > >>
> > >> Cheers,
> > >>
> > >> Konstantin
> > >>
> > >>
> > >>
> > >> On Fri, Jan 14, 2022 at 11:21 AM Martijn Visser <
> mart...@ververica.com>
> > >> wrote:
> > >>
> > >>> Hi everyone,
> > >>>
> > >>> The Flink website currently uses Google Analytics to track how
> visitors
> > >> of
> > >>> the website are interacting with it. It provides insights into which
> > >>> documentation pages are visited, how users are using the website
> > (what's
> > >>> the cycle of pages they visit before exiting the page), if they are
> > >>> downloading Flink etc. However, the Apache Software Foundation
> > >> discourages
> > >>> using Google Analytics [1] unless meeting certain requirements. The
> > Flink
> > >>> website currently does not meet those requirements.
> > >>>
> > >>> I do believe that it's useful to understand what parts of a website
> are
> > >>> important to users, what features are most frequently read up on,
> where
> > >>> they get lost in the docs, etc. so we can better understand how users
> > use
> > >>> the system, the website, and the docs and where to focus improvements
> > >> next.
> > >>> I would like to move the Flink website from Google Analytics to an
> > >>> alternative as soon as possible for Flink. I would be in favour of
> > >> opening
> > >>> up insights to this data for everyone too, it's public data anyway.
> > >>>
> > >>> For the past couple of months, I've been engaging in a conversation
> > with
> > >>> ASF Legal and ASF Infra about setting up a privacy-friendly
> alternative
> > >> for
> > >>> Google Analytics for all ASF projects via the priv...@apache.org
> > mailing
> > >>> list (I can't find a public web archive link for this unfortunately).
> > As
> > >>> part of that discussion, I've done a test with the open source and
> > >>> self-hosted version of Matomo [2], taking a look at the privacy
> > >>> implications and the functionality that this tool offers. You can
> > watch a
> > >>> recording of that experiment [3] and view the test setup I've used
> [4].
> > >>>
> > >>> The current status is that ASF Legal, ASF Infra and I have agreed to
> > take
> > >>> the next step on this project. This step means that:
> > >>>
> > >>> * I set up Matomo on a VM provided by ASF Infra
> > >>> * A new DNS name is created (either https://analytics.apache.org/ or
> > >>> https://matomo.analytics.apache.org/) by ASF Infra
> > >>> * The Flink website is adjusted to remove the tracking from Google
> > >>> Analytics and include the necessary Javascript to allow tracking of
> the
> > >>> Flink website and documentation in Matomo
> > >>>
> > >>> If this test would be successful, ASF Infra would take over the
> hosting
> > >> of
> > >>> this solution and provide it to all ASF projects.
> > >>>
> > >>> I would like to understand from the Flink community:
> > >>>
> > >>> 1. Do you think this is a good idea?
> > >>>
> > >>> 2. If yes, I need a couple of PMCs for requesting a VM from Apache
> > Infra
> > >>> [5]
> > >>>
> > >>> Best regards,
> > >>>
> > >>> Martijn
> > >>> https://twitter.com/MartijnVisser82
> > >>>
> > >>> [1] https://privacy.apache.org/faq/committers.html
> > >>> [2] https://matomo.org/
> > >>> [3]
> > >>>
> > >>>
> > >>
> >
> https://drive.google.com/file/d/1yomYhLoyrzBW620bpn_dROiwyvSCzuvt/view?usp=sharing
> > >>> [4] https://github.com/MartijnVisser/matomo-analytics
> > >>> [5] https://infra.apache.org/vm-for-project.html
> > >>>
> > >>
> > >> --
> > >>
> > >> Konstantin Knauf
> > >>
> > >> https://twitter.

[jira] [Created] (FLINK-25951) Implement Matomo on JavaDocs

2022-02-03 Thread Martijn Visser (Jira)
Martijn Visser created FLINK-25951:
--

 Summary: Implement Matomo on JavaDocs
 Key: FLINK-25951
 URL: https://issues.apache.org/jira/browse/FLINK-25951
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation
Reporter: Martijn Visser
Assignee: Martijn Visser






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] FLIP-211: Kerberos delegation token framework

2022-02-03 Thread Gabor Somogyi
>  Isn't this something the underlying resource management system could do
or which every process could do on its own?

I was looking for such feature but not found.
Maybe we can solve the propagation easier but then I'm waiting on better
suggestion.
If anybody has better/more simple idea then please point to a specific
feature which works on all resource management systems.

> Here's an example for the TM to run workloads without being connected to the
RM, without ever having a valid token

All in all I see the main problem. Not sure what is the reason behind that
a TM accepts tasks w/o registration but clearly not helping here.
I basically see 2 possible solutions:
* Make the TM accept tasks only after registration(not sure if it's
possible or makes sense at all)
* We send tokens right after container creation with
"updateDelegationTokens"
Not sure which one is more realistic to do since I'm not involved the new
feature.
WDYT?


On Thu, Feb 3, 2022 at 3:09 PM Till Rohrmann  wrote:

> Hi everyone,
>
> Sorry for joining this discussion late. I also did not read all responses
> in this thread so my question might already be answered: Why does Flink
> need to be involved in the propagation of the tokens? Why do we need
> explicit RPC calls in the Flink domain? Isn't this something the underlying
> resource management system could do or which every process could do on its
> own? I am a bit worried that we are making Flink responsible for something
> that it is not really designed to do so.
>
> Cheers,
> Till
>
> On Thu, Feb 3, 2022 at 2:54 PM Chesnay Schepler 
> wrote:
>
>> Here's an example for the TM to run workloads without being connected to
>> the RM, while potentially having a valid token:
>>
>>  1. TM registers at RM
>>  2. JobMaster requests slot from RM -> TM gets notified
>>  3. JM fails over
>>  4. TM re-offers the slot to the failed over JobMaster
>>  5. TM reconnects to RM at some point
>>
>> Here's an example for the TM to run workloads without being connected to
>> the RM, without ever having a valid token:
>>
>>  1. TM1 has a valid token and is running some tasks.
>>  2. TM1 crashes
>>  3. TM2 is started to take over, and re-uses the working directory of
>> TM1 (new feature in 1.15!)
>>  4. TM2 recovers the previous slot allocations
>>  5. TM2 is informed about leading JM
>>  6. TM2 starts registration with RM
>>  7. TM2 offers slots to JobMaster
>>  8. TM2 accepts task submission from JobMaster
>>  9. ...some time later the registration completes...
>>
>>
>> On 03/02/2022 14:24, Gabor Somogyi wrote:
>> > > but it can happen that the JobMaster+TM collaborate to run stuff
>> > without the TM being registered at the RM
>> >
>> > Honestly I'm not educated enough within Flink to give an example to
>> > such scenario.
>> > Until now I thought JM defines tasks to be done and TM just blindly
>> > connects to external systems and does the processing.
>> > All in all if external systems can be touched when JM + TM
>> > collaboration happens then we need to consider that in the design.
>> > Since I don't have an example scenario I don't know what exactly needs
>> > to be solved.
>> > I think we need an example case to decide whether we face a real issue
>> > or the design is not leaking.
>> >
>> >
>> > On Thu, Feb 3, 2022 at 2:12 PM Chesnay Schepler 
>> > wrote:
>> >
>> > > Just to learn something new. I think local recovery is clear to
>> > me which is not touching external systems like Kafka or so
>> > (correct me if I'm wrong). Is it possible that such case the user
>> > code just starts to run blindly w/o JM coordination and connects
>> > to external systems to do data processing?
>> >
>> > Local recovery itself shouldn't touch external systems; the TM
>> > cannot just run user-code without the JobMaster being involved,
>> > but it can happen that the JobMaster+TM collaborate to run stuff
>> > without the TM being registered at the RM.
>> >
>> > On 03/02/2022 13:48, Gabor Somogyi wrote:
>> >> > Any error in loading the provider (be it by accident or
>> >> explicit checks) then is a setup error and we can fail the cluster.
>> >>
>> >> Fail fast is a good direction in my view. In Spark I wanted to go
>> >> to this direction but there were other opinions so there if a
>> >> provider is not loaded then the workload goes further.
>> >> Of course the processing will fail if the token is missing...
>> >>
>> >> > Requiring HBase (and Hadoop for that matter) to be on the JM
>> >> system classpath would be a bit unfortunate. Have you considered
>> >> loading the providers as plugins?
>> >>
>> >> Even if it's unfortunate the actual implementation is depending
>> >> on that already. Moving HBase and/or all token providers into
>> >> plugins is a possibility.
>> >> That way if one wants to use a specific provider then a plugin
>> >> need to be added. If we would like to go to this direction I
>> >> would do th

Re: [VOTE] Deprecate NiFi connector

2022-02-03 Thread Fabian Paul
Thanks for driving the deprecation efforts.

+1 (binding)

Best,
Fabian

On Mon, Jan 31, 2022 at 11:47 AM Martijn Visser  wrote:
>
> Hi everyone,
>
> I would like to open up a vote to deprecate NiFi in Flink 1.15 and remove
> it in the next version. I've previously mentioned that we were looking for
> maintainers for NiFi, but so far none have come forward [1].
>
> The vote will last for at least 72 hours, and will be accepted by a
> consensus of active committers.
>
> Best regards,
>
> Martijn Visser
> https://twitter.com/MartijnVisser82
>
> [1] https://lists.apache.org/thread/1vs3wmk66vsq6l4npjsfzltft4tz5tkq


Re: [VOTE] Remove Twitter connector

2022-02-03 Thread Ingo Bürk

+1 (binding)


Best
Ingo

On 31.01.22 11:46, Martijn Visser wrote:

Hi everyone,

I would like to open up a vote to remove the Twitter connector in Flink
1.15. This was brought up previously for a discussion [1].

The vote will last for at least 72 hours, and will be accepted by
a consensus of active committers.

Best regards,

Martijn Visser
https://twitter.com/MartijnVisser82

[1] https://lists.apache.org/thread/7sdvp4hj93rh0cz8r50800stzrpgkdm2



Re: [VOTE] Remove Twitter connector

2022-02-03 Thread Fabian Paul
This connector is really a relict of the past.

+1 (binding)

Best,
Fabian

On Mon, Jan 31, 2022 at 11:47 AM Martijn Visser  wrote:
>
> Hi everyone,
>
> I would like to open up a vote to remove the Twitter connector in Flink
> 1.15. This was brought up previously for a discussion [1].
>
> The vote will last for at least 72 hours, and will be accepted by
> a consensus of active committers.
>
> Best regards,
>
> Martijn Visser
> https://twitter.com/MartijnVisser82
>
> [1] https://lists.apache.org/thread/7sdvp4hj93rh0cz8r50800stzrpgkdm2


Re: [VOTE] Deprecate NiFi connector

2022-02-03 Thread Konstantin Knauf
Hi everyone,

+1. If or when someone starts an effort of migrating the new APIs in the
future, the code will still be accessible even if it's already deleted in
the latest versions.

Thanks,

Konstantin

On Mon, Jan 31, 2022 at 11:46 AM Martijn Visser 
wrote:

> Hi everyone,
>
> I would like to open up a vote to deprecate NiFi in Flink 1.15 and remove
> it in the next version. I've previously mentioned that we were looking for
> maintainers for NiFi, but so far none have come forward [1].
>
> The vote will last for at least 72 hours, and will be accepted by a
> consensus of active committers.
>
> Best regards,
>
> Martijn Visser
> https://twitter.com/MartijnVisser82
>
> [1] https://lists.apache.org/thread/1vs3wmk66vsq6l4npjsfzltft4tz5tkq
>


-- 

Konstantin Knauf

https://twitter.com/snntrable

https://github.com/knaufk


Re: [DISCUSS] FLIP-211: Kerberos delegation token framework

2022-02-03 Thread Till Rohrmann
I would prefer not choosing the first option

> Make the TM accept tasks only after registration(not sure if it's
possible or makes sense at all)

because it effectively means that we change how Flink's component lifecycle
works for distributing Kerberos tokens. It also effectively means that a TM
cannot make progress until connected to a RM.

I am not a Kerberos expert but is it really so that every application that
wants to use Kerberos needs to implement the token propagation itself? This
somehow feels as if there is something missing.

Cheers,
Till

On Thu, Feb 3, 2022 at 3:29 PM Gabor Somogyi 
wrote:

> >  Isn't this something the underlying resource management system could do
> or which every process could do on its own?
>
> I was looking for such feature but not found.
> Maybe we can solve the propagation easier but then I'm waiting on better
> suggestion.
> If anybody has better/more simple idea then please point to a specific
> feature which works on all resource management systems.
>
> > Here's an example for the TM to run workloads without being connected
> to the RM, without ever having a valid token
>
> All in all I see the main problem. Not sure what is the reason behind that
> a TM accepts tasks w/o registration but clearly not helping here.
> I basically see 2 possible solutions:
> * Make the TM accept tasks only after registration(not sure if it's
> possible or makes sense at all)
> * We send tokens right after container creation with
> "updateDelegationTokens"
> Not sure which one is more realistic to do since I'm not involved the new
> feature.
> WDYT?
>
>
> On Thu, Feb 3, 2022 at 3:09 PM Till Rohrmann  wrote:
>
>> Hi everyone,
>>
>> Sorry for joining this discussion late. I also did not read all responses
>> in this thread so my question might already be answered: Why does Flink
>> need to be involved in the propagation of the tokens? Why do we need
>> explicit RPC calls in the Flink domain? Isn't this something the underlying
>> resource management system could do or which every process could do on its
>> own? I am a bit worried that we are making Flink responsible for something
>> that it is not really designed to do so.
>>
>> Cheers,
>> Till
>>
>> On Thu, Feb 3, 2022 at 2:54 PM Chesnay Schepler 
>> wrote:
>>
>>> Here's an example for the TM to run workloads without being connected to
>>> the RM, while potentially having a valid token:
>>>
>>>  1. TM registers at RM
>>>  2. JobMaster requests slot from RM -> TM gets notified
>>>  3. JM fails over
>>>  4. TM re-offers the slot to the failed over JobMaster
>>>  5. TM reconnects to RM at some point
>>>
>>> Here's an example for the TM to run workloads without being connected to
>>> the RM, without ever having a valid token:
>>>
>>>  1. TM1 has a valid token and is running some tasks.
>>>  2. TM1 crashes
>>>  3. TM2 is started to take over, and re-uses the working directory of
>>> TM1 (new feature in 1.15!)
>>>  4. TM2 recovers the previous slot allocations
>>>  5. TM2 is informed about leading JM
>>>  6. TM2 starts registration with RM
>>>  7. TM2 offers slots to JobMaster
>>>  8. TM2 accepts task submission from JobMaster
>>>  9. ...some time later the registration completes...
>>>
>>>
>>> On 03/02/2022 14:24, Gabor Somogyi wrote:
>>> > > but it can happen that the JobMaster+TM collaborate to run stuff
>>> > without the TM being registered at the RM
>>> >
>>> > Honestly I'm not educated enough within Flink to give an example to
>>> > such scenario.
>>> > Until now I thought JM defines tasks to be done and TM just blindly
>>> > connects to external systems and does the processing.
>>> > All in all if external systems can be touched when JM + TM
>>> > collaboration happens then we need to consider that in the design.
>>> > Since I don't have an example scenario I don't know what exactly needs
>>> > to be solved.
>>> > I think we need an example case to decide whether we face a real issue
>>> > or the design is not leaking.
>>> >
>>> >
>>> > On Thu, Feb 3, 2022 at 2:12 PM Chesnay Schepler 
>>> > wrote:
>>> >
>>> > > Just to learn something new. I think local recovery is clear to
>>> > me which is not touching external systems like Kafka or so
>>> > (correct me if I'm wrong). Is it possible that such case the user
>>> > code just starts to run blindly w/o JM coordination and connects
>>> > to external systems to do data processing?
>>> >
>>> > Local recovery itself shouldn't touch external systems; the TM
>>> > cannot just run user-code without the JobMaster being involved,
>>> > but it can happen that the JobMaster+TM collaborate to run stuff
>>> > without the TM being registered at the RM.
>>> >
>>> > On 03/02/2022 13:48, Gabor Somogyi wrote:
>>> >> > Any error in loading the provider (be it by accident or
>>> >> explicit checks) then is a setup error and we can fail the
>>> cluster.
>>> >>
>>> >> Fail fast is a good direction in my view. In Spark I wanted to go
>>> >> to this di

Re: [DISCUSS] Move Flink website to privacy friendly Analytics solution

2022-02-03 Thread Till Rohrmann
Great news. Thanks for driving this effort Martijn and helping with it
Chesnay and Konstantin :-)

Cheers,
Till

On Thu, Feb 3, 2022 at 3:13 PM Martijn Visser  wrote:

> Hi everyone,
>
> A short update from my end: with the help of Chesnay and Konstantin both
> the Project Website, all Flink documentation (back to 1.0) and the Statefun
> websites no longer send data to Google Analytics, only to Matomo. The
> privacy policy has also been adjusted and it includes an opt-out.
>
> Anyone can view the results [1]. For example, you could now write a blog
> and see how well it's being read/visited by others.
>
> I'll drive some more implementations in the near future, for which you can
> follow the umbrella ticket. [2]
>
> Best regards,
>
> Martijn
>
> [1] https://matomo.privacy.apache.org/
> [2] https://issues.apache.org/jira/browse/FLINK-25863
>
> On Fri, 14 Jan 2022 at 18:55, David Morávek 
> wrote:
>
> > +1, thanks for driving this Martijn
> >
> > On Fri 14. 1. 2022 at 15:01, Chesnay Schepler 
> wrote:
> >
> > > +1
> > >
> > > On 14/01/2022 14:47, Till Rohrmann wrote:
> > > > Hi Martijn,
> > > >
> > > > big +1 for this effort. Thanks a lot for pushing this initiative
> > forward!
> > > >
> > > > Cheers,
> > > > Till
> > > >
> > > > On Fri, Jan 14, 2022 at 11:49 AM Konstantin Knauf  >
> > > wrote:
> > > >
> > > >> Hi Martijn,
> > > >>
> > > >> I think this is a great initiative. Thank you for pursuing this. It
> > > allows
> > > >> us to
> > > >>
> > > >> a) generate better insights into the usage of Apache Flink and its
> > > >> documentation as shown in the video
> > > >> a) do this in a privacy preserving way and
> > > >> c) act as a role model for other Apache projects on this matter
> > > >>
> > > >> Big +1. I am happy to help, if I can.
> > > >>
> > > >> Cheers,
> > > >>
> > > >> Konstantin
> > > >>
> > > >>
> > > >>
> > > >> On Fri, Jan 14, 2022 at 11:21 AM Martijn Visser <
> > mart...@ververica.com>
> > > >> wrote:
> > > >>
> > > >>> Hi everyone,
> > > >>>
> > > >>> The Flink website currently uses Google Analytics to track how
> > visitors
> > > >> of
> > > >>> the website are interacting with it. It provides insights into
> which
> > > >>> documentation pages are visited, how users are using the website
> > > (what's
> > > >>> the cycle of pages they visit before exiting the page), if they are
> > > >>> downloading Flink etc. However, the Apache Software Foundation
> > > >> discourages
> > > >>> using Google Analytics [1] unless meeting certain requirements. The
> > > Flink
> > > >>> website currently does not meet those requirements.
> > > >>>
> > > >>> I do believe that it's useful to understand what parts of a website
> > are
> > > >>> important to users, what features are most frequently read up on,
> > where
> > > >>> they get lost in the docs, etc. so we can better understand how
> users
> > > use
> > > >>> the system, the website, and the docs and where to focus
> improvements
> > > >> next.
> > > >>> I would like to move the Flink website from Google Analytics to an
> > > >>> alternative as soon as possible for Flink. I would be in favour of
> > > >> opening
> > > >>> up insights to this data for everyone too, it's public data anyway.
> > > >>>
> > > >>> For the past couple of months, I've been engaging in a conversation
> > > with
> > > >>> ASF Legal and ASF Infra about setting up a privacy-friendly
> > alternative
> > > >> for
> > > >>> Google Analytics for all ASF projects via the priv...@apache.org
> > > mailing
> > > >>> list (I can't find a public web archive link for this
> unfortunately).
> > > As
> > > >>> part of that discussion, I've done a test with the open source and
> > > >>> self-hosted version of Matomo [2], taking a look at the privacy
> > > >>> implications and the functionality that this tool offers. You can
> > > watch a
> > > >>> recording of that experiment [3] and view the test setup I've used
> > [4].
> > > >>>
> > > >>> The current status is that ASF Legal, ASF Infra and I have agreed
> to
> > > take
> > > >>> the next step on this project. This step means that:
> > > >>>
> > > >>> * I set up Matomo on a VM provided by ASF Infra
> > > >>> * A new DNS name is created (either https://analytics.apache.org/
> or
> > > >>> https://matomo.analytics.apache.org/) by ASF Infra
> > > >>> * The Flink website is adjusted to remove the tracking from Google
> > > >>> Analytics and include the necessary Javascript to allow tracking of
> > the
> > > >>> Flink website and documentation in Matomo
> > > >>>
> > > >>> If this test would be successful, ASF Infra would take over the
> > hosting
> > > >> of
> > > >>> this solution and provide it to all ASF projects.
> > > >>>
> > > >>> I would like to understand from the Flink community:
> > > >>>
> > > >>> 1. Do you think this is a good idea?
> > > >>>
> > > >>> 2. If yes, I need a couple of PMCs for requesting a VM from Apache
> > > Infra
> > > >>> [5]
> > > >>>
> > > >>> Best regards,
> > > >>>
> > > >>> Martijn
> > > >>> https://twitte

Re: [VOTE] Deprecate NiFi connector

2022-02-03 Thread Seth Wiesman
+1 (binding)

On Thu, Feb 3, 2022 at 8:44 AM Fabian Paul  wrote:

> Thanks for driving the deprecation efforts.
>
> +1 (binding)
>
> Best,
> Fabian
>
> On Mon, Jan 31, 2022 at 11:47 AM Martijn Visser 
> wrote:
> >
> > Hi everyone,
> >
> > I would like to open up a vote to deprecate NiFi in Flink 1.15 and remove
> > it in the next version. I've previously mentioned that we were looking
> for
> > maintainers for NiFi, but so far none have come forward [1].
> >
> > The vote will last for at least 72 hours, and will be accepted by a
> > consensus of active committers.
> >
> > Best regards,
> >
> > Martijn Visser
> > https://twitter.com/MartijnVisser82
> >
> > [1] https://lists.apache.org/thread/1vs3wmk66vsq6l4npjsfzltft4tz5tkq
>


Re: [VOTE] Remove Twitter connector

2022-02-03 Thread Seth Wiesman
+1 (binding)

On Thu, Feb 3, 2022 at 8:39 AM Fabian Paul  wrote:

> This connector is really a relict of the past.
>
> +1 (binding)
>
> Best,
> Fabian
>
> On Mon, Jan 31, 2022 at 11:47 AM Martijn Visser 
> wrote:
> >
> > Hi everyone,
> >
> > I would like to open up a vote to remove the Twitter connector in Flink
> > 1.15. This was brought up previously for a discussion [1].
> >
> > The vote will last for at least 72 hours, and will be accepted by
> > a consensus of active committers.
> >
> > Best regards,
> >
> > Martijn Visser
> > https://twitter.com/MartijnVisser82
> >
> > [1] https://lists.apache.org/thread/7sdvp4hj93rh0cz8r50800stzrpgkdm2
>


Re: [DISCUSS] FLIP-211: Kerberos delegation token framework

2022-02-03 Thread Gabor Somogyi
> I would prefer not choosing the first option

Then the second option may play only.

> I am not a Kerberos expert but is it really so that every application that
wants to use Kerberos needs to implement the token propagation itself? This
somehow feels as if there is something missing.

OK, so first some kerberos + token intro.

Some basics:
* TGT can be created from keytab
* TGT is needed to obtain TGS (called token)
* Authentication only works with TGS -> all places where external system is
needed either a TGT or TGS needed

There are basically 2 ways to authenticate to a kerberos secured external
system:
1. One needs a kerberos TGT which MUST be propagated to all JVMs. Here each
and every JVM obtains a TGS by itself which bombs the KDC that may collapse.
2. One needs a kerberos TGT which exists only on a single place (in this
case JM). JM gets a TGS which MUST be propagated to all TMs because
otherwise authentication fails.

Now the whole system works in a way that keytab file (we can imagine that
as plaintext password) is reachable on all nodes.
This is a relatively huge attack surface. Now the main intention is:
* Instead of propagating keytab file to all nodes propagate a TGS which has
limited lifetime (more secure)
* Do the TGS generation in a single place so KDC may not collapse + having
keytab only on a single node can be better protected

As a final conclusion if there is a place which expects to do kerberos
authentication then it's a MUST to have either TGT or TGS.
Now it's done in a pretty unsecure way. The questions are the following:
* Do we want to leave this unsecure keytab propagation like this and bomb
KDC?
* If no then how do we propagate the more secure token to TMs.

If the answer to the first question is no then the FLIP can be abandoned
and doesn't worth the further effort.
If the answer is yes then we can talk about the how part.

G


On Thu, Feb 3, 2022 at 3:42 PM Till Rohrmann  wrote:

> I would prefer not choosing the first option
>
> > Make the TM accept tasks only after registration(not sure if it's
> possible or makes sense at all)
>
> because it effectively means that we change how Flink's component lifecycle
> works for distributing Kerberos tokens. It also effectively means that a TM
> cannot make progress until connected to a RM.
>
> I am not a Kerberos expert but is it really so that every application that
> wants to use Kerberos needs to implement the token propagation itself? This
> somehow feels as if there is something missing.
>
> Cheers,
> Till
>
> On Thu, Feb 3, 2022 at 3:29 PM Gabor Somogyi 
> wrote:
>
> > >  Isn't this something the underlying resource management system could
> do
> > or which every process could do on its own?
> >
> > I was looking for such feature but not found.
> > Maybe we can solve the propagation easier but then I'm waiting on better
> > suggestion.
> > If anybody has better/more simple idea then please point to a specific
> > feature which works on all resource management systems.
> >
> > > Here's an example for the TM to run workloads without being connected
> > to the RM, without ever having a valid token
> >
> > All in all I see the main problem. Not sure what is the reason behind
> that
> > a TM accepts tasks w/o registration but clearly not helping here.
> > I basically see 2 possible solutions:
> > * Make the TM accept tasks only after registration(not sure if it's
> > possible or makes sense at all)
> > * We send tokens right after container creation with
> > "updateDelegationTokens"
> > Not sure which one is more realistic to do since I'm not involved the new
> > feature.
> > WDYT?
> >
> >
> > On Thu, Feb 3, 2022 at 3:09 PM Till Rohrmann 
> wrote:
> >
> >> Hi everyone,
> >>
> >> Sorry for joining this discussion late. I also did not read all
> responses
> >> in this thread so my question might already be answered: Why does Flink
> >> need to be involved in the propagation of the tokens? Why do we need
> >> explicit RPC calls in the Flink domain? Isn't this something the
> underlying
> >> resource management system could do or which every process could do on
> its
> >> own? I am a bit worried that we are making Flink responsible for
> something
> >> that it is not really designed to do so.
> >>
> >> Cheers,
> >> Till
> >>
> >> On Thu, Feb 3, 2022 at 2:54 PM Chesnay Schepler 
> >> wrote:
> >>
> >>> Here's an example for the TM to run workloads without being connected
> to
> >>> the RM, while potentially having a valid token:
> >>>
> >>>  1. TM registers at RM
> >>>  2. JobMaster requests slot from RM -> TM gets notified
> >>>  3. JM fails over
> >>>  4. TM re-offers the slot to the failed over JobMaster
> >>>  5. TM reconnects to RM at some point
> >>>
> >>> Here's an example for the TM to run workloads without being connected
> to
> >>> the RM, without ever having a valid token:
> >>>
> >>>  1. TM1 has a valid token and is running some tasks.
> >>>  2. TM1 crashes
> >>>  3. TM2 is started to take over, and re-uses the working direc

Re: [VOTE] Remove Twitter connector

2022-02-03 Thread David Anderson
+1

On Mon, Jan 31, 2022 at 11:47 AM Martijn Visser 
wrote:

> Hi everyone,
>
> I would like to open up a vote to remove the Twitter connector in Flink
> 1.15. This was brought up previously for a discussion [1].
>
> The vote will last for at least 72 hours, and will be accepted by
> a consensus of active committers.
>
> Best regards,
>
> Martijn Visser
> https://twitter.com/MartijnVisser82
>
> [1] https://lists.apache.org/thread/7sdvp4hj93rh0cz8r50800stzrpgkdm2
>


[jira] [Created] (FLINK-25952) Savepoint on S3 are not relocatable even if entropy injection is not enabled

2022-02-03 Thread Dawid Wysakowicz (Jira)
Dawid Wysakowicz created FLINK-25952:


 Summary: Savepoint on S3 are not relocatable even if entropy 
injection is not enabled
 Key: FLINK-25952
 URL: https://issues.apache.org/jira/browse/FLINK-25952
 Project: Flink
  Issue Type: Bug
  Components: FileSystems, Runtime / Checkpointing
Affects Versions: 1.14.3, 1.13.5, 1.15.0
Reporter: Dawid Wysakowicz
Assignee: Dawid Wysakowicz
 Fix For: 1.15.0, 1.14.4, 1.13.7


We have a limitation that if we create savepoints with an injected entropy, 
they are not relocatable 
(https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#triggering-savepoints).

However the check if we use the entropy is flawed. In 
{{FsCheckpointStreamFactory}} we check only if the used filesystem extends from 
{{EntropyInjectingFileSystem}}. {{FlinkS3FileSystem}} does, but it still may 
have the entropy disabled. {{FlinkS3FileSystem#getEntropyInjectionKey}} may 
still return {{null}}. We should check for that in 
{{org.apache.flink.core.fs.EntropyInjector#isEntropyInjecting}}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] FLIP-211: Kerberos delegation token framework

2022-02-03 Thread Chesnay Schepler
What I don't understand is how this could overload the KDC. Aren't 
tokens valid for a relatively long time period?


For new deployments where many TMs are started at once I could imagine 
it temporarily, but shouldn't the accesses to the KDC eventually 
naturally spread out?


The FLIP mentions scaling issues with 200 nodes; it's really surprising 
to me that such a small number of requests can already cause issues.


On 03/02/2022 16:14, Gabor Somogyi wrote:

I would prefer not choosing the first option

Then the second option may play only.


I am not a Kerberos expert but is it really so that every application that

wants to use Kerberos needs to implement the token propagation itself? This
somehow feels as if there is something missing.

OK, so first some kerberos + token intro.

Some basics:
* TGT can be created from keytab
* TGT is needed to obtain TGS (called token)
* Authentication only works with TGS -> all places where external system is
needed either a TGT or TGS needed

There are basically 2 ways to authenticate to a kerberos secured external
system:
1. One needs a kerberos TGT which MUST be propagated to all JVMs. Here each
and every JVM obtains a TGS by itself which bombs the KDC that may collapse.
2. One needs a kerberos TGT which exists only on a single place (in this
case JM). JM gets a TGS which MUST be propagated to all TMs because
otherwise authentication fails.

Now the whole system works in a way that keytab file (we can imagine that
as plaintext password) is reachable on all nodes.
This is a relatively huge attack surface. Now the main intention is:
* Instead of propagating keytab file to all nodes propagate a TGS which has
limited lifetime (more secure)
* Do the TGS generation in a single place so KDC may not collapse + having
keytab only on a single node can be better protected

As a final conclusion if there is a place which expects to do kerberos
authentication then it's a MUST to have either TGT or TGS.
Now it's done in a pretty unsecure way. The questions are the following:
* Do we want to leave this unsecure keytab propagation like this and bomb
KDC?
* If no then how do we propagate the more secure token to TMs.

If the answer to the first question is no then the FLIP can be abandoned
and doesn't worth the further effort.
If the answer is yes then we can talk about the how part.

G


On Thu, Feb 3, 2022 at 3:42 PM Till Rohrmann  wrote:


I would prefer not choosing the first option


Make the TM accept tasks only after registration(not sure if it's

possible or makes sense at all)

because it effectively means that we change how Flink's component lifecycle
works for distributing Kerberos tokens. It also effectively means that a TM
cannot make progress until connected to a RM.

I am not a Kerberos expert but is it really so that every application that
wants to use Kerberos needs to implement the token propagation itself? This
somehow feels as if there is something missing.

Cheers,
Till

On Thu, Feb 3, 2022 at 3:29 PM Gabor Somogyi 
wrote:


  Isn't this something the underlying resource management system could

do

or which every process could do on its own?

I was looking for such feature but not found.
Maybe we can solve the propagation easier but then I'm waiting on better
suggestion.
If anybody has better/more simple idea then please point to a specific
feature which works on all resource management systems.


Here's an example for the TM to run workloads without being connected

to the RM, without ever having a valid token

All in all I see the main problem. Not sure what is the reason behind

that

a TM accepts tasks w/o registration but clearly not helping here.
I basically see 2 possible solutions:
* Make the TM accept tasks only after registration(not sure if it's
possible or makes sense at all)
* We send tokens right after container creation with
"updateDelegationTokens"
Not sure which one is more realistic to do since I'm not involved the new
feature.
WDYT?


On Thu, Feb 3, 2022 at 3:09 PM Till Rohrmann 

wrote:

Hi everyone,

Sorry for joining this discussion late. I also did not read all

responses

in this thread so my question might already be answered: Why does Flink
need to be involved in the propagation of the tokens? Why do we need
explicit RPC calls in the Flink domain? Isn't this something the

underlying

resource management system could do or which every process could do on

its

own? I am a bit worried that we are making Flink responsible for

something

that it is not really designed to do so.

Cheers,
Till

On Thu, Feb 3, 2022 at 2:54 PM Chesnay Schepler 
wrote:


Here's an example for the TM to run workloads without being connected

to

the RM, while potentially having a valid token:

  1. TM registers at RM
  2. JobMaster requests slot from RM -> TM gets notified
  3. JM fails over
  4. TM re-offers the slot to the failed over JobMaster
  5. TM reconnects to RM at some point

Here's an example for the TM to run workloads without being connected

t

Re: [DISCUSS] FLIP-211: Kerberos delegation token framework

2022-02-03 Thread Gabor Somogyi
That's not the single purpose of the feature but in some environments it
caused problems.
The main intention is not to deploy keytab to all the nodes because the
attack surface is bigger + reduce the KDC load.
I've already described the situation previously in this thread so copying
it here.

COPY
"KDC *may* collapse under some circumstances" is the proper wording.

We have several customers who are executing workloads on Spark/Flink. Most
of the time I'm facing their
daily issues which is heavily environment and use-case dependent. I've seen
various cases:
* where the mentioned ~1k nodes were working fine
* where KDC thought the number of requests are coming from DDOS attack so
discontinued authentication
* where KDC was simply not responding because of the load
* where KDC was intermittently had some outage (this was the most nasty
thing)

Since you're managing relatively big cluster then you know that KDC is not
only used by Spark/Flink workloads
but the whole company IT infrastructure is bombing it so it really depends
on other factors too whether KDC is reaching
it's limit or not. Not sure what kind of evidence are you looking for but
I'm not authorized to share any information about
our clients data.

One thing is for sure. The more external system types are used in workloads
(for ex. HDFS, HBase, Hive, Kafka) which
are authenticating through KDC the more possibility to reach this threshold
when the cluster is big enough.
COPY

> The FLIP mentions scaling issues with 200 nodes; it's really surprising
to me that such a small number of requests can already cause issues.

One node/task doesn't mean 1 request. The following type of kerberos auth
types has been seen by me which can run at the same time:
HDFS, Hbase, Hive, Kafka, all DBs (oracle, mariaDB, etc...) Additionally
one task is not necessarily opens 1 connection.

All in all I don't have steps to reproduce but we've faced this already...

G


On Thu, Feb 3, 2022 at 5:15 PM Chesnay Schepler  wrote:

> What I don't understand is how this could overload the KDC. Aren't
> tokens valid for a relatively long time period?
>
> For new deployments where many TMs are started at once I could imagine
> it temporarily, but shouldn't the accesses to the KDC eventually
> naturally spread out?
>
> The FLIP mentions scaling issues with 200 nodes; it's really surprising
> to me that such a small number of requests can already cause issues.
>
> On 03/02/2022 16:14, Gabor Somogyi wrote:
> >> I would prefer not choosing the first option
> > Then the second option may play only.
> >
> >> I am not a Kerberos expert but is it really so that every application
> that
> > wants to use Kerberos needs to implement the token propagation itself?
> This
> > somehow feels as if there is something missing.
> >
> > OK, so first some kerberos + token intro.
> >
> > Some basics:
> > * TGT can be created from keytab
> > * TGT is needed to obtain TGS (called token)
> > * Authentication only works with TGS -> all places where external system
> is
> > needed either a TGT or TGS needed
> >
> > There are basically 2 ways to authenticate to a kerberos secured external
> > system:
> > 1. One needs a kerberos TGT which MUST be propagated to all JVMs. Here
> each
> > and every JVM obtains a TGS by itself which bombs the KDC that may
> collapse.
> > 2. One needs a kerberos TGT which exists only on a single place (in this
> > case JM). JM gets a TGS which MUST be propagated to all TMs because
> > otherwise authentication fails.
> >
> > Now the whole system works in a way that keytab file (we can imagine that
> > as plaintext password) is reachable on all nodes.
> > This is a relatively huge attack surface. Now the main intention is:
> > * Instead of propagating keytab file to all nodes propagate a TGS which
> has
> > limited lifetime (more secure)
> > * Do the TGS generation in a single place so KDC may not collapse +
> having
> > keytab only on a single node can be better protected
> >
> > As a final conclusion if there is a place which expects to do kerberos
> > authentication then it's a MUST to have either TGT or TGS.
> > Now it's done in a pretty unsecure way. The questions are the following:
> > * Do we want to leave this unsecure keytab propagation like this and bomb
> > KDC?
> > * If no then how do we propagate the more secure token to TMs.
> >
> > If the answer to the first question is no then the FLIP can be abandoned
> > and doesn't worth the further effort.
> > If the answer is yes then we can talk about the how part.
> >
> > G
> >
> >
> > On Thu, Feb 3, 2022 at 3:42 PM Till Rohrmann 
> wrote:
> >
> >> I would prefer not choosing the first option
> >>
> >>> Make the TM accept tasks only after registration(not sure if it's
> >> possible or makes sense at all)
> >>
> >> because it effectively means that we change how Flink's component
> lifecycle
> >> works for distributing Kerberos tokens. It also effectively means that
> a TM
> >> cannot make progress

Re: [DISCUSS] FLIP-211: Kerberos delegation token framework

2022-02-03 Thread Gabor Somogyi
Oh and the most important reason I've forgotten.
Without the feature in the FLIP all secure workloads with delegation tokens
are going to stop when tokens are reaching it's max lifetime 🙂
This is around 7 days with default config...

On Thu, Feb 3, 2022 at 5:30 PM Gabor Somogyi 
wrote:

> That's not the single purpose of the feature but in some environments it
> caused problems.
> The main intention is not to deploy keytab to all the nodes because the
> attack surface is bigger + reduce the KDC load.
> I've already described the situation previously in this thread so copying
> it here.
>
> COPY
> "KDC *may* collapse under some circumstances" is the proper wording.
>
> We have several customers who are executing workloads on Spark/Flink. Most
> of the time I'm facing their
> daily issues which is heavily environment and use-case dependent. I've
> seen various cases:
> * where the mentioned ~1k nodes were working fine
> * where KDC thought the number of requests are coming from DDOS attack so
> discontinued authentication
> * where KDC was simply not responding because of the load
> * where KDC was intermittently had some outage (this was the most nasty
> thing)
>
> Since you're managing relatively big cluster then you know that KDC is not
> only used by Spark/Flink workloads
> but the whole company IT infrastructure is bombing it so it really depends
> on other factors too whether KDC is reaching
> it's limit or not. Not sure what kind of evidence are you looking for but
> I'm not authorized to share any information about
> our clients data.
>
> One thing is for sure. The more external system types are used in
> workloads (for ex. HDFS, HBase, Hive, Kafka) which
> are authenticating through KDC the more possibility to reach this
> threshold when the cluster is big enough.
> COPY
>
> > The FLIP mentions scaling issues with 200 nodes; it's really surprising
> to me that such a small number of requests can already cause issues.
>
> One node/task doesn't mean 1 request. The following type of kerberos auth
> types has been seen by me which can run at the same time:
> HDFS, Hbase, Hive, Kafka, all DBs (oracle, mariaDB, etc...) Additionally
> one task is not necessarily opens 1 connection.
>
> All in all I don't have steps to reproduce but we've faced this already...
>
> G
>
>
> On Thu, Feb 3, 2022 at 5:15 PM Chesnay Schepler 
> wrote:
>
>> What I don't understand is how this could overload the KDC. Aren't
>> tokens valid for a relatively long time period?
>>
>> For new deployments where many TMs are started at once I could imagine
>> it temporarily, but shouldn't the accesses to the KDC eventually
>> naturally spread out?
>>
>> The FLIP mentions scaling issues with 200 nodes; it's really surprising
>> to me that such a small number of requests can already cause issues.
>>
>> On 03/02/2022 16:14, Gabor Somogyi wrote:
>> >> I would prefer not choosing the first option
>> > Then the second option may play only.
>> >
>> >> I am not a Kerberos expert but is it really so that every application
>> that
>> > wants to use Kerberos needs to implement the token propagation itself?
>> This
>> > somehow feels as if there is something missing.
>> >
>> > OK, so first some kerberos + token intro.
>> >
>> > Some basics:
>> > * TGT can be created from keytab
>> > * TGT is needed to obtain TGS (called token)
>> > * Authentication only works with TGS -> all places where external
>> system is
>> > needed either a TGT or TGS needed
>> >
>> > There are basically 2 ways to authenticate to a kerberos secured
>> external
>> > system:
>> > 1. One needs a kerberos TGT which MUST be propagated to all JVMs. Here
>> each
>> > and every JVM obtains a TGS by itself which bombs the KDC that may
>> collapse.
>> > 2. One needs a kerberos TGT which exists only on a single place (in this
>> > case JM). JM gets a TGS which MUST be propagated to all TMs because
>> > otherwise authentication fails.
>> >
>> > Now the whole system works in a way that keytab file (we can imagine
>> that
>> > as plaintext password) is reachable on all nodes.
>> > This is a relatively huge attack surface. Now the main intention is:
>> > * Instead of propagating keytab file to all nodes propagate a TGS which
>> has
>> > limited lifetime (more secure)
>> > * Do the TGS generation in a single place so KDC may not collapse +
>> having
>> > keytab only on a single node can be better protected
>> >
>> > As a final conclusion if there is a place which expects to do kerberos
>> > authentication then it's a MUST to have either TGT or TGS.
>> > Now it's done in a pretty unsecure way. The questions are the following:
>> > * Do we want to leave this unsecure keytab propagation like this and
>> bomb
>> > KDC?
>> > * If no then how do we propagate the more secure token to TMs.
>> >
>> > If the answer to the first question is no then the FLIP can be abandoned
>> > and doesn't worth the further effort.
>> > If the answer is yes then we can talk about the how 

Re: [DISCUSS] FLIP-211: Kerberos delegation token framework

2022-02-03 Thread Till Rohrmann
I don't have a good alternative solution but it sounds to me a bit as if we
are trying to solve Kerberos' scalability problems within Flink. And even
if we do it like this, there is no guarantee that it works because there
can be other applications bombing the KDC with requests. From a
maintainability and separation of concerns perspective I'd rather have this
as some kind of external tool/service that makes KDC scale better and that
Flink processes can talk to to obtain the tokens.

Cheers,
Till

On Thu, Feb 3, 2022 at 6:01 PM Gabor Somogyi 
wrote:

> Oh and the most important reason I've forgotten.
> Without the feature in the FLIP all secure workloads with delegation tokens
> are going to stop when tokens are reaching it's max lifetime 🙂
> This is around 7 days with default config...
>
> On Thu, Feb 3, 2022 at 5:30 PM Gabor Somogyi 
> wrote:
>
> > That's not the single purpose of the feature but in some environments it
> > caused problems.
> > The main intention is not to deploy keytab to all the nodes because the
> > attack surface is bigger + reduce the KDC load.
> > I've already described the situation previously in this thread so copying
> > it here.
> >
> > COPY
> > "KDC *may* collapse under some circumstances" is the proper wording.
> >
> > We have several customers who are executing workloads on Spark/Flink.
> Most
> > of the time I'm facing their
> > daily issues which is heavily environment and use-case dependent. I've
> > seen various cases:
> > * where the mentioned ~1k nodes were working fine
> > * where KDC thought the number of requests are coming from DDOS attack so
> > discontinued authentication
> > * where KDC was simply not responding because of the load
> > * where KDC was intermittently had some outage (this was the most nasty
> > thing)
> >
> > Since you're managing relatively big cluster then you know that KDC is
> not
> > only used by Spark/Flink workloads
> > but the whole company IT infrastructure is bombing it so it really
> depends
> > on other factors too whether KDC is reaching
> > it's limit or not. Not sure what kind of evidence are you looking for but
> > I'm not authorized to share any information about
> > our clients data.
> >
> > One thing is for sure. The more external system types are used in
> > workloads (for ex. HDFS, HBase, Hive, Kafka) which
> > are authenticating through KDC the more possibility to reach this
> > threshold when the cluster is big enough.
> > COPY
> >
> > > The FLIP mentions scaling issues with 200 nodes; it's really surprising
> > to me that such a small number of requests can already cause issues.
> >
> > One node/task doesn't mean 1 request. The following type of kerberos auth
> > types has been seen by me which can run at the same time:
> > HDFS, Hbase, Hive, Kafka, all DBs (oracle, mariaDB, etc...) Additionally
> > one task is not necessarily opens 1 connection.
> >
> > All in all I don't have steps to reproduce but we've faced this
> already...
> >
> > G
> >
> >
> > On Thu, Feb 3, 2022 at 5:15 PM Chesnay Schepler 
> > wrote:
> >
> >> What I don't understand is how this could overload the KDC. Aren't
> >> tokens valid for a relatively long time period?
> >>
> >> For new deployments where many TMs are started at once I could imagine
> >> it temporarily, but shouldn't the accesses to the KDC eventually
> >> naturally spread out?
> >>
> >> The FLIP mentions scaling issues with 200 nodes; it's really surprising
> >> to me that such a small number of requests can already cause issues.
> >>
> >> On 03/02/2022 16:14, Gabor Somogyi wrote:
> >> >> I would prefer not choosing the first option
> >> > Then the second option may play only.
> >> >
> >> >> I am not a Kerberos expert but is it really so that every application
> >> that
> >> > wants to use Kerberos needs to implement the token propagation itself?
> >> This
> >> > somehow feels as if there is something missing.
> >> >
> >> > OK, so first some kerberos + token intro.
> >> >
> >> > Some basics:
> >> > * TGT can be created from keytab
> >> > * TGT is needed to obtain TGS (called token)
> >> > * Authentication only works with TGS -> all places where external
> >> system is
> >> > needed either a TGT or TGS needed
> >> >
> >> > There are basically 2 ways to authenticate to a kerberos secured
> >> external
> >> > system:
> >> > 1. One needs a kerberos TGT which MUST be propagated to all JVMs. Here
> >> each
> >> > and every JVM obtains a TGS by itself which bombs the KDC that may
> >> collapse.
> >> > 2. One needs a kerberos TGT which exists only on a single place (in
> this
> >> > case JM). JM gets a TGS which MUST be propagated to all TMs because
> >> > otherwise authentication fails.
> >> >
> >> > Now the whole system works in a way that keytab file (we can imagine
> >> that
> >> > as plaintext password) is reachable on all nodes.
> >> > This is a relatively huge attack surface. Now the main intention is:
> >> > * Instead of propagating keytab file to all nodes

Re: [DISCUSS] FLIP-211: Kerberos delegation token framework

2022-02-03 Thread Gyula Fóra
Hi Till!

The delegation token framework solves a few production problems, KDC
scalability is just one and probably not the most important.

As Gabor has explained some of which are:
 - Solves the problem for token renewal for long running jobs which would
currently time out and die
 - Improves security by not exposing keytabs on each node
 - Reduces KDC load

I do not think we should reject the design just because one of the things
it solves is not primarily Flink's responsibility.
Even if that is the case I think the other issues like security and general
token renewal seem very important to me.

Cheers,
Gyula

On Thu, Feb 3, 2022 at 6:34 PM Till Rohrmann  wrote:

> I don't have a good alternative solution but it sounds to me a bit as if we
> are trying to solve Kerberos' scalability problems within Flink. And even
> if we do it like this, there is no guarantee that it works because there
> can be other applications bombing the KDC with requests. From a
> maintainability and separation of concerns perspective I'd rather have this
> as some kind of external tool/service that makes KDC scale better and that
> Flink processes can talk to to obtain the tokens.
>
> Cheers,
> Till
>
> On Thu, Feb 3, 2022 at 6:01 PM Gabor Somogyi 
> wrote:
>
> > Oh and the most important reason I've forgotten.
> > Without the feature in the FLIP all secure workloads with delegation
> tokens
> > are going to stop when tokens are reaching it's max lifetime 🙂
> > This is around 7 days with default config...
> >
> > On Thu, Feb 3, 2022 at 5:30 PM Gabor Somogyi 
> > wrote:
> >
> > > That's not the single purpose of the feature but in some environments
> it
> > > caused problems.
> > > The main intention is not to deploy keytab to all the nodes because the
> > > attack surface is bigger + reduce the KDC load.
> > > I've already described the situation previously in this thread so
> copying
> > > it here.
> > >
> > > COPY
> > > "KDC *may* collapse under some circumstances" is the proper wording.
> > >
> > > We have several customers who are executing workloads on Spark/Flink.
> > Most
> > > of the time I'm facing their
> > > daily issues which is heavily environment and use-case dependent. I've
> > > seen various cases:
> > > * where the mentioned ~1k nodes were working fine
> > > * where KDC thought the number of requests are coming from DDOS attack
> so
> > > discontinued authentication
> > > * where KDC was simply not responding because of the load
> > > * where KDC was intermittently had some outage (this was the most nasty
> > > thing)
> > >
> > > Since you're managing relatively big cluster then you know that KDC is
> > not
> > > only used by Spark/Flink workloads
> > > but the whole company IT infrastructure is bombing it so it really
> > depends
> > > on other factors too whether KDC is reaching
> > > it's limit or not. Not sure what kind of evidence are you looking for
> but
> > > I'm not authorized to share any information about
> > > our clients data.
> > >
> > > One thing is for sure. The more external system types are used in
> > > workloads (for ex. HDFS, HBase, Hive, Kafka) which
> > > are authenticating through KDC the more possibility to reach this
> > > threshold when the cluster is big enough.
> > > COPY
> > >
> > > > The FLIP mentions scaling issues with 200 nodes; it's really
> surprising
> > > to me that such a small number of requests can already cause issues.
> > >
> > > One node/task doesn't mean 1 request. The following type of kerberos
> auth
> > > types has been seen by me which can run at the same time:
> > > HDFS, Hbase, Hive, Kafka, all DBs (oracle, mariaDB, etc...)
> Additionally
> > > one task is not necessarily opens 1 connection.
> > >
> > > All in all I don't have steps to reproduce but we've faced this
> > already...
> > >
> > > G
> > >
> > >
> > > On Thu, Feb 3, 2022 at 5:15 PM Chesnay Schepler 
> > > wrote:
> > >
> > >> What I don't understand is how this could overload the KDC. Aren't
> > >> tokens valid for a relatively long time period?
> > >>
> > >> For new deployments where many TMs are started at once I could imagine
> > >> it temporarily, but shouldn't the accesses to the KDC eventually
> > >> naturally spread out?
> > >>
> > >> The FLIP mentions scaling issues with 200 nodes; it's really
> surprising
> > >> to me that such a small number of requests can already cause issues.
> > >>
> > >> On 03/02/2022 16:14, Gabor Somogyi wrote:
> > >> >> I would prefer not choosing the first option
> > >> > Then the second option may play only.
> > >> >
> > >> >> I am not a Kerberos expert but is it really so that every
> application
> > >> that
> > >> > wants to use Kerberos needs to implement the token propagation
> itself?
> > >> This
> > >> > somehow feels as if there is something missing.
> > >> >
> > >> > OK, so first some kerberos + token intro.
> > >> >
> > >> > Some basics:
> > >> > * TGT can be created from keytab
> > >> > * TGT is needed to obtain TGS (called t

Re: [DISCUSS] FLIP-211: Kerberos delegation token framework

2022-02-03 Thread Gabor Somogyi
> And even
if we do it like this, there is no guarantee that it works because there
can be other applications bombing the KDC with requests.

1. The main issue to solve here is that workloads using delegation tokens
are stopping after 7 days with default configuration.
2. This is not new design, it's rock stable and performing well in Spark
for years.

> From a
maintainability and separation of concerns perspective I'd rather have this
as some kind of external tool/service that makes KDC scale better and that
Flink processes can talk to to obtain the tokens.

Ok, so we declare that users who try to use delegation tokens in Flink is
dead end code and not supported, right? Then this must be explicitely
written in the security documentation that such users who use that feature
are left behind.

As I see the discussion turned away from facts and started to speak about
feelings. If you have strategic problems with the feature please put your
-1 on the vote and we can spare quite some time.

G


On Thu, 3 Feb 2022, 18:34 Till Rohrmann,  wrote:

> I don't have a good alternative solution but it sounds to me a bit as if we
> are trying to solve Kerberos' scalability problems within Flink. And even
> if we do it like this, there is no guarantee that it works because there
> can be other applications bombing the KDC with requests. From a
> maintainability and separation of concerns perspective I'd rather have this
> as some kind of external tool/service that makes KDC scale better and that
> Flink processes can talk to to obtain the tokens.
>
> Cheers,
> Till
>
> On Thu, Feb 3, 2022 at 6:01 PM Gabor Somogyi 
> wrote:
>
> > Oh and the most important reason I've forgotten.
> > Without the feature in the FLIP all secure workloads with delegation
> tokens
> > are going to stop when tokens are reaching it's max lifetime 🙂
> > This is around 7 days with default config...
> >
> > On Thu, Feb 3, 2022 at 5:30 PM Gabor Somogyi 
> > wrote:
> >
> > > That's not the single purpose of the feature but in some environments
> it
> > > caused problems.
> > > The main intention is not to deploy keytab to all the nodes because the
> > > attack surface is bigger + reduce the KDC load.
> > > I've already described the situation previously in this thread so
> copying
> > > it here.
> > >
> > > COPY
> > > "KDC *may* collapse under some circumstances" is the proper wording.
> > >
> > > We have several customers who are executing workloads on Spark/Flink.
> > Most
> > > of the time I'm facing their
> > > daily issues which is heavily environment and use-case dependent. I've
> > > seen various cases:
> > > * where the mentioned ~1k nodes were working fine
> > > * where KDC thought the number of requests are coming from DDOS attack
> so
> > > discontinued authentication
> > > * where KDC was simply not responding because of the load
> > > * where KDC was intermittently had some outage (this was the most nasty
> > > thing)
> > >
> > > Since you're managing relatively big cluster then you know that KDC is
> > not
> > > only used by Spark/Flink workloads
> > > but the whole company IT infrastructure is bombing it so it really
> > depends
> > > on other factors too whether KDC is reaching
> > > it's limit or not. Not sure what kind of evidence are you looking for
> but
> > > I'm not authorized to share any information about
> > > our clients data.
> > >
> > > One thing is for sure. The more external system types are used in
> > > workloads (for ex. HDFS, HBase, Hive, Kafka) which
> > > are authenticating through KDC the more possibility to reach this
> > > threshold when the cluster is big enough.
> > > COPY
> > >
> > > > The FLIP mentions scaling issues with 200 nodes; it's really
> surprising
> > > to me that such a small number of requests can already cause issues.
> > >
> > > One node/task doesn't mean 1 request. The following type of kerberos
> auth
> > > types has been seen by me which can run at the same time:
> > > HDFS, Hbase, Hive, Kafka, all DBs (oracle, mariaDB, etc...)
> Additionally
> > > one task is not necessarily opens 1 connection.
> > >
> > > All in all I don't have steps to reproduce but we've faced this
> > already...
> > >
> > > G
> > >
> > >
> > > On Thu, Feb 3, 2022 at 5:15 PM Chesnay Schepler 
> > > wrote:
> > >
> > >> What I don't understand is how this could overload the KDC. Aren't
> > >> tokens valid for a relatively long time period?
> > >>
> > >> For new deployments where many TMs are started at once I could imagine
> > >> it temporarily, but shouldn't the accesses to the KDC eventually
> > >> naturally spread out?
> > >>
> > >> The FLIP mentions scaling issues with 200 nodes; it's really
> surprising
> > >> to me that such a small number of requests can already cause issues.
> > >>
> > >> On 03/02/2022 16:14, Gabor Somogyi wrote:
> > >> >> I would prefer not choosing the first option
> > >> > Then the second option may play only.
> > >> >
> > >> >> I am not a Kerberos expert but is it

Re: [DISCUSS] FLIP-211: Kerberos delegation token framework

2022-02-03 Thread Chesnay Schepler
First of, at no point have we questioned the use-case and importance of 
this feature, and the fact that David, Till and me spent time looking at 
the FLIP, asking questions, and discussing different aspects of it 
should make this obvious.


I'd appreciate it if you didn't dismiss our replies that quickly.

> Ok, so we declare that users who try to use delegation tokens in 
Flink is dead end code and not supported, right?


No one has said that. Are you claiming that your design is the /only 
possible implementation/ that is capable of achieving the stated goals, 
that there are 0 alternatives? On of the *main**points* of these 
discussion threads is to discover alternative implementations that maybe 
weren't thought of. Yes, that may imply that we amend your design, or 
reject it completely and come up with a new one.



Let's clarify what (I think) Till proposed to get the imagination juice 
flowing.


At the end of the day, all we need is a way to provide Flink processes 
with a token that can be periodically updated. _Who_ issues that token 
is irrelevant for the functionality to work. You are proposing for a new 
component in the Flink RM to do that; Till is proposing to have some 
external process do it. *That's it*.


How this could look like in practice is fairly straight forwad; add a 
pluggable interface (aka, your TokenProvider thing) that is loaded in 
each process, which can _somehow_ provide tokens that are then set in 
the UserGroupInformation.
_How_ the provider receives token is up to the provider. It _may_ just 
talk directly to Kerberos, or it could use some communication channel to 
accept tokens from the outside.
This would for example make it a lot easier to properly integrate this 
into the lifecycle of the process, as we'd sidestep the whole "TM is 
running but still needs a Token" issue; it could become a proper setup 
step of the process that is independent from other Flink processes.


/Discuss/.

On 03/02/2022 18:57, Gabor Somogyi wrote:

And even

if we do it like this, there is no guarantee that it works because there
can be other applications bombing the KDC with requests.

1. The main issue to solve here is that workloads using delegation tokens
are stopping after 7 days with default configuration.
2. This is not new design, it's rock stable and performing well in Spark
for years.


 From a

maintainability and separation of concerns perspective I'd rather have this
as some kind of external tool/service that makes KDC scale better and that
Flink processes can talk to to obtain the tokens.

Ok, so we declare that users who try to use delegation tokens in Flink is
dead end code and not supported, right? Then this must be explicitely
written in the security documentation that such users who use that feature
are left behind.

As I see the discussion turned away from facts and started to speak about
feelings. If you have strategic problems with the feature please put your
-1 on the vote and we can spare quite some time.

G


On Thu, 3 Feb 2022, 18:34 Till Rohrmann,  wrote:


I don't have a good alternative solution but it sounds to me a bit as if we
are trying to solve Kerberos' scalability problems within Flink. And even
if we do it like this, there is no guarantee that it works because there
can be other applications bombing the KDC with requests. From a
maintainability and separation of concerns perspective I'd rather have this
as some kind of external tool/service that makes KDC scale better and that
Flink processes can talk to to obtain the tokens.

Cheers,
Till

On Thu, Feb 3, 2022 at 6:01 PM Gabor Somogyi
wrote:


Oh and the most important reason I've forgotten.
Without the feature in the FLIP all secure workloads with delegation

tokens

are going to stop when tokens are reaching it's max lifetime 🙂
This is around 7 days with default config...

On Thu, Feb 3, 2022 at 5:30 PM Gabor Somogyi
wrote:


That's not the single purpose of the feature but in some environments

it

caused problems.
The main intention is not to deploy keytab to all the nodes because the
attack surface is bigger + reduce the KDC load.
I've already described the situation previously in this thread so

copying

it here.

COPY
"KDC *may* collapse under some circumstances" is the proper wording.

We have several customers who are executing workloads on Spark/Flink.

Most

of the time I'm facing their
daily issues which is heavily environment and use-case dependent. I've
seen various cases:
* where the mentioned ~1k nodes were working fine
* where KDC thought the number of requests are coming from DDOS attack

so

discontinued authentication
* where KDC was simply not responding because of the load
* where KDC was intermittently had some outage (this was the most nasty
thing)

Since you're managing relatively big cluster then you know that KDC is

not

only used by Spark/Flink workloads
but the whole company IT infrastructure is bombing it so it really

depends

on other factors too whethe

Re: [DISCUSS] FLIP-211: Kerberos delegation token framework

2022-02-03 Thread Gyula Fóra
Hi Team!

Let's all calm down a little and not let our emotions affect the discussion
too much.
There has been a lot of effort spent from all involved parties so this is
quite understandable :)

Even though not everyone said this explicitly, it seems that everyone more
or less agrees that a feature implementing token renewal is necessary and
valuable.

The main point of contention is: where should the token renewal
logic run and how to get the tokens to wherever needed.

>From my perspective the current design is very reasonable at first sight
because:
 1. It runs the token renewal in a single place avoiding extra CDC workload
 2. Does not introduce new processes, extra communication channels etc but
piggybacks on existing robust mechanisms.

I understand the concerns about adding new things in the resource manager
but I think that really depends on how we look at it.
We cannot reasonably expect a custom token renewal process to have it's own
secure distribution logic like Flink has now, that is a complete overkill.
This practically means that we will not have a slim efficient
implementation for this but something unnecessarily complex. And the only
thing we get in return is a bit less code in the resource manager.

>From a logical standpoint the delegation framework needs to run at a
centralized place and need to be able to access new task manager processes
to achieve all it's design goals.
We can drop a single renewer as a design goal but that might be a decision
that can affect large scale production runs.

Cheers,
Gyula




On Thu, Feb 3, 2022 at 7:32 PM Chesnay Schepler  wrote:

> First of, at no point have we questioned the use-case and importance of
> this feature, and the fact that David, Till and me spent time looking at
> the FLIP, asking questions, and discussing different aspects of it
> should make this obvious.
>
> I'd appreciate it if you didn't dismiss our replies that quickly.
>
>  > Ok, so we declare that users who try to use delegation tokens in
> Flink is dead end code and not supported, right?
>
> No one has said that. Are you claiming that your design is the /only
> possible implementation/ that is capable of achieving the stated goals,
> that there are 0 alternatives? On of the *main**points* of these
> discussion threads is to discover alternative implementations that maybe
> weren't thought of. Yes, that may imply that we amend your design, or
> reject it completely and come up with a new one.
>
>
> Let's clarify what (I think) Till proposed to get the imagination juice
> flowing.
>
> At the end of the day, all we need is a way to provide Flink processes
> with a token that can be periodically updated. _Who_ issues that token
> is irrelevant for the functionality to work. You are proposing for a new
> component in the Flink RM to do that; Till is proposing to have some
> external process do it. *That's it*.
>
> How this could look like in practice is fairly straight forwad; add a
> pluggable interface (aka, your TokenProvider thing) that is loaded in
> each process, which can _somehow_ provide tokens that are then set in
> the UserGroupInformation.
> _How_ the provider receives token is up to the provider. It _may_ just
> talk directly to Kerberos, or it could use some communication channel to
> accept tokens from the outside.
> This would for example make it a lot easier to properly integrate this
> into the lifecycle of the process, as we'd sidestep the whole "TM is
> running but still needs a Token" issue; it could become a proper setup
> step of the process that is independent from other Flink processes.
>
> /Discuss/.
>
> On 03/02/2022 18:57, Gabor Somogyi wrote:
> >> And even
> > if we do it like this, there is no guarantee that it works because there
> > can be other applications bombing the KDC with requests.
> >
> > 1. The main issue to solve here is that workloads using delegation tokens
> > are stopping after 7 days with default configuration.
> > 2. This is not new design, it's rock stable and performing well in Spark
> > for years.
> >
> >>  From a
> > maintainability and separation of concerns perspective I'd rather have
> this
> > as some kind of external tool/service that makes KDC scale better and
> that
> > Flink processes can talk to to obtain the tokens.
> >
> > Ok, so we declare that users who try to use delegation tokens in Flink is
> > dead end code and not supported, right? Then this must be explicitely
> > written in the security documentation that such users who use that
> feature
> > are left behind.
> >
> > As I see the discussion turned away from facts and started to speak about
> > feelings. If you have strategic problems with the feature please put your
> > -1 on the vote and we can spare quite some time.
> >
> > G
> >
> >
> > On Thu, 3 Feb 2022, 18:34 Till Rohrmann,  wrote:
> >
> >> I don't have a good alternative solution but it sounds to me a bit as
> if we
> >> are trying to solve Kerberos' scalability problems within Flink. And
> even
> >> if we 

Re: [DISCUSS] FLIP-211: Kerberos delegation token framework

2022-02-03 Thread Till Rohrmann
Sorry I didn't want to offend anybody if it was perceived like this. I can
see that me joining very late into the discussion w/o constructive ideas
was not nice. My motivation for asking for the reasoning behind the current
design proposal is primarily the lack of Kerberos knowledge. Moreover, it
happened before that we moved responsibilities into Flink that we regretted
later.

As I've said, I don't have a better idea right now. If we believe that it
is the right thing to make Flink responsible for distributing the tokens
and we don't find a better solution then we'll go for it. I just wanted to
make sure that we don't overlook an alternative solution that might be
easier to maintain in the long run.

Cheers,
Till

On Thu, Feb 3, 2022 at 7:52 PM Gyula Fóra  wrote:

> Hi Team!
>
> Let's all calm down a little and not let our emotions affect the discussion
> too much.
> There has been a lot of effort spent from all involved parties so this is
> quite understandable :)
>
> Even though not everyone said this explicitly, it seems that everyone more
> or less agrees that a feature implementing token renewal is necessary and
> valuable.
>
> The main point of contention is: where should the token renewal
> logic run and how to get the tokens to wherever needed.
>
> From my perspective the current design is very reasonable at first sight
> because:
>  1. It runs the token renewal in a single place avoiding extra CDC workload
>  2. Does not introduce new processes, extra communication channels etc but
> piggybacks on existing robust mechanisms.
>
> I understand the concerns about adding new things in the resource manager
> but I think that really depends on how we look at it.
> We cannot reasonably expect a custom token renewal process to have it's own
> secure distribution logic like Flink has now, that is a complete overkill.
> This practically means that we will not have a slim efficient
> implementation for this but something unnecessarily complex. And the only
> thing we get in return is a bit less code in the resource manager.
>
> From a logical standpoint the delegation framework needs to run at a
> centralized place and need to be able to access new task manager processes
> to achieve all it's design goals.
> We can drop a single renewer as a design goal but that might be a decision
> that can affect large scale production runs.
>
> Cheers,
> Gyula
>
>
>
>
> On Thu, Feb 3, 2022 at 7:32 PM Chesnay Schepler 
> wrote:
>
> > First of, at no point have we questioned the use-case and importance of
> > this feature, and the fact that David, Till and me spent time looking at
> > the FLIP, asking questions, and discussing different aspects of it
> > should make this obvious.
> >
> > I'd appreciate it if you didn't dismiss our replies that quickly.
> >
> >  > Ok, so we declare that users who try to use delegation tokens in
> > Flink is dead end code and not supported, right?
> >
> > No one has said that. Are you claiming that your design is the /only
> > possible implementation/ that is capable of achieving the stated goals,
> > that there are 0 alternatives? On of the *main**points* of these
> > discussion threads is to discover alternative implementations that maybe
> > weren't thought of. Yes, that may imply that we amend your design, or
> > reject it completely and come up with a new one.
> >
> >
> > Let's clarify what (I think) Till proposed to get the imagination juice
> > flowing.
> >
> > At the end of the day, all we need is a way to provide Flink processes
> > with a token that can be periodically updated. _Who_ issues that token
> > is irrelevant for the functionality to work. You are proposing for a new
> > component in the Flink RM to do that; Till is proposing to have some
> > external process do it. *That's it*.
> >
> > How this could look like in practice is fairly straight forwad; add a
> > pluggable interface (aka, your TokenProvider thing) that is loaded in
> > each process, which can _somehow_ provide tokens that are then set in
> > the UserGroupInformation.
> > _How_ the provider receives token is up to the provider. It _may_ just
> > talk directly to Kerberos, or it could use some communication channel to
> > accept tokens from the outside.
> > This would for example make it a lot easier to properly integrate this
> > into the lifecycle of the process, as we'd sidestep the whole "TM is
> > running but still needs a Token" issue; it could become a proper setup
> > step of the process that is independent from other Flink processes.
> >
> > /Discuss/.
> >
> > On 03/02/2022 18:57, Gabor Somogyi wrote:
> > >> And even
> > > if we do it like this, there is no guarantee that it works because
> there
> > > can be other applications bombing the KDC with requests.
> > >
> > > 1. The main issue to solve here is that workloads using delegation
> tokens
> > > are stopping after 7 days with default configuration.
> > > 2. This is not new design, it's rock stable and performing well in
> Spark
> > > for years.