Hi all, In current implementation, the RecordSerializer is created separately for each subpartition in RecordWriter, that means the number of serializers equals to the number of subpartitions. For broadcast partitioner, every record will be serialized many times in all the subpartitions, and this may bring bad performance to some extent. In theory every record can be serialized only once and referenced for all the subpartitions in broadcast mode.
To do so, I propose the following changes: 1. Create and maintain only one serializer in RecordWriter, and it will serialize the record for all the subpartitions. It makes sense for any partitioners, and the memory overhead can be also decreased, because every serializer will maintain some separate byte buffers internally. 2. Maybe we can abstract the RecordWriter as a base class used for other partitioner mode and implement a BroadcastRecordWriter for BroadcastPartitioner. And this new implementation will add buffer references based on the number of subpartitions before adding into subpartition queue. 3. Maybe we can remove StreamRecordWriter by migrating flusher from it to RecordWriter, then the uniform RecordWriter can be used for both stream and batch. The above BroadcastRecordWriter can aslo uniform for both stream and batch. I am not sure whether this improvement is proposed before and what do you think of it? If necessary I can create JIRAs to contirbute it, and may need one commiter cooperate with me. Best, Zhijiang