Hi,

The implementation looks good. I'd probably cache the
*ObjectValidator.of().getValidator()* in a field to be sure that it's not a
pricey construction.
Did you evaluate what happens when you skip the validation entirely in
terms of records/s?

On Thu, May 27, 2021 at 11:18 AM B.B. <bijela.vr...@gmail.com> wrote:

> I am having a problem with sending code. So here it is. Hope this now
> looks ok
>
> This is my main job (some parts of codes are abbreviated and this is the
> main part):
>
> *public class MyJob {*
>
> *  private StreamExecutionEnvironment env;*
>
> *  private static final Integer NUM_OF_PARALLEL_OPERATORS = 1;*
>
>
> *  public static void main(String[] args) throws Exception {*
>
>
> *   var myJob = new MyJob();*
>
> *   myJob.withExecutionEnvironment();*
>
> *   myJob.init();*
>
> *  }*
>
> *  public void init() throws Exception {*
>
> *    var settings = getApplicationProperties();*
>
>
> *    SourceFunction<ProcessingResult<MyEvent>> customerAccountSource =*
>
> *        getSource(settings, settings.getProperty("my.topic"));*
>
>
> *    DataStream<ProcessingResult<MyEvent>> mySource =*
>
> *
> env.addSource(customerAccountSource).setParallelism(NUM_OF_PARALLEL_OPERATORS);*
>
>
> *    mySource.print();*
>
> *    env.execute();*
>
> *  }*
>
>
>
>
> *  private static SourceFunction<ProcessingResult<MyEvent>> getSource(
>   Properties settings, String topic) {*
>
> *    var kafkaProps = getKafkaSourceProperties(settings);*
>
> *    return new FlinkKafkaConsumer<>(topic, new MyKafkaDeserializer(),
> kafkaProps)*
> *        .setStartFromEarliest()*
> *        .assignTimestampsAndWatermarks(*
> *
> WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMillis(100)))*
> *        .setCommitOffsetsOnCheckpoints(true);*
> *  }*
>
> *}*
>
> And this is myKafkaDeserializer:
>
>
> *public class MyKafkaDeserializer implements
> KafkaDeserializationSchema<ProcessingResult<MyEvent>> {*
>
> *  private final ObjectMapper objectMapper;*
>
> *  public MyKafkaDeserializer() {*
>
> *    this.objectMapper = JsonUtils.objectMapper();*
>
> *  }*
>
> *  @Override*
> *  public boolean isEndOfStream(ProcessingResult<MyEvent>
> processingResult) {*
>
> *    return false;*
>
> *  }*
>
> *  @Override*
> *  public ProcessingResult<MyEvent> deserialize(ConsumerRecord<byte[],
> byte[]> consumerRecord)*
> *      throws Exception {*
>
> *    try {*
>
> *      var event = objectMapper.readValue(consumerRecord.value(),
> MyEvent.class);*
>
> *      var violation =
> ObjectValidator.of().getValidator().validate(event);*
>
>
> *      if (!violation.isEmpty()) {*
>
> *        return ProcessingResult.error(*
>
> *            new ProcessingError(*
>
> *                asTransformationError(*
>
> *                    "constraint.violation",
> toCustomerAccountValidationErrorString(violation))));*
>
> *      }*
>
>
> *      return ProcessingResult.success(event);*
>
>
> *    } catch (JsonProcessingException e) {*
>
> *      return ProcessingResult.error(*
>
> *          new ProcessingError(*
>
> *              asTransformationError("constraint.violation",
> toStacktraceString(e))));*
>
> *    }*
>
> *  }*
>
> *  @Override*
> *  public TypeInformation<ProcessingResult<MyEvent>> getProducedType() {*
>
> *    return TypeInformation.of(new TypeHint<ProcessingResult<MyEvent>>()
> {});*
>
> *  }*
>
> *  private static String toCustomerAccountValidationErrorString(*
> *      Set<ConstraintViolation<MyEvent>> errors) {*
>
> *    return
> errors.stream().map(ConstraintViolation::getMessage).collect(Collectors.joining(";"));*
>
> *  }*
>
>
>
>
>
> On Tue, May 25, 2021 at 5:51 PM Arvid Heise <ar...@apache.org> wrote:
>
>> Could you share your KafkaDeserializationSchema, we might be able to spot
>> some optimization potential. You could also try out enableObjectReuse [1],
>> which avoids copying data between tasks (not sure if you have any
>> non-chained tasks).
>>
>> If you are on 1.13, you could check out the flamegraph to see where the
>> bottleneck occurs. [2]
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/execution/execution_configuration/
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/flame_graphs/
>>
>> On Tue, May 25, 2021 at 5:12 PM Piotr Nowojski <pnowoj...@apache.org>
>> wrote:
>>
>>> Hi,
>>>
>>> That's a throughput of 700 records/second, which should be well below
>>> theoretical limits of any deserializer (from hundreds thousands up to tens
>>> of millions records/second/per single operator), unless your records are
>>> huge or very complex.
>>>
>>> Long story short, I don't know of a magic bullet to help you solve your
>>> problem. As always you have two options, either optimise/speed up your
>>> code/job, or scale up.
>>>
>>> If you choose the former, think about Flink as just another Java
>>> application. Check metrics and resource usage, and understand what resource
>>> is the problem (cpu? memory? machine is swapping? io?). You might be able
>>> to guess what's your bottleneck (reading from kafka? deserialisation?
>>> something else? Flink itself?) by looking at some of the metrics
>>> (busyTimeMsPerSecond [1] or idleTimeMsPerSecond could help with that), or
>>> you can also simplify your job to bare minimum and test performance of
>>> independent components. Also you can always attach a code profiler and
>>> simply look at what's happening. First identify what's the source of the
>>> bottleneck and then try to understand what's causing it.
>>>
>>> Best,
>>> Piotrek
>>>
>>> [1] busyTimeMsPerSecond is available since Flink 1.13. Flink 1.13 also
>>> comes with nice tools to analyse bottlenecks in the WebUI (coloring nodes
>>> in the job graph based on busy/back pressured status and Flamegraph
>>> support)
>>>
>>> wt., 25 maj 2021 o 15:44 B.B. <bijela.vr...@gmail.com> napisaƂ(a):
>>>
>>>> Hi,
>>>>
>>>> I am in the process of optimizing my job which at the moment by our
>>>> thinking is too slow.
>>>>
>>>> We are deploying job in kubernetes with 1 job manager with 1gb ram and
>>>> 1 cpu and 1 task manager with 4gb ram and 2 cpu-s (eg. 2 task slots and
>>>> parallelism of two).
>>>>
>>>> The main problem is one kafka source that has 3,8 million events that
>>>> we have to process.
>>>> As a test we made a simple job that connects to kafka using a custom
>>>> implementation of KafkaDeserializationSchema. There we are using
>>>> ObjectMapper that mapps input values eg.
>>>>
>>>> *var event = objectMapper.readValue(consumerRecord.value(),
>>>> MyClass.class);*
>>>>
>>>> This is then validated with hibernate validator and output of this
>>>> source is printed on the console.
>>>>
>>>> The time needed for the job to consume all the events was one and a
>>>> half hours, which seems a bit long.
>>>> Is there a way we can speed up this process?
>>>>
>>>> Is more cpu cores or memory solution?
>>>> Should we switch to avro deserialization schema?
>>>>
>>>>
>>>>
>>>>
>
> --
> Everybody wants to be a winner
> Nobody wants to lose their game
> Its insane for me
> Its insane for you
> Its insane
>

Reply via email to