Hi Leonard,

I am using flink 1.11.2 and using debezium-json to read CDC data generated
by debezium.

For each table, I convert the Kafka dynamic table to a retract stream and
finally that stream is converted to DataStream<RowData>. Here's the sample
function

private DataStream<RowData> getDataStream(String sql) {
    LOGGER.debug(sql);
    Table out = tEnv.sqlQuery(sql);
    DataStream<Tuple2<Boolean, Row>> dsRow = tEnv.toRetractStream(out,
Row.class);
    return dsRow.map((MapFunction<Tuple2<Boolean, Row>, RowData>) t2 -> {
        RowKind rowKind = t2.f1.getKind();
        GenericRowData genericRowData = new GenericRowData(rowKind,
t2.f1.getArity());
        for (int pos = 0; pos < t2.f1.getArity(); pos = pos + 1) {
            Object object = t2.f1.getField(pos);
            Object convertedType;
            if (object instanceof String) {
                convertedType =
RowDataUtil.convertConstant(Types.StringType.get(), object);
            } else if (object instanceof LocalDateTime) {
                convertedType =
TimestampData.fromLocalDateTime((LocalDateTime) object);
            } else {
                convertedType = object;
            }
            genericRowData.setField(pos, convertedType);
        }
        return genericRowData;
    });
}


I then pass this datastream to the Flink sink.

FlinkSink.forRowData(rowDataDataStream)
        .table(icebergTable)
        
.tableSchema(FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(icebergTable.schema())))
        .tableLoader(tableLoader)
        .equalityFieldColumns(tableConfig.getEqualityColumns())
        .build();


Please let me know if you need some other information too


On Mon, Aug 2, 2021 at 7:48 AM Leonard Xu <xbjt...@gmail.com> wrote:

> Hi, Ayush
>
> Thanks for the detailed description.
>
> Before analyze the issue, I have two questions that which Flink and Flink
> CDC version are you using?  Is Flink CDC used in SQL or DataStream ?
> That’s helpful if you can post you Flink CDC connector parameters.
>
> Best,
> Leonard
>
> 在 2021年7月29日,18:57,Ayush Chauhan <ayush.chau...@zomato.com> 写道:
>
> Hi all,
>
> We are using Flink + iceberg to consume CDC data. We have combined all the
> tables of a single DB in one job. Our job is frequently running into GC
> issues. Earlier it was running default on parallel GC and I have changed it
> to G1GC. G1GC did bring some improvements but still, I am facing the same
> problem.
>
> Following are the params on my job - -ytm 5120m -yjm 1024m -yD
> env.java.opts="-XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=35"
>
> This job is running CDC ingestion for 17 tables with a parallelism of 1
> and throughput is around ~10k messages for the 10minutes checkpointing
> interval
>
> I am attaching a part of the thread dump in this email.
>
> During old GC, the job gets stuck and its checkpointing which is normally
> under 1 sec gets increased exponentially to the timeout threshold. Job
> either get failed due to checkpointing timeout or it failed to get the
> heartbeat of the task manager
>
> <Screenshot 2021-07-29 at 16.09.19.png>
> <Screenshot 2021-07-29 at 16.08.58.png>
>
>
> --
>  Ayush Chauhan
>
>
> This email is intended only for the person or the entity to whom it is
> addressed. If you are not the intended recipient, please delete this email
> and contact the sender.
> <thread_dump.txt>
>
>
>

-- 
 Ayush Chauhan
 Data Platform
 [image: mobile-icon]  +91 9990747111

-- 












This email is intended only for the person or the entity to 
whom it is addressed. If you are not the intended recipient, please delete 
this email and contact the sender.

Reply via email to