Re: Working with bounded Datastreams - Flink 1.11.1

2020-10-28 Thread Danny Chan
In SQL, you can use the over window to deduplicate the messages by the id
[1], but i'm not sure if there are same semantic operators in DataStream.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication

s_penakalap...@yahoo.com  于2020年10月28日周三
下午12:34写道:

> Hi All,
>
> Request your inputs please.
>
> Regards,
> Sunitha
>
> On Tuesday, October 27, 2020, 01:01:41 PM GMT+5:30,
> s_penakalap...@yahoo.com  wrote:
>
>
> Hi Team,
>
> I want to use Flink Datastream for Batch operations which involves huge
> data, I did try to calculate count and average on the whole Datastream with
> out using window function.
>
>  Approach I tried to calculate count on the datastream:
> 1> Read data from table (say past 2 days of data) as Datastream
> 2> apply Key operation on the datastream
> 3> then use reduce function to find count, sum and average.
>
> I have written output to file and also inserted into table: sample data
> from file is:
>
> vehicleId=aa, count=1, fuel=10, avgFuel=0.0
> vehicleId=dd, count=1, fuel=7, avgFuel=0.0
> vehicleId=dd, count=2, fuel=22, avgFuel=11.0
> vehicleId=dd, count=3, fuel=42, avgFuel=14.0
> vehicleId=ee, count=1, fuel=0, avgFuel=0.0
>
> what I am looking for is , when there are multiple records with same
> vehicle Id I see that only the final record is having correct values (like 
> vehicleId=dd).
> Is there any way to get only one final record for each vehicle as shown
> below:
> vehicleId=aa, count=1, fuel=10, avgFuel=0.0
> vehicleId=dd, count=3, fuel=42, avgFuel=14.0
> vehicleId=ee, count=1, fuel=0, avgFuel=0.0
>
> Also I request some help on how to sort whole DataStream based on one
> attribute. Say we have x records in one Batch Job I would like to sort and
> fetch X-2 position record per vehicle.
>
> Regards,
> Sunitha.
>
>


Re: [DISCUSS] Remove flink-connector-filesystem module.

2020-10-28 Thread Kostas Kloudas
Hi all,

I am bringing the up again to see if there are any users actively
using the BucketingSink.
So far, if I am not mistaken (and really sorry if I forgot anything),
it is only a discussion between devs about the potential problems of
removing it. I totally understand Chesnay's concern about not
providing compatibility with the StreamingFileSink (SFS) and if there
are any users, then we should not remove it without trying to find a
solution for them.

But if there are no users then I would still propose to remove the
module, given that I am not aware of any efforts to provide
compatibility with the SFS any time soon.
The reasons for removing it also include the facts that we do not
actively maintain it and we do not add new features. As for potential
missing features in the SFS compared to the BucketingSink that was
mentioned before, I am not aware of any fundamental limitations and
even if there are, I would assume that the solution is not to direct
the users to a deprecated sink but rather try to increase the
functionality of the actively maintained one.

Please keep in mind that the BucketingSink is deprecated since FLINK
1.9 and there is a new File Sink that is coming as part of FLIP-143
[1].
Again, if there are any active users who cannot migrate easily, then
we cannot remove it before trying to provide a smooth migration path.

Thanks,
Kostas

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API

On Fri, Oct 16, 2020 at 4:36 PM Chesnay Schepler  wrote:
>
> @Seth: Earlier in this discussion it was said that the BucketingSink
> would not be usable in 1.12 .
>
> On 10/16/2020 4:25 PM, Seth Wiesman wrote:
> > +1 It has been deprecated for some time and the StreamingFileSink has
> > stabalized with a large number of formats and features.
> >
> > Plus, the bucketing sink only implements a small number of stable
> > interfaces[1]. I would expect users to continue to use the bucketing sink
> > from the 1.11 release with future versions for some time.
> >
> > Seth
> >
> > https://github.com/apache/flink/blob/2ff3b771cbb091e1f43686dd8e176cea6d435501/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L170-L172
> >
> > On Thu, Oct 15, 2020 at 2:57 PM Kostas Kloudas  wrote:
> >
> >> @Arvid Heise I also do not remember exactly what were all the
> >> problems. The fact that we added some more bulk formats to the
> >> streaming file sink definitely reduced the non-supported features. In
> >> addition, the latest discussion I found on the topic was [1] and the
> >> conclusion of that discussion seems to be to remove it.
> >>
> >> Currently, I cannot find any obvious reason why keeping the
> >> BucketingSink, apart from the fact that we do not have a migration
> >> plan unfortunately. This is why I posted this to dev@ and user@.
> >>
> >> Cheers,
> >> Kostas
> >>
> >> [1]
> >> https://lists.apache.org/thread.html/r799be74658bc7e169238cc8c1e479e961a9e85ccea19089290940ff0%40%3Cdev.flink.apache.org%3E
> >>
> >> On Wed, Oct 14, 2020 at 8:03 AM Arvid Heise  wrote:
> >>> I remember this conversation popping up a few times already and I'm in
> >>> general a big fan of removing BucketingSink.
> >>>
> >>> However, until now there were a few features lacking in StreamingFileSink
> >>> that are present in BucketingSink and that are being actively used (I
> >> can't
> >>> exactly remember them now, but I can look it up if everyone else is also
> >>> suffering from bad memory). Did we manage to add them in the meantime? If
> >>> not, then it feels rushed to remove it at this point.
> >>>
> >>> On Tue, Oct 13, 2020 at 2:33 PM Kostas Kloudas 
> >> wrote:
>  @Chesnay Schepler  Off the top of my head, I cannot find an easy way
>  to migrate from the BucketingSink to the StreamingFileSink. It may be
>  possible but it will require some effort because the logic would be
>  "read the old state, commit it, and start fresh with the
>  StreamingFileSink."
> 
>  On Tue, Oct 13, 2020 at 2:09 PM Aljoscha Krettek 
>  wrote:
> > On 13.10.20 14:01, David Anderson wrote:
> >> I thought this was waiting on FLIP-46 -- Graceful Shutdown
> >> Handling --
>  and
> >> in fact, the StreamingFileSink is mentioned in that FLIP as a
>  motivating
> >> use case.
> > Ah yes, I see FLIP-147 as a more general replacement for FLIP-46.
> >> Thanks
> > for the reminder, we should close FLIP-46 now with an explanatory
> > message to avoid confusion.
> >>>
> >>> --
> >>>
> >>> Arvid Heise | Senior Java Developer
> >>>
> >>> 
> >>>
> >>> Follow us @VervericaData
> >>>
> >>> --
> >>>
> >>> Join Flink Forward  - The Apache Flink
> >>> Conference
> >>>
> >>> Stream Processing | Event Driven | Real Time
> >>>
> >>> --
> >>>
> >>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> >>>
> >>> --
> >>> Ververica GmbH
> >>> Register

Re: [BULK]Re: [SURVEY] Remove Mesos support

2020-10-28 Thread Till Rohrmann
Hi Oleksandr,

yes you are right. The biggest problem is at the moment the lack of test
coverage and thereby confidence to make changes. We have some e2e tests
which you can find here [1]. These tests are, however, quite coarse grained
and are missing a lot of cases. One idea would be to add a Mesos e2e test
based on Flink's end-to-end test framework [2]. I think what needs to be
done there is to add a Mesos resource and a way to submit jobs to a Mesos
cluster to write e2e tests.

[1] https://github.com/apache/flink/tree/master/flink-jepsen
[2]
https://github.com/apache/flink/tree/master/flink-end-to-end-tests/flink-end-to-end-tests-common

Cheers,
Till

On Tue, Oct 27, 2020 at 12:29 PM Oleksandr Nitavskyi 
wrote:

> Hello Xintong,
>
> Thanks for the insights and support.
>
> Browsing the Mesos backlog and didn't identify anything critical, which is
> left there.
>
> I see that there are were quite a lot of contributions to the Flink Mesos
> in the recent version:
> https://github.com/apache/flink/commits/master/flink-mesos.
> We plan to validate the current Flink master (or release 1.12 branch) our
> Mesos setup. In case of any issues, we will try to propose changes.
> My feeling is that our test results shouldn't affect the Flink 1.12
> release cycle. And if any potential commits will land into the 1.12.1 it
> should be totally fine.
>
> In the future, we would be glad to help you guys with any
> maintenance-related questions. One of the highest priorities around this
> component seems to be the development of the full e2e test.
>
> Kind Regards
> Oleksandr Nitavskyi
> 
> From: Xintong Song 
> Sent: Tuesday, October 27, 2020 7:14 AM
> To: dev ; user 
> Cc: Piyush Narang 
> Subject: [BULK]Re: [SURVEY] Remove Mesos support
>
> Hi Piyush,
>
> Thanks a lot for sharing the information. It would be a great relief that
> you are good with Flink on Mesos as is.
>
> As for the jira issues, I believe the most essential ones should have
> already been resolved. You may find some remaining open issues here [1],
> but not all of them are necessary if we decide to keep Flink on Mesos as is.
>
> At the moment and in the short future, I think helps are mostly needed on
> testing the upcoming release 1.12 with Mesos use cases. The community is
> currently actively preparing the new release, and hopefully we could come
> up with a release candidate early next month. It would be greatly
> appreciated if you fork as experienced Flink on Mesos users can help with
> verifying the release candidates.
>
>
> Thank you~
>
> Xintong Song
>
> [1]
> https://issues.apache.org/jira/browse/FLINK-17402?jql=project%20%3D%20FLINK%20AND%20component%20%3D%20%22Deployment%20%2F%20Mesos%22%20AND%20status%20%3D%20Open
> <
> https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-17402%3Fjql%3Dproject%2520%253D%2520FLINK%2520AND%2520component%2520%253D%2520%2522Deployment%2520%252F%2520Mesos%2522%2520AND%2520status%2520%253D%2520Open&data=04%7C01%7Co.nitavskyi%40criteo.com%7C3585e1f25bdf4e091af808d87a3f92db%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637393760750820881%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=hytJFQE0MCPzMLiQTQTdbg3GVckX5M3r1NPRGrRV8j4%3D&reserved=0
> >
>
> On Tue, Oct 27, 2020 at 2:58 AM Piyush Narang  p.nar...@criteo.com>> wrote:
>
> Hi Xintong,
>
>
>
> Do you have any jiras that cover any of the items on 1 or 2? I can reach
> out to folks internally and see if I can get some folks to commit to
> helping out.
>
>
>
> To cover the other qs:
>
>   *   Yes, we’ve not got a plan at the moment to get off Mesos. We use
> Yarn for some our Flink workloads when we can. Mesos is only used when we
> need streaming capabilities in our WW dcs (as our Yarn is centralized in
> one DC)
>   *   We’re currently on Flink 1.9 (old planner). We have a plan to bump
> to 1.11 / 1.12 this quarter.
>   *   We typically upgrade once every 6 months to a year (not every
> release). We’d like to speed up the cadence but we’re not there yet.
>   *   We’d largely be good with keeping Flink on Mesos as-is and
> functional while missing out on some of the newer features. We understand
> the pain on the communities side and we can take on the work if we see some
> fancy improvement in Flink on Yarn / K8s that we want in Mesos to put in
> the request to port it over.
>
>
>
> Thanks,
>
>
>
> -- Piyush
>
>
>
>
>
> From: Xintong Song mailto:tonysong...@gmail.com>>
> Date: Sunday, October 25, 2020 at 10:57 PM
> To: dev mailto:d...@flink.apache.org>>, user <
> user@flink.apache.org>
> Cc: Lasse Nedergaard  lassenedergaardfl...@gmail.com>>,  p.nar...@criteo.com>>
> Subject: Re: [SURVEY] Remove Mesos support
>
>
>
> Thanks for sharing the information with us, Piyush an Lasse.
>
>
>
> @Piyush
>
>
>
> Thanks for offering the help. IMO, there are currently several problems
> that make supportin

Re: [DISCUSS] Remove flink-connector-filesystem module.

2020-10-28 Thread Chesnay Schepler
If the conclusion is that we shouldn't remove it if _anyone_ is using 
it, then we cannot remove it because the user ML obviously does not 
reach all users.


On 10/28/2020 9:28 AM, Kostas Kloudas wrote:

Hi all,

I am bringing the up again to see if there are any users actively
using the BucketingSink.
So far, if I am not mistaken (and really sorry if I forgot anything),
it is only a discussion between devs about the potential problems of
removing it. I totally understand Chesnay's concern about not
providing compatibility with the StreamingFileSink (SFS) and if there
are any users, then we should not remove it without trying to find a
solution for them.

But if there are no users then I would still propose to remove the
module, given that I am not aware of any efforts to provide
compatibility with the SFS any time soon.
The reasons for removing it also include the facts that we do not
actively maintain it and we do not add new features. As for potential
missing features in the SFS compared to the BucketingSink that was
mentioned before, I am not aware of any fundamental limitations and
even if there are, I would assume that the solution is not to direct
the users to a deprecated sink but rather try to increase the
functionality of the actively maintained one.

Please keep in mind that the BucketingSink is deprecated since FLINK
1.9 and there is a new File Sink that is coming as part of FLIP-143
[1].
Again, if there are any active users who cannot migrate easily, then
we cannot remove it before trying to provide a smooth migration path.

Thanks,
Kostas

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API

On Fri, Oct 16, 2020 at 4:36 PM Chesnay Schepler  wrote:

@Seth: Earlier in this discussion it was said that the BucketingSink
would not be usable in 1.12 .

On 10/16/2020 4:25 PM, Seth Wiesman wrote:

+1 It has been deprecated for some time and the StreamingFileSink has
stabalized with a large number of formats and features.

Plus, the bucketing sink only implements a small number of stable
interfaces[1]. I would expect users to continue to use the bucketing sink
from the 1.11 release with future versions for some time.

Seth

https://github.com/apache/flink/blob/2ff3b771cbb091e1f43686dd8e176cea6d435501/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L170-L172

On Thu, Oct 15, 2020 at 2:57 PM Kostas Kloudas  wrote:


@Arvid Heise I also do not remember exactly what were all the
problems. The fact that we added some more bulk formats to the
streaming file sink definitely reduced the non-supported features. In
addition, the latest discussion I found on the topic was [1] and the
conclusion of that discussion seems to be to remove it.

Currently, I cannot find any obvious reason why keeping the
BucketingSink, apart from the fact that we do not have a migration
plan unfortunately. This is why I posted this to dev@ and user@.

Cheers,
Kostas

[1]
https://lists.apache.org/thread.html/r799be74658bc7e169238cc8c1e479e961a9e85ccea19089290940ff0%40%3Cdev.flink.apache.org%3E

On Wed, Oct 14, 2020 at 8:03 AM Arvid Heise  wrote:

I remember this conversation popping up a few times already and I'm in
general a big fan of removing BucketingSink.

However, until now there were a few features lacking in StreamingFileSink
that are present in BucketingSink and that are being actively used (I

can't

exactly remember them now, but I can look it up if everyone else is also
suffering from bad memory). Did we manage to add them in the meantime? If
not, then it feels rushed to remove it at this point.

On Tue, Oct 13, 2020 at 2:33 PM Kostas Kloudas 

wrote:

@Chesnay Schepler  Off the top of my head, I cannot find an easy way
to migrate from the BucketingSink to the StreamingFileSink. It may be
possible but it will require some effort because the logic would be
"read the old state, commit it, and start fresh with the
StreamingFileSink."

On Tue, Oct 13, 2020 at 2:09 PM Aljoscha Krettek 
wrote:

On 13.10.20 14:01, David Anderson wrote:

I thought this was waiting on FLIP-46 -- Graceful Shutdown

Handling --

and

in fact, the StreamingFileSink is mentioned in that FLIP as a

motivating

use case.

Ah yes, I see FLIP-147 as a more general replacement for FLIP-46.

Thanks

for the reminder, we should close FLIP-46 now with an explanatory
message to avoid confusion.

--

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng






Re: RestClusterClient and classpath

2020-10-28 Thread Flavio Pompermaier
Any help here?  How can I understand why the classes inside the jar are not
found when creating the PackagedProgram?

On Tue, Oct 27, 2020 at 11:04 AM Flavio Pompermaier 
wrote:

> In the logs I see that the jar is the classpath (I'm trying to debug the
> program from the IDE)..isn'it?
>
> Classpath: [file:/tmp/job-bundle.jar]
> ...
>
> Best,
> Flavio
>
> On Tue, Oct 27, 2020 at 10:39 AM Chesnay Schepler 
> wrote:
>
>> * your JobExecutor is _not_ putting it on the classpath.
>>
>> On 10/27/2020 10:36 AM, Chesnay Schepler wrote:
>>
>> Well it happens on the client before you even hit the RestClusterClient,
>> so I assume that either your jar is not packaged correctly or you your
>> JobExecutor is putting it on the classpath.
>>
>> On 10/27/2020 9:42 AM, Flavio Pompermaier wrote:
>>
>> Sure. Here it is (org.apache.flink.client.cli.JobExecutor is my main
>> class I'm trying to use as a client towards the Flink cluster - session
>> mode).
>> it/test/MyOb is within the fat jar (/tmp/job-bundle.jar).
>>
>> The code of getBatchEnv is:
>>
>> @Deprecated
>>   public static BatchEnv getBatchEnv() {
>> // TODO use the following when ready to convert from/to datastream
>> // return
>> getTableEnv(EnvironmentSettings.newInstance().inBatchMode().build());
>> ExecutionEnvironment env =
>> ExecutionEnvironment.getExecutionEnvironment();
>> BatchTableEnvironment ret = BatchTableEnvironment.create(env);
>> customizeEnv(ret);
>> return new BatchEnv(env, ret);
>>   }
>>
>>   private static void customizeEnv(TableEnvironment ret) {
>> final Configuration conf = ret.getConfig().getConfiguration();
>> //
>> conf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
>> 2);
>> conf.setString(CoreOptions.TMP_DIRS, FLINK_TEST_TMP_DIR);
>> conf.setString(BlobServerOptions.STORAGE_DIRECTORY,
>> FLINK_TEST_TMP_DIR);
>> // conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4);
>> //NOSONAR
>> // conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
>> 0.4f);//NOSONAR
>> // conf.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 32768
>> * 2);//NOSONAR
>> // conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 32768 *
>> 2);// NOSONAR
>> conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 0);//
>> NOSONAR
>> conf.setString(AkkaOptions.ASK_TIMEOUT, "10 min");// NOSONAR
>> conf.setString(AkkaOptions.TCP_TIMEOUT, "10 min");// NOSONAR
>> conf.setString(AkkaOptions.STARTUP_TIMEOUT, "10 min");// NOSONAR
>> conf.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofMinutes(10));//
>> NOSONAR
>> final List kryoSerializers = new ArrayList<>();
>> kryoSerializers.add(getKryoSerializerConfigLine(DateTime.class,
>> JodaDateTimeSerializer.class));
>> kryoSerializers.add(getKryoSerializerConfigLine(EntitonAtom.class,
>> TBaseSerializer.class));
>> kryoSerializers.add(getKryoSerializerConfigLine(EntitonQuad.class,
>> TBaseSerializer.class));
>> conf.set(PipelineOptions.KRYO_DEFAULT_SERIALIZERS, kryoSerializers);
>>
>>   }
>>
>> Classpath: [file:/tmp/job-bundle.jar]
>>
>> System.out: (none)
>>
>> System.err: (none)
>> at
>> org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:245)
>> at
>> org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:164)
>> at
>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:77)
>> at
>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:109)
>> at org.apache.flink.client.cli.JobExecutor.main(JobExecutor.java:42)
>> Caused by: java.lang.NoClassDefFoundError: it/test/MyOb
>> at
>> it.okkam.datalinks.flink.DatalinksExecutionEnvironment.customizeEnv(DatalinksExecutionEnvironment.java:116)
>> at
>> it.okkam.datalinks.flink.DatalinksExecutionEnvironment.getBatchEnv(DatalinksExecutionEnvironment.java:95)
>> at
>> it.okkam.datalinks.flink.jobs.EnsReconciliator.main(EnsReconciliator.java:73)
>> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>> Method)
>> at
>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>> at
>> org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:150)
>> ... 3 more
>> Caused by: java.lang.ClassNotFoundException: it/test/MyOb
>> at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
>> at
>> org.apac

Re: RestClusterClient and classpath

2020-10-28 Thread Chesnay Schepler
hmm..it appears as if PackagedProgramUtils#createJobGraph does some 
things outside the usercode classlodaer (getPipelineFromProgram()), 
specifically the call to the main method.


@klou This seems like wrong behavior?

@Flavio What you could try in the meantime is wrap the call to 
createJobGraph like this:


final ClassLoader contextClassLoader = 
Thread.currentThread().getContextClassLoader(); try {
   
Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader());
 // do tstuff}finally {
   Thread.currentThread().setContextClassLoader(contextClassLoader); }


On 10/28/2020 10:12 AM, Flavio Pompermaier wrote:
Any help here?  How can I understand why the classes inside the jar 
are not found when creating the PackagedProgram?


On Tue, Oct 27, 2020 at 11:04 AM Flavio Pompermaier 
mailto:pomperma...@okkam.it>> wrote:


In the logs I see that the jar is the classpath (I'm trying to
debug the program from the IDE)..isn'it?

Classpath: [file:/tmp/job-bundle.jar]
...

Best,
Flavio

On Tue, Oct 27, 2020 at 10:39 AM Chesnay Schepler
mailto:ches...@apache.org>> wrote:

* your JobExecutor is _not_ putting it on the classpath.

On 10/27/2020 10:36 AM, Chesnay Schepler wrote:

Well it happens on the client before you even hit the
RestClusterClient, so I assume that either your jar is not
packaged correctly or you your JobExecutor is putting it on
the classpath.

On 10/27/2020 9:42 AM, Flavio Pompermaier wrote:

Sure. Here it is (org.apache.flink.client.cli.JobExecutor is
my main class I'm trying to use as a client towards the
Flink cluster - session mode).
it/test/MyOb is within the fat jar (/tmp/job-bundle.jar).

The code of getBatchEnv is:

@Deprecated
  public static BatchEnv getBatchEnv() {
    // TODO use the following when ready to convert from/to
datastream
    // return
getTableEnv(EnvironmentSettings.newInstance().inBatchMode().build());
    ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
    BatchTableEnvironment ret =
BatchTableEnvironment.create(env);
    customizeEnv(ret);
    return new BatchEnv(env, ret);
  }

  private static void customizeEnv(TableEnvironment ret) {
    final Configuration conf =
ret.getConfig().getConfiguration();
    //

conf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
2);
    conf.setString(CoreOptions.TMP_DIRS, FLINK_TEST_TMP_DIR);
conf.setString(BlobServerOptions.STORAGE_DIRECTORY,
FLINK_TEST_TMP_DIR);
    //
conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
4); //NOSONAR
    //
conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
0.4f);//NOSONAR
    //
conf.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX,
32768 * 2);//NOSONAR
    //
conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT,
32768 * 2);// NOSONAR
conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT,
0);// NOSONAR
conf.setString(AkkaOptions.ASK_TIMEOUT, "10 min");// NOSONAR
conf.setString(AkkaOptions.TCP_TIMEOUT, "10 min");// NOSONAR
conf.setString(AkkaOptions.STARTUP_TIMEOUT, "10 min");// NOSONAR
    conf.set(ClientOptions.CLIENT_TIMEOUT,
Duration.ofMinutes(10));// NOSONAR
    final List kryoSerializers = new ArrayList<>();
kryoSerializers.add(getKryoSerializerConfigLine(DateTime.class,
JodaDateTimeSerializer.class));
kryoSerializers.add(getKryoSerializerConfigLine(EntitonAtom.class,
TBaseSerializer.class));
kryoSerializers.add(getKryoSerializerConfigLine(EntitonQuad.class,
TBaseSerializer.class));
conf.set(PipelineOptions.KRYO_DEFAULT_SERIALIZERS,
kryoSerializers);

  }

Classpath: [file:/tmp/job-bundle.jar]

System.out: (none)

System.err: (none)
at

org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:245)
at

org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:164)
at

org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:77)
at

org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:109)
at
org.apache.flink.client.cli.JobExecutor.main(JobExecutor.java:42)
Caused by: java.lang.NoClassDefFoundError: it/test/MyOb
at

it.okkam.datalinks.flink.DatalinksExecutionEnvironment.customizeEnv(DatalinksExecutionEnvironment.java:116)
at

it.okkam.datalinks.flink.Datalin

Re: Native memory allocation (mmap) failed to map 1006567424 bytes

2020-10-28 Thread Xintong Song
Hi Ori,

The error message suggests that there's not enough physical memory on the
machine to satisfy the allocation. This does not necessarily mean a managed
memory leak. Managed memory leak is only one of the possibilities. There
are other potential reasons, e.g., another process/container on the machine
used more memory than expected, Yarn NM is not configured with enough
memory reserved for the system processes, etc.

I would suggest to first look into the machine memory usages, see whether
the Flink process indeed uses more memory than expected. This could be
achieved via:
- Run the `top` command
- Look into the `/proc/meminfo` file
- Any container memory usage metrics that are available to your Yarn cluster

Thank you~

Xintong Song



On Tue, Oct 27, 2020 at 6:21 PM Ori Popowski  wrote:

> After the job is running for 10 days in production, TaskManagers start
> failing with:
>
> Connection unexpectedly closed by remote task manager
>
> Looking in the machine logs, I can see the following error:
>
> = Java processes for user hadoop =
> OpenJDK 64-Bit Server VM warning: INFO:
> os::commit_memory(0x7fb4f401, 1006567424, 0) failed; error='Cannot
> allocate memory' (err
> #
> # There is insufficient memory for the Java Runtime Environment to
> continue.
> # Native memory allocation (mmap) failed to map 1006567424 bytes for
> committing reserved memory.
> # An error report file with more information is saved as:
> # /mnt/tmp/hsperfdata_hadoop/hs_err_pid6585.log
> === End java processes for user hadoop ===
>
> In addition, the metrics for the TaskManager show very low Heap memory
> consumption (20% of Xmx).
>
> Hence, I suspect there is a memory leak in the TaskManager's Managed
> Memory.
>
> This my TaskManager's memory detail:
> flink process 112g
> framework.heap.size 0.2g
> task.heap.size 50g
> managed.size 54g
> framework.off-heap.size 0.5g
> task.off-heap.size 1g
> network 2g
> XX:MaxMetaspaceSize 1g
>
> As you can see, the managed memory is 54g, so it's already high (my
> managed.fraction is set to 0.5).
>
> I'm running Flink 1.10. Full job details attached.
>
> Can someone advise what would cause a managed memory leak?
>
>
>


Re: [DISCUSS] Remove flink-connector-filesystem module.

2020-10-28 Thread Kostas Kloudas
I think that the mailing lists is the best we can do and I would say
that they seem to be working pretty well (e.g. the recent Mesos
discussion).
Of course they are not perfect but the alternative would be to never
remove anything user facing until the next major release, which I find
pretty strict.

On Wed, Oct 28, 2020 at 10:04 AM Chesnay Schepler  wrote:
>
> If the conclusion is that we shouldn't remove it if _anyone_ is using
> it, then we cannot remove it because the user ML obviously does not
> reach all users.
>
> On 10/28/2020 9:28 AM, Kostas Kloudas wrote:
> > Hi all,
> >
> > I am bringing the up again to see if there are any users actively
> > using the BucketingSink.
> > So far, if I am not mistaken (and really sorry if I forgot anything),
> > it is only a discussion between devs about the potential problems of
> > removing it. I totally understand Chesnay's concern about not
> > providing compatibility with the StreamingFileSink (SFS) and if there
> > are any users, then we should not remove it without trying to find a
> > solution for them.
> >
> > But if there are no users then I would still propose to remove the
> > module, given that I am not aware of any efforts to provide
> > compatibility with the SFS any time soon.
> > The reasons for removing it also include the facts that we do not
> > actively maintain it and we do not add new features. As for potential
> > missing features in the SFS compared to the BucketingSink that was
> > mentioned before, I am not aware of any fundamental limitations and
> > even if there are, I would assume that the solution is not to direct
> > the users to a deprecated sink but rather try to increase the
> > functionality of the actively maintained one.
> >
> > Please keep in mind that the BucketingSink is deprecated since FLINK
> > 1.9 and there is a new File Sink that is coming as part of FLIP-143
> > [1].
> > Again, if there are any active users who cannot migrate easily, then
> > we cannot remove it before trying to provide a smooth migration path.
> >
> > Thanks,
> > Kostas
> >
> > [1] 
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API
> >
> > On Fri, Oct 16, 2020 at 4:36 PM Chesnay Schepler  wrote:
> >> @Seth: Earlier in this discussion it was said that the BucketingSink
> >> would not be usable in 1.12 .
> >>
> >> On 10/16/2020 4:25 PM, Seth Wiesman wrote:
> >>> +1 It has been deprecated for some time and the StreamingFileSink has
> >>> stabalized with a large number of formats and features.
> >>>
> >>> Plus, the bucketing sink only implements a small number of stable
> >>> interfaces[1]. I would expect users to continue to use the bucketing sink
> >>> from the 1.11 release with future versions for some time.
> >>>
> >>> Seth
> >>>
> >>> https://github.com/apache/flink/blob/2ff3b771cbb091e1f43686dd8e176cea6d435501/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L170-L172
> >>>
> >>> On Thu, Oct 15, 2020 at 2:57 PM Kostas Kloudas  wrote:
> >>>
>  @Arvid Heise I also do not remember exactly what were all the
>  problems. The fact that we added some more bulk formats to the
>  streaming file sink definitely reduced the non-supported features. In
>  addition, the latest discussion I found on the topic was [1] and the
>  conclusion of that discussion seems to be to remove it.
> 
>  Currently, I cannot find any obvious reason why keeping the
>  BucketingSink, apart from the fact that we do not have a migration
>  plan unfortunately. This is why I posted this to dev@ and user@.
> 
>  Cheers,
>  Kostas
> 
>  [1]
>  https://lists.apache.org/thread.html/r799be74658bc7e169238cc8c1e479e961a9e85ccea19089290940ff0%40%3Cdev.flink.apache.org%3E
> 
>  On Wed, Oct 14, 2020 at 8:03 AM Arvid Heise  wrote:
> > I remember this conversation popping up a few times already and I'm in
> > general a big fan of removing BucketingSink.
> >
> > However, until now there were a few features lacking in 
> > StreamingFileSink
> > that are present in BucketingSink and that are being actively used (I
>  can't
> > exactly remember them now, but I can look it up if everyone else is also
> > suffering from bad memory). Did we manage to add them in the meantime? 
> > If
> > not, then it feels rushed to remove it at this point.
> >
> > On Tue, Oct 13, 2020 at 2:33 PM Kostas Kloudas 
>  wrote:
> >> @Chesnay Schepler  Off the top of my head, I cannot find an easy way
> >> to migrate from the BucketingSink to the StreamingFileSink. It may be
> >> possible but it will require some effort because the logic would be
> >> "read the old state, commit it, and start fresh with the
> >> StreamingFileSink."
> >>
> >> On Tue, Oct 13, 2020 at 2:09 PM Aljoscha Krettek 
> >> wrote:
> >>> On 13.10.20 14:01, David Anderson wrote:
>  

Re: Native memory allocation (mmap) failed to map 1006567424 bytes

2020-10-28 Thread Ori Popowski
Hi Xintong,

See here:

# Top memory users
ps auxwww --sort -rss | head -10
USER   PID %CPU %MEMVSZ   RSS TTY  STAT START   TIME COMMAND
yarn 20339 35.8 97.0 128600192 126672256 ? Sl   Oct15 5975:47
/etc/alternatives/jre/bin/java -Xmx54760833024 -Xms54760833024 -XX:Max
root  5245  0.1  0.4 5580484 627436 ?  Sl   Jul30 144:39
/etc/alternatives/jre/bin/java -Xmx1024m -XX:+ExitOnOutOfMemoryError -X
hadoop5252  0.1  0.4 7376768 604772 ?  Sl   Jul30 153:22
/etc/alternatives/jre/bin/java -Xmx1024m -XX:+ExitOnOutOfMemoryError -X
yarn 26857  0.3  0.2 4214784 341464 ?  Sl   Sep17 198:43
/etc/alternatives/jre/bin/java -Dproc_nodemanager -Xmx2048m -XX:OnOutOf
root  5519  0.0  0.2 5658624 269344 ?  Sl   Jul30  45:21
/usr/bin/java -Xmx1500m -Xms300m -XX:+ExitOnOutOfMemoryError -XX:MinHea
root  1781  0.0  0.0 172644  8096 ?Ss   Jul30   2:06
/usr/lib/systemd/systemd-journald
root  4801  0.0  0.0 2690260 4776 ?Ssl  Jul30   4:42
/usr/bin/amazon-ssm-agent
root  6566  0.0  0.0 164672  4116 ?R00:30   0:00 ps auxwww
--sort -rss
root  6532  0.0  0.0 183124  3592 ?S00:30   0:00
/usr/sbin/CROND -n

On Wed, Oct 28, 2020 at 11:34 AM Xintong Song  wrote:

> Hi Ori,
>
> The error message suggests that there's not enough physical memory on the
> machine to satisfy the allocation. This does not necessarily mean a managed
> memory leak. Managed memory leak is only one of the possibilities. There
> are other potential reasons, e.g., another process/container on the machine
> used more memory than expected, Yarn NM is not configured with enough
> memory reserved for the system processes, etc.
>
> I would suggest to first look into the machine memory usages, see whether
> the Flink process indeed uses more memory than expected. This could be
> achieved via:
> - Run the `top` command
> - Look into the `/proc/meminfo` file
> - Any container memory usage metrics that are available to your Yarn
> cluster
>
> Thank you~
>
> Xintong Song
>
>
>
> On Tue, Oct 27, 2020 at 6:21 PM Ori Popowski  wrote:
>
>> After the job is running for 10 days in production, TaskManagers start
>> failing with:
>>
>> Connection unexpectedly closed by remote task manager
>>
>> Looking in the machine logs, I can see the following error:
>>
>> = Java processes for user hadoop =
>> OpenJDK 64-Bit Server VM warning: INFO:
>> os::commit_memory(0x7fb4f401, 1006567424, 0) failed; error='Cannot
>> allocate memory' (err
>> #
>> # There is insufficient memory for the Java Runtime Environment to
>> continue.
>> # Native memory allocation (mmap) failed to map 1006567424 bytes for
>> committing reserved memory.
>> # An error report file with more information is saved as:
>> # /mnt/tmp/hsperfdata_hadoop/hs_err_pid6585.log
>> === End java processes for user hadoop ===
>>
>> In addition, the metrics for the TaskManager show very low Heap memory
>> consumption (20% of Xmx).
>>
>> Hence, I suspect there is a memory leak in the TaskManager's Managed
>> Memory.
>>
>> This my TaskManager's memory detail:
>> flink process 112g
>> framework.heap.size 0.2g
>> task.heap.size 50g
>> managed.size 54g
>> framework.off-heap.size 0.5g
>> task.off-heap.size 1g
>> network 2g
>> XX:MaxMetaspaceSize 1g
>>
>> As you can see, the managed memory is 54g, so it's already high (my
>> managed.fraction is set to 0.5).
>>
>> I'm running Flink 1.10. Full job details attached.
>>
>> Can someone advise what would cause a managed memory leak?
>>
>>
>>


Re: RestClusterClient and classpath

2020-10-28 Thread Flavio Pompermaier
Always the same problem.

Caused by: java.lang.ClassNotFoundException: it.test.XXX
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
at
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
... 10 more

I've also tried with

flinkConf.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first");

but nothing changes.

On Wed, Oct 28, 2020 at 10:34 AM Chesnay Schepler 
wrote:

> hmm..it appears as if PackagedProgramUtils#createJobGraph does some things
> outside the usercode classlodaer (getPipelineFromProgram()), specifically
> the call to the main method.
>
> @klou This seems like wrong behavior?
>
> @Flavio What you could try in the meantime is wrap the call to
> createJobGraph like this:
>
> final ClassLoader contextClassLoader = 
> Thread.currentThread().getContextClassLoader();try {
>
> Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader());
>// do tstuff} finally {
>Thread.currentThread().setContextClassLoader(contextClassLoader);}
>
>
> On 10/28/2020 10:12 AM, Flavio Pompermaier wrote:
>
> Any help here?  How can I understand why the classes inside the jar are
> not found when creating the PackagedProgram?
>
> On Tue, Oct 27, 2020 at 11:04 AM Flavio Pompermaier 
> wrote:
>
>> In the logs I see that the jar is the classpath (I'm trying to debug the
>> program from the IDE)..isn'it?
>>
>> Classpath: [file:/tmp/job-bundle.jar]
>> ...
>>
>> Best,
>> Flavio
>>
>> On Tue, Oct 27, 2020 at 10:39 AM Chesnay Schepler 
>> wrote:
>>
>>> * your JobExecutor is _not_ putting it on the classpath.
>>>
>>> On 10/27/2020 10:36 AM, Chesnay Schepler wrote:
>>>
>>> Well it happens on the client before you even hit the RestClusterClient,
>>> so I assume that either your jar is not packaged correctly or you your
>>> JobExecutor is putting it on the classpath.
>>>
>>> On 10/27/2020 9:42 AM, Flavio Pompermaier wrote:
>>>
>>> Sure. Here it is (org.apache.flink.client.cli.JobExecutor is my main
>>> class I'm trying to use as a client towards the Flink cluster - session
>>> mode).
>>> it/test/MyOb is within the fat jar (/tmp/job-bundle.jar).
>>>
>>> The code of getBatchEnv is:
>>>
>>> @Deprecated
>>>   public static BatchEnv getBatchEnv() {
>>> // TODO use the following when ready to convert from/to datastream
>>> // return
>>> getTableEnv(EnvironmentSettings.newInstance().inBatchMode().build());
>>> ExecutionEnvironment env =
>>> ExecutionEnvironment.getExecutionEnvironment();
>>> BatchTableEnvironment ret = BatchTableEnvironment.create(env);
>>> customizeEnv(ret);
>>> return new BatchEnv(env, ret);
>>>   }
>>>
>>>   private static void customizeEnv(TableEnvironment ret) {
>>> final Configuration conf = ret.getConfig().getConfiguration();
>>> //
>>> conf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
>>> 2);
>>> conf.setString(CoreOptions.TMP_DIRS, FLINK_TEST_TMP_DIR);
>>> conf.setString(BlobServerOptions.STORAGE_DIRECTORY,
>>> FLINK_TEST_TMP_DIR);
>>> // conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4);
>>> //NOSONAR
>>> // conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
>>> 0.4f);//NOSONAR
>>> // conf.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 32768
>>> * 2);//NOSONAR
>>> // conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 32768
>>> * 2);// NOSONAR
>>> conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 0);//
>>> NOSONAR
>>> conf.setString(AkkaOptions.ASK_TIMEOUT, "10 min");// NOSONAR
>>> conf.setString(AkkaOptions.TCP_TIMEOUT, "10 min");// NOSONAR
>>> conf.setString(AkkaOptions.STARTUP_TIMEOUT, "10 min");// NOSONAR
>>> conf.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofMinutes(10));//
>>> NOSONAR
>>> final List kryoSerializers = new ArrayList<>();
>>> kryoSerializers.add(getKryoSerializerConfigLine(DateTime.class,
>>> JodaDateTimeSerializer.class));
>>> kryoSerializers.add(getKryoSerializerConfigLine(EntitonAtom.class,
>>> TBaseSerializer.class));
>>> kryoSerializers.add(getKryoSerializerConfigLine(EntitonQuad.class,
>>> TBaseSerializer.class));
>>> conf.set(PipelineOptions.KRYO_DEFAULT_SERIALIZERS, kryoSerializers);
>>>
>>>   }
>>>
>>> Classpath: [file:/tmp/job-bundle.jar]
>>>
>>> System.out: (none)
>>>
>>> System.err: (none)
>>> at
>>> org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:245)
>>> at
>>> org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:164)
>>> at
>>> org.apache.flink.clien

Fwd: Kubernetes Job Cluster, does it autoterminate?

2020-10-28 Thread Ruben Laguna
Hi,

First time user , I'm just evaluating Flink at the moment, and I was
reading 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html#deploy-job-cluster
and I don't fully understand if a Job Cluster will autoterminate after
the job is completed (for at batch job) ?

The examples look to me like  like the task manager pods will continue
running as it's configured as Deployment.

So is there any way to achieve "autotermination" or am I supposed to
monitor the job status externally (like from airflow) and delete the
JobManager and TaskManager kubernetes resources from there?

--
/Rubén Laguna


Re: adding core-site xml to flink1.11

2020-10-28 Thread Shachar Carmeli
10x

On 2020/10/27 10:42:40, Robert Metzger  wrote: 
> Hi,
> 
> it seems that this is what you have to do for now. However, I see that it
> would be nice if Flink would allow reading from multiple configuration
> files, so that you can have a "common configuration" and a "per cluster"
> configuration.
> 
> I filed a JIRA ticket for a feature request:
> https://issues.apache.org/jira/browse/FLINK-19828
> 
> 
> On Tue, Oct 27, 2020 at 10:54 AM Shachar Carmeli 
> wrote:
> 
> > Hi,
> > Thank you for your reply,
> > WE are deploying on kubernetes and the xml is part of the  common config
> > map to all flink jobs we have(or at least was for previous versions)
> >
> > This means that we need to duplicate the configuration in the
> > flink-conf.yaml for each job
> > instead of having a common configmap
> >
> > Thanks,
> > Shachar
> >
> > On 2020/10/27 08:48:17, Robert Metzger  wrote:
> > > Hi Shachar,
> > >
> > > Why do you want to use the core-site.xml to configure the file system?
> > >
> > > Since we are adding the file systems as plugins, their initialization is
> > > customized. It might be the case that we are intentionally ignoring xml
> > > configurations from the classpath.
> > > You can configure the filesystem in the flink-conf.yaml file.
> > >
> > >
> > > On Sun, Oct 25, 2020 at 7:56 AM Shachar Carmeli 
> > > wrote:
> > >
> > > > Hi,
> > > > I'm trying to define filesystem to flink 1.11 using core-site.xml
> > > > I tried adding in the flink-conf.yaml env.hadoop.conf.dir and I see it
> > is
> > > > added to the classpath
> > > > also adding environment variable HADOOP_CONF_DIR didn't help
> > > >
> > > > The flink 1.11.2 is running on docker using kubernetes
> > > >
> > > > I added hadoop using plugin as mentioned in
> > > >
> > https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/s3.html#hadooppresto-s3-file-systems-plugins
> > > >
> > > > when configure the parameters manually I can connect to the local s3a
> > > > server
> > > > So it looks like the flink is not reading the core-site.xml file
> > > >
> > > > please advise
> > > >
> > > > Thanks,
> > > > Shachar
> > > >
> > >
> >
> 


Re: RestClusterClient and classpath

2020-10-28 Thread Kostas Kloudas
Hi all,

I will have a look in the whole stack trace in a bit.

@Chesnay Schepler I think that we are setting the correct classloader
during jobgraph creation [1]. Is that what you mean?

Cheers,
Kostas

[1] 
https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java#L122

On Wed, Oct 28, 2020 at 11:02 AM Flavio Pompermaier
 wrote:
>
> Always the same problem.
>
> Caused by: java.lang.ClassNotFoundException: it.test.XXX
> at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
> at 
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
> at 
> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
> at 
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
> ... 10 more
>
> I've also tried with
>
> flinkConf.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first");
>
> but nothing changes.
>
> On Wed, Oct 28, 2020 at 10:34 AM Chesnay Schepler  wrote:
>>
>> hmm..it appears as if PackagedProgramUtils#createJobGraph does some things 
>> outside the usercode classlodaer (getPipelineFromProgram()), specifically 
>> the call to the main method.
>>
>> @klou This seems like wrong behavior?
>>
>> @Flavio What you could try in the meantime is wrap the call to 
>> createJobGraph like this:
>>
>> final ClassLoader contextClassLoader = 
>> Thread.currentThread().getContextClassLoader();
>> try {
>>
>> Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader());
>>// do tstuff
>> } finally {
>>Thread.currentThread().setContextClassLoader(contextClassLoader);
>> }
>>
>>
>> On 10/28/2020 10:12 AM, Flavio Pompermaier wrote:
>>
>> Any help here?  How can I understand why the classes inside the jar are not 
>> found when creating the PackagedProgram?
>>
>> On Tue, Oct 27, 2020 at 11:04 AM Flavio Pompermaier  
>> wrote:
>>>
>>> In the logs I see that the jar is the classpath (I'm trying to debug the 
>>> program from the IDE)..isn'it?
>>>
>>> Classpath: [file:/tmp/job-bundle.jar]
>>> ...
>>>
>>> Best,
>>> Flavio
>>>
>>> On Tue, Oct 27, 2020 at 10:39 AM Chesnay Schepler  
>>> wrote:

 * your JobExecutor is _not_ putting it on the classpath.

 On 10/27/2020 10:36 AM, Chesnay Schepler wrote:

 Well it happens on the client before you even hit the RestClusterClient, 
 so I assume that either your jar is not packaged correctly or you your 
 JobExecutor is putting it on the classpath.

 On 10/27/2020 9:42 AM, Flavio Pompermaier wrote:

 Sure. Here it is (org.apache.flink.client.cli.JobExecutor is my main class 
 I'm trying to use as a client towards the Flink cluster - session mode).
 it/test/MyOb is within the fat jar (/tmp/job-bundle.jar).

 The code of getBatchEnv is:

 @Deprecated
   public static BatchEnv getBatchEnv() {
 // TODO use the following when ready to convert from/to datastream
 // return 
 getTableEnv(EnvironmentSettings.newInstance().inBatchMode().build());
 ExecutionEnvironment env = 
 ExecutionEnvironment.getExecutionEnvironment();
 BatchTableEnvironment ret = BatchTableEnvironment.create(env);
 customizeEnv(ret);
 return new BatchEnv(env, ret);
   }

   private static void customizeEnv(TableEnvironment ret) {
 final Configuration conf = ret.getConfig().getConfiguration();
 // 
 conf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
  2);
 conf.setString(CoreOptions.TMP_DIRS, FLINK_TEST_TMP_DIR);
 conf.setString(BlobServerOptions.STORAGE_DIRECTORY, 
 FLINK_TEST_TMP_DIR);
 // conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4); 
 //NOSONAR
 // conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, 
 0.4f);//NOSONAR
 // conf.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 32768 * 
 2);//NOSONAR
 // conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 32768 * 
 2);// NOSONAR
 conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 0);// 
 NOSONAR
 conf.setString(AkkaOptions.ASK_TIMEOUT, "10 min");// NOSONAR
 conf.setString(AkkaOptions.TCP_TIMEOUT, "10 min");// NOSONAR
 conf.setString(AkkaOptions.STARTUP_TIMEOUT, "10 min");// NOSONAR
 conf.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofMinutes(10));// 
 NOSONAR
 final List kryoSerializers = new ArrayList<>();
 kryoSerializers.add(getKryoSerializerConfigLine(DateTime.class, 
 JodaDateTimeSerializer.class));
 kryoSerializers.add(getKryoSerializerConfigLine(EntitonAtom.class, 
 TBaseSe

Re: [DISCUSS] Remove flink-connector-filesystem module.

2020-10-28 Thread Chesnay Schepler
The alternative could also be to use a different argument than "no one 
uses it", e.g., we are fine with removing it at the cost of friction for 
some users because there are better alternatives.


On 10/28/2020 10:46 AM, Kostas Kloudas wrote:

I think that the mailing lists is the best we can do and I would say
that they seem to be working pretty well (e.g. the recent Mesos
discussion).
Of course they are not perfect but the alternative would be to never
remove anything user facing until the next major release, which I find
pretty strict.

On Wed, Oct 28, 2020 at 10:04 AM Chesnay Schepler  wrote:

If the conclusion is that we shouldn't remove it if _anyone_ is using
it, then we cannot remove it because the user ML obviously does not
reach all users.

On 10/28/2020 9:28 AM, Kostas Kloudas wrote:

Hi all,

I am bringing the up again to see if there are any users actively
using the BucketingSink.
So far, if I am not mistaken (and really sorry if I forgot anything),
it is only a discussion between devs about the potential problems of
removing it. I totally understand Chesnay's concern about not
providing compatibility with the StreamingFileSink (SFS) and if there
are any users, then we should not remove it without trying to find a
solution for them.

But if there are no users then I would still propose to remove the
module, given that I am not aware of any efforts to provide
compatibility with the SFS any time soon.
The reasons for removing it also include the facts that we do not
actively maintain it and we do not add new features. As for potential
missing features in the SFS compared to the BucketingSink that was
mentioned before, I am not aware of any fundamental limitations and
even if there are, I would assume that the solution is not to direct
the users to a deprecated sink but rather try to increase the
functionality of the actively maintained one.

Please keep in mind that the BucketingSink is deprecated since FLINK
1.9 and there is a new File Sink that is coming as part of FLIP-143
[1].
Again, if there are any active users who cannot migrate easily, then
we cannot remove it before trying to provide a smooth migration path.

Thanks,
Kostas

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API

On Fri, Oct 16, 2020 at 4:36 PM Chesnay Schepler  wrote:

@Seth: Earlier in this discussion it was said that the BucketingSink
would not be usable in 1.12 .

On 10/16/2020 4:25 PM, Seth Wiesman wrote:

+1 It has been deprecated for some time and the StreamingFileSink has
stabalized with a large number of formats and features.

Plus, the bucketing sink only implements a small number of stable
interfaces[1]. I would expect users to continue to use the bucketing sink
from the 1.11 release with future versions for some time.

Seth

https://github.com/apache/flink/blob/2ff3b771cbb091e1f43686dd8e176cea6d435501/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L170-L172

On Thu, Oct 15, 2020 at 2:57 PM Kostas Kloudas  wrote:


@Arvid Heise I also do not remember exactly what were all the
problems. The fact that we added some more bulk formats to the
streaming file sink definitely reduced the non-supported features. In
addition, the latest discussion I found on the topic was [1] and the
conclusion of that discussion seems to be to remove it.

Currently, I cannot find any obvious reason why keeping the
BucketingSink, apart from the fact that we do not have a migration
plan unfortunately. This is why I posted this to dev@ and user@.

Cheers,
Kostas

[1]
https://lists.apache.org/thread.html/r799be74658bc7e169238cc8c1e479e961a9e85ccea19089290940ff0%40%3Cdev.flink.apache.org%3E

On Wed, Oct 14, 2020 at 8:03 AM Arvid Heise  wrote:

I remember this conversation popping up a few times already and I'm in
general a big fan of removing BucketingSink.

However, until now there were a few features lacking in StreamingFileSink
that are present in BucketingSink and that are being actively used (I

can't

exactly remember them now, but I can look it up if everyone else is also
suffering from bad memory). Did we manage to add them in the meantime? If
not, then it feels rushed to remove it at this point.

On Tue, Oct 13, 2020 at 2:33 PM Kostas Kloudas 

wrote:

@Chesnay Schepler  Off the top of my head, I cannot find an easy way
to migrate from the BucketingSink to the StreamingFileSink. It may be
possible but it will require some effort because the logic would be
"read the old state, commit it, and start fresh with the
StreamingFileSink."

On Tue, Oct 13, 2020 at 2:09 PM Aljoscha Krettek 
wrote:

On 13.10.20 14:01, David Anderson wrote:

I thought this was waiting on FLIP-46 -- Graceful Shutdown

Handling --

and

in fact, the StreamingFileSink is mentioned in that FLIP as a

motivating

use case.

Ah yes, I see FLIP-147 as a more general replacement for FLIP-46.

Thanks

for the reminder, we should close FLIP-46 now with an expl

Re: RestClusterClient and classpath

2020-10-28 Thread Chesnay Schepler

@Kostas: Ah, I missed that.

@Flavio: the only alternative I can think your jar does not contain the 
classes, or does not exist at all on the machine your application is run on.


On 10/28/2020 12:08 PM, Kostas Kloudas wrote:

Hi all,

I will have a look in the whole stack trace in a bit.

@Chesnay Schepler I think that we are setting the correct classloader
during jobgraph creation [1]. Is that what you mean?

Cheers,
Kostas

[1] 
https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java#L122

On Wed, Oct 28, 2020 at 11:02 AM Flavio Pompermaier
 wrote:

Always the same problem.

Caused by: java.lang.ClassNotFoundException: it.test.XXX
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
at 
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
... 10 more

I've also tried with

 flinkConf.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first");

but nothing changes.

On Wed, Oct 28, 2020 at 10:34 AM Chesnay Schepler  wrote:

hmm..it appears as if PackagedProgramUtils#createJobGraph does some things 
outside the usercode classlodaer (getPipelineFromProgram()), specifically the 
call to the main method.

@klou This seems like wrong behavior?

@Flavio What you could try in the meantime is wrap the call to createJobGraph 
like this:

final ClassLoader contextClassLoader = 
Thread.currentThread().getContextClassLoader();
try {

Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader());
// do tstuff
} finally {
Thread.currentThread().setContextClassLoader(contextClassLoader);
}


On 10/28/2020 10:12 AM, Flavio Pompermaier wrote:

Any help here?  How can I understand why the classes inside the jar are not 
found when creating the PackagedProgram?

On Tue, Oct 27, 2020 at 11:04 AM Flavio Pompermaier  
wrote:

In the logs I see that the jar is the classpath (I'm trying to debug the 
program from the IDE)..isn'it?

Classpath: [file:/tmp/job-bundle.jar]
...

Best,
Flavio

On Tue, Oct 27, 2020 at 10:39 AM Chesnay Schepler  wrote:

* your JobExecutor is _not_ putting it on the classpath.

On 10/27/2020 10:36 AM, Chesnay Schepler wrote:

Well it happens on the client before you even hit the RestClusterClient, so I 
assume that either your jar is not packaged correctly or you your JobExecutor 
is putting it on the classpath.

On 10/27/2020 9:42 AM, Flavio Pompermaier wrote:

Sure. Here it is (org.apache.flink.client.cli.JobExecutor is my main class I'm 
trying to use as a client towards the Flink cluster - session mode).
it/test/MyOb is within the fat jar (/tmp/job-bundle.jar).

The code of getBatchEnv is:

@Deprecated
   public static BatchEnv getBatchEnv() {
 // TODO use the following when ready to convert from/to datastream
 // return 
getTableEnv(EnvironmentSettings.newInstance().inBatchMode().build());
 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 BatchTableEnvironment ret = BatchTableEnvironment.create(env);
 customizeEnv(ret);
 return new BatchEnv(env, ret);
   }

   private static void customizeEnv(TableEnvironment ret) {
 final Configuration conf = ret.getConfig().getConfiguration();
 // 
conf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 
2);
 conf.setString(CoreOptions.TMP_DIRS, FLINK_TEST_TMP_DIR);
 conf.setString(BlobServerOptions.STORAGE_DIRECTORY, FLINK_TEST_TMP_DIR);
 // conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4); //NOSONAR
 // conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, 
0.4f);//NOSONAR
 // conf.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 32768 * 
2);//NOSONAR
 // conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 32768 * 
2);// NOSONAR
 conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 0);// NOSONAR
 conf.setString(AkkaOptions.ASK_TIMEOUT, "10 min");// NOSONAR
 conf.setString(AkkaOptions.TCP_TIMEOUT, "10 min");// NOSONAR
 conf.setString(AkkaOptions.STARTUP_TIMEOUT, "10 min");// NOSONAR
 conf.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofMinutes(10));// NOSONAR
 final List kryoSerializers = new ArrayList<>();
 kryoSerializers.add(getKryoSerializerConfigLine(DateTime.class, 
JodaDateTimeSerializer.class));
 kryoSerializers.add(getKryoSerializerConfigLine(EntitonAtom.class, 
TBaseSerializer.class));
 kryoSerializers.add(getKryoSerializerConfigLine(EntitonQuad.class, 
TBaseSerializer.class));
 conf.set(PipelineOptions.KRYO_DEFAULT_SERIALIZERS, kryoSeriali

Re: RestClusterClient and classpath

2020-10-28 Thread Flavio Pompermaier
I'm runnin the code from Eclipse, the jar exists and it contains the
classes Flink is not finding..maybe I can try to use IntelliJ in the
afternoon

On Wed, Oct 28, 2020 at 12:13 PM Chesnay Schepler 
wrote:

> @Kostas: Ah, I missed that.
>
> @Flavio: the only alternative I can think your jar does not contain the
> classes, or does not exist at all on the machine your application is run
> on.
>
> On 10/28/2020 12:08 PM, Kostas Kloudas wrote:
> > Hi all,
> >
> > I will have a look in the whole stack trace in a bit.
> >
> > @Chesnay Schepler I think that we are setting the correct classloader
> > during jobgraph creation [1]. Is that what you mean?
> >
> > Cheers,
> > Kostas
> >
> > [1]
> https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java#L122
> >
> > On Wed, Oct 28, 2020 at 11:02 AM Flavio Pompermaier
> >  wrote:
> >> Always the same problem.
> >>
> >> Caused by: java.lang.ClassNotFoundException: it.test.XXX
> >> at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
> >> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
> >> at
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
> >> at
> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
> >> at
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
> >> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
> >> ... 10 more
> >>
> >> I've also tried with
> >>
> >>  flinkConf.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER,
> "parent-first");
> >>
> >> but nothing changes.
> >>
> >> On Wed, Oct 28, 2020 at 10:34 AM Chesnay Schepler 
> wrote:
> >>> hmm..it appears as if PackagedProgramUtils#createJobGraph does some
> things outside the usercode classlodaer (getPipelineFromProgram()),
> specifically the call to the main method.
> >>>
> >>> @klou This seems like wrong behavior?
> >>>
> >>> @Flavio What you could try in the meantime is wrap the call to
> createJobGraph like this:
> >>>
> >>> final ClassLoader contextClassLoader =
> Thread.currentThread().getContextClassLoader();
> >>> try {
> >>>
>  
> Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader());
> >>> // do tstuff
> >>> } finally {
> >>> Thread.currentThread().setContextClassLoader(contextClassLoader);
> >>> }
> >>>
> >>>
> >>> On 10/28/2020 10:12 AM, Flavio Pompermaier wrote:
> >>>
> >>> Any help here?  How can I understand why the classes inside the jar
> are not found when creating the PackagedProgram?
> >>>
> >>> On Tue, Oct 27, 2020 at 11:04 AM Flavio Pompermaier <
> pomperma...@okkam.it> wrote:
>  In the logs I see that the jar is the classpath (I'm trying to debug
> the program from the IDE)..isn'it?
> 
>  Classpath: [file:/tmp/job-bundle.jar]
>  ...
> 
>  Best,
>  Flavio
> 
>  On Tue, Oct 27, 2020 at 10:39 AM Chesnay Schepler 
> wrote:
> > * your JobExecutor is _not_ putting it on the classpath.
> >
> > On 10/27/2020 10:36 AM, Chesnay Schepler wrote:
> >
> > Well it happens on the client before you even hit the
> RestClusterClient, so I assume that either your jar is not packaged
> correctly or you your JobExecutor is putting it on the classpath.
> >
> > On 10/27/2020 9:42 AM, Flavio Pompermaier wrote:
> >
> > Sure. Here it is (org.apache.flink.client.cli.JobExecutor is my main
> class I'm trying to use as a client towards the Flink cluster - session
> mode).
> > it/test/MyOb is within the fat jar (/tmp/job-bundle.jar).
> >
> > The code of getBatchEnv is:
> >
> > @Deprecated
> >public static BatchEnv getBatchEnv() {
> >  // TODO use the following when ready to convert from/to
> datastream
> >  // return
> getTableEnv(EnvironmentSettings.newInstance().inBatchMode().build());
> >  ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> >  BatchTableEnvironment ret = BatchTableEnvironment.create(env);
> >  customizeEnv(ret);
> >  return new BatchEnv(env, ret);
> >}
> >
> >private static void customizeEnv(TableEnvironment ret) {
> >  final Configuration conf = ret.getConfig().getConfiguration();
> >  //
> conf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
> 2);
> >  conf.setString(CoreOptions.TMP_DIRS, FLINK_TEST_TMP_DIR);
> >  conf.setString(BlobServerOptions.STORAGE_DIRECTORY,
> FLINK_TEST_TMP_DIR);
> >  // conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
> 4); //NOSONAR
> >  //
> conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
> 0.4f);//NOSONAR
> >  // conf.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX,
> 32768 * 2);//NOSONAR
> >  // conf.setLong(TaskManagerOpti

Re: [DISCUSS] Remove flink-connector-filesystem module.

2020-10-28 Thread Kostas Kloudas
No, I do not think that "we are fine with removing it at the cost of
friction for some users".

I believe that this can be another discussion that we should have as
soon as we establish that someone is actually using it. The point I am
trying to make is that if no user is using it, we should remove it and
not leave unmaintained code around.

On Wed, Oct 28, 2020 at 12:11 PM Chesnay Schepler  wrote:
>
> The alternative could also be to use a different argument than "no one
> uses it", e.g., we are fine with removing it at the cost of friction for
> some users because there are better alternatives.
>
> On 10/28/2020 10:46 AM, Kostas Kloudas wrote:
> > I think that the mailing lists is the best we can do and I would say
> > that they seem to be working pretty well (e.g. the recent Mesos
> > discussion).
> > Of course they are not perfect but the alternative would be to never
> > remove anything user facing until the next major release, which I find
> > pretty strict.
> >
> > On Wed, Oct 28, 2020 at 10:04 AM Chesnay Schepler  
> > wrote:
> >> If the conclusion is that we shouldn't remove it if _anyone_ is using
> >> it, then we cannot remove it because the user ML obviously does not
> >> reach all users.
> >>
> >> On 10/28/2020 9:28 AM, Kostas Kloudas wrote:
> >>> Hi all,
> >>>
> >>> I am bringing the up again to see if there are any users actively
> >>> using the BucketingSink.
> >>> So far, if I am not mistaken (and really sorry if I forgot anything),
> >>> it is only a discussion between devs about the potential problems of
> >>> removing it. I totally understand Chesnay's concern about not
> >>> providing compatibility with the StreamingFileSink (SFS) and if there
> >>> are any users, then we should not remove it without trying to find a
> >>> solution for them.
> >>>
> >>> But if there are no users then I would still propose to remove the
> >>> module, given that I am not aware of any efforts to provide
> >>> compatibility with the SFS any time soon.
> >>> The reasons for removing it also include the facts that we do not
> >>> actively maintain it and we do not add new features. As for potential
> >>> missing features in the SFS compared to the BucketingSink that was
> >>> mentioned before, I am not aware of any fundamental limitations and
> >>> even if there are, I would assume that the solution is not to direct
> >>> the users to a deprecated sink but rather try to increase the
> >>> functionality of the actively maintained one.
> >>>
> >>> Please keep in mind that the BucketingSink is deprecated since FLINK
> >>> 1.9 and there is a new File Sink that is coming as part of FLIP-143
> >>> [1].
> >>> Again, if there are any active users who cannot migrate easily, then
> >>> we cannot remove it before trying to provide a smooth migration path.
> >>>
> >>> Thanks,
> >>> Kostas
> >>>
> >>> [1] 
> >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API
> >>>
> >>> On Fri, Oct 16, 2020 at 4:36 PM Chesnay Schepler  
> >>> wrote:
>  @Seth: Earlier in this discussion it was said that the BucketingSink
>  would not be usable in 1.12 .
> 
>  On 10/16/2020 4:25 PM, Seth Wiesman wrote:
> > +1 It has been deprecated for some time and the StreamingFileSink has
> > stabalized with a large number of formats and features.
> >
> > Plus, the bucketing sink only implements a small number of stable
> > interfaces[1]. I would expect users to continue to use the bucketing 
> > sink
> > from the 1.11 release with future versions for some time.
> >
> > Seth
> >
> > https://github.com/apache/flink/blob/2ff3b771cbb091e1f43686dd8e176cea6d435501/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L170-L172
> >
> > On Thu, Oct 15, 2020 at 2:57 PM Kostas Kloudas  
> > wrote:
> >
> >> @Arvid Heise I also do not remember exactly what were all the
> >> problems. The fact that we added some more bulk formats to the
> >> streaming file sink definitely reduced the non-supported features. In
> >> addition, the latest discussion I found on the topic was [1] and the
> >> conclusion of that discussion seems to be to remove it.
> >>
> >> Currently, I cannot find any obvious reason why keeping the
> >> BucketingSink, apart from the fact that we do not have a migration
> >> plan unfortunately. This is why I posted this to dev@ and user@.
> >>
> >> Cheers,
> >> Kostas
> >>
> >> [1]
> >> https://lists.apache.org/thread.html/r799be74658bc7e169238cc8c1e479e961a9e85ccea19089290940ff0%40%3Cdev.flink.apache.org%3E
> >>
> >> On Wed, Oct 14, 2020 at 8:03 AM Arvid Heise  
> >> wrote:
> >>> I remember this conversation popping up a few times already and I'm in
> >>> general a big fan of removing BucketingSink.
> >>>
> >>> However, until now there were a few features lacking in 
> >>> StreamingF

Re: [DISCUSS] Remove flink-connector-filesystem module.

2020-10-28 Thread Chesnay Schepler
Then we can't remove it, because there is no way for us to ascertain 
whether anyone is still using it.


Sure, the user ML is the best we got, but you can't argue that we don't 
want any users to be affected and then use an imperfect mean to find users.
If you are fine with relying on the user ML, then you _are_ fine with 
removing it at the cost of friction for some users.


To be clear, I, personally, don't have a problem with removing it (we 
have removed other connectors in the past that did not have a migration 
plan), I just reject he argumentation.


On 10/28/2020 12:21 PM, Kostas Kloudas wrote:

No, I do not think that "we are fine with removing it at the cost of
friction for some users".

I believe that this can be another discussion that we should have as
soon as we establish that someone is actually using it. The point I am
trying to make is that if no user is using it, we should remove it and
not leave unmaintained code around.

On Wed, Oct 28, 2020 at 12:11 PM Chesnay Schepler  wrote:

The alternative could also be to use a different argument than "no one
uses it", e.g., we are fine with removing it at the cost of friction for
some users because there are better alternatives.

On 10/28/2020 10:46 AM, Kostas Kloudas wrote:

I think that the mailing lists is the best we can do and I would say
that they seem to be working pretty well (e.g. the recent Mesos
discussion).
Of course they are not perfect but the alternative would be to never
remove anything user facing until the next major release, which I find
pretty strict.

On Wed, Oct 28, 2020 at 10:04 AM Chesnay Schepler  wrote:

If the conclusion is that we shouldn't remove it if _anyone_ is using
it, then we cannot remove it because the user ML obviously does not
reach all users.

On 10/28/2020 9:28 AM, Kostas Kloudas wrote:

Hi all,

I am bringing the up again to see if there are any users actively
using the BucketingSink.
So far, if I am not mistaken (and really sorry if I forgot anything),
it is only a discussion between devs about the potential problems of
removing it. I totally understand Chesnay's concern about not
providing compatibility with the StreamingFileSink (SFS) and if there
are any users, then we should not remove it without trying to find a
solution for them.

But if there are no users then I would still propose to remove the
module, given that I am not aware of any efforts to provide
compatibility with the SFS any time soon.
The reasons for removing it also include the facts that we do not
actively maintain it and we do not add new features. As for potential
missing features in the SFS compared to the BucketingSink that was
mentioned before, I am not aware of any fundamental limitations and
even if there are, I would assume that the solution is not to direct
the users to a deprecated sink but rather try to increase the
functionality of the actively maintained one.

Please keep in mind that the BucketingSink is deprecated since FLINK
1.9 and there is a new File Sink that is coming as part of FLIP-143
[1].
Again, if there are any active users who cannot migrate easily, then
we cannot remove it before trying to provide a smooth migration path.

Thanks,
Kostas

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API

On Fri, Oct 16, 2020 at 4:36 PM Chesnay Schepler  wrote:

@Seth: Earlier in this discussion it was said that the BucketingSink
would not be usable in 1.12 .

On 10/16/2020 4:25 PM, Seth Wiesman wrote:

+1 It has been deprecated for some time and the StreamingFileSink has
stabalized with a large number of formats and features.

Plus, the bucketing sink only implements a small number of stable
interfaces[1]. I would expect users to continue to use the bucketing sink
from the 1.11 release with future versions for some time.

Seth

https://github.com/apache/flink/blob/2ff3b771cbb091e1f43686dd8e176cea6d435501/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L170-L172

On Thu, Oct 15, 2020 at 2:57 PM Kostas Kloudas  wrote:


@Arvid Heise I also do not remember exactly what were all the
problems. The fact that we added some more bulk formats to the
streaming file sink definitely reduced the non-supported features. In
addition, the latest discussion I found on the topic was [1] and the
conclusion of that discussion seems to be to remove it.

Currently, I cannot find any obvious reason why keeping the
BucketingSink, apart from the fact that we do not have a migration
plan unfortunately. This is why I posted this to dev@ and user@.

Cheers,
Kostas

[1]
https://lists.apache.org/thread.html/r799be74658bc7e169238cc8c1e479e961a9e85ccea19089290940ff0%40%3Cdev.flink.apache.org%3E

On Wed, Oct 14, 2020 at 8:03 AM Arvid Heise  wrote:

I remember this conversation popping up a few times already and I'm in
general a big fan of removing BucketingSink.

However, until now there were a few features lacking in StreamingFileSink
that ar

Re: Building Flink on VirtualBox VM failing

2020-10-28 Thread Khachatryan Roman
The values printed by the OOM killer seem indeed strange. But from the line
above the memory usage seems fine: rss=2440960.
Running the given command I see only one forked process.
Probably, this is an issue of OOM killer running in VM on Wwindows host.
Can you try with OOM killer disabled?

Regards,
Roman


On Fri, Oct 23, 2020 at 3:02 PM Juha Mynttinen 
wrote:

> I'm trying again running the tests, now I have four cores
> (previously five) and 12 GB RAM (previously 8 GB). I'm still hit by the OOM
> killer.
>
> The command I'm running is:
>
> mvn -Dflink.forkCount=1 -Dflink.forkCountTestPackage=1 clean verify
>
> [INFO] BUILD FAILURE
> [INFO]
> 
> [INFO] Total time: 01:17 h
> [INFO] Finished at: 2020-10-23T15:36:50+03:00
> [INFO] Final Memory: 180M/614M
> [INFO]
> 
> [ERROR] Failed to execute goal
> org.apache.maven.plugins:maven-surefire-plugin:2.22.1:test
> (integration-tests) on project flink-tests: There are test failures.
> [ERROR]
> [ERROR] Please refer to
> /home/juha/git/flink/flink-tests/target/surefire-reports for the individual
> test results.
> [ERROR] Please refer to dump files (if any exist) [date].dump,
> [date]-jvmRun[N].dump and [date].dumpstream.
> [ERROR] ExecutionException The forked VM terminated without properly
> saying goodbye. VM crash or System.exit called?
> [ERROR] Command was /bin/sh -c cd /home/juha/git/flink/flink-tests/target
> && /usr/lib/jvm/java-11-openjdk-amd64/bin/java -Xms2048m -Xmx2048m
> -Dmvn.forkNumber=1 -XX:+UseG1GC -jar
> /home/juha/git/flink/flink-tests/target/surefire/surefirebooter15842756015305201470.jar
> /home/juha/git/flink/flink-tests/target/surefire
> 2020-10-23T14-19-18_685-jvmRun1 surefire394592676817174474tmp
> surefire_117413817767116882164827tmp
> [ERROR] Error occurred in starting fork, check output in log
> [ERROR] Process Exit Code: 137
> [ERROR] Crashed tests:
> [ERROR]
> org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase
> [ERROR] org.apache.maven.surefire.booter.SurefireBooterForkException:
> ExecutionException The forked VM terminated without properly saying
> goodbye. VM crash or System.exit called?
> [ERROR] Command was /bin/sh -c cd /home/juha/git/flink/flink-tests/target
> && /usr/lib/jvm/java-11-openjdk-amd64/bin/java -Xms2048m -Xmx2048m
> -Dmvn.forkNumber=1 -XX:+UseG1GC -jar
> /home/juha/git/flink/flink-tests/target/surefire/surefirebooter15842756015305201470.jar
> /home/juha/git/flink/flink-tests/target/surefire
> 2020-10-23T14-19-18_685-jvmRun1 surefire394592676817174474tmp
> surefire_117413817767116882164827tmp
> [ERROR] Error occurred in starting fork, check output in log
> [ERROR] Process Exit Code: 137
> [ERROR] Crashed tests:
> [ERROR]
> org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase
> [ERROR] at
> org.apache.maven.plugin.surefire.booterclient.ForkStarter.awaitResultsDone(ForkStarter.java:510)
> [ERROR] at
> org.apache.maven.plugin.surefire.booterclient.ForkStarter.runSuitesForkPerTestSet(ForkStarter.java:457)
> [ERROR] at
> org.apache.maven.plugin.surefire.booterclient.ForkStarter.run(ForkStarter.java:298)
> [ERROR] at
> org.apache.maven.plugin.surefire.booterclient.ForkStarter.run(ForkStarter.java:246)
> [ERROR] at
> org.apache.maven.plugin.surefire.AbstractSurefireMojo.executeProvider(AbstractSurefireMojo.java:1183)
> [ERROR] at
> org.apache.maven.plugin.surefire.AbstractSurefireMojo.executeAfterPreconditionsChecked(AbstractSurefireMojo.java:1011)
> [ERROR] at
> org.apache.maven.plugin.surefire.AbstractSurefireMojo.execute(AbstractSurefireMojo.java:857)
> [ERROR] at
> org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:132)
> [ERROR] at
> org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:208)
> [ERROR] at
> org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:153)
> [ERROR] at
> org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:145)
> [ERROR] at
> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:116)
> [ERROR] at
> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:80)
> [ERROR] at
> org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build(SingleThreadedBuilder.java:51)
> [ERROR] at
> org.apache.maven.lifecycle.internal.LifecycleStarter.execute(LifecycleStarter.java:120)
> [ERROR] at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:355)
> [ERROR] at org.apache.maven.DefaultMaven.execute(DefaultMaven.java:155)
> [ERROR] at org.apache.maven.cli.MavenCli.execute(MavenCli.java:584)
> [ERROR] at org.apache.maven.cli.MavenCli.doMain(MavenCli.java:216)
> [ERROR] at org.apache.maven.cli.MavenCli.main(MavenCli.java:160)
> [ERROR] at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Nativ

Re: NoResourceAvailableException

2020-10-28 Thread Khachatryan Roman
Hi Alexander,

Thanks for sharing,

I see a lot of exceptions in the logs, particularly
*Caused by: java.net.BindException: Could not start actor system on any
port in port range 6123

which means that there's probably more than one instance running and is
likely the root cause.
So it makes sense to make sure that the previous attempts cleaned up.

Regards,
Roman


On Tue, Oct 20, 2020 at 12:08 AM Alexander Semeshchenko 
wrote:

> Hi Roman,
> I made the cluster: 1 master, 2 worker. All - 8 cpu, 32 g RAM . Red Hat
> Enterprise Linux Server release 7.9 (Maipo)
> vsmart-f01 - master
> vsmart-f02 - worker
> vsmart-f03 - worker
> tvsmart-f02askmanager.numberOfTaskSlots for each node is : 8
>
> Then:
> *[flink@vsmart-f01 flink-1.11.1]$ ./bin/start-cluster.sh *
> *Starting cluster.*
> *[INFO] 1 instance(s) of standalonesession are already running on
> vsmart-f01.*
> *Starting standalonesession daemon on host vsmart-f01.*
> *flink@10.92.194.19 's password: *
> *[INFO] 1 instance(s) of taskexecutor are already running on vsmart-f02.*
> *Starting taskexecutor daemon on host vsmart-f02.*
> *flink@10.92.194.20 's password: *
> *Starting taskexecutor daemon on host vsmart-f03.*
>
> The cluster start up, running WordCount from master:
> *./bin/flink run -c org.apache.flink.examples.java.wordcount.WordCount
>  ./examples/batch/WordCount.jar  --output file:/tmp/wordcount_out*
>
> After 5 min. the job was canceled.
> In the screenshot appeared that was never assigned taskmanager for the job
> operator.
> I've put the 3 logs(  from each node) here.
>
> Thanks and Best Regards.
> Alex
>
>
> On Mon, Oct 19, 2020 at 5:47 PM Khachatryan Roman <
> khachatryan.ro...@gmail.com> wrote:
>
>> Hi Alex,
>>
>> This message isn't actually a problem - netty can't find the native
>> transports and falls back to nio-based one.
>> Does increasing taskmanager.numberOfTaskSlots in flink-conf.yaml help?
>> Can you share the full logs in DEBUG mode?
>>
>> Regards,
>> Roman
>>
>>
>> On Mon, Oct 19, 2020 at 6:14 PM Alexander Semeshchenko 
>> wrote:
>>
>>> thank you for your response.
>>>
>>> taskmanager has 1 slot , 1 slot free but WordCount job never change its
>>> status from "Created".
>>> After more less 5 min. job is canceled.
>>> I attached screenshot of taskmanager.
>>>
>>> Best Regards
>>> Alexander
>>>
>>> On Wed, Oct 14, 2020 at 6:13 PM Khachatryan Roman <
>>> khachatryan.ro...@gmail.com> wrote:
>>>
 Hi,
 Thanks for sharing the details and sorry for the late reply.
 You can check the number of free slots in the task manager in the web
 UI (http://localhost:8081/#/task-manager by default).
 Before running the program, there should be 1 TM with 1 slot available
 which should be free (with default settings).

 If there are other jobs, you can increase slots per TM by increasing
 taskmanager.numberOfTaskSlots in flink-conf.yaml [1].

 [1]
 https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#taskmanager-numberoftaskslots

 Regards,
 Roman


 On Wed, Oct 14, 2020 at 6:56 PM Alexander Semeshchenko <
 as77...@gmail.com> wrote:

> Hi, is there any news about my issue "Flink -
>  NoResourceAvailableException " post - installed WordCount job ?
> Best
>
> On Fri, Oct 9, 2020 at 10:19 AM Alexander Semeshchenko <
> as77...@gmail.com> wrote:
>
>> Yes, I made the following accions:
>> -   download Flink
>> -   ./bin/start-cluster.sh.
>> -   ./bin/flink run ./examples/streaming/WordCount.jar
>> 
>> Then, tried to increase values for > ulimit , VM memory values...
>> Below I put the logs messages.
>>
>> It's rare as I could do the  same job on: My Macbook( 8 cpu, 16g RAM
>> ), on k8s cluster - 4 cpu, 8g RAM
>>
>> Thanks
>>
>>
>>
>> On Fri, Oct 9, 2020 at 3:32 AM Khachatryan Roman <
>> khachatryan.ro...@gmail.com> wrote:
>>
>>> I assume that before submitting a job you started a cluster with
>>> default settings with ./bin/start-cluster.sh.
>>>
>>> Did you submit any other jobs?
>>> Can you share the logs from log folder?
>>>
>>> Regards,
>>> Roman
>>>
>>>
>>> On Wed, Oct 7, 2020 at 11:03 PM Alexander Semeshchenko <
>>> as77...@gmail.com> wrote:
>>>

 

 Installing (download & tar zxf) Apache Flink 1.11.1 and running: 
 ./bin/flink
 run examples/streaming/WordCount.jar it show on the nice message
 after more less 5 min. the trying of submitting:  Caused by:
 org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
 Could not allocate the required slot within slot request timeout. 
 Please
 make sure that the cluster has enough resources. at
 org.apache.flink.runtime.scheduler.DefaultScheduler.maybeWr

Re: [DISCUSS] Remove flink-connector-filesystem module.

2020-10-28 Thread Stephan Ewen
+1 to remove the Bucketing Sink.

It has been very common in the past to remove code that was deprecated for
multiple releases in favor of reducing baggage.
Also in cases that had no perfect drop-in replacement, but needed users to
forward fit the code.
I am not sure I understand why this case is so different.

Why the Bucketing Sink should be thrown out, in my opinion:

The Bucketing sink makes it easier for users to add general Hadoop writes.
But the price is that it easily leads to dataloss, because it assumes
flush()/sync() work reliably on Hadoop relicably, which they don't (HDFS
works somewhat, S3 works not at all).
I think the Bucketing sink is a trap for users, that's why it was
deprecated long ago.

The StreamingFileSink covers the majority of cases from the Bucketing Sink.
It does have some friction when adding/wrapping some general Hadoop
writers. Parts will be solved with the transactional sink work.
If something is missing and blocking users, we can prioritize adding it to
the Streaming File Sink. Also that is something we did before and it helped
being pragmatic with moving forward, rather than being held back by "maybe
there is something we don't know".




On Wed, Oct 28, 2020 at 12:36 PM Chesnay Schepler 
wrote:

> Then we can't remove it, because there is no way for us to ascertain
> whether anyone is still using it.
>
> Sure, the user ML is the best we got, but you can't argue that we don't
> want any users to be affected and then use an imperfect mean to find users.
> If you are fine with relying on the user ML, then you _are_ fine with
> removing it at the cost of friction for some users.
>
> To be clear, I, personally, don't have a problem with removing it (we
> have removed other connectors in the past that did not have a migration
> plan), I just reject he argumentation.
>
> On 10/28/2020 12:21 PM, Kostas Kloudas wrote:
> > No, I do not think that "we are fine with removing it at the cost of
> > friction for some users".
> >
> > I believe that this can be another discussion that we should have as
> > soon as we establish that someone is actually using it. The point I am
> > trying to make is that if no user is using it, we should remove it and
> > not leave unmaintained code around.
> >
> > On Wed, Oct 28, 2020 at 12:11 PM Chesnay Schepler 
> wrote:
> >> The alternative could also be to use a different argument than "no one
> >> uses it", e.g., we are fine with removing it at the cost of friction for
> >> some users because there are better alternatives.
> >>
> >> On 10/28/2020 10:46 AM, Kostas Kloudas wrote:
> >>> I think that the mailing lists is the best we can do and I would say
> >>> that they seem to be working pretty well (e.g. the recent Mesos
> >>> discussion).
> >>> Of course they are not perfect but the alternative would be to never
> >>> remove anything user facing until the next major release, which I find
> >>> pretty strict.
> >>>
> >>> On Wed, Oct 28, 2020 at 10:04 AM Chesnay Schepler 
> wrote:
>  If the conclusion is that we shouldn't remove it if _anyone_ is using
>  it, then we cannot remove it because the user ML obviously does not
>  reach all users.
> 
>  On 10/28/2020 9:28 AM, Kostas Kloudas wrote:
> > Hi all,
> >
> > I am bringing the up again to see if there are any users actively
> > using the BucketingSink.
> > So far, if I am not mistaken (and really sorry if I forgot anything),
> > it is only a discussion between devs about the potential problems of
> > removing it. I totally understand Chesnay's concern about not
> > providing compatibility with the StreamingFileSink (SFS) and if there
> > are any users, then we should not remove it without trying to find a
> > solution for them.
> >
> > But if there are no users then I would still propose to remove the
> > module, given that I am not aware of any efforts to provide
> > compatibility with the SFS any time soon.
> > The reasons for removing it also include the facts that we do not
> > actively maintain it and we do not add new features. As for potential
> > missing features in the SFS compared to the BucketingSink that was
> > mentioned before, I am not aware of any fundamental limitations and
> > even if there are, I would assume that the solution is not to direct
> > the users to a deprecated sink but rather try to increase the
> > functionality of the actively maintained one.
> >
> > Please keep in mind that the BucketingSink is deprecated since FLINK
> > 1.9 and there is a new File Sink that is coming as part of FLIP-143
> > [1].
> > Again, if there are any active users who cannot migrate easily, then
> > we cannot remove it before trying to provide a smooth migration path.
> >
> > Thanks,
> > Kostas
> >
> > [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API
> >
> > On Fri, Oct 16, 2020 at 4:36 PM Chesnay Schepler

Re: [EXTERNAL] Re: Native K8S Jobmanager restarts and job never recovers

2020-10-28 Thread Bohinski, Kevin
Hi Yang,

Thanks again for all the help!

We are still seeing this with 1.11.2 and ZK.
Looks like others are seeing this as well and they found a solution 
https://translate.google.com/translate?hl=en&sl=zh-CN&u=https://cloud.tencent.com/developer/article/1731416&prev=search

Should this solution be added to 1.12?

Best
kevin

On 2020/08/14 02:48:50, Yang Wang mailto:d...@gmail.com>> wrote:
> Hi kevin,>
>
> Thanks for sharing more information. You are right. Actually, "too old>
> resource version" is caused by a bug>
> of fabric8 kubernetes-client[1]. It has been fix in v4.6.1. And we have>
> bumped the kubernetes-client version>
> to v4.9.2 in Flink release-1.11. Also it has been backported to release>
> 1.10 and will be included in the next>
> minor release version(1.10.2).>
>
> BTW, if you really want all your jobs recovered when jobmanager crashed,>
> you still need to configure the Zookeeper high availability.>
>
> [1]. https://github.com/fabric8io/kubernetes-client/pull/1800>
>
>
> Best,>
> Yang>
>
> Bohinski, Kevin mailto:ke...@comcast.com>> 于2020年8月14日周五 
> 上午6:32写道:>
>
> > Might be useful>
> >>
> > https://stackoverflow.com/a/61437982>
> >>
> >>
> >>
> > Best,>
> >>
> > kevin>
> >>
> >>
> >>
> >>
> >>
> > *From: *"Bohinski, Kevin" mailto:ke...@comcast.com>>>
> > *Date: *Thursday, August 13, 2020 at 6:13 PM>
> > *To: *Yang Wang mailto:da...@gmail.com>>>
> > *Cc: *"user@flink.apache.org" 
> > mailto:us...@flink.apache.org>>>
> > *Subject: *Re: [EXTERNAL] Re: Native K8S Jobmanager restarts and job>
> > never recovers>
> >>
> >>
> >>
> > Hi>
> >>
> >>
> >>
> > Got the logs on crash, hopefully they help.>
> >>
> >>
> >>
> > 2020-08-13 22:00:40,336 ERROR>
> > org.apache.flink.kubernetes.KubernetesResourceManager[] - Fatal>
> > error occurred in ResourceManager.>
> >>
> > io.fabric8.kubernetes.client.KubernetesClientException: too old resource>
> > version: 8617182 (8633230)>
> >>
> > at>
> > io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$1.onMessage(WatchConnectionManager.java:259)>
> > [flink-dist_2.11-1.11.1.jar:1.11.1]>
> >>
> > at>
> > org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.onReadMessage(RealWebSocket.java:323)>
> > [flink-dist_2.11-1.11.1.jar:1.11.1]>
> >>
> > at>
> > org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.readMessageFrame(WebSocketReader.java:219)>
> > [flink-dist_2.11-1.11.1.jar:1.11.1]>
> >>
> > at>
> > org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:105)>
> > [flink-dist_2.11-1.11.1.jar:1.11.1]>
> >>
> > at>
> > org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:274)>
> > [flink-dist_2.11-1.11.1.jar:1.11.1]>
> >>
> > at>
> > org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:214)>
> > [flink-dist_2.11-1.11.1.jar:1.11.1]>
> >>
> > at>
> > org.apache.flink.kubernetes.shaded.okhttp3.RealCall$AsyncCall.execute(RealCall.java:206)>
> > [flink-dist_2.11-1.11.1.jar:1.11.1]>
> >>
> > at>
> > org.apache.flink.kubernetes.shaded.okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)>
> > [flink-dist_2.11-1.11.1.jar:1.11.1]>
> >>
> > at>
> > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)>
> > [?:1.8.0_262]>
> >>
> > at>
> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)>
> > [?:1.8.0_262]>
> >>
> > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_262]>
> >>
> > 2020-08-13 22:00:40,337 ERROR>
> > org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Fatal>
> > error occurred in the cluster entrypoint.>
> >>
> > io.fabric8.kubernetes.client.KubernetesClientException: too old resource>
> > version: 8617182 (8633230)>
> >>
> > at>
> > io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$1.onMessage(WatchConnectionManager.java:259)>
> > [flink-dist_2.11-1.11.1.jar:1.11.1]>
> >>
> > at>
> > org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.onReadMessage(RealWebSocket.java:323)>
> > [flink-dist_2.11-1.11.1.jar:1.11.1]>
> >>
> > at>
> > org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.readMessageFrame(WebSocketReader.java:219)>
> > [flink-dist_2.11-1.11.1.jar:1.11.1]>
> >>
> > at>
> > org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:105)>
> > [flink-dist_2.11-1.11.1.jar:1.11.1]>
> >>
> > at>
> > org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:274)>
> > [flink-dist_2.11-1.11.1.jar:1.11.1]>
> >>
> > at>
> > org.apache.flink.kubernetes.sh

Re: [DISCUSS] Remove flink-connector-filesystem module.

2020-10-28 Thread Kostas Kloudas
Thanks for the discussion!

>From this thread I do not see any objection with moving forward with
removing the sink.
Given this I will open a voting thread tomorrow.

Cheers,
Kostas

On Wed, Oct 28, 2020 at 6:50 PM Stephan Ewen  wrote:
>
> +1 to remove the Bucketing Sink.
>
> It has been very common in the past to remove code that was deprecated for 
> multiple releases in favor of reducing baggage.
> Also in cases that had no perfect drop-in replacement, but needed users to 
> forward fit the code.
> I am not sure I understand why this case is so different.
>
> Why the Bucketing Sink should be thrown out, in my opinion:
>
> The Bucketing sink makes it easier for users to add general Hadoop writes.
> But the price is that it easily leads to dataloss, because it assumes 
> flush()/sync() work reliably on Hadoop relicably, which they don't (HDFS 
> works somewhat, S3 works not at all).
> I think the Bucketing sink is a trap for users, that's why it was deprecated 
> long ago.
>
> The StreamingFileSink covers the majority of cases from the Bucketing Sink.
> It does have some friction when adding/wrapping some general Hadoop writers. 
> Parts will be solved with the transactional sink work.
> If something is missing and blocking users, we can prioritize adding it to 
> the Streaming File Sink. Also that is something we did before and it helped 
> being pragmatic with moving forward, rather than being held back by "maybe 
> there is something we don't know".
>
>
>
>
> On Wed, Oct 28, 2020 at 12:36 PM Chesnay Schepler  wrote:
>>
>> Then we can't remove it, because there is no way for us to ascertain
>> whether anyone is still using it.
>>
>> Sure, the user ML is the best we got, but you can't argue that we don't
>> want any users to be affected and then use an imperfect mean to find users.
>> If you are fine with relying on the user ML, then you _are_ fine with
>> removing it at the cost of friction for some users.
>>
>> To be clear, I, personally, don't have a problem with removing it (we
>> have removed other connectors in the past that did not have a migration
>> plan), I just reject he argumentation.
>>
>> On 10/28/2020 12:21 PM, Kostas Kloudas wrote:
>> > No, I do not think that "we are fine with removing it at the cost of
>> > friction for some users".
>> >
>> > I believe that this can be another discussion that we should have as
>> > soon as we establish that someone is actually using it. The point I am
>> > trying to make is that if no user is using it, we should remove it and
>> > not leave unmaintained code around.
>> >
>> > On Wed, Oct 28, 2020 at 12:11 PM Chesnay Schepler  
>> > wrote:
>> >> The alternative could also be to use a different argument than "no one
>> >> uses it", e.g., we are fine with removing it at the cost of friction for
>> >> some users because there are better alternatives.
>> >>
>> >> On 10/28/2020 10:46 AM, Kostas Kloudas wrote:
>> >>> I think that the mailing lists is the best we can do and I would say
>> >>> that they seem to be working pretty well (e.g. the recent Mesos
>> >>> discussion).
>> >>> Of course they are not perfect but the alternative would be to never
>> >>> remove anything user facing until the next major release, which I find
>> >>> pretty strict.
>> >>>
>> >>> On Wed, Oct 28, 2020 at 10:04 AM Chesnay Schepler  
>> >>> wrote:
>>  If the conclusion is that we shouldn't remove it if _anyone_ is using
>>  it, then we cannot remove it because the user ML obviously does not
>>  reach all users.
>> 
>>  On 10/28/2020 9:28 AM, Kostas Kloudas wrote:
>> > Hi all,
>> >
>> > I am bringing the up again to see if there are any users actively
>> > using the BucketingSink.
>> > So far, if I am not mistaken (and really sorry if I forgot anything),
>> > it is only a discussion between devs about the potential problems of
>> > removing it. I totally understand Chesnay's concern about not
>> > providing compatibility with the StreamingFileSink (SFS) and if there
>> > are any users, then we should not remove it without trying to find a
>> > solution for them.
>> >
>> > But if there are no users then I would still propose to remove the
>> > module, given that I am not aware of any efforts to provide
>> > compatibility with the SFS any time soon.
>> > The reasons for removing it also include the facts that we do not
>> > actively maintain it and we do not add new features. As for potential
>> > missing features in the SFS compared to the BucketingSink that was
>> > mentioned before, I am not aware of any fundamental limitations and
>> > even if there are, I would assume that the solution is not to direct
>> > the users to a deprecated sink but rather try to increase the
>> > functionality of the actively maintained one.
>> >
>> > Please keep in mind that the BucketingSink is deprecated since FLINK
>> > 1.9 and there is a new File Sink that is coming as part of FLIP-143

Re: Could you add some example about this document? Thanks`1

2020-10-28 Thread Robert Metzger
Hi,
from the messages you've sent on the user@ mailing list in the recent
weeks, I see that you are in the process of learning Flink. The Flink
community won't be able to provide you with full, runnable examples for
every method Flink provides.
Rather, we have a few running examples, and conceptual explanations and
small API snippets in our documentation.

I haven't checked all your questions in detail, but I would generally
recommend you to put more effort in trying to figure out problems yourself
before posting to this mailing list.
One starting point for learning Flink is the Flink Training:
https://flink.apache.org/training.html
Also, Flink provides a number of runnable TableAPI examples in
"flink-examples-table".

I'm sorry that not all your questions have been answered here on the list.
The developers sometimes need to prioritize where they spend their time
answering user questions, I hope you understand.

Best regards,
Robert


On Sun, Oct 25, 2020 at 5:01 AM 大森林  wrote:

> Dear Mr  Timo Walther:
>
> I'm learing  document
> 
> and have send 3 mails about
> executeinsert/flataggregate/GroupBy Window Aggregation from the document
> to
> user@flink.apache.org
> each of which is provided with full code,but no replies.
>
> I found
> the authors of above 3 parts are NOT in the mailing list
> Of course I understand they are very busy now.
>
> Could you add some completed examples(java style) of above 3 parts in
> flink github?
>
> Much Thanks for your help.
>
>
>
>
>


Re: Kubernetes Job Cluster, does it autoterminate?

2020-10-28 Thread Matthias Pohl
Hi Ruben,
thanks for reaching out to us. Flink's native Kubernetes Application mode
[1] might be what you're looking for.

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/native_kubernetes.html#flink-kubernetes-application

On Wed, Oct 28, 2020 at 11:50 AM Ruben Laguna 
wrote:

> Hi,
>
> First time user , I'm just evaluating Flink at the moment, and I was
> reading
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html#deploy-job-cluster
> and I don't fully understand if a Job Cluster will autoterminate after
> the job is completed (for at batch job) ?
>
> The examples look to me like  like the task manager pods will continue
> running as it's configured as Deployment.
>
> So is there any way to achieve "autotermination" or am I supposed to
> monitor the job status externally (like from airflow) and delete the
> JobManager and TaskManager kubernetes resources from there?
>
> --
> /Rubén Laguna
>


Re: How to understand NOW() in SQL when using Table & SQL API to develop a streaming app?

2020-10-28 Thread Jark Wu
issue created: https://issues.apache.org/jira/browse/FLINK-19861



On Wed, 28 Oct 2020 at 11:00, Danny Chan  wrote:

> Our behavior also conflicts with the SQL standard, we should also mention
> this in the document.
>
> Till Rohrmann  于2020年10月27日周二 下午10:37写道:
>
>> Thanks for the clarification. This improvement would be helpful, I
>> believe.
>>
>> Cheers,
>> Till
>>
>> On Tue, Oct 27, 2020 at 1:19 PM Jark Wu  wrote:
>>
>>> Hi Till,
>>>
>>> The documentation mentions that "this function is not deterministic"
>>> where the "not deterministic" means the value of this function is not
>>> deterministic for every record.
>>> However, this is not very clear for users. I think we can improve the
>>> documentation.
>>>
>>> Best,
>>> Jark
>>>
>>> On Tue, 27 Oct 2020 at 15:59, Till Rohrmann 
>>> wrote:
>>>
 Quick question Jark: Is this difference in behaviour documented? I
 couldn't find it in the docs.

 Cheers,
 Till

 On Tue, Oct 27, 2020 at 7:30 AM Jark Wu  wrote:

> Hi Longdexin,
>
> In traditional batch sql, NOW() is executed and determined before the
> job is submitted and will not change for every processed record.
> However, this doesn't make much sense in streaming sql, therefore,
> NOW() function in Flink is executed for every record.
>
> Best,
> Jark
>
> On Fri, 23 Oct 2020 at 16:30, Till Rohrmann 
> wrote:
>
>> Hi Longdexin,
>>
>> thanks for reaching out to the Flink community. I am pulling in Jark
>> who might be able to help you with this question.
>>
>> Cheers,
>> Till
>>
>> On Thu, Oct 22, 2020 at 2:56 PM Longdexin <274522...@qq.com> wrote:
>>
>>> From my point of view, the value of NOW() function in SQL is certain
>>> by the
>>> time when the streaming app is launched and will not change with the
>>> process
>>> time. However, as a new Flink user, I'm not so sure of that. By the
>>> way, if
>>> my attemp is to keep the time logic to update all the time, what
>>> should I
>>> do?
>>>
>>>
>>>
>>> --
>>> Sent from:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>
>>


Re: [DISCUSS] Remove flink-connector-filesystem module.

2020-10-28 Thread Jingsong Li
+1 to remove the Bucketing Sink.

Thanks for the effort on ORC and `HadoopPathBasedBulkFormatBuilder`, I
think it's safe to get rid of the old Bucketing API with them.

Best,
Jingsong

On Thu, Oct 29, 2020 at 3:06 AM Kostas Kloudas  wrote:

> Thanks for the discussion!
>
> From this thread I do not see any objection with moving forward with
> removing the sink.
> Given this I will open a voting thread tomorrow.
>
> Cheers,
> Kostas
>
> On Wed, Oct 28, 2020 at 6:50 PM Stephan Ewen  wrote:
> >
> > +1 to remove the Bucketing Sink.
> >
> > It has been very common in the past to remove code that was deprecated
> for multiple releases in favor of reducing baggage.
> > Also in cases that had no perfect drop-in replacement, but needed users
> to forward fit the code.
> > I am not sure I understand why this case is so different.
> >
> > Why the Bucketing Sink should be thrown out, in my opinion:
> >
> > The Bucketing sink makes it easier for users to add general Hadoop
> writes.
> > But the price is that it easily leads to dataloss, because it assumes
> flush()/sync() work reliably on Hadoop relicably, which they don't (HDFS
> works somewhat, S3 works not at all).
> > I think the Bucketing sink is a trap for users, that's why it was
> deprecated long ago.
> >
> > The StreamingFileSink covers the majority of cases from the Bucketing
> Sink.
> > It does have some friction when adding/wrapping some general Hadoop
> writers. Parts will be solved with the transactional sink work.
> > If something is missing and blocking users, we can prioritize adding it
> to the Streaming File Sink. Also that is something we did before and it
> helped being pragmatic with moving forward, rather than being held back by
> "maybe there is something we don't know".
> >
> >
> >
> >
> > On Wed, Oct 28, 2020 at 12:36 PM Chesnay Schepler 
> wrote:
> >>
> >> Then we can't remove it, because there is no way for us to ascertain
> >> whether anyone is still using it.
> >>
> >> Sure, the user ML is the best we got, but you can't argue that we don't
> >> want any users to be affected and then use an imperfect mean to find
> users.
> >> If you are fine with relying on the user ML, then you _are_ fine with
> >> removing it at the cost of friction for some users.
> >>
> >> To be clear, I, personally, don't have a problem with removing it (we
> >> have removed other connectors in the past that did not have a migration
> >> plan), I just reject he argumentation.
> >>
> >> On 10/28/2020 12:21 PM, Kostas Kloudas wrote:
> >> > No, I do not think that "we are fine with removing it at the cost of
> >> > friction for some users".
> >> >
> >> > I believe that this can be another discussion that we should have as
> >> > soon as we establish that someone is actually using it. The point I am
> >> > trying to make is that if no user is using it, we should remove it and
> >> > not leave unmaintained code around.
> >> >
> >> > On Wed, Oct 28, 2020 at 12:11 PM Chesnay Schepler 
> wrote:
> >> >> The alternative could also be to use a different argument than "no
> one
> >> >> uses it", e.g., we are fine with removing it at the cost of friction
> for
> >> >> some users because there are better alternatives.
> >> >>
> >> >> On 10/28/2020 10:46 AM, Kostas Kloudas wrote:
> >> >>> I think that the mailing lists is the best we can do and I would say
> >> >>> that they seem to be working pretty well (e.g. the recent Mesos
> >> >>> discussion).
> >> >>> Of course they are not perfect but the alternative would be to never
> >> >>> remove anything user facing until the next major release, which I
> find
> >> >>> pretty strict.
> >> >>>
> >> >>> On Wed, Oct 28, 2020 at 10:04 AM Chesnay Schepler <
> ches...@apache.org> wrote:
> >>  If the conclusion is that we shouldn't remove it if _anyone_ is
> using
> >>  it, then we cannot remove it because the user ML obviously does not
> >>  reach all users.
> >> 
> >>  On 10/28/2020 9:28 AM, Kostas Kloudas wrote:
> >> > Hi all,
> >> >
> >> > I am bringing the up again to see if there are any users actively
> >> > using the BucketingSink.
> >> > So far, if I am not mistaken (and really sorry if I forgot
> anything),
> >> > it is only a discussion between devs about the potential problems
> of
> >> > removing it. I totally understand Chesnay's concern about not
> >> > providing compatibility with the StreamingFileSink (SFS) and if
> there
> >> > are any users, then we should not remove it without trying to
> find a
> >> > solution for them.
> >> >
> >> > But if there are no users then I would still propose to remove the
> >> > module, given that I am not aware of any efforts to provide
> >> > compatibility with the SFS any time soon.
> >> > The reasons for removing it also include the facts that we do not
> >> > actively maintain it and we do not add new features. As for
> potential
> >> > missing features in the SFS compared to the BucketingSink that was
>

How to deploy dynamically generated flink jobs?

2020-10-28 Thread Alexander Bagerman
Hi,

I am trying to build a functionality to dynamically configure a flink job
(Java) in my code based on some additional metadata and submit it to a
flink running in a session cluster.

Flink version is 1.11.2

The problem I have is how to provide a packed job to the cluster. When I am
trying the following code

StreamExecutionEnvironment env =
StreamExecutionEnvironment.createRemoteEnvironment(hostName,
hostPort);
... configuring job workflow here...
env.execute(jobName);

I am getting ClassNotFoundException stating that code for my mapping
functions did not make it to the cluster. Which makes sense.

What would be the right way to deploy dynamically configured flink jobs
which are not packaged as a jar file but rather generated ad-hoc?

Thanks


Re: How to deploy dynamically generated flink jobs?

2020-10-28 Thread Yun Gao
Hi Alexander, 

The signature of the createRemoteEnvironment is 
public static StreamExecutionEnvironment createRemoteEnvironment(
  String host, int port, String... jarFiles);
Which could also ship the jars to execute to remote cluster. Could you have a 
try to also pass the jar files to the remote environment ?

Best,
 Yun
--
Sender:Alexander Bagerman
Date:2020/10/29 10:43:16
Recipient:
Theme:How to deploy dynamically generated flink jobs?

Hi,
I am trying to build a functionality to dynamically configure a flink job 
(Java) in my code based on some additional metadata and submit it to a flink 
running in a session cluster.
Flink version is 1.11.2
The problem I have is how to provide a packed job to the cluster. When I am 
trying the following code
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment(hostName, hostPort);
... configuring job workflow here...
env.execute(jobName);

I am getting ClassNotFoundException stating that code for my mapping functions 
did not make it to the cluster. Which makes sense.
What would be the right way to deploy dynamically configured flink jobs which 
are not packaged as a jar file but rather generated ad-hoc?
Thanks



Re: How to deploy dynamically generated flink jobs?

2020-10-28 Thread Alexander Bagerman
I did try it but this option seems to be for a third party jar. In my case
I would need to specify/ship a jar that contains the code where job is
being constracted. I'm not clear of
1. how to point to the containg jar
2. how to test such a submission from my project running in Eclipse
Alex

On Wed, Oct 28, 2020 at 8:21 PM Yun Gao  wrote:

> Hi Alexander,
>
> The signature of the createRemoteEnvironment is
>
> public static StreamExecutionEnvironment createRemoteEnvironment(
>   String host, int port, String... jarFiles);
>
> Which could also ship the jars to execute to remote cluster. Could you have a 
> try to also pass the jar files to the remote environment ?
>
>
> Best,
>
>  Yun
>
> --
> Sender:Alexander Bagerman
> Date:2020/10/29 10:43:16
> Recipient:
> Theme:How to deploy dynamically generated flink jobs?
>
>
>
> Hi,
>
> I am trying to build a functionality to dynamically configure a flink job
> (Java) in my code based on some additional metadata and submit it to a
> flink running in a session cluster.
>
> Flink version is 1.11.2
>
> The problem I have is how to provide a packed job to the cluster. When I
> am trying the following code
>
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.createRemoteEnvironment(hostName, hostPort);
> ... configuring job workflow here...
> env.execute(jobName);
>
> I am getting ClassNotFoundException stating that code for my mapping
> functions did not make it to the cluster. Which makes sense.
>
> What would be the right way to deploy dynamically configured flink jobs
> which are not packaged as a jar file but rather generated ad-hoc?
>
> Thanks
>
>


Re: Re: How to deploy dynamically generated flink jobs?

2020-10-28 Thread Yun Gao
Hi Alexander,

 From my side I still think it should be reasonable to have a jar that contains 
the code that are running in the clients and also shipped to the cluster. Then 
this jar could also be included in the shipping jar list.

 For the second issue, similarly I think you may first build the project to get 
the jar containing the code, then fill the path of the generated jar in to test 
the submitting.

Best,
 Yun



 --Original Mail --
Sender:Alexander Bagerman 
Send Date:Thu Oct 29 11:38:45 2020
Recipients:Yun Gao 
CC:Flink ML 
Subject:Re: How to deploy dynamically generated flink jobs?

I did try it but this option seems to be for a third party jar. In my case I 
would need to specify/ship a jar that contains the code where job is being 
constracted. I'm not clear of
1. how to point to the containg jar 
2. how to test such a submission from my project running in Eclipse 
Alex 

On Wed, Oct 28, 2020 at 8:21 PM Yun Gao  wrote:

Hi Alexander, 

The signature of the createRemoteEnvironment is 
public static StreamExecutionEnvironment createRemoteEnvironment(
  String host, int port, String... jarFiles);
Which could also ship the jars to execute to remote cluster. Could you have a 
try to also pass the jar files to the remote environment ?

Best,
 Yun
--
Sender:Alexander Bagerman
Date:2020/10/29 10:43:16
Recipient:
Theme:How to deploy dynamically generated flink jobs?

Hi,
I am trying to build a functionality to dynamically configure a flink job 
(Java) in my code based on some additional metadata and submit it to a flink 
running in a session cluster.
Flink version is 1.11.2
The problem I have is how to provide a packed job to the cluster. When I am 
trying the following code
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment(hostName, hostPort);... 
configuring job workflow here...env.execute(jobName);
I am getting ClassNotFoundException stating that code for my mapping functions 
did not make it to the cluster. Which makes sense.
What would be the right way to deploy dynamically configured flink jobs which 
are not packaged as a jar file but rather generated ad-hoc?
Thanks