Re: reading file from s3

2021-03-07 Thread Avi Levi
Thanks Tamir,
I was having some issues connecting from my IDE (solved) but this is really
helpful.


On Sat, Mar 6, 2021, 23:04 Tamir Sagi  wrote:

> I had a typo in my previous answer, the env name was missing an 'S'
>
> ENABLE_BUILT_IN_PLUGIN --> ENABLE_BUILT_IN_PLUGIN*S*
> once again, the value is *the plugin jar name*
> : flink-s3-fs-hadoop-.jar
> The complete list can be found here
> 
>
> You can Build your own Flink image and set an Environment variable in it
> or once you run the container.
> If you execute it locally(not in a container) in a standalone cluster,
> make sure this env is defined in system level.
>
> Tamir.
> --
> *From:* Tamir Sagi 
> *Sent:* Saturday, March 6, 2021 7:33 PM
> *To:* Avi Levi ; Chesnay Schepler 
> *Cc:* user@flink.apache.org 
> *Subject:* [SUSPECTED FRAUD]Re: reading file from s3
>
> Hey Avi,
>
> Do you use 'Hadoop S3 plugin' to read from S3?
>
> If yes, what is its version?
>
> If not try to read from S3 as follow (ref
> 
> )
>
>1. set an environment variable to use hadoop plugin (it's part of
>Flink image):
>key = ENABLE_BUILT_IN_PLUGIN
>value = flink-s3-fs-hadoop-.jar
>(i.e flink-s3-fs-hadoop-1.11.1.jar,  for Flink 1.11.1)
>2. read the file from S3:
>*DataSource lines = env.readTextFile("s3://");*
>
> Tamir
> --
> *From:* Avi Levi 
> *Sent:* Saturday, March 6, 2021 6:59 AM
> *To:* Chesnay Schepler 
> *Cc:* user@flink.apache.org 
> *Subject:* Re: reading file from s3
>
>
> *EXTERNAL EMAIL*
>
>
> Does anyone by any chance have a working example (of course without the
> credentials etc') that can be shared on github ?simply reading/writing a
> file from/to s3.
> I keep on struggling with this one and getting weird exceptions
> Thanks
>
> On Thu, Mar 4, 2021 at 7:30 PM Avi Levi  wrote:
>
> Sure, This is the full exception stacktrace:
>
> Exception in thread "main"
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
> at
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:117)
> at
> java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
> at
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> at
> java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2137)
> at
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:238)
> at
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
> at
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
> at
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> at
> java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2137)
> at
> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1046)
> at akka.dispatch.OnComplete.internal(Future.scala:264)
> at akka.dispatch.OnComplete.internal(Future.scala:261)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
> at
> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)
> at
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
> at
> scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
> at
> scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
> at
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573)
> at
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
> at
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
> at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:531)
> at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
> at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
> at
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
> at
> akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:91)
> at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
> at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
> at
> akka.dispatch.BatchingExecutor$BlockableBatch.run(Batching

Re: LocalWatermarkAssigner causes predicate pushdown to be skipped

2021-03-07 Thread Jark Wu
Hi Yuval,

That's correct you will always get a LogicalWatermarkAssigner if you
assigned a watermark.
If you implement SupportsWatermarkPushdown, the LogicalWatermarkAssigner
will be pushed
into TableSource, and then you can push Filter into source if source
implement SupportsFilterPushdown.

Best,
Jark

On Sat, 6 Mar 2021 at 01:16, Yuval Itzchakov  wrote:

> Hi Timo,
> After investigating this further, this is actually non related to
> implementing SupportsWatermarkPushdown.
>
> Once I create a TableSchema for my custom source's RowData, and assign it
> a watermark (see my example in the original mail), the plan will always
> include a LogicalWatermarkAssigner. This assigner that is between the
> LogicalTableScan and the LogicalFilter will then go on and fail the
> HepPlanner from invoking the optimization since it requires
> LogicalTableScan to be a direct child of LogicalFilter. Since I have
> LogicalFilter -> LogicalWatermarkAssigner -> LogicalTableScan, this won't
> work.
>
> On Fri, Mar 5, 2021 at 5:59 PM Timo Walther  wrote:
>
>> Hi Yuval,
>>
>> sorry that nobody replied earlier. Somehow your email fell through the
>> cracks.
>>
>> If I understand you correctly, could would like to implement a table
>> source that implements both `SupportsWatermarkPushDown` and
>> `SupportsFilterPushDown`?
>>
>> The current behavior might be on purpose. Filters and Watermarks are not
>> very compatible. Filtering would also mean that records (from which
>> watermarks could be generated) are skipped. If the filter is very
>> strict, we would not generate any new watermarks and the pipeline would
>> stop making progress in time.
>>
>> Watermark push down is only necessary, if per-partition watermarks are
>> required. Otherwise the watermarks are generated in a subsequent
>> operator after the source. So you can still use rowtime without
>> implementing `SupportsWatermarkPushDown` in your custom source.
>>
>> I will lookp in Shengkai who worked on this topic recently.
>>
>> Regards,
>> Timo
>>
>>
>> On 04.03.21 18:52, Yuval Itzchakov wrote:
>> > Bumping this up again, would appreciate any help if anyone is familiar
>> > with the blink planner.
>> >
>> > Thanks,
>> > Yuval.
>> >
>> > On Fri, Feb 26, 2021, 18:53 Yuval Itzchakov > > > wrote:
>> >
>> > Hi Jark,
>> > Would appreciate your help with this.
>> >
>> > On Wed, Feb 24, 2021 at 12:09 PM Roman Khachatryan <
>> ro...@apache.org
>> > > wrote:
>> >
>> > Hi Yuval,
>> >
>> > I'm not familiar with the Blink planner but probably Jark can
>> help.
>> >
>> > Regards,
>> > Roman
>> >
>> >
>> > On Sun, Feb 21, 2021 at 6:52 PM Yuval Itzchakov
>> > mailto:yuva...@gmail.com>> wrote:
>> >
>> > Update: When I don't set the watermark explicitly on the
>> > TableSchema, `applyWatermarkStrategy` never gets called on
>> > my ScanTableSource, which does make sense. But now the
>> > question is what should be done? This feels a bit
>> unintuitive.
>> >
>> > On Sun, Feb 21, 2021 at 7:09 PM Yuval Itzchakov
>> > mailto:yuva...@gmail.com>> wrote:
>> >
>> > Hi,
>> > Flink 1.12.1, Blink Planner, Scala 2.12
>> >
>> > I have the following logical plan:
>> >
>> >
>>  LogicalSink(table=[default_catalog.default_database.table], fields=[bar,
>> baz, hello_world, a, b])
>> > +- LogicalProject(value=[$2],
>> > bar=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)],
>> > baz=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)],
>> > hello_world=[null:VARCHAR(2147483647) CHARACTER SET
>> > "UTF-16LE"], a=[null:VARCHAR(2147483647) CHARACTER SET
>> > "UTF-16LE"], b=[EMPTY_MAP()])
>> > +- LogicalFilter(condition=[AND(=($4,
>> > _UTF-16LE'bar'), =($34, _UTF-16LE'baz'))])
>> >+- LogicalWatermarkAssigner(rowtime=[bar],
>> > watermark=[$0])
>> >   +- LogicalTableScan(table=[[default_catalog,
>> > default_database, foo]])
>> >
>> > I have a custom source which creates a TableSchema based
>> > on an external table. When I create the schema, I push
>> > the watermark definition to the schema:
>> >
>> > image.png
>> >
>> > When the HepPlanner starts the optimization phase and
>> > reaches the "PushFilterInotTableSourceScanRule", it
>> > matches on the LogicalFilter in the definition. But
>> > then, since the RelOptRuleOperandChildPolicy is set to
>> > "SOME", it attempts to do a full match on the child
>> > nodes. Since the rule is defined as so:
>> >
>> > image.png
>> >
>> > The child filter fails since

Re: LocalWatermarkAssigner causes predicate pushdown to be skipped

2021-03-07 Thread Yuval Itzchakov
Hi Jark,

Even after implementing both, I don't see the watermark being pushed to the
tablesource in the logical plan and avoids predicate pushdown from running.

On Sun, Mar 7, 2021, 15:43 Jark Wu  wrote:

> Hi Yuval,
>
> That's correct you will always get a LogicalWatermarkAssigner if you
> assigned a watermark.
> If you implement SupportsWatermarkPushdown, the LogicalWatermarkAssigner
> will be pushed
> into TableSource, and then you can push Filter into source if source
> implement SupportsFilterPushdown.
>
> Best,
> Jark
>
> On Sat, 6 Mar 2021 at 01:16, Yuval Itzchakov  wrote:
>
>> Hi Timo,
>> After investigating this further, this is actually non related to
>> implementing SupportsWatermarkPushdown.
>>
>> Once I create a TableSchema for my custom source's RowData, and assign it
>> a watermark (see my example in the original mail), the plan will always
>> include a LogicalWatermarkAssigner. This assigner that is between the
>> LogicalTableScan and the LogicalFilter will then go on and fail the
>> HepPlanner from invoking the optimization since it requires
>> LogicalTableScan to be a direct child of LogicalFilter. Since I have
>> LogicalFilter -> LogicalWatermarkAssigner -> LogicalTableScan, this won't
>> work.
>>
>> On Fri, Mar 5, 2021 at 5:59 PM Timo Walther  wrote:
>>
>>> Hi Yuval,
>>>
>>> sorry that nobody replied earlier. Somehow your email fell through the
>>> cracks.
>>>
>>> If I understand you correctly, could would like to implement a table
>>> source that implements both `SupportsWatermarkPushDown` and
>>> `SupportsFilterPushDown`?
>>>
>>> The current behavior might be on purpose. Filters and Watermarks are not
>>> very compatible. Filtering would also mean that records (from which
>>> watermarks could be generated) are skipped. If the filter is very
>>> strict, we would not generate any new watermarks and the pipeline would
>>> stop making progress in time.
>>>
>>> Watermark push down is only necessary, if per-partition watermarks are
>>> required. Otherwise the watermarks are generated in a subsequent
>>> operator after the source. So you can still use rowtime without
>>> implementing `SupportsWatermarkPushDown` in your custom source.
>>>
>>> I will lookp in Shengkai who worked on this topic recently.
>>>
>>> Regards,
>>> Timo
>>>
>>>
>>> On 04.03.21 18:52, Yuval Itzchakov wrote:
>>> > Bumping this up again, would appreciate any help if anyone is familiar
>>> > with the blink planner.
>>> >
>>> > Thanks,
>>> > Yuval.
>>> >
>>> > On Fri, Feb 26, 2021, 18:53 Yuval Itzchakov >> > > wrote:
>>> >
>>> > Hi Jark,
>>> > Would appreciate your help with this.
>>> >
>>> > On Wed, Feb 24, 2021 at 12:09 PM Roman Khachatryan <
>>> ro...@apache.org
>>> > > wrote:
>>> >
>>> > Hi Yuval,
>>> >
>>> > I'm not familiar with the Blink planner but probably Jark can
>>> help.
>>> >
>>> > Regards,
>>> > Roman
>>> >
>>> >
>>> > On Sun, Feb 21, 2021 at 6:52 PM Yuval Itzchakov
>>> > mailto:yuva...@gmail.com>> wrote:
>>> >
>>> > Update: When I don't set the watermark explicitly on the
>>> > TableSchema, `applyWatermarkStrategy` never gets called on
>>> > my ScanTableSource, which does make sense. But now the
>>> > question is what should be done? This feels a bit
>>> unintuitive.
>>> >
>>> > On Sun, Feb 21, 2021 at 7:09 PM Yuval Itzchakov
>>> > mailto:yuva...@gmail.com>> wrote:
>>> >
>>> > Hi,
>>> > Flink 1.12.1, Blink Planner, Scala 2.12
>>> >
>>> > I have the following logical plan:
>>> >
>>> >
>>>  LogicalSink(table=[default_catalog.default_database.table], fields=[bar,
>>> baz, hello_world, a, b])
>>> > +- LogicalProject(value=[$2],
>>> > bar=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)],
>>> > baz=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)],
>>> > hello_world=[null:VARCHAR(2147483647) CHARACTER SET
>>> > "UTF-16LE"], a=[null:VARCHAR(2147483647) CHARACTER SET
>>> > "UTF-16LE"], b=[EMPTY_MAP()])
>>> > +- LogicalFilter(condition=[AND(=($4,
>>> > _UTF-16LE'bar'), =($34, _UTF-16LE'baz'))])
>>> >+- LogicalWatermarkAssigner(rowtime=[bar],
>>> > watermark=[$0])
>>> >   +- LogicalTableScan(table=[[default_catalog,
>>> > default_database, foo]])
>>> >
>>> > I have a custom source which creates a TableSchema
>>> based
>>> > on an external table. When I create the schema, I push
>>> > the watermark definition to the schema:
>>> >
>>> > image.png
>>> >
>>> > When the HepPlanner starts the optimization phase and
>>> > reaches the "PushFilterInotTableSourceScanRule", it
>>> > matches 

Request for Flink JIRA Access

2021-03-07 Thread Rion Williams
Hey folks,

The community here has been awesome with my recent questions about Flink, so 
I’d like to give back. I’m already a member of the ASF JIRA but I was wondering 
if I could get access to the Flink Project. 

I’ve contributed a good bit to Apache Beam in the past, but I figured that I’ll 
be using Flink for the foreseeable future that I’d like to give back where I 
can.

Additionally, if there are any low hanging fruit / tags for new contributors 
just to familiarize myself with the process, that’d be great / appreciated!

My username for JIRA is rionmonster and if there’s anything else I could 
provide, please let me know!

Thanks,

Rion

Re: Request for Flink JIRA Access

2021-03-07 Thread Robert Metzger
Hey Rion,

you don't need special access to Flink's Jira: Any JIra user is assignable
to tickets, but only committers can assign people.

For low hanging fruits, we have a "starter" label to tag those tickets. I
also recommend keeping an eye on Jira tickets about topics you are
experienced with / interested in.

You'll find additional information here:
https://flink.apache.org/contributing/contribute-code.html

Looking forward to your contributions!


On Sun, Mar 7, 2021 at 5:21 PM Rion Williams  wrote:

> Hey folks,
>
> The community here has been awesome with my recent questions about Flink,
> so I’d like to give back. I’m already a member of the ASF JIRA but I was
> wondering if I could get access to the Flink Project.
>
> I’ve contributed a good bit to Apache Beam in the past, but I figured that
> I’ll be using Flink for the foreseeable future that I’d like to give back
> where I can.
>
> Additionally, if there are any low hanging fruit / tags for new
> contributors just to familiarize myself with the process, that’d be great /
> appreciated!
>
> My username for JIRA is rionmonster and if there’s anything else I could
> provide, please let me know!
>
> Thanks,
>
> Rion


Re: java.lang.OutOfMemoryError: GC overhead limit exceeded

2021-03-07 Thread Xintong Song
Hi Hemant,
I don't see any problem in your settings. Any exceptions suggesting why TM
containers are not coming up?

Thank you~

Xintong Song



On Sat, Mar 6, 2021 at 3:53 PM bat man  wrote:

> Hi Xintong Song,
> I tried using the java options to generate heap dump referring to docs[1]
> in flink-conf.yaml, however after adding this the task manager containers
> are not coming up. Note that I am using EMR. Am i doing anything wrong here?
>
> env.java.opts: "-XX:+HeapDumpOnOutOfMemoryError
> -XX:HeapDumpPath=/tmp/dump.hprof"
>
> Thanks,
> Hemant
>
>
>
>
>
> On Fri, Mar 5, 2021 at 3:05 PM Xintong Song  wrote:
>
>> Hi Hemant,
>>
>> This exception generally suggests that JVM is running out of heap memory.
>> Per the official documentation [1], the amount of live data barely fits
>> into the Java heap having little free space for new allocations.
>>
>> You can try to increase the heap size following these guides [2].
>>
>> If a memory leak is suspected, to further understand where the memory is
>> consumed, you may need to dump the heap on OOMs and looking for unexpected
>> memory usages leveraging profiling tools.
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>> [1]
>> https://docs.oracle.com/javase/8/docs/technotes/guides/troubleshoot/memleaks002.html
>>
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/memory/mem_setup.html
>>
>>
>>
>> On Fri, Mar 5, 2021 at 4:24 PM bat man  wrote:
>>
>>> Hi,
>>>
>>> Getting the below OOM but the job failed 4-5 times and recovered from
>>> there.
>>>
>>> j
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> *ava.lang.Exception: java.lang.OutOfMemoryError: GC overhead limit
>>> exceededat
>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)
>>>   at
>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132)
>>>   at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
>>>   at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
>>>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>> at java.lang.Thread.run(Thread.java:748)Caused by:
>>> java.lang.OutOfMemoryError: GC overhead limit exceeded*
>>>
>>> Is there any way I can debug this. since the job after a few re-starts
>>> started running fine. what could be the reason behind this.
>>>
>>> Thanks,
>>> Hemant
>>>
>>


Re: LocalWatermarkAssigner causes predicate pushdown to be skipped

2021-03-07 Thread Shengkai Fang
Hi, Yuval, Jark, Timo.

Currently the watermark push down happens in the logical rewrite phase but
the filter push down happens in the local phase, which means the planner
will first check the Filter push down and then check the watermark push
down.

I think we need a rule to transpose between the filter and watermark
assigner or extend the filter push down rule to capture the structure that
the watermark assigner is the parent of the table scan.

Best,
Shengkai

Yuval Itzchakov  于2021年3月8日周一 上午12:13写道:

> Hi Jark,
>
> Even after implementing both, I don't see the watermark being pushed to
> the tablesource in the logical plan and avoids predicate pushdown from
> running.
>
> On Sun, Mar 7, 2021, 15:43 Jark Wu  wrote:
>
>> Hi Yuval,
>>
>> That's correct you will always get a LogicalWatermarkAssigner if you
>> assigned a watermark.
>> If you implement SupportsWatermarkPushdown, the LogicalWatermarkAssigner
>> will be pushed
>> into TableSource, and then you can push Filter into source if source
>> implement SupportsFilterPushdown.
>>
>> Best,
>> Jark
>>
>> On Sat, 6 Mar 2021 at 01:16, Yuval Itzchakov  wrote:
>>
>>> Hi Timo,
>>> After investigating this further, this is actually non related to
>>> implementing SupportsWatermarkPushdown.
>>>
>>> Once I create a TableSchema for my custom source's RowData, and assign
>>> it a watermark (see my example in the original mail), the plan will always
>>> include a LogicalWatermarkAssigner. This assigner that is between the
>>> LogicalTableScan and the LogicalFilter will then go on and fail the
>>> HepPlanner from invoking the optimization since it requires
>>> LogicalTableScan to be a direct child of LogicalFilter. Since I have
>>> LogicalFilter -> LogicalWatermarkAssigner -> LogicalTableScan, this won't
>>> work.
>>>
>>> On Fri, Mar 5, 2021 at 5:59 PM Timo Walther  wrote:
>>>
 Hi Yuval,

 sorry that nobody replied earlier. Somehow your email fell through the
 cracks.

 If I understand you correctly, could would like to implement a table
 source that implements both `SupportsWatermarkPushDown` and
 `SupportsFilterPushDown`?

 The current behavior might be on purpose. Filters and Watermarks are
 not
 very compatible. Filtering would also mean that records (from which
 watermarks could be generated) are skipped. If the filter is very
 strict, we would not generate any new watermarks and the pipeline would
 stop making progress in time.

 Watermark push down is only necessary, if per-partition watermarks are
 required. Otherwise the watermarks are generated in a subsequent
 operator after the source. So you can still use rowtime without
 implementing `SupportsWatermarkPushDown` in your custom source.

 I will lookp in Shengkai who worked on this topic recently.

 Regards,
 Timo


 On 04.03.21 18:52, Yuval Itzchakov wrote:
 > Bumping this up again, would appreciate any help if anyone is
 familiar
 > with the blink planner.
 >
 > Thanks,
 > Yuval.
 >
 > On Fri, Feb 26, 2021, 18:53 Yuval Itzchakov >>> > > wrote:
 >
 > Hi Jark,
 > Would appreciate your help with this.
 >
 > On Wed, Feb 24, 2021 at 12:09 PM Roman Khachatryan <
 ro...@apache.org
 > > wrote:
 >
 > Hi Yuval,
 >
 > I'm not familiar with the Blink planner but probably Jark can
 help.
 >
 > Regards,
 > Roman
 >
 >
 > On Sun, Feb 21, 2021 at 6:52 PM Yuval Itzchakov
 > mailto:yuva...@gmail.com>> wrote:
 >
 > Update: When I don't set the watermark explicitly on the
 > TableSchema, `applyWatermarkStrategy` never gets called on
 > my ScanTableSource, which does make sense. But now the
 > question is what should be done? This feels a bit
 unintuitive.
 >
 > On Sun, Feb 21, 2021 at 7:09 PM Yuval Itzchakov
 > mailto:yuva...@gmail.com>> wrote:
 >
 > Hi,
 > Flink 1.12.1, Blink Planner, Scala 2.12
 >
 > I have the following logical plan:
 >
 >
  LogicalSink(table=[default_catalog.default_database.table], fields=[bar,
 baz, hello_world, a, b])
 > +- LogicalProject(value=[$2],
 > bar=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)],
 > baz=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)],
 > hello_world=[null:VARCHAR(2147483647) CHARACTER SET
 > "UTF-16LE"], a=[null:VARCHAR(2147483647) CHARACTER SET
 > "UTF-16LE"], b=[EMPTY_MAP()])
 > +- LogicalFilter(condition=[AND(=($4,
 > _UTF-16LE'bar'), =($34, _UTF-16LE'baz'))])
 >+- LogicalWate

Re: Error Starting PyFlink in Kubernetes Session Cluster "Could Not Get Rest Endpoint"

2021-03-07 Thread Yang Wang
I think you want to submit a Flink python job to the existing session
cluster.
Please ensure the session cluster is created with proper service exposed
type[1].
* LoadBalancer for the cloud environment
* NodePort for self managed K8s cluster
* ClusterIP for the K8s internal submission, which means you should submit
the Flink application in the K8s cluster. This could be done by starting a
pod as the Flink client.

[1].
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#accessing-flinks-web-ui

Best,
Yang

Robert Cullen  于2021年3月6日周六 上午12:43写道:

> Trying to spin up a Python Flink instance in my Kubernetes cluster with
> this configuration ...
>
> sudo ./bin/flink run \
> --target kubernetes-session \
> -Dkubernetes.cluster-id=flink-python \
> -Dkubernetes.namespace=cmdaa \
> -Dkubernetes.container.image=cmdaa/pyflink:0.0.1 \
> --pyModule word_count \
> --pyFiles /opt/flink-1.12.2/examples/python/table/batch/word_count.py
>
>  ... But getting this error:
>
> Traceback (most recent call last):
>   File "/Users/admin/.pyenv/versions/3.6.10/lib/python3.6/runpy.py", line 
> 193, in _run_module_as_main
> "__main__", mod_spec)
>   File "/Users/admin/.pyenv/versions/3.6.10/lib/python3.6/runpy.py", line 85, 
> in _run_code
> exec(code, run_globals)
>   File 
> "/var/folders/zz/zyxvpxvq6csfxvn_n0/T/pyflink/074c581c-5b39-4b07-ac84-55a86c46f9eb/affe3559-5364-43f1-93ef-b7f6cd9a2f77/word_count.py",
>  line 80, in 
> word_count()
>   File 
> "/var/folders/zz/zyxvpxvq6csfxvn_n0/T/pyflink/074c581c-5b39-4b07-ac84-55a86c46f9eb/affe3559-5364-43f1-93ef-b7f6cd9a2f77/word_count.py",
>  line 74, in word_count
> t_env.execute("word_count")
>   File 
> "/opt/flink-1.12.2/opt/python/pyflink.zip/pyflink/table/table_environment.py",
>  line 1276, in execute
>   File 
> "/opt/flink-1.12.2/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", 
> line 1286, in __call__
>   File "/opt/flink-1.12.2/opt/python/pyflink.zip/pyflink/util/exceptions.py", 
> line 147, in deco
>   File "/opt/flink-1.12.2/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", 
> line 328, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o2.execute.
> : java.lang.RuntimeException: 
> org.apache.flink.client.deployment.ClusterRetrieveException: Could not get 
> the rest endpoint of flink-python
> at 
> org.apache.flink.kubernetes.KubernetesClusterDescriptor.lambda$createClusterClientProvider$0(KubernetesClusterDescriptor.java:102)
> at 
> org.apache.flink.kubernetes.KubernetesClusterDescriptor.retrieve(KubernetesClusterDescriptor.java:145)
> at 
> org.apache.flink.kubernetes.KubernetesClusterDescriptor.retrieve(KubernetesClusterDescriptor.java:66)
> at 
> org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:75)
> at 
> org.apache.flink.table.api.internal.BatchTableEnvImpl.executePipeline(BatchTableEnvImpl.scala:347)
> at 
> org.apache.flink.table.api.internal.BatchTableEnvImpl.execute(BatchTableEnvImpl.scala:317)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at 
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at 
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
> at 
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at 
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
> at 
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.client.deployment.ClusterRetrieveException: Could 
> not get the rest endpoint of flink-python
> ... 17 more
>
> org.apache.flink.client.program.ProgramAbortException
> at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
> at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
> at 
> org.apache.flink.client.ClientUtils.executeProgram(Cl

Re: Re: Checkpoint Error

2021-03-07 Thread Yun Gao
Hi Navneeth,

It seems from the stack that the exception is caused by the underlying EFS 
problems ? Have you checked
if there are errors reported for EFS, or if there might be duplicate mounting 
for the same EFS and others
have ever deleted the directory?

Best,
Yun



 --Original Mail --
Sender:Navneeth Krishnan 
Send Date:Sun Mar 7 15:44:59 2021
Recipients:user 
Subject:Re: Checkpoint Error

Hi All,

Any suggestions?

Thanks
On Mon, Jan 18, 2021 at 7:38 PM Navneeth Krishnan  
wrote:

Hi All,

We are running our streaming job on flink 1.7.2 and we are noticing the below 
error. Not sure what's causing it, any pointers would help. We have 10 TM's 
checkpointing to AWS EFS.

AsynchronousException{java.lang.Exception: Could not materialize checkpoint 11 
for operator Processor -> Sink: KafkaSink (34/42).}at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)at
 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)at
 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)at
 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at 
java.util.concurrent.FutureTask.run(FutureTask.java:266)at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at
 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at
 java.lang.Thread.run(Thread.java:748)Caused by: java.lang.Exception: Could not 
materialize checkpoint 11 for operator Processor -> Sink: KafkaSink (34/42).at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)...
 6 moreCaused by: java.util.concurrent.ExecutionException: java.io.IOException: 
Could not flush and close the file system output stream to 
file:/mnt/checkpoints/a300d1b0fd059f3f83ce35a8042e89c8/chk-11/1cd768bd-3408-48a9-ad48-b005f66b130d
 in order to obtain the stream state handleat 
java.util.concurrent.FutureTask.report(FutureTask.java:122)at 
java.util.concurrent.FutureTask.get(FutureTask.java:192)at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53)at
 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)...
 5 moreCaused by: java.io.IOException: Could not flush and close the file 
system output stream to 
file:/mnt/checkpoints/a300d1b0fd059f3f83ce35a8042e89c8/chk-11/1cd768bd-3408-48a9-ad48-b005f66b130d
 in order to obtain the stream state handleat 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:326)at
 
org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:767)at
 
org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:696)at
 
org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:76)at
 java.util.concurrent.FutureTask.run(FutureTask.java:266)at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)... 7 
moreCaused by: java.io.IOException: Stale file handleat 
java.io.FileOutputStream.close0(Native Method)at 
java.io.FileOutputStream.access$000(FileOutputStream.java:53)at 
java.io.FileOutputStream$1.close(FileOutputStream.java:356)at 
java.io.FileDescriptor.closeAll(FileDescriptor.java:212)at 
java.io.FileOutputStream.close(FileOutputStream.java:354)at 
org.apache.flink.core.fs.local.LocalDataOutputStream.close(LocalDataOutputStream.java:62)at
 
org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)at
 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:312)...
 12 more

Thanks

Re: Re: Checkpoint Error

2021-03-07 Thread Navneeth Krishnan
Hi Yun,

Thanks for the response. I checked the mounts and only the JM's and TM's
are mounted with this EFS. Not sure how to debug this.

Thanks

On Sun, Mar 7, 2021 at 8:29 PM Yun Gao  wrote:

> Hi Navneeth,
>
> It seems from the stack that the exception is caused by the underlying EFS
> problems ? Have you checked
> if there are errors reported for EFS, or if there might be duplicate
> mounting for the same EFS and others
> have ever deleted the directory?
>
> Best,
> Yun
>
>
> --Original Mail --
> *Sender:*Navneeth Krishnan 
> *Send Date:*Sun Mar 7 15:44:59 2021
> *Recipients:*user 
> *Subject:*Re: Checkpoint Error
>
>> Hi All,
>>
>> Any suggestions?
>>
>> Thanks
>>
>> On Mon, Jan 18, 2021 at 7:38 PM Navneeth Krishnan <
>> reachnavnee...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> We are running our streaming job on flink 1.7.2 and we are noticing the
>>> below error. Not sure what's causing it, any pointers would help. We have
>>> 10 TM's checkpointing to AWS EFS.
>>>
>>> AsynchronousException{java.lang.Exception: Could not materialize checkpoint 
>>> 11 for operator Processor -> Sink: KafkaSink (34/42).}at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)at
>>>  
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)at
>>>  
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)at
>>>  java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at 
>>> java.util.concurrent.FutureTask.run(FutureTask.java:266)at 
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at
>>>  
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at
>>>  java.lang.Thread.run(Thread.java:748)Caused by: java.lang.Exception: Could 
>>> not materialize checkpoint 11 for operator Processor -> Sink: KafkaSink 
>>> (34/42).at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)...
>>>  6 moreCaused by: java.util.concurrent.ExecutionException: 
>>> java.io.IOException: Could not flush and close the file system output 
>>> stream to 
>>> file:/mnt/checkpoints/a300d1b0fd059f3f83ce35a8042e89c8/chk-11/1cd768bd-3408-48a9-ad48-b005f66b130d
>>>  in order to obtain the stream state handleat 
>>> java.util.concurrent.FutureTask.report(FutureTask.java:122)at 
>>> java.util.concurrent.FutureTask.get(FutureTask.java:192)at 
>>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)at 
>>> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53)at
>>>  
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)...
>>>  5 moreCaused by: java.io.IOException: Could not flush and close the file 
>>> system output stream to 
>>> file:/mnt/checkpoints/a300d1b0fd059f3f83ce35a8042e89c8/chk-11/1cd768bd-3408-48a9-ad48-b005f66b130d
>>>  in order to obtain the stream state handleat 
>>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:326)at
>>>  
>>> org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:767)at
>>>  
>>> org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:696)at
>>>  
>>> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:76)at
>>>  java.util.concurrent.FutureTask.run(FutureTask.java:266)at 
>>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)... 
>>> 7 moreCaused by: java.io.IOException: Stale file handleat 
>>> java.io.FileOutputStream.close0(Native Method)at 
>>> java.io.FileOutputStream.access$000(FileOutputStream.java:53)at 
>>> java.io.FileOutputStream$1.close(FileOutputStream.java:356)at 
>>> java.io.FileDescriptor.closeAll(FileDescriptor.java:212)at 
>>> java.io.FileOutputStream.close(FileOutputStream.java:354)at 
>>> org.apache.flink.core.fs.local.LocalDataOutputStream.close(LocalDataOutputStream.java:62)at
>>>  
>>> org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)at
>>>  
>>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:312)...
>>>  12 more
>>>
>>>
>>> Thanks
>>>
>>>


How do I call an algorithm written in C++ in Flink?

2021-03-07 Thread 苏喜 张
The company has provided an algorithm written in C++, which has been packaged 
into a.so file. I have built a SpringBoot project, which uses JNI to operate 
the algorithm written in C++. Could you please tell me how to call it in Flink? 
Do i need to define operators, chains of operators?



Re: java options to generate heap dump in EMR not working

2021-03-07 Thread Yun Gao
Hi,

I tried with the standalone session (sorry I do not have a yarn cluster in 
hand) and it seems that
the flink cluster could startup normally. Could you check the log of 
NodeManager to see the detail
reason that the container does not get launched? Also have you check if there 
are some spell error
or some unexpected special white space character for the configuration ?

For the case of configuring `env.java.opts`, it seems the JobManager also could 
not be launched with
this configuration.

Best,
Yun
 --Original Mail --
Sender:bat man 
Send Date:Sat Mar 6 16:03:06 2021
Recipients:user 
Subject:java options to generate heap dump in EMR not working

Hi,

I am trying to generate a heap dump to debug a GC overhead OOM. For that I 
added the below java options in flink-conf.yaml, however after adding this the 
yarn is not able to launch the containers. The job logs show it goes on 
requesting for containers from yarn and it gets them, again releases it. then 
again the same cycle continues. If I remove the option from flink-conf.yaml 
then the containers are launched and the job starts processing.

env.java.opts.taskmanager: "-XX:+HeapDumpOnOutOfMemoryError 
-XX:HeapDumpPath=/tmp/dump.hprof"

If I try this then yarn client does not comes up -

env.java.opts: "-XX:+HeapDumpOnOutOfMemoryError 
-XX:HeapDumpPath=/tmp/dump.hprof"

Am I doing anything wrong here?

PS: I am using EMR. 

Thanks,
Hemant

Gradually increasing checkpoint size

2021-03-07 Thread Dan Hill
Hi!

I'm running a backfill Flink stream job over older data.  It has multiple
interval joins.  I noticed my checkpoint is regularly gaining in size.  I'd
expect my checkpoints to stabilize and not grow.

Is there a setting to prune useless data from the checkpoint?  My top guess
is that my checkpoint has a bunch of useless state in it.

- Dan


Re: Job downgrade

2021-03-07 Thread Tzu-Li (Gordon) Tai
Hi Alexey,

Thanks for confirming.

Can you send me a copy of the exception stack trace? That could help me
pinpoint the exact issue.

Cheers,
Gordon

On Fri, Mar 5, 2021 at 2:02 PM Alexey Trenikhun  wrote:

> Hi Gordon,
> I was using RocksDB backend
> Alexey
>
> --
> *From:* Tzu-Li (Gordon) Tai 
> *Sent:* Thursday, March 4, 2021 12:58:01 AM
> *To:* Alexey Trenikhun 
> *Cc:* Piotr Nowojski ; Flink User Mail List <
> user@flink.apache.org>
> *Subject:* Re: Job downgrade
>
> Hi Alexey,
>
> Are you using the heap backend? If that's the case, then for whatever
> state was registered at the time of a savepoint, Flink will attempt to
> restore it to the heap backends.
> This essentially means that state "B" will be read as well, that would
> explain why Flink is trying to locate class B in the classpath.
>
> For this scenario, class B needs to be in the classpath if you downgrade
> back to version 1, with a savepoint taken with version 2 of the job.
>
> - Gordon
>
> On Thu, Mar 4, 2021 at 4:04 AM Alexey Trenikhun  wrote:
>
> If I copy class A into version 1+ it works. But it is the problem from CD
> perspective - I want to introduce feature which required new state: 1st I
> need make version 1+ with class B, but no other changes, then version 2 with
> class B and logic changes, upgrade job and if job doesn’t do what expected
> “rollback” to version 1+.
>
> --
> *From:* Piotr Nowojski 
> *Sent:* Wednesday, March 3, 2021 11:47:45 AM
> *To:* Alexey Trenikhun 
> *Cc:* Flink User Mail List 
> *Subject:* Re: Job downgrade
>
> Hi,
>
> I'm not sure what's the reason behind this. Probably classes are somehow
> attached to the state and this would explain why you are experiencing this
> issue. I've asked someone else from the community to chip in, but in the
> meantime, can not you just prepare a new "version 1" of the job, with just
> some empty `class B` on the class path? Or if this doesn't work, just copy
> the whole `class B` from version 2?
>
> Best,
> Piotrek
>
> sob., 27 lut 2021 o 19:10 Alexey Trenikhun  napisał(a):
>
> Hello,
> Let's have version 1 of my job uses keyed state with name "a" and type A,
> which some Avro generated class. Then I upgrade to version 2, which in
> addition uses keyed state "b" and type B (another concrete Avro generated
> class), I take savepoint with version 2 and decided to downgrade to version
> 1 and start with taken savepoint, can I do it? On one hand, version 1
> doesn't have state "b", but seems Flink still tries to create call
> restoreSerializer​ and it tries to read runtimeType (`class B`) which is
> not available in version 1
>
> Thanks,
> Alexey
>
>