I am having a problem with sending code. So here it is. Hope this now looks
ok

This is my main job (some parts of codes are abbreviated and this is the
main part):

*public class MyJob {*

*  private StreamExecutionEnvironment env;*

*  private static final Integer NUM_OF_PARALLEL_OPERATORS = 1;*


*  public static void main(String[] args) throws Exception {*


*   var myJob = new MyJob();*

*   myJob.withExecutionEnvironment();*

*   myJob.init();*

*  }*

*  public void init() throws Exception {*

*    var settings = getApplicationProperties();*


*    SourceFunction<ProcessingResult<MyEvent>> customerAccountSource =*

*        getSource(settings, settings.getProperty("my.topic"));*


*    DataStream<ProcessingResult<MyEvent>> mySource =*

*
env.addSource(customerAccountSource).setParallelism(NUM_OF_PARALLEL_OPERATORS);*


*    mySource.print();*

*    env.execute();*

*  }*




*  private static SourceFunction<ProcessingResult<MyEvent>> getSource(
Properties settings, String topic) {*

*    var kafkaProps = getKafkaSourceProperties(settings);*

*    return new FlinkKafkaConsumer<>(topic, new MyKafkaDeserializer(),
kafkaProps)*
*        .setStartFromEarliest()*
*        .assignTimestampsAndWatermarks(*
*
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMillis(100)))*
*        .setCommitOffsetsOnCheckpoints(true);*
*  }*

*}*

And this is myKafkaDeserializer:


*public class MyKafkaDeserializer implements
KafkaDeserializationSchema<ProcessingResult<MyEvent>> {*

*  private final ObjectMapper objectMapper;*

*  public MyKafkaDeserializer() {*

*    this.objectMapper = JsonUtils.objectMapper();*

*  }*

*  @Override*
*  public boolean isEndOfStream(ProcessingResult<MyEvent> processingResult)
{*

*    return false;*

*  }*

*  @Override*
*  public ProcessingResult<MyEvent> deserialize(ConsumerRecord<byte[],
byte[]> consumerRecord)*
*      throws Exception {*

*    try {*

*      var event = objectMapper.readValue(consumerRecord.value(),
MyEvent.class);*

*      var violation = ObjectValidator.of().getValidator().validate(event);*


*      if (!violation.isEmpty()) {*

*        return ProcessingResult.error(*

*            new ProcessingError(*

*                asTransformationError(*

*                    "constraint.violation",
toCustomerAccountValidationErrorString(violation))));*

*      }*


*      return ProcessingResult.success(event);*


*    } catch (JsonProcessingException e) {*

*      return ProcessingResult.error(*

*          new ProcessingError(*

*              asTransformationError("constraint.violation",
toStacktraceString(e))));*

*    }*

*  }*

*  @Override*
*  public TypeInformation<ProcessingResult<MyEvent>> getProducedType() {*

*    return TypeInformation.of(new TypeHint<ProcessingResult<MyEvent>>()
{});*

*  }*

*  private static String toCustomerAccountValidationErrorString(*
*      Set<ConstraintViolation<MyEvent>> errors) {*

*    return
errors.stream().map(ConstraintViolation::getMessage).collect(Collectors.joining(";"));*

*  }*





On Tue, May 25, 2021 at 5:51 PM Arvid Heise <ar...@apache.org> wrote:

> Could you share your KafkaDeserializationSchema, we might be able to spot
> some optimization potential. You could also try out enableObjectReuse [1],
> which avoids copying data between tasks (not sure if you have any
> non-chained tasks).
>
> If you are on 1.13, you could check out the flamegraph to see where the
> bottleneck occurs. [2]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/execution/execution_configuration/
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/flame_graphs/
>
> On Tue, May 25, 2021 at 5:12 PM Piotr Nowojski <pnowoj...@apache.org>
> wrote:
>
>> Hi,
>>
>> That's a throughput of 700 records/second, which should be well below
>> theoretical limits of any deserializer (from hundreds thousands up to tens
>> of millions records/second/per single operator), unless your records are
>> huge or very complex.
>>
>> Long story short, I don't know of a magic bullet to help you solve your
>> problem. As always you have two options, either optimise/speed up your
>> code/job, or scale up.
>>
>> If you choose the former, think about Flink as just another Java
>> application. Check metrics and resource usage, and understand what resource
>> is the problem (cpu? memory? machine is swapping? io?). You might be able
>> to guess what's your bottleneck (reading from kafka? deserialisation?
>> something else? Flink itself?) by looking at some of the metrics
>> (busyTimeMsPerSecond [1] or idleTimeMsPerSecond could help with that), or
>> you can also simplify your job to bare minimum and test performance of
>> independent components. Also you can always attach a code profiler and
>> simply look at what's happening. First identify what's the source of the
>> bottleneck and then try to understand what's causing it.
>>
>> Best,
>> Piotrek
>>
>> [1] busyTimeMsPerSecond is available since Flink 1.13. Flink 1.13 also
>> comes with nice tools to analyse bottlenecks in the WebUI (coloring nodes
>> in the job graph based on busy/back pressured status and Flamegraph
>> support)
>>
>> wt., 25 maj 2021 o 15:44 B.B. <bijela.vr...@gmail.com> napisaƂ(a):
>>
>>> Hi,
>>>
>>> I am in the process of optimizing my job which at the moment by our
>>> thinking is too slow.
>>>
>>> We are deploying job in kubernetes with 1 job manager with 1gb ram and 1
>>> cpu and 1 task manager with 4gb ram and 2 cpu-s (eg. 2 task slots and
>>> parallelism of two).
>>>
>>> The main problem is one kafka source that has 3,8 million events that we
>>> have to process.
>>> As a test we made a simple job that connects to kafka using a custom
>>> implementation of KafkaDeserializationSchema. There we are using
>>> ObjectMapper that mapps input values eg.
>>>
>>> *var event = objectMapper.readValue(consumerRecord.value(),
>>> MyClass.class);*
>>>
>>> This is then validated with hibernate validator and output of this
>>> source is printed on the console.
>>>
>>> The time needed for the job to consume all the events was one and a half
>>> hours, which seems a bit long.
>>> Is there a way we can speed up this process?
>>>
>>> Is more cpu cores or memory solution?
>>> Should we switch to avro deserialization schema?
>>>
>>>
>>>
>>>

-- 
Everybody wants to be a winner
Nobody wants to lose their game
Its insane for me
Its insane for you
Its insane

Reply via email to