[jira] [Created] (FLINK-27808) Allow "kubernetes" as setting for HA_MODE

2022-05-27 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-27808:


 Summary: Allow "kubernetes" as setting for HA_MODE
 Key: FLINK-27808
 URL: https://issues.apache.org/jira/browse/FLINK-27808
 Project: Flink
  Issue Type: Improvement
  Components: Deployment / Kubernetes, Runtime / Configuration
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.16.0


Make it easier to enable kubernetes HA by allowing "kubernetes" as a setting 
for HA_MODE.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27809) Clarify that cluster-id is mandatory for Kubernetes HA in standalone mode

2022-05-27 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-27809:


 Summary: Clarify that cluster-id is mandatory for Kubernetes HA in 
standalone mode
 Key: FLINK-27809
 URL: https://issues.apache.org/jira/browse/FLINK-27809
 Project: Flink
  Issue Type: Improvement
  Components: Deployment / Kubernetes, Runtime / Configuration
Reporter: Chesnay Schepler
 Fix For: 1.16.0


The description for KubernetesConfigOptions#CLUSTER_ID states that the client 
generates this automatically if it isn't set. This is technically correct, 
because the client is not involved in the deployment for standalone clusters, 
but to users this sounds like it is optional in general, while it must be set 
(to an ideally unique value) in standalone mode.




--
This message was sent by Atlassian Jira
(v8.20.7#820007)


Re: [DISCUSS] FLIP-217 Support watermark alignment of source splits

2022-05-27 Thread Becket Qin
I had an offline discussion with Piotr and here is the summary. Please
correct me if I miss something, Piotr.

There are two things we would like to seek more opinions from the
community, so we can make progress on this FLIP.

1. The General pattern to add obligatory features to existing interfaces.
***
For interfaces exposed to the developers for implementation, they are
either intended to be *optional* or *obligatory. *While it is quite clear
about how to convey that intention when creating the interfaces, it is not
as commonly agreed when we are adding new features to an existing
interface. In general, Flink uses decorative interfaces when adding
optional features to existing interfaces. Both Piotr and I agree that looks
good.

Different opinions are mainly about how to add obligatory features to the
existing interfaces, probably due to different understandings of
"obligatory".

We have discussed about four options:

*Option 1:*

   - Just add a new method to the existing interface.
   - For backwards compatibility, the method would have a default
   implementation throwing "UnsupportedOperationException".
   - In the next major version, remove the default implementation.
   - For the developers, any method with a default implementation throwing
   an "UnsupportedOperationException" should be taken as obligatory.

*Option 2:*

   - Always make the features optional by adding a decorative interface,
   just like ordinary optional features.
   - Inform the developers via documentation that this feature is
   obligatory, although it looks like optional from the code.
   - In case the developers did not implement the decorative interface,
   throw an exception
   - In the next major version, move the methods in the decorative
   interface to the base interface, and deprecate the decorative interface.

*Option 3:*

   - Always bump the major version when a new obligatory feature is added,
   even if we may have to do it frequently.

*Option 4:*

   - Add a V2, V3... of the interface affected by the new obligatory
   feature.
   - In the next major versions, deprecate old versions of the interfaces.

Both Piotr and me agreed that option 3 and option 4 have a big side effect
and should be avoided. We have different preference between option 1 and
option 2.

Personally I prefer option 1, the reasons are:
  a) simple and intuitive. Java 8 introduced the default impl in interfaces
exactly for interface evolving, and this is a common pattern in many
projects.
  b) prominent to the developers that the feature is expected to be
implemented, because it explicitly throws an exception in the default impl.
  c) low maintenance overhead - the Flink framework can always assume the
method exists, so no special handling logic is needed.
  d) communicate a clear semantic boundary between optional and obligatory
features in the Flink to the developers.
  - Optional: Jobs still run without exception if these methods are not
implemented. e.g. all the SupportsXXXPushDown interfaces.
  - Obligatory: Jobs may fail if these methods are not implemented
properly. e..g SourceReader#pauseOrResumeSplits(). This is a common pattern
in Java, e.g. Iterator.remove() by default throws
"UnsupportedOperationException", informing the implementation that things
may go wrong if this method is not implemented.

As for option 2, Although the API itself sounds clean, it misleads people
to think of an obligatory feature to be optional - from the code the
feature is optional, but the documents say it is obligatory. We probably
should avoid such code-doc inconsistency, as people will be confused. And I
would actually be bewildered that sometimes not implementing an "optional"
feature is fine, but sometimes it causes the jobs to fail.

In response to the argument that the method with a default implementation
is always optional, if that is true, it actually means all the interfaces
should be immutable once they are created. If we want to add a method to an
existing interface, for backwards compatibility, we will have to provide a
default implementation. And the fact it has a default implementation
indicates the method is optional. If that method is optional, it should
reside in a separate decorative interface, otherwise it clogs that existing
interface. Therefore, people should never add a method to an existing
interface. I find this conclusion a bit extreme.

Piotr prefers option 2, his opinions are:
a) Obligatory methods are the methods that fail the code compilation if
not implemented.
b) All obligatory methods should reside in the base interface, without
a default implementation. And all the optional methods should be in
decorative interfaces. This is a clean API.
c) due to b), there isn't a viable solution to add an obligatory method
to an existing interface in a backwards compatible way. Unless we are OK
with breaking backwards compatibility, all the

[jira] [Created] (FLINK-27810) Elasticsearch e2e jars bundle way more than they should

2022-05-27 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-27810:


 Summary: Elasticsearch e2e jars bundle way more than they should
 Key: FLINK-27810
 URL: https://issues.apache.org/jira/browse/FLINK-27810
 Project: Flink
  Issue Type: Technical Debt
  Components: Build System, Connectors / ElasticSearch, Tests
Affects Versions: 1.16.0
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.16.0


The jars bundle flink-end-to-end-tests-common-elasticsearch and all of it's 
transitive dependencies, like junit or flink-rpc-core.

All of these are unnecessary for the test to work and really shouldn't be 
bundled.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27811) Remove netty dependency in flink-test-utils

2022-05-27 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-27811:


 Summary: Remove netty dependency in flink-test-utils
 Key: FLINK-27811
 URL: https://issues.apache.org/jira/browse/FLINK-27811
 Project: Flink
  Issue Type: Technical Debt
  Components: Build System, Tests
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.16.0


For some reason we bundle a relocated version of netty in flink-test-utils. 
AFAICT this should be unnecessary because nothing makes use of the relocated 
version.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


Re: [DISCUSS] FLIP-221 Abstraction for lookup source cache and metric

2022-05-27 Thread Jingsong Li
Hi all,

I think the problem now is below:
1. AllCache and PartialCache interface on the non-uniform, one needs to
provide LookupProvider, the other needs to provide CacheBuilder.
2. AllCache definition is not flexible, for example, PartialCache can use
any custom storage, while the AllCache can not, AllCache can also be
considered to store memory or disk, also need a flexible strategy.
3. AllCache can not customize ReloadStrategy, currently only
ScheduledReloadStrategy.

In order to solve the above problems, the following are my ideas.

## Top level cache interfaces:

```

public interface CacheLookupProvider extends
LookupTableSource.LookupRuntimeProvider {

CacheBuilder createCacheBuilder();
}


public interface CacheBuilder {
Cache create();
}


public interface Cache {

/**
 * Returns the value associated with key in this cache, or null if
there is no cached value for
 * key.
 */
@Nullable
Collection getIfPresent(RowData key);

/** Returns the number of key-value mappings in the cache. */
long size();
}

```

## Partial cache

```

public interface PartialCacheLookupFunction extends CacheLookupProvider {

@Override
PartialCacheBuilder createCacheBuilder();

/** Creates an {@link LookupFunction} instance. */
LookupFunction createLookupFunction();
}


public interface PartialCacheBuilder extends CacheBuilder {

PartialCache create();
}


public interface PartialCache extends Cache {

/**
 * Associates the specified value rows with the specified key row
in the cache. If the cache
 * previously contained value associated with the key, the old
value is replaced by the
 * specified value.
 *
 * @return the previous value rows associated with key, or null if
there was no mapping for key.
 * @param key - key row with which the specified value is to be associated
 * @param value – value rows to be associated with the specified key
 */
Collection put(RowData key, Collection value);

/** Discards any cached value for the specified key. */
void invalidate(RowData key);
}

```

## All cache
```

public interface AllCacheLookupProvider extends CacheLookupProvider {

void registerReloadStrategy(ScheduledExecutorService
executorService, Reloader reloader);

ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider();

@Override
AllCacheBuilder createCacheBuilder();
}


public interface AllCacheBuilder extends CacheBuilder {

AllCache create();
}


public interface AllCache extends Cache {

void putAll(Iterator> allEntries);

void clearAll();
}


public interface Reloader {

void reload();
}

```

Best,
Jingsong

On Fri, May 27, 2022 at 11:10 AM Jingsong Li  wrote:

> Thanks Qingsheng and all for your discussion.
>
> Very sorry to jump in so late.
>
> Maybe I missed something?
> My first impression when I saw the cache interface was, why don't we
> provide an interface similar to guava cache [1], on top of guava cache,
> caffeine also makes extensions for asynchronous calls.[2]
> There is also the bulk load in caffeine too.
>
> I am also more confused why first from LookupCacheFactory.Builder and then
> to Factory to create Cache.
>
> [1] https://github.com/google/guava
> [2] https://github.com/ben-manes/caffeine/wiki/Population
>
> Best,
> Jingsong
>
> On Thu, May 26, 2022 at 11:17 PM Jark Wu  wrote:
>
>> After looking at the new introduced ReloadTime and Becket's comment,
>> I agree with Becket we should have a pluggable reloading strategy.
>> We can provide some common implementations, e.g., periodic reloading, and
>> daily reloading.
>> But there definitely be some connector- or business-specific reloading
>> strategies, e.g.
>> notify by a zookeeper watcher, reload once a new Hive partition is
>> complete.
>>
>> Best,
>> Jark
>>
>> On Thu, 26 May 2022 at 11:52, Becket Qin  wrote:
>>
>> > Hi Qingsheng,
>> >
>> > Thanks for updating the FLIP. A few comments / questions below:
>> >
>> > 1. Is there a reason that we have both "XXXFactory" and "XXXProvider".
>> > What is the difference between them? If they are the same, can we just
>> use
>> > XXXFactory everywhere?
>> >
>> > 2. Regarding the FullCachingLookupProvider, should the reloading policy
>> > also be pluggable? Periodical reloading could be sometimes be tricky in
>> > practice. For example, if user uses 24 hours as the cache refresh
>> interval
>> > and some nightly batch job delayed, the cache update may still see the
>> > stale data.
>> >
>> > 3. In DefaultLookupCacheFactory, it looks like InitialCapacity should be
>> > removed.
>> >
>> > 4. The purpose of LookupFunctionProvider#cacheMissingKey() seems a
>> little
>> > confusing to me. If Optional getCacheFactory()
>> returns
>> > a non-empty factory, doesn't that already indicates the framework to
>> cache
>> > the missing keys? Also, why is this method returning an
>> Optional
>> > instead of boolean?
>> >
>> > Thanks,
>> >
>> > Jiangjie (Becket) Qin
>> >
>> >
>> >
>> > On 

[jira] [Created] (FLINK-27812) Support Dynamic change of watched namespaces

2022-05-27 Thread Matyas Orhidi (Jira)
Matyas Orhidi created FLINK-27812:
-

 Summary: Support Dynamic change of watched namespaces
 Key: FLINK-27812
 URL: https://issues.apache.org/jira/browse/FLINK-27812
 Project: Flink
  Issue Type: Improvement
Reporter: Matyas Orhidi






--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27813) java.lang.IllegalStateException: afte migration from statefun 3.1.1 to 3.2.0

2022-05-27 Thread Oleksandr (Jira)
Oleksandr created FLINK-27813:
-

 Summary: java.lang.IllegalStateException: afte migration from 
statefun 3.1.1 to 3.2.0
 Key: FLINK-27813
 URL: https://issues.apache.org/jira/browse/FLINK-27813
 Project: Flink
  Issue Type: Bug
  Components: API / State Processor
Affects Versions: statefun-3.2.0
Reporter: Oleksandr


Issue was met after migration to 3.2.0 
{code:java}
ts.execution-paymentManualValidationRequestEgress-egress, Sink: 
ua.aval.payments.execution-norkomExecutionRequested-egress, Sink: 
ua.aval.payments.execution-shareStatusToManualValidation-egress) (1/1)#15863 
(98a1f6bef9a435ef9d0292fefeb64022) switched from RUNNING to FAILED with failure 
cause: java.lang.IllegalStateException: Unable to parse Netty transport 
spec.\n\tat 
org.apache.flink.statefun.flink.core.nettyclient.NettyRequestReplyClientFactory.parseTransportSpec(NettyRequestReplyClientFactory.java:56)\n\tat
 
org.apache.flink.statefun.flink.core.nettyclient.NettyRequestReplyClientFactory.createTransportClient(NettyRequestReplyClientFactory.java:39)\n\tat
 
org.apache.flink.statefun.flink.core.httpfn.HttpFunctionProvider.functionOfType(HttpFunctionProvider.java:49)\n\tat
 
org.apache.flink.statefun.flink.core.functions.PredefinedFunctionLoader.load(PredefinedFunctionLoader.java:70)\n\tat
 
org.apache.flink.statefun.flink.core.functions.PredefinedFunctionLoader.load(PredefinedFunctionLoader.java:47)\n\tat
 
org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.load(StatefulFunctionRepository.java:71)\n\tat
 
org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.get(StatefulFunctionRepository.java:59)\n\tat
 
org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.enqueue(LocalFunctionGroup.java:48)\n\tat
 
org.apache.flink.statefun.flink.core.functions.Reductions.enqueue(Reductions.java:153)\n\tat
 
org.apache.flink.statefun.flink.core.functions.Reductions.apply(Reductions.java:148)\n\tat
 
org.apache.flink.statefun.flink.core.functions.FunctionGroupOperator.processElement(FunctionGroupOperator.java:90)\n\tat
 
org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:99)\n\tat
 
org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:80)\n\tat
 
org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)\n\tat
 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)\n\tat
 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)\n\tat
 
org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.sendDownstream(FeedbackUnionOperator.java:180)\n\tat
 
org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.processElement(FeedbackUnionOperator.java:86)\n\tat
 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)\n\tat
 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)\n\tat
 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)\n\tat
 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)\n\tat
 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)\n\tat
 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)\n\tat
 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)\n\tat
 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)\n\tat
 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)\n\tat
 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)\n\tat 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)\n\tat 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)\n\tat 
java.base/java.lang.Thread.run(Unknown Source)\nCaused by: 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException:
 Time interval unit label 'm' does not match any of the recognized units: DAYS: 
(d | day | days), HOURS: (h | hour | hours), MINUTES: (min | minute | minutes), 
SECONDS: (s | sec | secs | second | seconds), MILLISECONDS: (ms | milli | 
millis | millisecond | milliseconds), MICROSECONDS: (µs | micro | micros | 
microsecond | microseconds), NANOSECONDS: (ns | nano | nanos | nanosecond | 
nanoseconds) (through reference chain: 
org.apache.flink.statefun.flink.core.nettyclient.NettyRequestReplySpec[\"timeouts\"]->org.apache.flink.statefun.flink.core.nettyclient.NettyRequestReplySpec$Timeouts[\"call\"])\n\tat
 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonM

[jira] [Created] (FLINK-27814) Add an abstraction layer for connectors to read and write row data instead of key-values

2022-05-27 Thread Caizhi Weng (Jira)
Caizhi Weng created FLINK-27814:
---

 Summary: Add an abstraction layer for connectors to read and write 
row data instead of key-values
 Key: FLINK-27814
 URL: https://issues.apache.org/jira/browse/FLINK-27814
 Project: Flink
  Issue Type: Improvement
  Components: Table Store
Affects Versions: table-store-0.2.0
Reporter: Caizhi Weng


Currently {{FileStore}} exposes an interface for reading and writing 
{{KeyValue}}. However connectors may have different ways to change a 
{{RowData}} to {{KeyValue}} under different {{WriteMode}}. This results in lots 
of {{if...else...}} branches and duplicated code.

We need to add an abstraction layer for connectors to read and write row data 
instead of key-values.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27815) Improve the join reorder strategy for batch sql job

2022-05-27 Thread godfrey he (Jira)
godfrey he created FLINK-27815:
--

 Summary: Improve the join reorder strategy for batch sql job 
 Key: FLINK-27815
 URL: https://issues.apache.org/jira/browse/FLINK-27815
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / Planner
Reporter: godfrey he


Join is heavy operation in the execution, the join order in a query can have a 
significant impact on the query’s performance. 
Currently, the planner has one  join reorder strategy which is provided by 
Apache Calcite,
but it strongly depends on the statistics. It's better we can provide different 
join reorder strategies for different situations, such as:
1. provide a join reorder strategy without statistics, e.g. eliminate cross 
joins
2. improve current join reorders strategy with statistics
3. provide hints to allow users to choose join order strategy
4. ...



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27816) file source reader has Some requirements

2022-05-27 Thread jinshuangxian (Jira)
jinshuangxian created FLINK-27816:
-

 Summary: file source reader has Some requirements
 Key: FLINK-27816
 URL: https://issues.apache.org/jira/browse/FLINK-27816
 Project: Flink
  Issue Type: Improvement
Affects Versions: 1.15.0
Reporter: jinshuangxian


I use the flink sql file-system connector to consume data, write it to object 
storage, and use the file-system consumption to process the data in the object 
storage. The whole process works fine. I have 2 new requirements:
1. can I specify a timestamp to consume files within a specified time range
2. It is hoped that the data written to the object storage can be ordered in 
the partition (for example, partitioned according to the device id), and the 
file source reader can consume the files in an orderly manner similar to kafka 
when consuming files.
Can some enhancements be made to the file source reader?



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27817) TaskManager metaspace OOM for session cluster

2022-05-27 Thread godfrey he (Jira)
godfrey he created FLINK-27817:
--

 Summary: TaskManager metaspace OOM for session cluster
 Key: FLINK-27817
 URL: https://issues.apache.org/jira/browse/FLINK-27817
 Project: Flink
  Issue Type: New Feature
  Components: Runtime / Task
Reporter: godfrey he


>From user ML: 
>https://www.mail-archive.com/user-zh@flink.apache.org/msg15224.html

For SQL jobs, the most operators are code generated with *unique class name*, 
this will cause the TM metaspace space continued growth until OOM in a session 
cluster.




--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27818) Model enums as references in OpenAPI spec

2022-05-27 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-27818:


 Summary: Model enums as references in OpenAPI spec
 Key: FLINK-27818
 URL: https://issues.apache.org/jira/browse/FLINK-27818
 Project: Flink
  Issue Type: Improvement
  Components: Documentation, Runtime / REST
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.16.0






--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27819) Generate better operationIds for OpenAPI spec

2022-05-27 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-27819:


 Summary: Generate better operationIds for OpenAPI spec
 Key: FLINK-27819
 URL: https://issues.apache.org/jira/browse/FLINK-27819
 Project: Flink
  Issue Type: Improvement
  Components: Documentation, Runtime / REST
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.16.0


There is an easy way to generate operation ids that are significantly better 
than the defaults.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


Re: [DISCUSS] FLIP-234: Support Retryable Lookup Join To Solve Delayed Updates Issue In External Systems

2022-05-27 Thread Jark Wu
Hi Lincoln,

Delayed Dim Join is a frequently requested feature, it's exciting to see
this feature is on the road.
The FLIP looks good to me in general. I only left some minor comments.

1) support retry for sync lookup
I'm also fine with the idea proposed by Jingsong. But this doesn't conflict
with the FLIP and can
be future work. It would be great if we can determine the APIs first.

1) "allow-unordered" => "unordered"
I would prefer the "unordered" output mode rather than "allow-unordered".
Because this fully aligns with the DataStream behaviors and avoids
confusion on the differences.
I understand the purpose that adding a "allow" prefix here, but I think the
semantic is fine to just
use "unordered" here. We didn't see any users confused about
OutputMode#UNORDERED.

Best,
Jark



On Fri, 27 May 2022 at 12:58, Jingsong Li  wrote:

> Thanks Lincoln for your proposal.
>
> Take a look at `strategy: fixed-delay delay: duration, e.g., 10s
> max-attempts: integer, e.g., 3`.
>
> Are these options only for async? It looks like normal lookups work too?
>
> One thing is: most of the lookup functions seem to be synchronous now?
> There are not so many asynchronous ones?
>
> Best,
> Jingsong
>
> On Tue, May 24, 2022 at 11:48 AM Lincoln Lee 
> wrote:
>
> > Hi all,
> >
> > Considering the new common table option 'lookup.max-retries' proposed in
> > FLIP-221[1] which is commonly used for exception handling in connector
> > implementation, we should clearly distinguish ASYNC_LOOKUP_RETRY from it
> to
> > avoid confusing users.
> >
> > To do so, the name ASYNC_LOOKUP_RETRY can change to
> > ASYNC_LOOKUP_MISS_RETRY,  and as the name implies, restrict it to support
> > retries only for lookup misses and no longer include exceptions (for sql
> > connectors, let the connector implementer decide how to handle exceptions
> > since there are various kinds of retryable exceptions and can not retry
> > ones).
> >
> > The FLIP[2] has been updated.
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221%3A+Abstraction+for+lookup+source+cache+and+metric
> > [2]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-234%3A+Support+Retryable+Lookup+Join+To+Solve+Delayed+Updates+Issue+In+External+Systems
> >
> >
> > Best,
> > Lincoln Lee
> >
> >
> > Lincoln Lee  于2022年5月19日周四 18:24写道:
> >
> > > Dear Flink developers,
> > >
> > > I would like to open a discussion on FLIP 234 [1] to support retryable
> > > lookup join to solve delayed updates issue, as a pre-work for this
> > > solution, we proposed FLIP-232[2] which adds a generic retry support
> for
> > > Async I/O.
> > > We prefer to offer this retry capability via query hints, similar to
> new
> > > join hints proposed in FLINK-27625[3] & FLIP-204[4].
> > >
> > > This feature is backwards compatible and transparently to connectors.
> For
> > > existing connectors which implements AsyncTableFunction, can easily
> > enable
> > > async retry via the new join hint.
> > >
> > > [1]
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-234%3A+Support+Retryable+Lookup+Join+To+Solve+Delayed+Updates+Issue+In+External+Systems
> > > [2]
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963
> > > [3] https://lists.apache.org/thread/jm9kg33wk9z2bvo2b0g5bp3n5kfj6qv8
> > > [4]
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-204:+Introduce+Hash+Lookup+Join
> > >
> > > Best,
> > > Lincoln Lee
> > >
> >
>


[jira] [Created] (FLINK-27820) Handle Upgrade/Deployment errors gracefully

2022-05-27 Thread Gyula Fora (Jira)
Gyula Fora created FLINK-27820:
--

 Summary: Handle Upgrade/Deployment errors gracefully
 Key: FLINK-27820
 URL: https://issues.apache.org/jira/browse/FLINK-27820
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.0.0
Reporter: Gyula Fora
Assignee: Gyula Fora
 Fix For: kubernetes-operator-1.1.0


The operator currently cannot gracefully handle the cases when there is a 
failure during (or directly after & and before updating the status) job 
submission.

This applies to both initial cluster submissions when a Flink CR was created 
but more importantly during upgrades.

This is slightly related to https://issues.apache.org/jira/browse/FLINK-27804 
where mid-upgrade observe was disabled to workaround some issues, this logic 
should also be improved to only skip observing last-state info for already 
finished jobs (that were observed before).

During upgrades, the observer should be able to recognize when the job/cluster 
was actually submitted already even if the status update subsequently failed 
and move the status into a healthy DEPLOYED state.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27821) Cannot delete flinkdeployment when the pod and deployment deleted manually

2022-05-27 Thread Hector Miuler Malpica Gallegos (Jira)
Hector Miuler Malpica Gallegos created FLINK-27821:
--

 Summary: Cannot delete flinkdeployment when the pod and deployment 
deleted manually
 Key: FLINK-27821
 URL: https://issues.apache.org/jira/browse/FLINK-27821
 Project: Flink
  Issue Type: Bug
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.1.0
Reporter: Hector Miuler Malpica Gallegos


My operator was installed with following command:

```

git clone g...@github.com:apache/flink-kubernetes-operator.git git checkout 
207b17b

cd flink-kubernetes-operator  

helm --debug upgrade -i \
           flink-kubernetes-operator helm/flink-kubernetes-operator \
           --set image.repository=ghcr.io/apache/flink-kubernetes-operator \
           --set image.tag=207b17b

```

Then I create a flinkDeployment and flinkSessionJob, then I delete the 
deployment of the flinkDeployment, and finally I wanted to delete the 
flinkdeployment

 

kubectl logs -f pod/flink-kubernetes-operator-5cf66cbbcb-bpl9p

 

```

2022-05-27 13:40:22,027 o.a.f.k.o.c.FlinkDeploymentController [INFO 
][flink-system/migration] Deleting FlinkDeployment
2022-05-27 13:40:34,047 o.a.f.s.n.i.n.c.AbstractChannel [WARN ] Force-closing a 
channel whose registration task was not accepted by an event loop: [id: 
0xb2062900]
java.util.concurrent.RejectedExecutionException: event executor terminated
        at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:923)
        at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.offerTask(SingleThreadEventExecutor.java:350)
        at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.addTask(SingleThreadEventExecutor.java:343)
        at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:825)
        at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:815)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.register(AbstractChannel.java:483)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.SingleThreadEventLoop.register(SingleThreadEventLoop.java:87)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.SingleThreadEventLoop.register(SingleThreadEventLoop.java:81)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.MultithreadEventLoopGroup.register(MultithreadEventLoopGroup.java:86)
        at 
org.apache.flink.shaded.netty4.io.netty.bootstrap.AbstractBootstrap.initAndRegister(AbstractBootstrap.java:323)
        at 
org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.doResolveAndConnect(Bootstrap.java:155)
        at 
org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:139)
        at 
org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:123)
        at 
org.apache.flink.runtime.rest.RestClient.submitRequest(RestClient.java:467)
        at 
org.apache.flink.runtime.rest.RestClient.sendRequest(RestClient.java:390)
        at 
org.apache.flink.runtime.rest.RestClient.sendRequest(RestClient.java:304)
        at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$null$32(RestClusterClient.java:863)
        at 
java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown 
Source)
        at 
java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
        at java.base/java.util.concurrent.CompletableFuture.postFire(Unknown 
Source)
        at 
java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(Unknown 
Source)
        at 
java.base/java.util.concurrent.CompletableFuture$Completion.run(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
Source)
        at java.base/java.lang.Thread.run(Unknown Source)
2022-05-27 13:40:34,047 o.a.f.s.n.i.n.u.c.D.rejectedExecution [ERROR] Failed to 
submit a listener notification task. Event loop shut down?
java.util.concurrent.RejectedExecutionException: event executor terminated
        at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:923)
        at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.offerTask(SingleThreadEventExecutor.java:350)
        at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.addTask(SingleThreadEventExecutor.java:343)
        at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:825)
        at 
org.apache.flink.shaded.netty4.i

Re: [DISCUSS] FLIP-221 Abstraction for lookup source cache and metric

2022-05-27 Thread Jing Ge
Thanks all for the valuable discussion. The new feature looks very
interesting.

According to the FLIP description: "*Currently we have JDBC, Hive and HBase
connector implemented lookup table source. All existing implementations
will be migrated to the current design and the migration will be
transparent to end users*." I was only wondering if we should pay attention
to HBase and similar DBs. Since, commonly, the lookup data will be huge
while using HBase, partial caching will be used in this case, if I am not
mistaken, which might have an impact on the block cache used by HBase, e.g.
LruBlockCache.
Another question is that, since HBase provides a sophisticated cache
solution, does it make sense to have a no-cache solution as one of the
default solutions so that customers will have no effort for the migration
if they want to stick with Hbase cache?

Best regards,
Jing

On Fri, May 27, 2022 at 11:19 AM Jingsong Li  wrote:

> Hi all,
>
> I think the problem now is below:
> 1. AllCache and PartialCache interface on the non-uniform, one needs to
> provide LookupProvider, the other needs to provide CacheBuilder.
> 2. AllCache definition is not flexible, for example, PartialCache can use
> any custom storage, while the AllCache can not, AllCache can also be
> considered to store memory or disk, also need a flexible strategy.
> 3. AllCache can not customize ReloadStrategy, currently only
> ScheduledReloadStrategy.
>
> In order to solve the above problems, the following are my ideas.
>
> ## Top level cache interfaces:
>
> ```
>
> public interface CacheLookupProvider extends
> LookupTableSource.LookupRuntimeProvider {
>
> CacheBuilder createCacheBuilder();
> }
>
>
> public interface CacheBuilder {
> Cache create();
> }
>
>
> public interface Cache {
>
> /**
>  * Returns the value associated with key in this cache, or null if
> there is no cached value for
>  * key.
>  */
> @Nullable
> Collection getIfPresent(RowData key);
>
> /** Returns the number of key-value mappings in the cache. */
> long size();
> }
>
> ```
>
> ## Partial cache
>
> ```
>
> public interface PartialCacheLookupFunction extends CacheLookupProvider {
>
> @Override
> PartialCacheBuilder createCacheBuilder();
>
> /** Creates an {@link LookupFunction} instance. */
> LookupFunction createLookupFunction();
> }
>
>
> public interface PartialCacheBuilder extends CacheBuilder {
>
> PartialCache create();
> }
>
>
> public interface PartialCache extends Cache {
>
> /**
>  * Associates the specified value rows with the specified key row
> in the cache. If the cache
>  * previously contained value associated with the key, the old
> value is replaced by the
>  * specified value.
>  *
>  * @return the previous value rows associated with key, or null if
> there was no mapping for key.
>  * @param key - key row with which the specified value is to be
> associated
>  * @param value – value rows to be associated with the specified key
>  */
> Collection put(RowData key, Collection value);
>
> /** Discards any cached value for the specified key. */
> void invalidate(RowData key);
> }
>
> ```
>
> ## All cache
> ```
>
> public interface AllCacheLookupProvider extends CacheLookupProvider {
>
> void registerReloadStrategy(ScheduledExecutorService
> executorService, Reloader reloader);
>
> ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider();
>
> @Override
> AllCacheBuilder createCacheBuilder();
> }
>
>
> public interface AllCacheBuilder extends CacheBuilder {
>
> AllCache create();
> }
>
>
> public interface AllCache extends Cache {
>
> void putAll(Iterator> allEntries);
>
> void clearAll();
> }
>
>
> public interface Reloader {
>
> void reload();
> }
>
> ```
>
> Best,
> Jingsong
>
> On Fri, May 27, 2022 at 11:10 AM Jingsong Li 
> wrote:
>
> > Thanks Qingsheng and all for your discussion.
> >
> > Very sorry to jump in so late.
> >
> > Maybe I missed something?
> > My first impression when I saw the cache interface was, why don't we
> > provide an interface similar to guava cache [1], on top of guava cache,
> > caffeine also makes extensions for asynchronous calls.[2]
> > There is also the bulk load in caffeine too.
> >
> > I am also more confused why first from LookupCacheFactory.Builder and
> then
> > to Factory to create Cache.
> >
> > [1] https://github.com/google/guava
> > [2] https://github.com/ben-manes/caffeine/wiki/Population
> >
> > Best,
> > Jingsong
> >
> > On Thu, May 26, 2022 at 11:17 PM Jark Wu  wrote:
> >
> >> After looking at the new introduced ReloadTime and Becket's comment,
> >> I agree with Becket we should have a pluggable reloading strategy.
> >> We can provide some common implementations, e.g., periodic reloading,
> and
> >> daily reloading.
> >> But there definitely be some connector- or business-specific reloading
> >> strategies, e.g.
> >> notify by a zookeeper watcher, reload once a new Hive partition is
>

Re: [DISCUSS] FLIP-231: Introduce SupportStatisticReport to support reporting statistics from source connectors

2022-05-27 Thread godfrey he
Hi, everyone.

Thanks for all the inputs.
If there is no more feedback, I think we can start the vote next monday.

Best,
Godfrey

Martijn Visser  于2022年5月25日周三 19:46写道:
>
> Hi Godfrey,
>
> Thanks for creating the FLIP. I have no comments.
>
> Best regards,
>
> Martijn
>
>
> On Tue, 17 May 2022 at 04:52, Jingsong Li  wrote:
>
> > Hi Godfrey,
> >
> > Thanks for your reply.
> >
> > Sounds good to me.
> >
> > > I think we should also introduce a config option
> >
> > We can add this option to the FLIP. I prefer a option for
> > FileSystemConnector, maybe a enum.
> >
> > Best,
> > Jingsong
> >
> > On Tue, May 17, 2022 at 10:31 AM godfrey he  wrote:
> >
> > > Hi Jingsong,
> > >
> > > Thanks for the feedback.
> > >
> > >
> > > >One concern I have is that we read the footer for each file, and this
> > may
> > > >be a bit costly in some cases. Is it possible for us to have some
> > > > hierarchical way
> > > yes, if there are thousands of orc/parquet files, it may take a long
> > time.
> > > So we can introduce a config option to let the user choose the
> > > granularity of the statistics.
> > > But the SIZE will not be introduced, because the planner does not use
> > > the file size statistics now.
> > > We can introduce once file size statistics is introduce in the future.
> > > I think we should also introduce a config option to enable/disable
> > > SupportStatisticReport,
> > > because it's a heavy operation for some connectors in some cases.
> > >
> > > > is the filter pushdown already happening at
> > > > this time?
> > > That's a good point. Currently, the filter push down is after partition
> > > pruning
> > > to prevent the filter push down rule from consuming the partition
> > > predicates.
> > > The statistics will be set to unknown if filter is pushed down now.
> > > To combine them all, we can create an optimization program after filter
> > > push
> > > down program to collect the statistics. This could avoid collecting
> > > statistics multiple times.
> > >
> > >
> > > Best,
> > > Godfrey
> > >
> > > Jingsong Li  于2022年5月13日周五 22:44写道:
> > > >
> > > > Thank Godfrey for driving.
> > > >
> > > > Looks very good~ This will undoubtedly greatly enhance the various
> > batch
> > > > mode connectors.
> > > >
> > > > I left some comments:
> > > >
> > > > ## FileBasedStatisticsReportableDecodingFormat
> > > >
> > > > One concern I have is that we read the footer for each file, and this
> > may
> > > > be a bit costly in some cases. Is it possible for us to have some
> > > > hierarchical way, e.g.
> > > > - No statistics are collected for files by default.
> > > > - SIZE: Generate statistics based on file Size, get the size of the
> > file
> > > > only with access to the master of the FileSystem.
> > > > - DETAILED: Get the complete statistics by format, possibly by
> > accessing
> > > > the footer of the file.
> > > >
> > > > ## When use the statistics reported by connector
> > > >
> > > > > When partitions are pruned by PushPartitionIntoTableSourceScanRule,
> > the
> > > > statistics should also be updated.
> > > >
> > > > I understand that we definitely need to use reporter after the
> > partition
> > > > prune, but another question: is the filter pushdown already happening
> > at
> > > > this time?
> > > > Can we make sure that in the following three cases, both the filter
> > > > pushdown and the partition prune happen before the stats reporting.
> > > > - only partition prune happens
> > > > - only filter pushdown happens
> > > > - both filter pushdown and partition prune happen
> > > >
> > > > Best,
> > > > Jingsong
> > > >
> > > > On Fri, May 13, 2022 at 6:57 PM godfrey he 
> > wrote:
> > > >
> > > > > Hi all,
> > > > >
> > > > > I would like to open a discussion on FLIP-231:  Introduce
> > > > > SupportStatisticReport
> > > > > to support reporting statistics from source connectors.
> > > > >
> > > > > Statistics are one of the most important inputs to the optimizer.
> > > > > Accurate and complete statistics allows the optimizer to be more
> > > powerful.
> > > > > Currently, the statistics of Flink SQL come from Catalog only,
> > > > > while many Connectors have the ability to provide statistics, e.g.
> > > > > FileSystem.
> > > > > In production, we find many tables in Catalog do not have any
> > > statistics.
> > > > > As a result, the optimizer can't generate better execution plans,
> > > > > especially for Batch jobs.
> > > > >
> > > > > There are two approaches to enhance statistics for the planner,
> > > > > one is to introduce the "ANALYZE TABLE" syntax which will write
> > > > > the analyzed result to the catalog, another is to introduce a new
> > > > > connector interface
> > > > > which allows the connector itself to report statistics directly to
> > the
> > > > > planner.
> > > > > The second one is a supplement to the catalog statistics.
> > > > >
> > > > > Here, we will discuss the second approach. Compared to the first one,
> > > > > the second one is to get statistics in real time, no

RE: [VOTE] Apache Flink Kubernetes Operator Release 1.0.0, release candidate #2

2022-05-27 Thread Jim Busche
Hi Yang,


Oh, I’ve been using podman on Red Hat for testing:

podman version

Client:   Podman Engine

Version:  4.0.2

API Version:  4.0.2


If I use Docker (version 20.10.13 for me right now) then it builds fine with 
that COPY git line.  Nice!



To use podman, I need to either comment out the COPY git line, or mkdir .git 
first.



Thanks, Jim







Re: [VOTE] Apache Flink Kubernetes Operator Release 1.0.0, release candidate #2

2022-05-27 Thread Gyula Fóra
Hi Devs!

We have been performing extensive manual testing on the release.

We have found some very infrequent savepoint upgrade issues with Flink 1.15
deployments that we could not reliably reproduce so far. I have added a fix
that hopefully eliminates the root cause (
https://issues.apache.org/jira/browse/FLINK-27804)
Since adding this we haven't hit the same problem again, so our confidence
in the fix is growing :)

I think it would make sense to create a new RC including this fix tomorrow,
or whenever you have time Yang.
We do not need to rush this release, I would prefer to take an extra 1-2
days to eliminate these corner cases as much as possible.

Cheers,
Gyula

On Fri, May 27, 2022 at 11:09 AM Jim Busche  wrote:

> Hi Yang,
>
>
> Oh, I’ve been using podman on Red Hat for testing:
>
> podman version
>
> Client:   Podman Engine
>
> Version:  4.0.2
>
> API Version:  4.0.2
>
>
> If I use Docker (version 20.10.13 for me right now) then it builds fine
> with that COPY git line.  Nice!
>
>
>
> To use podman, I need to either comment out the COPY git line, or mkdir
> .git first.
>
>
>
> Thanks, Jim
>
>
>
>
>
>


[jira] [Created] (FLINK-27822) Translate the doc of checkpoint/savepoint guarantees

2022-05-27 Thread fanrui (Jira)
fanrui created FLINK-27822:
--

 Summary: Translate the doc of checkpoint/savepoint guarantees
 Key: FLINK-27822
 URL: https://issues.apache.org/jira/browse/FLINK-27822
 Project: Flink
  Issue Type: Improvement
  Components: chinese-translation, Documentation
Affects Versions: 1.15.0, 1.16.0
Reporter: fanrui
 Fix For: 1.16.0, 1.15.1


Translate the change of FLINK-26134 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


Re: [VOTE] FLIP-224: Blocklist Mechanism

2022-05-27 Thread Zhu Zhu
Hi Chesnay,
Would you share your thoughts in the discussion thread if there are
still concerns?

Thanks,
Zhu

Chesnay Schepler  于2022年5月27日周五 14:54写道:

>
> -1 to put a lid on things for now, because I'm not quite done yet with
> the discussion.
>
> On 27/05/2022 05:25, Yangze Guo wrote:
> > +1 (binding)
> >
> > Best,
> > Yangze Guo
> >
> > On Thu, May 26, 2022 at 3:54 PM Yun Gao  
> > wrote:
> >> Thanks Lijie and Zhu for driving the FLIP!
> >>
> >> The blocked list functionality helps reduce the complexity in maintenance
> >> and the currently design looks good to me, thus +1 from my side (binding).
> >>
> >>
> >> Best,
> >> Yun
> >>
> >>
> >>
> >>
> >> --
> >> From:Xintong Song 
> >> Send Time:2022 May 26 (Thu.) 12:51
> >> To:dev 
> >> Subject:Re: [VOTE] FLIP-224: Blocklist Mechanism
> >>
> >> Thanks for driving this effort, Lijie.
> >>
> >> I think a nice addition would be to make this feature accessible directly
> >> from webui. However, there's no reason to block this FLIP on it.
> >>
> >> So +1 (binding) from my side.
> >>
> >> Best,
> >>
> >> Xintong
> >>
> >>
> >>
> >> On Fri, May 20, 2022 at 12:57 PM Lijie Wang 
> >> wrote:
> >>
> >>> Hi everyone,
> >>>
> >>> Thanks for the feedback for FLIP-224: Blocklist Mechanism [1] on the
> >>> discussion thread [2]
> >>>
> >>> I'd like to start a vote for it. The vote will last for at least 72 hours
> >>> unless there is an objection or insufficient votes.
> >>>
> >>> [1]
> >>>
> >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-224%3A+Blocklist+Mechanism
> >>> [2] https://lists.apache.org/thread/fngkk52kjbc6b6v9nn0lkfq6hhsbgb1h
> >>>
> >>> Best,
> >>> Lijie
> >>>
>


Re: [DISCUSS] FLIP-224: Blacklist Mechanism

2022-05-27 Thread Zhu Zhu
Regarding the concern of the SlotManager, my two cents:
1. it is necessary for the SlotManager to host blocked slots, in 2 cases:
  a. In standalone mode, a taskmanager may be temporarily added to
the blocklist. We do not want the TM to get disconnected and shut down.
So we need to keep its connection to RM and keep hosting its slots.
  b. When we want to avoid allocating slots to a slow nodes but do not
want to kill current running tasks on the nodes (MARK_BLOCKED mode).

There is possible a way to keep the connection of a blocked task manager
while hide its slots from SlotManager, but I feel it may be even much more
complicated.

2. It will not complicate the SlotManager too much. The SlotManager will
be offered a BlockedTaskManagerChecker when created, and just need
to use it to filter out blocked slots on slot request. Therefore I think the
complication is acceptable.

Thanks,
Zhu

Lijie Wang  于2022年5月25日周三 15:26写道:
>
> Hi everyone,
>
> I've updated the FLIP according to Chesnay's feedback, changes as follows:
> 1. Change the GET result to a map.
> 2. Only left *endTimestamp* in ADD operation, and change the rest (from
> POST) to PUT
> 3. Introduce a new slot pool implementation(BlocklistSlotPool) to
> encapsulate blocklist related functions.
> 4. Remove *mainThread* from BlocklistTracker, instead provide a
> *removeTimeoutItems* method to be called by outside components。
>
> Best,
> Lijie
>
> Lijie Wang  于2022年5月23日周一 22:51写道:
>
> > Hi Chesnay,
> >
> > Thanks for feedback.
> >
> > 1. Regarding the TM/Node id. Do you mean special characters may appear in
> > the rest URL?  Actually, I don't think so. The task manager id in REST API
> > should be the *ResourceID* of taskmanager in Flink, there should be no
> > special characters, and some existing REST APIs are already using it, e.g.
> > GET: http://{jm_rest_address:port}/taskmanagers/. The node
> > id should be an IP of a machine or node name in Yarn/Kubernetes, I think it
> > should also have no special characters.
> > 2. Regarding the GET query responses. I agree with you, it makes sense to
> > change the GET result to a map.
> >
> > 3. Regarding the endTimestamp.  I also agree with you, endTimestamp can
> > cover everything, and the endTimestamp is a unix timestamp, there should be
> > no timezone issues. But I think PUT and DELETE are enough, no PATCH.  The
> > add rest api is add or update, PUT can cover this semantics.
> >
> > 4. Regarding the slot pool/manager. I don't think the current slotpool
> > and slotmanager are able to support the MARK_BLOCKED(slots that are
> > already allocated will not be affected) action. The reasons are as
> > follows:
> >
> > a) for slot pool, with the MARK_BLOCKED action, when a slot state changes
> > from reserved(task assigned) to free(no task assigned), it is necessary to
> > check whether the slot should be released immediately(it should be released
> > immediately if the task manager is blocked, otherwise it may be allocated
> > to other tasks). I think it cannot be supported without being aware of
> > the blocklist information. Compared to the solution in FLIP, a more
> > appropriate/prefered way may be: Introduce a new slot pool
> > implementation for blocklist(may be named BlocklistSlotPool, it
> > extends/wrapps the original slot pool), and implement the parts that need
> > to be aware of the blocklist in this newly introduced slot pool, and the
> > original slot pool basically does not need to change.
> >
> > b) for slot manager, with the MARK_BLOCKED action, there may be free but
> > blocked slots in slot manager (the corresponding TMs cannot be
> > released/unregistered because there are still running tasks on them).
> > Therefore, we need to filter out the blocked slots when trying to fulfill
> > the slot requirements. Therefore it also needs to know the blocklist 
> > information.
> > A better way may be to abstract a resource allocation strategy, and make
> > the blocklist as a special implementation, then pass the resource
> > allocation strategy in when constructing the slot manager. Unfortunately,
> > the data structures in the two existing slot manager
> > implementations(*DeclarativeSlotManager* and *FineGrainedSlotManager*) are
> > quite different, it is not easy to abstract a common resource allocation
> > strategy, so we prefer to keep the current way(i.e. pass the blocklist
> > information directly into slot manager).
> >
> >
> > 5. Regarding the BlocklistTracker. I also agree with you, the 
> > BlocklistTracker
> > does not need to be aware of the executor, and the timeout actions can be
> > done outside.
> >
> > Chesnay Schepler  于2022年5月20日周五 17:34写道:
> >
> >> I have a number of concerns:
> >>
> >> Is the id used for deleting an item the same sent in the initial request
> >> (and not one returned by Flink)?
> >> I'm very concerned that the tm/node id can contain special characters.
> >>
> >> The GET query should return a map, not a list of items. This makes it
> >> easier to work with.
>