回复: why we need keyed state and operate state when we already have checkpoint?

2020-10-13 Thread 大森林
So
state:
store the result of some operator(such as keyby,map)


Checkpoint:
store the last result when the program is running OK.


Am I right?
Thanks for your help~!




-- 原始邮件 --
发件人:
"Congxian Qiu"  
  
https://ci.apache.org/projects/flink/flink-docs-release-1.11/concepts/stateful-stream-processing.html
Best,

Congxian









大森林 https://ci.apache.org/projects/flink/flink-docs-release-1.11/learn-flink/fault_tolerance.html#definitions


On Wed, Oct 7, 2020 at 6:51 AM 大森林 

Re: PyFlink 1.11.2 couldn’t configure [taskmanager.memory.task.off-heap.size] property when registering custom UDF function

2020-10-13 Thread Xingbo Huang
Hi,

>From my point of view, pyflink-shell only provides an interactive tool.
Below it, you can choose whether to run the job in minicluster(similar to
python xx.py)  or submit it to the cluster through flink run. For python
xxx.py, it is reasonable to not load the config of flink-conf.yaml. What do
you think?


Best,
Xingbo

Sharipov, Rinat  于2020年10月13日周二 下午2:16写道:

> Hi Xingbo, thx a lot, it works !
>
> But I'm still sure that it's not obvious from a user point of view, that 
> *pyflink-shell.sh
> *doesn't use provided flink-conf.yaml, don't you think that it looks like
> an issue ?
>
> Thx !
>
> вт, 13 окт. 2020 г. в 05:35, Xingbo Huang :
>
>> Hi,
>>
>> You can use api to set configuration:
>> table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
>> '80m')
>>
>> The flink-conf.yaml way will only take effect when submitted through
>> flink run, and the minicluster way(python xxx.py) will not take effect.
>>
>> Best,
>> Xingbo
>>
>> Sharipov, Rinat  于2020年10月13日周二 上午1:56写道:
>>
>>> Hi mates !
>>>
>>> I'm very new at pyflink and trying to register a custom UDF function
>>> using python API.
>>> Currently I faced an issue in both server env and my local IDE
>>> environment.
>>>
>>> When I'm trying to execute the example below I got an error message: *The
>>> configured Task Off-Heap Memory 0 bytes is less than the least required
>>> Python worker Memory 79 mb. The Task Off-Heap Memory can be configured
>>> using the configuration key 'taskmanager.memory.task.off-heap.size*
>>>
>>> Of course I've added required property into *flink-conf.yaml *and
>>> checked that *pyflink-shell.sh *initializes env using specified
>>> configuration but it doesn't make any sense and I still have an error.
>>>
>>> I've also attached my flink-conf.yaml file
>>>
>>> Thx for your help !
>>>
>>> *Here is an example:*
>>>
>>> from pyflink.dataset import ExecutionEnvironment
>>> from pyflink.table import BatchTableEnvironment, DataTypes
>>> from pyflink.table.udf import udf
>>>
>>>
>>> @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
>>> def test_udf(i):
>>> return i
>>>
>>>
>>> if __name__ == "__main__":
>>> env = ExecutionEnvironment.get_execution_environment()
>>> env.set_parallelism(1)
>>>
>>> bt_env = BatchTableEnvironment.create(env)
>>> bt_env.register_function("test_udf", test_udf)
>>>
>>> my_table = bt_env.from_elements(
>>> [
>>> ("user-1", "http://url/1";),
>>> ("user-2", "http://url/2";),
>>> ("user-1", "http://url/3";),
>>> ("user-3", "http://url/4";),
>>> ("user-1", "http://url/3";)
>>> ],
>>> [
>>> "uid", "url"
>>> ]
>>> )
>>>
>>> my_table_grouped_by_uid = my_table.group_by("uid").select("uid, 
>>> collect(url) as urls")
>>> bt_env.create_temporary_view("my_temp_table", my_table_grouped_by_uid)
>>>
>>> bt_env.execute_sql("select test_udf(uid) as uid, urls from 
>>> my_temp_table").print()
>>>
>>>
>>>
>>>


Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2020-10-13 Thread Yun Gao
Hi Arvid,
Very thanks for the comments!
>>> 4) Yes, the interaction is not trivial and also I have not completely 
>>> thought it through. But in general, I'm currently at the point where I 
>>> think that we also need non-checkpoint related events in unaligned 
>>> checkpoints. So just keep that in mind, that we might converge anyhow at 
>>> this point.
I also agree with that it would be better to keep the unaligned checkpoints 
behavior on EndOfPartition, I will then double check on this issue again. 

>>> In general, what is helping in this case is to remember that there no 
>>> unaligned checkpoint barrier ever going to overtake EndOfPartition. So, we 
>>> can completely ignore the problem on how to store and restore output 
>>> buffers of a completed task (also important for the next point).
Exactly, we should not need to persist the output buffers for the completed 
tasks, and that would simply the implementation a lot.

>>> 5) I think we are on the same page and I completely agree that for the 
>>> MVP/first version, it's completely fine to start and immediately stop. A 
>>> tad better would be even to not even start the procession loop. 
I also agree with this part. We would keep optimizing the implementation after 
the first version. 

Best,
Yun   




--
From:Arvid Heise 
Send Time:2020 Oct. 13 (Tue.) 03:39
To:Yun Gao 
Cc:Flink Dev ; User-Flink 
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Hi Yun,

4) Yes, the interaction is not trivial and also I have not completely thought 
it through. But in general, I'm currently at the point where I think that we 
also need non-checkpoint related events in unaligned checkpoints. So just keep 
that in mind, that we might converge anyhow at this point.

In general, what is helping in this case is to remember that there no unaligned 
checkpoint barrier ever going to overtake EndOfPartition. So, we can completely 
ignore the problem on how to store and restore output buffers of a completed 
task (also important for the next point).

5) I think we are on the same page and I completely agree that for the 
MVP/first version, it's completely fine to start and immediately stop. A tad 
better would be even to not even start the procession loop. 

On Mon, Oct 12, 2020 at 6:18 PM Yun Gao  wrote:

Hi Arvid,
Very thanks for the insightful comments! I added the responses for this issue 
under the quota: 
>> 1) You call the tasks that get the barriers injected leaf nodes, which would 
>> make the > sinks the root nodes. That is very similar to how graphs in 
>> relational algebra are labeled. However, I got the feeling that in Flink, we 
>> rather iterate from sources to sink, making the sources root nodes and the 
>> sinks the leaf nodes. However, I have no clue how it's done in similar 
>> cases, so please take that hint cautiously.
>> 2) I'd make the algorithm to find the subtasks iterative and react in 
>> CheckpointCoordinator. Let's assume that we inject the barrier at all root 
>> subtasks (initially all sources). So in the iterative algorithm, whenever 
>> root A finishes, it looks at all connected subtasks B if they have any 
>> upstream task left. If not B becomes a new root. That would require to only 
>> touch a part of the job graph, but would require some callback from 
>> JobManager to CheckpointCoordinator.

I think I should have used a bad name of "leaf nodes", in fact I think we 
should have the same thoughts that we start with the source nodes to find all 
the nodes whose precedent nodes are all finished. It would be much better to 
call these nodes (which we would trigger) as "root nodes". I'll modify the FLIP 
to change the names to "root nodes".
>> 2b) We also need to be careful for out-of-sync updates: if the root is about 
>> to finish, we could send the barrier to it from CheckpointCoordinator, but 
>> at the time it arrives, the subtask is finished already.
Exactly. When the checkpoint triggers a task but found the task is not there, 
it may then further check if the task has been finished, if so, it should then 
re-check its descendants to see if there are new "root nodes" to trigger.
>> 3) An implied change is that checkpoints are not aborted anymore at 
>> EndOfPartition, which is good, but might be explicitly added.
Yes, currently barrier alignment would fail the current checkpoint on 
EndOfPartition, and we would modify the behavior.
>> 4) The interaction between unaligned checkpoint and EndOfPartition is a bit 
>> ambiguous: What happens when an unaligned checkpoint is started and then one 
>> input channel contains the EndOfPartition event? From the written 
>> description, it sounds to me like, we move back to an aligned checkpoint for 
>> the whole receiving task. However, that is neither easily possible nor 
>> necessary. Imho it would be enough to also store the EndOfPartition in the 
>> channel state.

Very thanks for the suggestions on this iss

Re: Dynamic file name prefix - StreamingFileSink

2020-10-13 Thread Ravi Bhushan Ratnakar
Hi Vijayendra,

OutputFileConfig provides a builder method to create immutable objects
with given 'prefix' and 'suffix'. The parameter which you are passing
to '*withPartPrefix*' will only be evaluated at the time of calling
this method '*withPartPrefix*'. So if you want to achieve a dynamic
'prefix' or 'suffix' then you may try to have your own custom
implementation of 'OutputFileConfig' which could provide a way to set
function definition for 'prefix' or 'suffix'. For the same, I am
attaching you a sample implementation. Kindly make sure that the
function definition which you are passing is serializable.


Use like this

val outputFileConfig:OutputFileConfig = new DynamicOutputFileConfig()
  
.withPartPrefixFunction(()=>ZonedDateTime.now.withZoneSameInstant(ZoneId.of("UTC")).format(DateTimeFormatter.ofPattern("-MM-dd-HH-mm-ss-SSS")))
  .withPartSuffixFunction(()=> ".ext")


Regards,
Ravi

On Tue, Oct 13, 2020 at 6:05 AM Vijayendra Yadav 
wrote:

> Hi Team,
>
> I have tried to assign a dynamic prefix for file name, which contains
> datetime components.
> *The Problem is Job always takes initial datetime when job first starts
> and never refreshes later. *
> *How can I get dynamic current datetime in filename at sink time ?*
>
> *.withPartPrefix
> (ZonedDateTime.now.withZoneSameInstant(ZoneId.of("UTC")).format(DateTimeFormatter.ofPattern("-MM-dd-HH-mm-ss-SSS")))*
>
>
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
>
> val config = OutputFileConfig
>  .builder() .withPartPrefix("prefix")
>  .withPartSuffix(".ext")
>  .build()
> val sink = StreamingFileSink
>  .forRowFormat(new Path(outputPath), new SimpleStringEncoder[String]("UTF-8"))
>  .withBucketAssigner(new KeyBucketAssigner())
>  .withRollingPolicy(OnCheckpointRollingPolicy.build()) 
> .withOutputFileConfig(config)
>  .build()
>
>
package com.example

import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig

class DynamicOutputFileConfig(partPrefix:String,partSuffix:String) extends OutputFileConfig(partPrefix,partSuffix) with Serializable {

  private var partPrefixFunction:() => String = _
  private var partSuffixFunction:() => String = _

  def this(){
this("","")
  }


  override def getPartPrefix: String = if(partPrefixFunction == null) partPrefix else partPrefixFunction.apply()

  /**
* The suffix for the part name.
*/
  override def getPartSuffix: String = if(partSuffixFunction == null) partSuffix else partSuffixFunction.apply()

  def withPartPrefixFunction(fun:() => String):DynamicOutputFileConfig={
this.partPrefixFunction = fun
this
  }

  def withPartSuffixFunction(fun:() => String):DynamicOutputFileConfig={
this.partSuffixFunction = fun
this
  }
}


Re: Additional options to S3 Filesystem: Interest?

2020-10-13 Thread Arvid Heise
Hi Padarn,

I assigned the ticket to you, so you can start working on it. Here are some
contribution guidelines [1] in case it's your first contribution.

Basically, you will need to open a PR which contains the ticket and
component. So the prefix should be "[FLINK-19589][s3]" (also for your
commits).

Feel free to reach out to me if you have any questions about the process.
All discussions about the feature should be on the ticket, so everyone can
see it.

[1] https://flink.apache.org/contributing/contribute-code.html

On Tue, Oct 13, 2020 at 3:37 AM Padarn Wilson  wrote:

> Thanks for the feedback. I've created a JIRA here
> https://issues.apache.org/jira/browse/FLINK-19589.
>
> @Dan: This indeed would make it easier to set a lifetime property on
> objects created by Flink, but actually if you want to apply it to all your
> objects for a given bucket you can set bucket wide policies instead. The
> reason I want this is that we have a shared bucket and wish to tag
> different objects based on which pipeline is producing them.
>
> On Tue, Oct 13, 2020 at 4:13 AM Dan Diephouse  wrote:
>
>> We use the StreamingFileSink. An option to expire files after some time
>> period would certainly be welcome. (I could probably figure out a way to do
>> this from the S3 admin UI too though)
>>
>> On Sat, Oct 10, 2020 at 10:45 PM Padarn Wilson  wrote:
>>
>>> Hi Flink Users,
>>>
>>> We need to expose some additional options for the s3 hadoop filesystem:
>>> Specifically, we want to set object tagging and lifecycle. This would be a
>>> fairly easy change and we initially thought to create a new Filsystem with
>>> very minor changes to allow this.
>>>
>>> However then I wondered, would others use this? If it something that is
>>> worth raising as a Flink issue and then contributing back upstream.
>>>
>>> Any others who would like to be able to set object tags for the s3
>>> filesystem?
>>>
>>> Cheers,
>>> Padarn
>>>
>>
>>
>> --
>> Dan Diephouse
>> @dandiep
>>
>

-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Flink is failing for all Jobs if one job gets failed

2020-10-13 Thread saksham sapra
I am working on flink local, i have created one task manager which pushes
the request to flink. So if one job gets failed for some function error,
then other jobs which were running correctly before the error came for one
file ,  the new jobs fail automatically if configuration for that file is
different also.

It seems to flink cache issues, not sure. If anyone has any idea let me
know.
Please find the log attached.

2020-10-13 13:57:13
java.lang.Exception: The user defined 'open(Configuration)' method in class
com.opus.optimus.toolset.batch.workflow.runtime.flink.adapter.
AdapterMultiTypeRichFlatMapFunction caused an exception: Failed to compile
class name(com.opus.optimus.toolset.batch.workflow.di2.
NG100FileReaderd8d76151019a4e9985627bd9b73409d4InboundAdapter256253037297300)

code:

public class
NG100FileReaderd8d76151019a4e9985627bd9b73409d4InboundAdapter256253037297300
implements IInboundContextAdapter {
String convert(IndexedMapContext context) {
return nG9(context);
}

public String adapt(IContext context) {
return convert((IndexedMapContext)context);
}

String nG9(IndexedMapContext context) {
return (String)context.get(2);}
}

at org.apache.flink.runtime.operators.BatchTask.openUserCode(BatchTask
.java:1351)
at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver
.openTask(ChainedFlatMapDriver.java:47)
at org.apache.flink.runtime.operators.BatchTask.openChainedTasks(
BatchTask.java:1391)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(
DataSourceTask.java:157)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.RuntimeException: Failed to compile class
name(com.opus.optimus.toolset.batch.workflow.di2.
NG100FileReaderd8d76151019a4e9985627bd9b73409d4InboundAdapter256253037297300)

code:
package com.opus.optimus.toolset.batch.workflow.di2;

import com.opus.optimus.toolset.batch.workflow.dynamic.IndexedMapContext;
import com.opus.optimus.toolset.batch.workflow.runtime.common.IContext;
import com.opus.optimus.toolset.batch.workflow.runtime.common.
IInboundContextAdapter;
import java.lang.String;

public class
NG100FileReaderd8d76151019a4e9985627bd9b73409d4InboundAdapter256253037297300
implements IInboundContextAdapter {
String convert(IndexedMapContext context) {
return nG9(context);
}

public String adapt(IContext context) {
return convert((IndexedMapContext)context);
}

String nG9(IndexedMapContext context) {
return (String)context.get(2);}
}

at com.opus.optimus.toolset.code.generator.compiler.ScriptCompilerUtil
.newInstance(ScriptCompilerUtil.java:11)
at com.opus.optimus.toolset.batch.workflow.dynamic.analyzer.
JavaCodeInboundContextAdapter.compile(JavaCodeInboundContextAdapter.java:31)
at com.opus.optimus.toolset.batch.workflow.runtime.flink.util.
AdapterUtil.compileAdapters(AdapterUtil.java:31)
at com.opus.optimus.toolset.batch.workflow.runtime.flink.util.
AdapterUtil.compileAdapters(AdapterUtil.java:47)
at com.opus.optimus.toolset.batch.workflow.runtime.flink.adapter.
AdapterMultiTypeRichFlatMapFunction.compileScripts(
AdapterMultiTypeRichFlatMapFunction.java:65)
at com.opus.optimus.toolset.batch.workflow.runtime.flink.adapter.
AdapterMultiTypeRichFlatMapFunction.open(AdapterMultiTypeRichFlatMapFunction
.java:60)
at org.apache.flink.api.common.functions.util.FunctionUtils
.openFunction(FunctionUtils.java:36)
at org.apache.flink.runtime.operators.BatchTask.openUserCode(BatchTask
.java:1349)
... 6 more
Caused by: java.lang.ClassNotFoundException: File
'com/opus/optimus/generated/excel/NG1256203349277500.java', Line 11, Column
35: No applicable constructor/method found for actual parameters
"java.lang.String"; candidates are: "public static java.lang.Integer
com.opus.optimus.toolset.batch.workflow.function.standard.common.excel.ExcelFunctions.indexOf(java.lang.String,
java.lang.String)"
at org.codehaus.janino.JavaSourceClassLoader.generateBytecodes(
JavaSourceClassLoader.java:228)
at org.codehaus.janino.JavaSourceClassLoader.findClass(
JavaSourceClassLoader.java:182)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:588)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
at com.opus.optimus.toolset.code.generator.compiler.JaninoCompiler
.getClass(JaninoCompiler.java:18)
at com.opus.optimus.toolset.code.generator.compiler.ScriptCompilerUtil
.newInstance(ScriptCompilerUtil.java:7)
... 13 more
Caused by: org.codehaus.commons.compiler.CompileException: File
'com/opus/optimus/generated/excel/NG1256203349277500.java', Line 11, Column
35: No applicable constructor/method found for actual parameters
"java.lang.String"; candidates are: "public static java.lang.Integer
com.opus.optimus.toolset.batch.workflow.function.standard.common.excel.ExcelFunctions.indexOf(java.lang.String,
java.lang.String)"
at org.codehaus.ja

Re: [DISCUSS] Remove flink-connector-filesystem module.

2020-10-13 Thread Konstantin Knauf
Given that it has been deprecated for three releases now, I am +1 to
dropping it.

On Mon, Oct 12, 2020 at 9:38 PM Chesnay Schepler  wrote:

> Is there a way for us to change the module (in a reasonable way) that
> would allow users to continue using it?
> Is it an API problem, or one of semantics?
>
> On 10/12/2020 4:57 PM, Kostas Kloudas wrote:
> > Hi Chesnay,
> >
> > Unfortunately not from what I can see in the code.
> > This is the reason why I am opening a discussion. I think that if we
> > supported backwards compatibility, this would have been an easier
> > process.
> >
> > Kostas
> >
> > On Mon, Oct 12, 2020 at 4:32 PM Chesnay Schepler 
> wrote:
> >> Are older versions of the module compatible with 1.12+?
> >>
> >> On 10/12/2020 4:30 PM, Kostas Kloudas wrote:
> >>> Hi all,
> >>>
> >>> As the title suggests, this thread is to discuss the removal of the
> >>> flink-connector-filesystem module which contains (only) the deprecated
> >>> BucketingSink. The BucketingSin is deprecated since FLINK 1.9 [1] in
> >>> favor of the relatively recently introduced StreamingFileSink.
> >>>
> >>> For the sake of a clean and more manageable codebase, I propose to
> >>> remove this module for release-1.12, but of course we should see first
> >>> if there are any usecases that depend on it.
> >>>
> >>> Let's have a fruitful discussion.
> >>>
> >>> Cheers,
> >>> Kostas
> >>>
> >>> [1] https://issues.apache.org/jira/browse/FLINK-13396
> >>>
>
>

-- 

Konstantin Knauf

https://twitter.com/snntrable

https://github.com/knaufk


Re: Flink Kafka offsets

2020-10-13 Thread Dawid Wysakowicz
Hey Rex,

I agree the documentation might be slightly misleading. To get the full
picture of that configuration I'd suggest having a look at the
DataStream Kafka connector page[1]. The Table connector is just a
wrapper around the DataStream one.

Let me also try to clarify it a bit more. In case of Flink there are two
places where the offsets are committed:

1) Flink's checkpoint/savepoint. Those always take the highest priority.
Therefore e.g. when the job is restarted because of a failure, it will
use offsets that were stored in the last successful checkpoint.

2) Upon a checkpoint Flink can also write the offsets back to Kafka.
This is enabled by default in DataStream API and is enabled in Table API
if you provide properties.group.id[2]. This works only if you have
checkpointing enabled. If you disable checkpoints, you can still auto
commit offsets from the underlying Kafka consumer via
properties.enable.auto.commit / properties.auto.commit.interval.ms (btw,
you can pass any Kafka options with a properties.* prefix).

Having explained that, if you set scan.startup-mode and you do not
restore from a checkpoint/savepoint:

* group-offsets -> it will start consuming from the committed offset in
Kafka for the configured group.id, if there are none it should use
properties.auto.offset.reset option

* earliest-offset -> it will ignore committed offsets in Kafka and start
from earliest-offsets.

Hope it helps.

Best,

Dawid

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/kafka.html#kafka-consumers-start-position-configuration

[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html#properties-group-id

On 13/10/2020 07:43, Rex Fenley wrote:
> Hello,
>
> I've been trying to configure the offset start position for a flink
> kafka consumer. when there is no committed offset, to always start at
> the beginning. It seems like the typical way to do this would be
> setting |auto.offset.reset=earliest| however, I don't see that
> configuration property in the documentation.
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
>
> However, I do see |scan.startup.mode = earliest-offset|, but from the
> docs it sounds like this would mean it would never commit an offset
> and flink would always start consuming from the beginning of the kafka
> stream, which is not what I want.
>
> Is this the case or am I misunderstanding? How can I get the behavior
> that I wish to see, where committed offsets are respected, but no
> offset means start at the beginning of the kafka log stream?
>
> Thanks!
> -- 
>
> Rex Fenley | Software Engineer - Mobile and Backend
>
>
> Remind.com |  BLOG
>  |  FOLLOW US
>  |  LIKE US
> 
>


signature.asc
Description: OpenPGP digital signature


Re: [DISCUSS] Remove flink-connector-filesystem module.

2020-10-13 Thread David Anderson
I think the pertinent question is whether there are interesting cases where
the BucketingSink is still a better choice. One case I'm not sure about is
the situation described in docs for the StreamingFileSink under Important
Note 2 [1]:

... upon normal termination of a job, the last in-progress files will
not be transitioned to the “finished” state.

I know this confuses and frustrates users, but I don't know if the
BucketingSink has any advantages in this regard.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/streamfile_sink.html#important-considerations

On Tue, Oct 13, 2020 at 11:06 AM Konstantin Knauf  wrote:

> Given that it has been deprecated for three releases now, I am +1 to
> dropping it.
>
> On Mon, Oct 12, 2020 at 9:38 PM Chesnay Schepler 
> wrote:
>
>> Is there a way for us to change the module (in a reasonable way) that
>> would allow users to continue using it?
>> Is it an API problem, or one of semantics?
>>
>> On 10/12/2020 4:57 PM, Kostas Kloudas wrote:
>> > Hi Chesnay,
>> >
>> > Unfortunately not from what I can see in the code.
>> > This is the reason why I am opening a discussion. I think that if we
>> > supported backwards compatibility, this would have been an easier
>> > process.
>> >
>> > Kostas
>> >
>> > On Mon, Oct 12, 2020 at 4:32 PM Chesnay Schepler 
>> wrote:
>> >> Are older versions of the module compatible with 1.12+?
>> >>
>> >> On 10/12/2020 4:30 PM, Kostas Kloudas wrote:
>> >>> Hi all,
>> >>>
>> >>> As the title suggests, this thread is to discuss the removal of the
>> >>> flink-connector-filesystem module which contains (only) the deprecated
>> >>> BucketingSink. The BucketingSin is deprecated since FLINK 1.9 [1] in
>> >>> favor of the relatively recently introduced StreamingFileSink.
>> >>>
>> >>> For the sake of a clean and more manageable codebase, I propose to
>> >>> remove this module for release-1.12, but of course we should see first
>> >>> if there are any usecases that depend on it.
>> >>>
>> >>> Let's have a fruitful discussion.
>> >>>
>> >>> Cheers,
>> >>> Kostas
>> >>>
>> >>> [1] https://issues.apache.org/jira/browse/FLINK-13396
>> >>>
>>
>>
>
> --
>
> Konstantin Knauf
>
> https://twitter.com/snntrable
>
> https://github.com/knaufk
>


Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2020-10-13 Thread Till Rohrmann
Thanks for starting this discussion Yun Gao,

I have three comments/questions:

1) When restarting all tasks independent of the status at checkpoint time
(finished, running, scheduled), we might allocate more resources than we
actually need to run the remaining job. From a scheduling perspective it
would be easier if we already know that certain subtasks don't need to be
rescheduled. I believe this can be an optimization, though.

2) In the section Compatibility, Deprecation and Migration Plan you
mentioned that you want to record operators in the CompletedCheckpoint
which are fully finished. How will this information be used for
constructing a recovered ExecutionGraph? Why wouldn't the same principle
work for the task level?

3) How will checkpointing work together with fully bounded jobs and FLIP-1
(fine grained recovery)?

Cheers,
Till

On Tue, Oct 13, 2020 at 9:30 AM Yun Gao  wrote:

> Hi Arvid,
>
> Very thanks for the comments!
>
> >>> 4) Yes, the interaction is not trivial and also I have not completely
> thought it through. But in general, I'm currently at the point where I
> think that we also need non-checkpoint related events in unaligned
> checkpoints. So just keep that in mind, that we might converge anyhow at
> this point.
>
> I also agree with that it would be better to keep the unaligned
> checkpoints behavior on EndOfPartition, I will then double check on this
> issue again.
>
> >>> In general, what is helping in this case is to remember that there no
> unaligned checkpoint barrier ever going to overtake EndOfPartition. So, we
> can completely ignore the problem on how to store and restore output
> buffers of a completed task (also important for the next point).
>
> Exactly, we should not need to persist the output buffers for the
> completed tasks, and that would simply the implementation a lot.
>
>
> >>> 5) I think we are on the same page and I completely agree that for
> the MVP/first version, it's completely fine to start and immediately stop.
> A tad better would be even to not even start the procession loop.
>
> I also agree with this part. We would keep optimizing the implementation
> after the first version.
>
>
> Best,
>
> Yun
>
>
>
> --
> From:Arvid Heise 
> Send Time:2020 Oct. 13 (Tue.) 03:39
> To:Yun Gao 
> Cc:Flink Dev ; User-Flink 
> Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
>
> Hi Yun,
>
> 4) Yes, the interaction is not trivial and also I have not completely
> thought it through. But in general, I'm currently at the point where I
> think that we also need non-checkpoint related events in unaligned
> checkpoints. So just keep that in mind, that we might converge anyhow at
> this point.
>
> In general, what is helping in this case is to remember that there no
> unaligned checkpoint barrier ever going to overtake EndOfPartition. So, we
> can completely ignore the problem on how to store and restore output
> buffers of a completed task (also important for the next point).
>
> 5) I think we are on the same page and I completely agree that for the
> MVP/first version, it's completely fine to start and immediately stop. A
> tad better would be even to not even start the procession loop.
>
> On Mon, Oct 12, 2020 at 6:18 PM Yun Gao  wrote:
>
> Hi Arvid,
>
> Very thanks for the insightful comments! I added the responses for this
> issue under the quota:
>
> >> 1) You call the tasks that get the barriers injected leaf nodes, which
> would make the > sinks the root nodes. That is very similar to how graphs
> in relational algebra are labeled. However, I got the feeling that in
> Flink, we rather iterate from sources to sink, making the sources root
> nodes and the sinks the leaf nodes. However, I have no clue how it's done
> in similar cases, so please take that hint cautiously.
>
> >> 2) I'd make the algorithm to find the subtasks iterative and react in
> CheckpointCoordinator. Let's assume that we inject the barrier at all root
> subtasks (initially all sources). So in the iterative algorithm, whenever
> root A finishes, it looks at all connected subtasks B if they have any
> upstream task left. If not B becomes a new root. That would require to only
> touch a part of the job graph, but would require some callback from
> JobManager to CheckpointCoordinator.
>
>
> I think I should have used a bad name of "leaf nodes", in fact I think we
> should have the same thoughts that we start with the source nodes to find
> all the nodes whose precedent nodes are all finished. It would be much
> better to call these nodes (which we would trigger) as "root nodes". I'll
> modify the FLIP to change the names to "root nodes".
>
> >> 2b) We also need to be careful for out-of-sync updates: if the root is
> about to finish, we could send the barrier to it from
> CheckpointCoordinator, but at the time it arrives, the subtask is finished
> already.
>
> Exactly. When the checkpoint triggers a task but found the ta

Re: TaskManager takes abnormally long time to register with JobManager on Kubernetes for Flink 1.11.0

2020-10-13 Thread Till Rohrmann
Hi Weike,

thanks for posting the logs. I will take a look at them. My suspicion would
be that there is some operation blocking the JobMaster's main thread which
causes the registrations from the TMs to time out. Maybe the logs allow me
to validate/falsify this suspicion.

Cheers,
Till

On Mon, Oct 12, 2020 at 10:43 AM DONG, Weike 
wrote:

> Hi community,
>
> I have uploaded the log files of JobManager and TaskManager-1-1 (one of
> the 50 TaskManagers) with DEBUG log level and default Flink configuration,
> and it clearly shows that TaskManager failed to register with JobManager
> after 10 attempts.
>
> Here is the link:
>
> JobManager:
> https://gist.github.com/kylemeow/740c470d9b5a1ab3552376193920adce
>
> TaskManager-1-1:
> https://gist.github.com/kylemeow/41b9a8fe91975875c40afaf58276c2fe
>
> Thanks : )
>
> Best regards,
> Weike
>
>
> On Mon, Oct 12, 2020 at 4:14 PM DONG, Weike 
> wrote:
>
>> Hi community,
>>
>> Recently we have noticed a strange behavior for Flink jobs on Kubernetes
>> per-job mode: when the parallelism increases, the time it takes for the
>> TaskManagers to register with *JobManager *becomes abnormally long (for
>> a task with parallelism of 50, it could take 60 ~ 120 seconds or even
>> longer for the registration attempt), and usually more than 10 attempts are
>> needed to finish this registration.
>>
>> Because of this, we could not submit a job requiring more than 20 slots
>> with the default configuration, as the TaskManager would say:
>>
>>
>>> Registration at JobManager 
>>> (akka.tcp://flink@myjob-201076.default:6123/user/rpc/jobmanager_2)
>>> attempt 9 timed out after 25600 ms
>>
>> Free slot with allocation id 60d5277e138a94fb73fc6691557001e0 because:
>>> The slot 60d5277e138a94fb73fc6691557001e0 has timed out.
>>
>> Free slot TaskSlot(index:0, state:ALLOCATED, resource profile:
>>> ResourceProfile{cpuCores=1., taskHeapMemory=1.425gb
>>> (1530082070 bytes), taskOffHeapMemory=0 bytes, managedMemory=1.340gb
>>> (1438814063 bytes), networkMemory=343.040mb (359703515 bytes)},
>>> allocationId: 60d5277e138a94fb73fc6691557001e0, jobId:
>>> 493cd86e389ccc8f2887e1222903b5ce).
>>> java.lang.Exception: The slot 60d5277e138a94fb73fc6691557001e0 has timed
>>> out.
>>
>>
>> In order to cope with this issue, we have to change the below
>> configuration parameters:
>>
>>>
>>> # Prevent "Could not allocate the required slot within slot request
>>> timeout. Please make sure that the cluster has enough resources. Stopping
>>> the JobMaster for job"
>>> slot.request.timeout: 50
>>
>> # Increase max timeout in a single attempt
>>> cluster.registration.max-timeout: 30
>>> # Prevent "free slot (TaskSlot)"
>>> akka.ask.timeout: 10 min
>>> # Prevent "Heartbeat of TaskManager timed out."
>>> heartbeat.timeout: 50
>>
>>
>> However, we acknowledge that this is only a temporary dirty fix, which is
>> not what we want. It could be seen that during TaskManager registration to
>> JobManager, lots of warning messages come out in logs:
>>
>> No hostname could be resolved for the IP address 9.166.0.118, using IP
>>> address as host name. Local input split assignment (such as for HDFS files)
>>> may be impacted.
>>
>>
>> Initially we thought this was probably the cause (reverse lookup of DNS
>> might take up a long time), however we later found that the reverse lookup
>> only took less than 1ms, so maybe not because of this.
>>
>> Also, we have checked the GC log of both TaskManagers and JobManager, and
>> they seem to be perfectly normal, without any signs of pauses. And the
>> heartbeats are processed as normal according to the logs.
>>
>> Moreover, TaskManagers register quickly with ResourceManager, but then
>> extra slow with TaskManager, so this is not because of a slow network
>> connection.
>>
>> Here we wonder what could be the cause for the slow registration between
>> JobManager and TaskManager(s)? No other warning or error messages in the
>> log (DEBUG level) other than the "No hostname could be resolved" messages,
>> which is quite weird.
>>
>> Thanks for the reading, and hope to get some insights into this issues : )
>>
>> Sincerely,
>> Weike
>>
>>
>>
>


Re: Flink is failing for all Jobs if one job gets failed

2020-10-13 Thread Dawid Wysakowicz
Hi,

As far as I understand it, it is not a Flink problem. It's your code
that is failling to compile the code it gets. It's also quite hard to
actually figure out how it is used from within Flink.

Best,

Dawid

On 13/10/2020 10:42, saksham sapra wrote:
>
> I am working on flink local, i have created one task manager which
> pushes the request to flink. So if one job gets failed for some
> function error, then other jobs which were running correctly before
> the error came for one file ,  the new jobs fail automatically if
> configuration for that file is different also.
>
> It seems to flink cache issues, not sure. If anyone has any idea let
> me know.
> Please find the log attached.
>
> 2020-10-13 13:57:13
> java.lang.Exception: The user defined 'open(Configuration)' method in
> class
> com.opus.optimus.toolset.batch.workflow.runtime.flink.adapter.AdapterMultiTypeRichFlatMapFunction
> caused an exception: Failed to compile class
> name(com.opus.optimus.toolset.batch.workflow.di2.NG100FileReaderd8d76151019a4e9985627bd9b73409d4InboundAdapter256253037297300)
>
> code:
> public class
> NG100FileReaderd8d76151019a4e9985627bd9b73409d4InboundAdapter256253037297300
> implements IInboundContextAdapter {
> String convert(IndexedMapContext context) {
> return nG9(context);
> }
> public String adapt(IContext context) {
> return convert((IndexedMapContext)context);
> }
> String nG9(IndexedMapContext context) {
> return (String)context.get(2);}
> }
> at
> org.apache.flink.runtime.operators.BatchTask.openUserCode(BatchTask.java:1351)
> at
> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.openTask(ChainedFlatMapDriver.java:47)
> at
> org.apache.flink.runtime.operators.BatchTask.openChainedTasks(BatchTask.java:1391)
> at
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:157)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.base/java.lang.Thread.run(Thread.java:834)
> Caused by: java.lang.RuntimeException: Failed to compile class
> name(com.opus.optimus.toolset.batch.workflow.di2.NG100FileReaderd8d76151019a4e9985627bd9b73409d4InboundAdapter256253037297300)
>
> code:
> package com.opus.optimus.toolset.batch.workflow.di2;
> import com.opus.optimus.toolset.batch.workflow.dynamic.IndexedMapContext;
> import com.opus.optimus.toolset.batch.workflow.runtime.common.IContext;
> import
> com.opus.optimus.toolset.batch.workflow.runtime.common.IInboundContextAdapter;
> import java.lang.String;
> public class
> NG100FileReaderd8d76151019a4e9985627bd9b73409d4InboundAdapter256253037297300
> implements IInboundContextAdapter {
> String convert(IndexedMapContext context) {
> return nG9(context);
> }
> public String adapt(IContext context) {
> return convert((IndexedMapContext)context);
> }
> String nG9(IndexedMapContext context) {
> return (String)context.get(2);}
> }
> at
> com.opus.optimus.toolset.code.generator.compiler.ScriptCompilerUtil.newInstance(ScriptCompilerUtil.java:11)
> at
> com.opus.optimus.toolset.batch.workflow.dynamic.analyzer.JavaCodeInboundContextAdapter.compile(JavaCodeInboundContextAdapter.java:31)
> at
> com.opus.optimus.toolset.batch.workflow.runtime.flink.util.AdapterUtil.compileAdapters(AdapterUtil.java:31)
> at
> com.opus.optimus.toolset.batch.workflow.runtime.flink.util.AdapterUtil.compileAdapters(AdapterUtil.java:47)
> at
> com.opus.optimus.toolset.batch.workflow.runtime.flink.adapter.AdapterMultiTypeRichFlatMapFunction.compileScripts(AdapterMultiTypeRichFlatMapFunction.java:65)
> at
> com.opus.optimus.toolset.batch.workflow.runtime.flink.adapter.AdapterMultiTypeRichFlatMapFunction.open(AdapterMultiTypeRichFlatMapFunction.java:60)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at
> org.apache.flink.runtime.operators.BatchTask.openUserCode(BatchTask.java:1349)
> ... 6 more
> Caused by: java.lang.ClassNotFoundException: File
> 'com/opus/optimus/generated/excel/NG1256203349277500.java', Line 11,
> Column 35: No applicable constructor/method found for actual
> parameters "java.lang.String"; candidates are: "public static
> java.lang.Integer
> com.opus.optimus.toolset.batch.workflow.function.standard.common.excel.ExcelFunctions.indexOf(java.lang.String,
> java.lang.String)"
> at
> org.codehaus.janino.JavaSourceClassLoader.generateBytecodes(JavaSourceClassLoader.java:228)
> at
> org.codehaus.janino.JavaSourceClassLoader.findClass(JavaSourceClassLoader.java:182)
> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:588)
> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
> at
> com.opus.optimus.toolset.code.generator.compiler.JaninoCompiler.getClass(JaninoCompiler.java:18)
> at
> com.opus.optimus.toolset.code.generator.compiler.ScriptCompilerUtil.newInstance(ScriptCompilerUtil.java:7)
> ... 13 more
> Caused 

Re: Additional options to S3 Filesystem: Interest?

2020-10-13 Thread Padarn Wilson
Great. Thanks.

On Tue, Oct 13, 2020 at 4:29 PM Arvid Heise  wrote:

> Hi Padarn,
>
> I assigned the ticket to you, so you can start working on it. Here are
> some contribution guidelines [1] in case it's your first contribution.
>
> Basically, you will need to open a PR which contains the ticket and
> component. So the prefix should be "[FLINK-19589][s3]" (also for your
> commits).
>
> Feel free to reach out to me if you have any questions about the process.
> All discussions about the feature should be on the ticket, so everyone can
> see it.
>
> [1] https://flink.apache.org/contributing/contribute-code.html
>
> On Tue, Oct 13, 2020 at 3:37 AM Padarn Wilson  wrote:
>
>> Thanks for the feedback. I've created a JIRA here
>> https://issues.apache.org/jira/browse/FLINK-19589.
>>
>> @Dan: This indeed would make it easier to set a lifetime property on
>> objects created by Flink, but actually if you want to apply it to all your
>> objects for a given bucket you can set bucket wide policies instead. The
>> reason I want this is that we have a shared bucket and wish to tag
>> different objects based on which pipeline is producing them.
>>
>> On Tue, Oct 13, 2020 at 4:13 AM Dan Diephouse  wrote:
>>
>>> We use the StreamingFileSink. An option to expire files after some time
>>> period would certainly be welcome. (I could probably figure out a way to do
>>> this from the S3 admin UI too though)
>>>
>>> On Sat, Oct 10, 2020 at 10:45 PM Padarn Wilson  wrote:
>>>
 Hi Flink Users,

 We need to expose some additional options for the s3 hadoop filesystem:
 Specifically, we want to set object tagging and lifecycle. This would be a
 fairly easy change and we initially thought to create a new Filsystem with
 very minor changes to allow this.

 However then I wondered, would others use this? If it something that is
 worth raising as a Flink issue and then contributing back upstream.

 Any others who would like to be able to set object tags for the s3
 filesystem?

 Cheers,
 Padarn

>>>
>>>
>>> --
>>> Dan Diephouse
>>> @dandiep
>>>
>>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


Re: TaskManager takes abnormally long time to register with JobManager on Kubernetes for Flink 1.11.0

2020-10-13 Thread Till Rohrmann
Hi Weike,

could you try setting kubernetes.jobmanager.cpu: 4 in your flink-conf.yaml?
I fear that a single CPU is too low for the JobManager component.

Cheers,
Till

On Tue, Oct 13, 2020 at 11:33 AM Till Rohrmann  wrote:

> Hi Weike,
>
> thanks for posting the logs. I will take a look at them. My suspicion
> would be that there is some operation blocking the JobMaster's main thread
> which causes the registrations from the TMs to time out. Maybe the logs
> allow me to validate/falsify this suspicion.
>
> Cheers,
> Till
>
> On Mon, Oct 12, 2020 at 10:43 AM DONG, Weike 
> wrote:
>
>> Hi community,
>>
>> I have uploaded the log files of JobManager and TaskManager-1-1 (one of
>> the 50 TaskManagers) with DEBUG log level and default Flink configuration,
>> and it clearly shows that TaskManager failed to register with JobManager
>> after 10 attempts.
>>
>> Here is the link:
>>
>> JobManager:
>> https://gist.github.com/kylemeow/740c470d9b5a1ab3552376193920adce
>>
>> TaskManager-1-1:
>> https://gist.github.com/kylemeow/41b9a8fe91975875c40afaf58276c2fe
>>
>> Thanks : )
>>
>> Best regards,
>> Weike
>>
>>
>> On Mon, Oct 12, 2020 at 4:14 PM DONG, Weike 
>> wrote:
>>
>>> Hi community,
>>>
>>> Recently we have noticed a strange behavior for Flink jobs on Kubernetes
>>> per-job mode: when the parallelism increases, the time it takes for the
>>> TaskManagers to register with *JobManager *becomes abnormally long (for
>>> a task with parallelism of 50, it could take 60 ~ 120 seconds or even
>>> longer for the registration attempt), and usually more than 10 attempts are
>>> needed to finish this registration.
>>>
>>> Because of this, we could not submit a job requiring more than 20 slots
>>> with the default configuration, as the TaskManager would say:
>>>
>>>
 Registration at JobManager 
 (akka.tcp://flink@myjob-201076.default:6123/user/rpc/jobmanager_2)
 attempt 9 timed out after 25600 ms
>>>
>>> Free slot with allocation id 60d5277e138a94fb73fc6691557001e0 because:
 The slot 60d5277e138a94fb73fc6691557001e0 has timed out.
>>>
>>> Free slot TaskSlot(index:0, state:ALLOCATED, resource profile:
 ResourceProfile{cpuCores=1., taskHeapMemory=1.425gb
 (1530082070 bytes), taskOffHeapMemory=0 bytes, managedMemory=1.340gb
 (1438814063 bytes), networkMemory=343.040mb (359703515 bytes)},
 allocationId: 60d5277e138a94fb73fc6691557001e0, jobId:
 493cd86e389ccc8f2887e1222903b5ce).
 java.lang.Exception: The slot 60d5277e138a94fb73fc6691557001e0 has
 timed out.
>>>
>>>
>>> In order to cope with this issue, we have to change the below
>>> configuration parameters:
>>>

 # Prevent "Could not allocate the required slot within slot request
 timeout. Please make sure that the cluster has enough resources. Stopping
 the JobMaster for job"
 slot.request.timeout: 50
>>>
>>> # Increase max timeout in a single attempt
 cluster.registration.max-timeout: 30
 # Prevent "free slot (TaskSlot)"
 akka.ask.timeout: 10 min
 # Prevent "Heartbeat of TaskManager timed out."
 heartbeat.timeout: 50
>>>
>>>
>>> However, we acknowledge that this is only a temporary dirty fix, which
>>> is not what we want. It could be seen that during TaskManager registration
>>> to JobManager, lots of warning messages come out in logs:
>>>
>>> No hostname could be resolved for the IP address 9.166.0.118, using IP
 address as host name. Local input split assignment (such as for HDFS files)
 may be impacted.
>>>
>>>
>>> Initially we thought this was probably the cause (reverse lookup of DNS
>>> might take up a long time), however we later found that the reverse lookup
>>> only took less than 1ms, so maybe not because of this.
>>>
>>> Also, we have checked the GC log of both TaskManagers and JobManager,
>>> and they seem to be perfectly normal, without any signs of pauses. And the
>>> heartbeats are processed as normal according to the logs.
>>>
>>> Moreover, TaskManagers register quickly with ResourceManager, but then
>>> extra slow with TaskManager, so this is not because of a slow network
>>> connection.
>>>
>>> Here we wonder what could be the cause for the slow registration between
>>> JobManager and TaskManager(s)? No other warning or error messages in the
>>> log (DEBUG level) other than the "No hostname could be resolved" messages,
>>> which is quite weird.
>>>
>>> Thanks for the reading, and hope to get some insights into this issues :
>>> )
>>>
>>> Sincerely,
>>> Weike
>>>
>>>
>>>
>>


Is MapState tied to Operator Input & Output type?

2020-10-13 Thread Arpith P
Hi,


I’ve a *ProcessFunction* which initially was receiving input & output type
of String (1) & inside *processElement* I was updating MapState. Now I have
changed the Input & Output type to be Map, String (2), but if I restore
from the last checkpoint folder MapState is coming in as empty. I’ve
checked that checkpoint folder actually saves data (i.e. Files size > 1GB).
Does map state tied with ProcessFunction input & output type, if not why
doesn't mapstate get restored.



(1)

   public class TestProcess extends ProcessFunction implements
CheckpointedFunction

(2)

public class TestProcess extends ProcessFunction, String> implements CheckpointedFunction


Re: Is MapState tied to Operator Input & Output type?

2020-10-13 Thread Yun Tang
Hi

The type of map state is not directly related with input & output type, this is 
only related with how you define the state descriptor.

  *   Have you ever changed the state descriptor after changing the type of 
input/output type?
  *   Have you assigned the id [1] to the operator which using the 
'TestProcess'? The state might not be restored if you change your code without 
id assigned.

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#assigning-operator-ids

Best
Yun Tang


From: Arpith P 
Sent: Tuesday, October 13, 2020 19:26
To: user 
Subject: Is MapState tied to Operator Input & Output type?


Hi,


I’ve a ProcessFunction which initially was receiving input & output type of 
String (1) & inside processElement I was updating MapState. Now I have changed 
the Input & Output type to be Map, String (2), but if I restore from the last 
checkpoint folder MapState is coming in as empty. I’ve checked that checkpoint 
folder actually saves data (i.e. Files size > 1GB). Does map state tied with 
ProcessFunction input & output type, if not why doesn't mapstate get restored.



(1)

   public class TestProcess extends ProcessFunction 
implements CheckpointedFunction

(2)

public class TestProcess extends ProcessFunction, 
String> implements CheckpointedFunction


why this pyflink code has no output?

2020-10-13 Thread ??????
My code is:
https://paste.ubuntu.com/p/KqpKwTw5zH/




My step is:
$FLINK_HOME/bin/flink run -py /wordcount.py
the sentence"data.output()"has no output.




where am I wrong in above code?


Thanks for your help

Re: Is MapState tied to Operator Input & Output type?

2020-10-13 Thread Arpith P
Hi Yun,

Neither state descriptor type or name changed. I did assign an ID as well
but it didn't help me. What I'm trying to do is I have two stream A & B
which I want to connect/process in C; I eventually want values from stream
A to be saved in C's MapState. What I've tried is I used ConnectedStream to
connect both A(keyedstream) & B and processed in CoProcessFunction C, but
looks like in CoprocessFunction doesn't have MapState as I'm getting
functionInitializationContext.getKeyedStore as null. Is it possible to
access MapState inside CoProcessFunction.

Arpith

On Tue, Oct 13, 2020 at 5:18 PM Yun Tang  wrote:

> Hi
>
> The type of map state is not directly related with input & output type,
> this is only related with how you define the state descriptor.
>
>- Have you ever changed the state descriptor after changing the type
>of input/output type?
>- Have you assigned the id [1] to the operator which using the
>'TestProcess'? The state might not be restored if you change your code
>without id assigned.
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#assigning-operator-ids
>
> Best
> Yun Tang
>
> --
> *From:* Arpith P 
> *Sent:* Tuesday, October 13, 2020 19:26
> *To:* user 
> *Subject:* Is MapState tied to Operator Input & Output type?
>
>
> Hi,
>
>
> I’ve a *ProcessFunction* which initially was receiving input & output
> type of String (1) & inside *processElement* I was updating MapState. Now
> I have changed the Input & Output type to be Map, String (2), but if I
> restore from the last checkpoint folder MapState is coming in as empty.
> I’ve checked that checkpoint folder actually saves data (i.e. Files size >
> 1GB). Does map state tied with ProcessFunction input & output type, if not
> why doesn't mapstate get restored.
>
>
>
> (1)
>
>public class TestProcess extends ProcessFunction 
> implements
> CheckpointedFunction
>
> (2)
>
> public class TestProcess extends ProcessFunction, 
> String> implements CheckpointedFunction
>
>


Externally load data into RocksDB backend

2020-10-13 Thread Arpith P
Hi,

Is it possible to load data to RocksDB backend externally(outside Flink )
using  common dbPath, so that it will be available to MapState inside
ProcessFunction. I've external data available in JSON format which I want
to load to RocksDb. One option is to make Stream from the JSON path and
process it inside ProcessFunction but since I can't modify existing
ProcessFunction ;I'm thinking of alternate ways of doing it.

Thanks,
Arpith


Re: Dynamic file name prefix - StreamingFileSink

2020-10-13 Thread Vijayendra Yadav
*Thanks Ravi. I got following Error:*
[ERROR] DynamicOutputFileConfig.scala:21: error: method getPartPrefix
overrides nothing
[ERROR]   override def getPartPrefix: String = if(partPrefixFunction ==
null) partPrefix else partPrefixFunction.apply()
[ERROR]^
[ERROR] DynamicOutputFileConfig.scala:26: error: method getPartSuffix
overrides nothing
[ERROR]   override def getPartSuffix: String = if(partSuffixFunction ==
null) partSuffix else partSuffixFunction.apply()



On Tue, Oct 13, 2020 at 7:29 AM Ravi Bhushan Ratnakar <
ravibhushanratna...@gmail.com> wrote:

> Hi Vijayendra,
>
> OutputFileConfig provides a builder method to create immutable objects with 
> given 'prefix' and 'suffix'. The parameter which you are passing to 
> '*withPartPrefix*' will only be evaluated at the time of calling this method 
> '*withPartPrefix*'. So if you want to achieve a dynamic 'prefix' or 'suffix' 
> then you may try to have your own custom implementation of 'OutputFileConfig' 
> which could provide a way to set function definition for 'prefix' or 
> 'suffix'. For the same, I am attaching you a sample implementation. Kindly 
> make sure that the function definition which you are passing is serializable.
>
>
> Use like this
>
> val outputFileConfig:OutputFileConfig = new DynamicOutputFileConfig()
>   
> .withPartPrefixFunction(()=>ZonedDateTime.now.withZoneSameInstant(ZoneId.of("UTC")).format(DateTimeFormatter.ofPattern("-MM-dd-HH-mm-ss-SSS")))
>   .withPartSuffixFunction(()=> ".ext")
>
>
> Regards,
> Ravi
>
> On Tue, Oct 13, 2020 at 6:05 AM Vijayendra Yadav 
> wrote:
>
>> Hi Team,
>>
>> I have tried to assign a dynamic prefix for file name, which contains
>> datetime components.
>> *The Problem is Job always takes initial datetime when job first starts
>> and never refreshes later. *
>> *How can I get dynamic current datetime in filename at sink time ?*
>>
>> *.withPartPrefix
>> (ZonedDateTime.now.withZoneSameInstant(ZoneId.of("UTC")).format(DateTimeFormatter.ofPattern("-MM-dd-HH-mm-ss-SSS")))*
>>
>>
>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
>>
>> val config = OutputFileConfig
>>  .builder() .withPartPrefix("prefix")
>>  .withPartSuffix(".ext")
>>  .build()
>> val sink = StreamingFileSink
>>  .forRowFormat(new Path(outputPath), new 
>> SimpleStringEncoder[String]("UTF-8"))
>>  .withBucketAssigner(new KeyBucketAssigner())
>>  .withRollingPolicy(OnCheckpointRollingPolicy.build()) 
>> .withOutputFileConfig(config)
>>  .build()
>>
>>


Re: Flink Kafka offsets

2020-10-13 Thread Rex Fenley
Thanks for the explanation, this was all super helpful.

On Tue, Oct 13, 2020 at 2:16 AM Dawid Wysakowicz 
wrote:

> Hey Rex,
>
> I agree the documentation might be slightly misleading. To get the full
> picture of that configuration I'd suggest having a look at the DataStream
> Kafka connector page[1]. The Table connector is just a wrapper around the
> DataStream one.
>
> Let me also try to clarify it a bit more. In case of Flink there are two
> places where the offsets are committed:
>
> 1) Flink's checkpoint/savepoint. Those always take the highest priority.
> Therefore e.g. when the job is restarted because of a failure, it will use
> offsets that were stored in the last successful checkpoint.
>
> 2) Upon a checkpoint Flink can also write the offsets back to Kafka. This
> is enabled by default in DataStream API and is enabled in Table API if you
> provide properties.group.id[2]. This works only if you have checkpointing
> enabled. If you disable checkpoints, you can still auto commit offsets from
> the underlying Kafka consumer via properties.enable.auto.commit /
> properties.auto.commit.interval.ms (btw, you can pass any Kafka options
> with a properties.* prefix).
>
> Having explained that, if you set scan.startup-mode and you do not restore
> from a checkpoint/savepoint:
>
> * group-offsets -> it will start consuming from the committed offset in
> Kafka for the configured group.id, if there are none it should use
> properties.auto.offset.reset option
>
> * earliest-offset -> it will ignore committed offsets in Kafka and start
> from earliest-offsets.
>
> Hope it helps.
>
> Best,
>
> Dawid
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/kafka.html#kafka-consumers-start-position-configuration
>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html#properties-group-id
> On 13/10/2020 07:43, Rex Fenley wrote:
>
> Hello,
>
> I've been trying to configure the offset start position for a flink kafka
> consumer. when there is no committed offset, to always start at the
> beginning. It seems like the typical way to do this would be setting
> auto.offset.reset=earliest however, I don't see that configuration
> property in the documentation.
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
>
> However, I do see scan.startup.mode = earliest-offset, but from the docs
> it sounds like this would mean it would never commit an offset and flink
> would always start consuming from the beginning of the kafka stream, which
> is not what I want.
>
> Is this the case or am I misunderstanding? How can I get the behavior that
> I wish to see, where committed offsets are respected, but no offset means
> start at the beginning of the kafka log stream?
>
> Thanks!
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG   |
>  FOLLOW US   |  LIKE US
> 
>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



Re: Externally load data into RocksDB backend

2020-10-13 Thread Akshay Aggarwal
Hi Arpith,

You should look into the State Processor API [1], it can be used to
bootstrap state, and also for reading and modifying existing state.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html#writing-new-savepoints


Thanks,
Akshay Aggarwal

On Tue, Oct 13, 2020 at 10:31 PM Arpith P  wrote:

> Hi,
>
> Is it possible to load data to RocksDB backend externally(outside Flink )
> using  common dbPath, so that it will be available to MapState inside
> ProcessFunction. I've external data available in JSON format which I want
> to load to RocksDb. One option is to make Stream from the JSON path and
> process it inside ProcessFunction but since I can't modify existing
> ProcessFunction ;I'm thinking of alternate ways of doing it.
>
> Thanks,
> Arpith
>

-- 


*-*

*This email and any files transmitted with it are confidential and 
intended solely for the use of the individual or entity to whom they are 
addressed. If you have received this email in error, please notify the 
system manager. This message contains confidential information and is 
intended only for the individual named. If you are not the named addressee, 
you should not disseminate, distribute or copy this email. Please notify 
the sender immediately by email if you have received this email by mistake 
and delete this email from your system. If you are not the intended 
recipient, you are notified that disclosing, copying, distributing or 
taking any action in reliance on the contents of this information is 
strictly prohibited.*

 

*Any views or opinions presented in this 
email are solely those of the author and do not necessarily represent those 
of the organization. Any information on shares, debentures or similar 
instruments, recommended product pricing, valuations and the like are for 
information purposes only. It is not meant to be an instruction or 
recommendation, as the case may be, to buy or to sell securities, products, 
services nor an offer to buy or sell securities, products or services 
unless specifically stated to be so on behalf of the Flipkart group. 
Employees of the Flipkart group of companies are expressly required not to 
make defamatory statements and not to infringe or authorise any 
infringement of copyright or any other legal right by email communications. 
Any such communication is contrary to organizational policy and outside the 
scope of the employment of the individual concerned. The organization will 
not accept any liability in respect of such communication, and the employee 
responsible will be personally liable for any damages or other liability 
arising.*

 

*Our organization accepts no liability for the 
content of this email, or for the consequences of any actions taken on the 
basis of the information *provided,* unless that information is 
subsequently confirmed in writing. If you are not the intended recipient, 
you are notified that disclosing, copying, distributing or taking any 
action in reliance on the contents of this information is strictly 
prohibited.*

_-_


Re: Is MapState tied to Operator Input & Output type?

2020-10-13 Thread Yun Tang
Hi Arpith

I'm afraid that you're totally talking about the wrong thing in previous 
thread. The root cause is not restoring state from checkpoint but not access 
the state legally.
Have you ever add keyBy before process your function as doc's note [1] said: 
"If you want to access keyed state and timers you have to apply the 
ProcessFunction on a keyed stream"?


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/process_function.html#the-processfunction

Best
Yun Tang

From: Arpith P 
Sent: Tuesday, October 13, 2020 22:20
To: Yun Tang 
Cc: user 
Subject: Re: Is MapState tied to Operator Input & Output type?

Hi Yun,

Neither state descriptor type or name changed. I did assign an ID as well but 
it didn't help me. What I'm trying to do is I have two stream A & B which I 
want to connect/process in C; I eventually want values from stream A to be 
saved in C's MapState. What I've tried is I used ConnectedStream to connect 
both A(keyedstream) & B and processed in CoProcessFunction C, but looks like in 
CoprocessFunction doesn't have MapState as I'm getting 
functionInitializationContext.getKeyedStore as null. Is it possible to access 
MapState inside CoProcessFunction.

Arpith

On Tue, Oct 13, 2020 at 5:18 PM Yun Tang 
mailto:myas...@live.com>> wrote:
Hi

The type of map state is not directly related with input & output type, this is 
only related with how you define the state descriptor.

  *   Have you ever changed the state descriptor after changing the type of 
input/output type?
  *   Have you assigned the id [1] to the operator which using the 
'TestProcess'? The state might not be restored if you change your code without 
id assigned.

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#assigning-operator-ids

Best
Yun Tang


From: Arpith P mailto:arpitht...@gmail.com>>
Sent: Tuesday, October 13, 2020 19:26
To: user mailto:user@flink.apache.org>>
Subject: Is MapState tied to Operator Input & Output type?


Hi,


I’ve a ProcessFunction which initially was receiving input & output type of 
String (1) & inside processElement I was updating MapState. Now I have changed 
the Input & Output type to be Map, String (2), but if I restore from the last 
checkpoint folder MapState is coming in as empty. I’ve checked that checkpoint 
folder actually saves data (i.e. Files size > 1GB). Does map state tied with 
ProcessFunction input & output type, if not why doesn't mapstate get restored.



(1)

   public class TestProcess extends ProcessFunction 
implements CheckpointedFunction

(2)

public class TestProcess extends ProcessFunction, 
String> implements CheckpointedFunction


Re: NPE when checkpointing

2020-10-13 Thread Binh Nguyen Van
Hi,

Sorry for the late reply. It took me quite a while to change the JDK
version to reproduce the issue. I confirmed that if I upgrade to a newer
JDK version (I tried with JDK 1.8.0_265) the issue doesn’t happen.

Thank you for helping
-Binh

On Fri, Oct 9, 2020 at 11:36 AM Piotr Nowojski  wrote:

> Hi Binh,
>
> Could you try upgrading Flink's Java runtime? It was previously reported
> that upgrading to jdk1.8.0_251 was solving the problem.
>
> Piotrek
>
> pt., 9 paź 2020 o 19:41 Binh Nguyen Van  napisał(a):
>
>> Hi,
>>
>> Thank you for helping me!
>> The code is compiled on
>>
>> java version "1.8.0_161"
>> Java(TM) SE Runtime Environment (build 1.8.0_161-b12)
>> Java HotSpot(TM) 64-Bit Server VM (build 25.161-b12, mixed mode)
>>
>> But I just checked our Hadoop and its Java version is
>>
>> java version "1.8.0_77"
>> Java(TM) SE Runtime Environment (build 1.8.0_77-b03)
>> Java HotSpot(TM) 64-Bit Server VM (build 25.77-b03, mixed mode)
>>
>> Thanks
>> -Binh
>>
>> On Fri, Oct 9, 2020 at 10:23 AM Piotr Nowojski 
>> wrote:
>>
>>> Hi,
>>>
>>> One more thing. It looks like it's not a Flink issue, but some JDK bug.
>>> Others reported that upgrading JDK version (for example to  jdk1.8.0_251)
>>> seemed to be solving this problem. What JDK version are you using?
>>>
>>> Piotrek
>>>
>>> pt., 9 paź 2020 o 17:59 Piotr Nowojski 
>>> napisał(a):
>>>
 Hi,

 Thanks for reporting the problem. I think this is a known issue [1] on
 which we are working to fix.

 Piotrek

 [1] https://issues.apache.org/jira/browse/FLINK-18196

 pon., 5 paź 2020 o 08:54 Binh Nguyen Van 
 napisał(a):

> Hi,
>
> I have a streaming job that is written in Apache Beam and uses Flink
> as its runner. The job is working as expected for about 15 hours and then
> it started to have checkpointing error. The error message looks like this
>
> java.lang.Exception: Could not perform checkpoint 910 for operator 
> Source:  (8/60).
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:785)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$3(StreamTask.java:760)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:469)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1394)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:974)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:870)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:843)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:776)
> ... 11 more
>
> When this happened, I have to stop the job and then start it again,
> and then 15 hours later the issue happens again.
>
> Here are some additional information
>
>- Flink version is 1.10.1
>- Job reads data from Kafka, transform, and then writes to Kafka
>- There are 6 tasks with the parallelism of 60 each (each task
>reads from 1 Kafka topic)
>- The job is deployed to run on YARN with 60 task managers and
>each task manager has 1 slot
>- The State backend is filesystem and HDFS is the storage (Doesn’t
>seem to related to the type of state backend since the issue also 
> happened
>when I use memory as the state backend)
>- The checkpointing interval is 60 seconds (The longest duration
>of the normal checkpoint as shown in Flink UI is 14 seconds)
>- The minimum pause between checkpoints is 30 seconds
>- Hado

Re: Externally load data into RocksDB backend

2020-10-13 Thread Daksh Talwar
I believe you're trying to bootstrap state for an operator. If yes, then it
might be worthwhile to check out State

Processor API

.

Cheers,
Daksh


On Tue, Oct 13, 2020 at 10:31 PM Arpith P  wrote:

> Hi,
>
> Is it possible to load data to RocksDB backend externally(outside Flink )
> using  common dbPath, so that it will be available to MapState inside
> ProcessFunction. I've external data available in JSON format which I want
> to load to RocksDb. One option is to make Stream from the JSON path and
> process it inside ProcessFunction but since I can't modify existing
> ProcessFunction ;I'm thinking of alternate ways of doing it.
>
> Thanks,
> Arpith
>

-- 


*-*

*This email and any files transmitted with it are confidential and 
intended solely for the use of the individual or entity to whom they are 
addressed. If you have received this email in error, please notify the 
system manager. This message contains confidential information and is 
intended only for the individual named. If you are not the named addressee, 
you should not disseminate, distribute or copy this email. Please notify 
the sender immediately by email if you have received this email by mistake 
and delete this email from your system. If you are not the intended 
recipient, you are notified that disclosing, copying, distributing or 
taking any action in reliance on the contents of this information is 
strictly prohibited.*

 

*Any views or opinions presented in this 
email are solely those of the author and do not necessarily represent those 
of the organization. Any information on shares, debentures or similar 
instruments, recommended product pricing, valuations and the like are for 
information purposes only. It is not meant to be an instruction or 
recommendation, as the case may be, to buy or to sell securities, products, 
services nor an offer to buy or sell securities, products or services 
unless specifically stated to be so on behalf of the Flipkart group. 
Employees of the Flipkart group of companies are expressly required not to 
make defamatory statements and not to infringe or authorise any 
infringement of copyright or any other legal right by email communications. 
Any such communication is contrary to organizational policy and outside the 
scope of the employment of the individual concerned. The organization will 
not accept any liability in respect of such communication, and the employee 
responsible will be personally liable for any damages or other liability 
arising.*

 

*Our organization accepts no liability for the 
content of this email, or for the consequences of any actions taken on the 
basis of the information *provided,* unless that information is 
subsequently confirmed in writing. If you are not the intended recipient, 
you are notified that disclosing, copying, distributing or taking any 
action in reliance on the contents of this information is strictly 
prohibited.*

_-_


Required context properties mismatch in connecting the flink with mysql database

2020-10-13 Thread xi sizhe
I am using flink latest (1.11.2) to work with a sample mysql database,
which the database is working fine.

Additionally, i have added the flink-connector-jdbc_2.11-1.11.2,
mysql-connector-java-8.0.21.jar, postgresql-42.2.17.jar to the {FLINK}/lib

Here is my code

T_CONFIG = TableConfig()
B_EXEC_ENV = ExecutionEnvironment.get_execution_environment()
B_EXEC_ENV.set_parallelism(1)
BT_ENV = BatchTableEnvironment.create(B_EXEC_ENV, T_CONFIG)

ddl = """
CREATE TABLE nba_player4 (
 first_name STRING ,
 last_name STRING,
 email STRING,
 id INT
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/inventory',
'username' = 'root',
'password' = 'debezium',
'table-name' = 'customers'
)
  """;
BT_ENV.sql_update(ddl)

sinkddl = """
CREATE TABLE print_table (
 f0 INT,
 f1 INT,
 f2 STRING,
 f3 DOUBLE
) WITH (
 'connector' = 'print'
)
  """;
BT_ENV.sql_update(sinkddl)


sqlquery("SELECT first_name, last_name  FROM nba_player4 ");
BT_ENV.execute("table_job")

However when running the code, it come up with error saying

py4j.protocol.Py4JJavaError: An error occurred while calling o23.sqlQuery.
: org.apache.flink.table.api.ValidationException: SQL validation
failed. findAndCreateTableSource failed.

Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
Could not find a suitable table factory for
'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Reason: Required context properties mismatch.

The following properties are requested:
connector=jdbc
password=debezium
schema.0.data-type=VARCHAR(2147483647)schema.0.name=first_name
schema.1.data-type=VARCHAR(2147483647)schema.1.name=last_name
schema.2.data-type=VARCHAR(2147483647)schema.2.name=email
schema.3.data-type=INTschema.3.name=id
table-name=customers
url=jdbc:mysql://localhost:3306/inventory
username=root

The following factories have been considered:
org.apache.flink.connector.jdbc.table.JdbcTableSourceSinkFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
org.apache.flink.table.filesystem.FileSystemTableFactory

latest:

this is my docker yml file.

version: '2.1'
services:
  jobmanager:
build: .
image: flink:latest
hostname: "jobmanager"
expose:
  - "6123"
ports:
  - "8081:8081"
command: jobmanager
environment:
  - JOB_MANAGER_RPC_ADDRESS=jobmanager
  taskmanager:
image: flink:latest
expose:
  - "6121"
  - "6122"
depends_on:
  - jobmanager
command: taskmanager
links:
  - jobmanager:jobmanager
environment:
  - JOB_MANAGER_RPC_ADDRESS=jobmanager
  mysql:
image: debezium/example-mysql
ports:
 - "3306:3306"
environment:
 - MYSQL_ROOT_PASSWORD=debezium
 - MYSQL_USER=mysqluser
 - MYSQL_PASSWORD=mysqlpw

docker ps commands show out

CONTAINER IDIMAGE   COMMAND
  CREATED STATUS  PORTS
NAMES
cf84c84f7821flink  "/docker-entrypoint.…"   2 minutes ago
 Up 2 minutes6121-6123/tcp, 8081/tcp
   _taskmanager_1
09b19142d70aflink  "/docker-entrypoint.…"   9 minutes ago
 Up 9 minutes6123/tcp, 0.0.0.0:8081->8081/tcp
   _jobmanager_1
4ac01eb11bf7debezium/example-mysql  "docker-entrypoint.s…"
  3 days ago  Up 9 minutes0.0.0.0:3306->3306/tcp,
33060/tcpkeras-flask-dep

more info:

*my current flink environment* in docker is flink:scala_2.12-java8

docker pull flink:scala_2.12-java8

*pyflink jdbc connector* is flink-connector-jdbc_2.11-1.11.2.jar from flink
1.11 version.

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html

in order to use the jdbc library, I tried two ways

   1.

   save the flink-connector-jdbc_2.11-1.11.2.jar into
   /usr/local/lib/python3.7/site-packages/pyflink/lib
   2.

   configure the classpath in the python app

base_dir = "/Users/huhu/Documents/projects/webapp/libs/"
flink_jdbc_jar = f"file://{base_dir}flink-connector-jdbc_2.11-1.11.2.jar"

   BT_ENV.get_config().get_configuration().set_string("pipeline.jars",jars)


but still getting the same error


Rocksdb - Incremental vs full checkpoints

2020-10-13 Thread sudranga
Hi,
I have an event-window pipeline which handles a fixed number of messages per
second for a fixed number of keys. When i have rocksdb as the state backend
with incremental checkpoints, i see the delta checkpoint size constantly
increase. Please see 

 

I turned off incremental checkpoints and all the checkpoints are 64kb (There
appears to be no state leak in user code or otherwise). It is not clear why
the incremental checkpoints keep increasing in size. Perhaps, the
incremental checkpoints are not incremental(for this small state size) and
are simply full state appended to full state and so on...

>From some posts on this forum, I understand the use case for incremental
checkpoints is designed when the state size is fairly large (Gbs-Tbs) and
where the changes in state are minimal across checkpoints. However, does
this mean that we should not enable incremental checkpointing for use cases
where the state size is much smaller? Would the 'constantly' increasing
snapshot delta size reduce at some point?  I don't see any compaction runs
happening
(taskmanager_job_task_operator_column_family_rocksdb.num-running-compactions).
Not sure if that is what I am missing...

Thanks
Sudharsan



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Stateful function and large state applications

2020-10-13 Thread Lian Jiang
Hi,

I am learning Stateful function and saw below:

"In addition to the Apache Flink processes, a full deployment requires
ZooKeeper  (for master failover
)
and bulk storage (S3, HDFS, NAS, GCS, Azure Blob Store, etc.) to store
Flink’s checkpoints
.
In turn, the deployment requires no database, and Flink processes do not
require persistent volumes."

Does this mean stateful function does not support rocksdb (and incremental
checkpoint, local task recovery)? Will it be an issue for large state (e.g.
200GB) applications? Thanks for clarifying.


Thanks
Lian


Re: Stateful function and large state applications

2020-10-13 Thread Tzu-Li (Gordon) Tai
Hi,

The StateFun runtime is built directly on top of Apache Flink, so RocksDB
as the state backend is supported as well as all the features for large
state such as checkpointing and local task recovery.

Cheers,
Gordon


On Wed, Oct 14, 2020 at 11:49 AM Lian Jiang  wrote:

> Hi,
>
> I am learning Stateful function and saw below:
>
> "In addition to the Apache Flink processes, a full deployment requires
> ZooKeeper  (for master failover
> )
> and bulk storage (S3, HDFS, NAS, GCS, Azure Blob Store, etc.) to store
> Flink’s checkpoints
> .
> In turn, the deployment requires no database, and Flink processes do not
> require persistent volumes."
>
> Does this mean stateful function does not support rocksdb (and incremental
> checkpoint, local task recovery)? Will it be an issue for large state (e.g.
> 200GB) applications? Thanks for clarifying.
>
>
> Thanks
> Lian
>


Upgrade to Flink 1.11 in EMR 5.31 Command line interface

2020-10-13 Thread Vijayendra Yadav
Hi Team,

I have upgraded to Flink 1.11 (EMR 5.31) from Flink 1.10 (5.30.1).

I am facing following Error while running *flink streaming *Job from
command line.
run command like:*/usr/lib/flink/bin/flink run*

*What dependency I might be missing or conflicting ?*

















































































*04:46:51.669 [main] ERROR org.apache.flink.client.cli.CliFrontend - Fatal
error while running command line interface.java.lang.NoSuchMethodError:
org.apache.hadoop.ipc.RPC.getProtocolProxy(Ljava/lang/Class;JLjava/net/InetSocketAddress;Lorg/apache/hadoop/security/UserGroupInformation;Lorg/apache/hadoop/conf/Configuration;Ljavax/net/SocketFactory;ILorg/apache/hadoop/io/retry/RetryPolicy;Ljava/util/concurrent/atomic/AtomicBoolean;Lorg/apache/hadoop/ipc/AlignmentContext;)Lorg/apache/hadoop/ipc/ProtocolProxy;
  at
org.apache.hadoop.hdfs.NameNodeProxiesClient.createProxyWithAlignmentContext(NameNodeProxiesClient.java:400)
~[hadoop-hdfs-client-2.10.0-amzn-0.jar:?]at
org.apache.hadoop.hdfs.NameNodeProxiesClient.createNonHAProxyWithClientProtocol(NameNodeProxiesClient.java:351)
~[hadoop-hdfs-client-2.10.0-amzn-0.jar:?]at
org.apache.hadoop.hdfs.NameNodeProxiesClient.createProxyWithClientProtocol(NameNodeProxiesClient.java:143)
~[hadoop-hdfs-client-2.10.0-amzn-0.jar:?]at
org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:353)
~[hadoop-hdfs-client-2.10.0-amzn-0.jar:?]at
org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:287)
~[hadoop-hdfs-client-2.10.0-amzn-0.jar:?]at
org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:159)
~[hadoop-hdfs-client-2.10.0-amzn-0.jar:?]at
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3354)
~[flink-s3-fs-hadoop-1.11.0.jar:1.11.0]at
org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
~[flink-s3-fs-hadoop-1.11.0.jar:1.11.0]at
org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3403)
~[flink-s3-fs-hadoop-1.11.0.jar:1.11.0]at
org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3371)
~[flink-s3-fs-hadoop-1.11.0.jar:1.11.0]at
org.apache.hadoop.fs.FileSystem.get(FileSystem.java:477)
~[flink-s3-fs-hadoop-1.11.0.jar:1.11.0]at
org.apache.hadoop.fs.FileSystem.get(FileSystem.java:226)
~[flink-s3-fs-hadoop-1.11.0.jar:1.11.0]at
org.apache.flink.yarn.YarnClusterDescriptor.startAppMaster(YarnClusterDescriptor.java:661)
~[flink-dist_2.11-1.11.0.jar:1.11.0]at
org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:524)
~[flink-dist_2.11-1.11.0.jar:1.11.0]at
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:424)
~[flink-dist_2.11-1.11.0.jar:1.11.0]at
org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:70)
~[flink-dist_2.11-1.11.0.jar:1.11.0]at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1812)
~[flink-dist_2.11-1.11.0.jar:1.11.0]at
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)
~[flink-dist_2.11-1.11.0.jar:1.11.0]at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1785)
~[flink-dist_2.11-1.11.0.jar:1.11.0]at
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.scala:752)
~[flink-dist_2.11-1.11.0.jar:1.11.0]at
com.att.vdcs.StreamingJobKafkaFlink$.main(StreamingJobKafkaFlink.scala:196)
~[?:?]at
com.att.vdcs.StreamingJobKafkaFlink.main(StreamingJobKafkaFlink.scala)
~[?:?]at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
Method) ~[?:1.8.0_265]at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:1.8.0_265]at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_265]at java.lang.reflect.Method.invoke(Method.java:498)
~[?:1.8.0_265]at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
~[flink-dist_2.11-1.11.0.jar:1.11.0]at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
~[flink-dist_2.11-1.11.0.jar:1.11.0]at
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
~[flink-dist_2.11-1.11.0.jar:1.11.0]at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
~[flink-dist_2.11-1.11.0.jar:1.11.0]at
org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
~[flink-dist_2.11-1.11.0.jar:1.11.0]at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
~[flink-dist_2.11-1.11.0.jar:1.11.0]at
org.apache.flink.client.cli.CliFrontend.lambda$main$1

Re: why this pyflink code has no output?

2020-10-13 Thread Xingbo Huang
Hi,
Which version of pyflink are you using? I think the api you are using is
not the pyflink since flink 1.9. For detailed usage of pyflink, you can
refer to doc[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table_api_tutorial.html

Best,
Xingbo

大森林  于2020年10月13日周二 下午8:23写道:

>
> My code is:
> https://paste.ubuntu.com/p/KqpKwTw5zH/
>
>
> My step is:
> $FLINK_HOME/bin/flink run -py /wordcount.py
> the sentence"data.output()"has no output.
>
>
> where am I wrong in above code?
>
> Thanks for your help
>


Re: Rocksdb - Incremental vs full checkpoints

2020-10-13 Thread Yun Tang
Hi

This difference of data size of incremental vs full checkpoint is due to the 
different implementations.
The incremental checkpoint strategy upload binary sst files while full 
checkpoint strategy scans the DB and write all kv entries to external DFS.

As your state size is really small (only 200 KB), I think your RocksDB has not 
ever triggered compaction to reduce sst files, that's why the size constantly 
increase.

Best
Yun Tang

From: sudranga 
Sent: Wednesday, October 14, 2020 10:40
To: user@flink.apache.org 
Subject: Rocksdb - Incremental vs full checkpoints

Hi,
I have an event-window pipeline which handles a fixed number of messages per
second for a fixed number of keys. When i have rocksdb as the state backend
with incremental checkpoints, i see the delta checkpoint size constantly
increase. Please see


I turned off incremental checkpoints and all the checkpoints are 64kb (There
appears to be no state leak in user code or otherwise). It is not clear why
the incremental checkpoints keep increasing in size. Perhaps, the
incremental checkpoints are not incremental(for this small state size) and
are simply full state appended to full state and so on...

>From some posts on this forum, I understand the use case for incremental
checkpoints is designed when the state size is fairly large (Gbs-Tbs) and
where the changes in state are minimal across checkpoints. However, does
this mean that we should not enable incremental checkpointing for use cases
where the state size is much smaller? Would the 'constantly' increasing
snapshot delta size reduce at some point?  I don't see any compaction runs
happening
(taskmanager_job_task_operator_column_family_rocksdb.num-running-compactions).
Not sure if that is what I am missing...

Thanks
Sudharsan



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2020-10-13 Thread Yun Gao
Hi Till,
Very thanks for the feedbacks !
> 1) When restarting all tasks independent of the status at checkpoint time 
> (finished, running, scheduled), we might allocate more resources than we 
> actually need to run the remaining job. From a scheduling perspective it 
> would be easier if we already know that certain subtasks don't need to be 
> rescheduled. I believe this can be an optimization, though.
> 2) In the section Compatibility, Deprecation and Migration Plan you mentioned 
> that you want to record operators in the CompletedCheckpoint which are fully 
> finished. How will this information be used for constructing a recovered 
> ExecutionGraph? Why wouldn't the same principle work for the task level?

I think the first two issues should be related. The main reason that with 
external checkpoints the checkpoint might taken from one job and used in 
another jobs, but we do not have a unique ID to match tasks across jobs. 
Furthermore, users may also change the parallelism of JobVertex, or even modify 
the graph structures by adding/removing operators or changing the chain 
relationship between operators. 
On the other side, currently Flink already provides custom UID for operators, 
which makes the operators a stable unit for recovery. The current checkpoints 
are also organized in the unit of operators to support rescale and job 
Upgrading. 
When restarting from a checkpoint with finished operators, we could only starts 
the tasks with operators that are not fully finished (namely some subtasks are 
still running when taking checkpoints). Then during the execution of a single 
task, we only initialize/open/run/close the operators not fully finished. The 
Scheduler should be able to compute if a tasks contains not fully finished 
operators with the current JobGraph and the operator finish states restored 
from the checkpoints.

> 3) How will checkpointing work together with fully bounded jobs and FLIP-1 
> (fine grained recovery)?
Currently I think it should be compatible with fully bounded jobs and FLIP-1 
since it could be viewed as a completion of the current checkpoint mechanism. 
Concretely
1. The batch job (with blocking execution mode) should be not affected since 
checkpoints are not enabled in this case.
2. The bounded job running with pipeline mode would be also supported with 
checkpoints during it is finishing with the modification. As discussed in the 
FLIP it should not affect the current behavior after restored for almost all 
the jobs.
3. The region failover and more fine-grained tasks should also not be affected: 
similar to the previous behavior, after failover, the failover policy 
(full/region/fine-grained) decides which tasks to restart and the checkpoint 
only decides what state are restored for these tasks. The only difference with 
this modification is that these tasks are now might restored from a checkpoints 
taken after some tasks are finished. Since the perviously finished tasks would 
always be skipped by not started or run an empty execution, and the behavior of 
the previously running tasks should keeps unchanged, the overall behavior 
should be not affected.


Best,
Yun


--
From:Till Rohrmann 
Send Time:2020 Oct. 13 (Tue.) 17:25
To:Yun Gao 
Cc:Arvid Heise ; Flink Dev ; 
User-Flink 
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Thanks for starting this discussion Yun Gao,

I have three comments/questions:

1) When restarting all tasks independent of the status at checkpoint time 
(finished, running, scheduled), we might allocate more resources than we 
actually need to run the remaining job. From a scheduling perspective it would 
be easier if we already know that certain subtasks don't need to be 
rescheduled. I believe this can be an optimization, though.

2) In the section Compatibility, Deprecation and Migration Plan you mentioned 
that you want to record operators in the CompletedCheckpoint which are fully 
finished. How will this information be used for constructing a recovered 
ExecutionGraph? Why wouldn't the same principle work for the task level?

3) How will checkpointing work together with fully bounded jobs and FLIP-1 
(fine grained recovery)?

Cheers,
Till
On Tue, Oct 13, 2020 at 9:30 AM Yun Gao  wrote:

Hi Arvid,
Very thanks for the comments!
>>> 4) Yes, the interaction is not trivial and also I have not completely 
>>> thought it through. But in general, I'm currently at the point where I 
>>> think that we also need non-checkpoint related events in unaligned 
>>> checkpoints. So just keep that in mind, that we might converge anyhow at 
>>> this point.
I also agree with that it would be better to keep the unaligned checkpoints 
behavior on EndOfPartition, I will then double check on this issue again. 

>>> In general, what is helping in this case is to remember that there no 
>>> unaligned checkpoint barrier ever going to overtake EndOfPartition. So, we 
>>> c