[ 
https://issues.apache.org/jira/browse/KAFKA-5684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16113499#comment-16113499
 ] 

Guozhang Wang commented on KAFKA-5684:
--------------------------------------

I have a slightly different idea:

1. Remove the `defaultKeyValueMapper` from {{KStreamImpl}}, let the overload 
`print` and `writeAsText` functions without the mapper parameter to pass in the 
null values (note in general we do NOT prefer to pass nulls but replace it with 
default mapper as early as possible in the call trace, but this is a exception 
since in {{KStreamImpl}} we cannot access to {{context}} object for getting the 
default serdes).

2. Remove the `KeyValueMapper` from the constructor of {{PrintForeachAction}}, 
but add an API to set the {{KeyValueMapper}} after the object is constructed.

2. Still keep the {{KStreamPrint}} class, but let {{KStreamPrint}} extend 
{{KStreamPeek}} and let {{KStreamPrintProcessor}} extend 
{{KStreamPeekProcessor}}: let the {{KStreamPrint}} to pass in the 
user-specified serdes to the constructor of {{KStreamPrintProcessor}}.

4. Override {{KStreamPrintProcessor}}' `init` function to construct the default 
mapper (which potentially based on the default values of serdes from context) 
if it is not passed in, and then call `setMapper` in the action field.

> KStreamPrintProcessor as customized KStreamPeekProcessor
> --------------------------------------------------------
>
>                 Key: KAFKA-5684
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5684
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Paolo Patierno
>            Assignee: Paolo Patierno
>            Priority: Minor
>
> Hi,
> the {{KStreamPrintProcessor}} is implemented from scratch (from the 
> {{AbstractProcessor}}) and the same for the related supplier.
> It looks to me that it's just a special {{KStreamPeekProcessor}} with 
> forwardDownStream to false and that allows the possibility to specify Serdes 
> instances used if key/values are bytes.
> At same time used by a {{print()}} method it provides a fast way to print 
> data flowing through the pipeline (while using just {{peek()}} you need to 
> write the code).
> I think that it could be useful to refactoring the {{KStreamPrintProcessor}} 
> as derived from the {{KStreamPeekProcessor}} customizing its behavior.
> Thanks,
> Paolo.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to