Hi,
我刚刚在本地完全模拟了你的数据和核心的代码,是可以在sink里拿到结果的。
我把我的测试代码放到附件里面了,
你可以参考一下,如果还是不行的话,可以提供下你的代码再帮你看一下
Best,
Xingbo
秦寒 <[email protected]> 于2020年4月15日周三 下午3:16写道:
> 你好
>
> 我在使用kafka produce数据后,在python中使用UDF做一个add function,但是最后的sink
> 文件里面没有任何数据,
>
> 如果不用UDF的话直接获取一个数据在最后的sink文件里面是有数据的如下所示,DEBUG很久也不清楚是什么原因是否能帮忙分下
>
>
>
> *Kafka用的测试数据 {"a": "bbb", "b": 3, "c": 1}*
>
>
>
>
>
>
>
>
>
> *测试结果*
>
> *Kafka用的测试数据 {"a": "bbb", "b": 3, "c": 1}*
>
>
>
>
>
>
>
> st_env.from_path("source")\
> .select("b.cast(LONG) as b1, c.cast(LONG) as c1") \
> *.select("add(b1,c1)") \ **无任何输出*
> .insert_into("result_tab")
>
> *无任何输出*
>
>
>
>
>
> st_env.from_path("source")\
> .select("b.cast(LONG) as b1, c.cast(LONG) as c1") \
> *.select("c1")\* #正常输出
>
>
> .insert_into("result_tab")
>
> *正确输出*
>
>
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
import os
from pyflink.table import StreamTableEnvironment, DataTypes, CsvTableSink
from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime, FileSystem, OldCsv
from pyflink.table.udf import udf
def test_udf():
s_env = StreamExecutionEnvironment.get_execution_environment()
s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
s_env.set_parallelism(1)
st_env = StreamTableEnvironment.create(s_env)
result_file = "/tmp/scalar_func_basic.csv"
if os.path.exists(result_file):
os.remove(result_file)
st_env.register_table_sink("Results",
CsvTableSink(['a'],
[DataTypes.BIGINT()],
result_file))
st_env.register_function("add",
udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()],
DataTypes.BIGINT()))
st_env \
.connect( # declare the external system to connect to
Kafka()
.version("0.11")
.topic("user")
.start_from_earliest()
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")
) \
.with_format( # declare a format for this system
Json()
.fail_on_missing_field(True)
.json_schema(
"{"
" type: 'object',"
" properties: {"
" a: {"
" type: 'string'"
" },"
" b: {"
" type: 'string'"
" },"
" c: {"
" type: 'string'"
" }"
" }"
"}"
)
) \
.with_schema( # declare the schema of the table
Schema()
.field("a", DataTypes.STRING())
.field("b", DataTypes.STRING())
.field("c", DataTypes.STRING())
) \
.in_append_mode() \
.register_table_source("source")
st_env.from_path("source") \
.select("b.cast(LONG) as b1, c.cast(LONG) as c1") \
.select("add(b1, c1)") \
.insert_into("Results")
st_env.execute("test")
if __name__ == '__main__':
test_udf()