Hi,

>From my point of view, pyflink-shell only provides an interactive tool.
Below it, you can choose whether to run the job in minicluster(similar to
python xx.py)  or submit it to the cluster through flink run. For python
xxx.py, it is reasonable to not load the config of flink-conf.yaml. What do
you think?


Best,
Xingbo

Sharipov, Rinat <r.shari...@cleverdata.ru> 于2020年10月13日周二 下午2:16写道:

> Hi Xingbo, thx a lot, it works !
>
> But I'm still sure that it's not obvious from a user point of view, that 
> *pyflink-shell.sh
> *doesn't use provided flink-conf.yaml, don't you think that it looks like
> an issue ?
>
> Thx !
>
> вт, 13 окт. 2020 г. в 05:35, Xingbo Huang <hxbks...@gmail.com>:
>
>> Hi,
>>
>> You can use api to set configuration:
>> table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
>> '80m')
>>
>> The flink-conf.yaml way will only take effect when submitted through
>> flink run, and the minicluster way(python xxx.py) will not take effect.
>>
>> Best,
>> Xingbo
>>
>> Sharipov, Rinat <r.shari...@cleverdata.ru> 于2020年10月13日周二 上午1:56写道:
>>
>>> Hi mates !
>>>
>>> I'm very new at pyflink and trying to register a custom UDF function
>>> using python API.
>>> Currently I faced an issue in both server env and my local IDE
>>> environment.
>>>
>>> When I'm trying to execute the example below I got an error message: *The
>>> configured Task Off-Heap Memory 0 bytes is less than the least required
>>> Python worker Memory 79 mb. The Task Off-Heap Memory can be configured
>>> using the configuration key 'taskmanager.memory.task.off-heap.size*
>>>
>>> Of course I've added required property into *flink-conf.yaml *and
>>> checked that *pyflink-shell.sh *initializes env using specified
>>> configuration but it doesn't make any sense and I still have an error.
>>>
>>> I've also attached my flink-conf.yaml file
>>>
>>> Thx for your help !
>>>
>>> *Here is an example:*
>>>
>>> from pyflink.dataset import ExecutionEnvironment
>>> from pyflink.table import BatchTableEnvironment, DataTypes
>>> from pyflink.table.udf import udf
>>>
>>>
>>> @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
>>> def test_udf(i):
>>>     return i
>>>
>>>
>>> if __name__ == "__main__":
>>>     env = ExecutionEnvironment.get_execution_environment()
>>>     env.set_parallelism(1)
>>>
>>>     bt_env = BatchTableEnvironment.create(env)
>>>     bt_env.register_function("test_udf", test_udf)
>>>
>>>     my_table = bt_env.from_elements(
>>>         [
>>>             ("user-1", "http://url/1";),
>>>             ("user-2", "http://url/2";),
>>>             ("user-1", "http://url/3";),
>>>             ("user-3", "http://url/4";),
>>>             ("user-1", "http://url/3";)
>>>         ],
>>>         [
>>>             "uid", "url"
>>>         ]
>>>     )
>>>
>>>     my_table_grouped_by_uid = my_table.group_by("uid").select("uid, 
>>> collect(url) as urls")
>>>     bt_env.create_temporary_view("my_temp_table", my_table_grouped_by_uid)
>>>
>>>     bt_env.execute_sql("select test_udf(uid) as uid, urls from 
>>> my_temp_table").print()
>>>
>>>
>>>
>>>

Reply via email to