Thanks for doing this benchmark @Yangze Guo <karma...@gmail.com> . The result looks promising. So I would +1 to refactor ExecutionAttemptID and IntermediateResultPartitionID.
Regarding why 'The size of TDD after serialization become smaller than before', I guess it's because the new IntermediateResultPartitionIDs can share the same IntermediateDataSetID, in this way the space of IntermediateResultPartitionID is a ref (to IntermediateDataSetID) and an int (index), which is smaller than 2 Longs (AbstractID). Thanks, Zhu Zhu Yangze Guo <karma...@gmail.com> 于2020年4月14日周二 下午3:09写道: > Hi everyone, > > I've investigated the infect with higher parallelism jobs. > > The result shows: > - The size of TDD after serialization become smaller than before. > While I did not meet any issue with Akka framework when the > parallelism set to 6000. > - There was no significant difference regarding the end to end > schedule time, job runtime, young gc count and total full gc time. > > For details, please take a look at > > https://docs.google.com/spreadsheets/d/13lt6J29uikWoux79047lkvbGi2fS4ioWHSbvT1kTL2M/edit?usp=sharing > . > > From my perspective, I think it might be ok to refactor > ExecutionAttemptID and IntermediateResultPartitionID. If you have > further concerns or think we should make further investigation. Please > let me know. > > Best, > Yangze Guo > > On Mon, Apr 13, 2020 at 1:36 PM Yangze Guo <karma...@gmail.com> wrote: > > > > Hi everyone, > > After an offline discussion with ZhuZhu, I have some comments on this > > investigation. > > > > Regarding the maximum parallelism went from 760 to 685, it may because > > of that the tasks are not scheduled evenly. The result is inconsistent > > in multiple experiments. So, this phenomenon would be irrelevant to > > our changes. > > > > I think what we really care about is the framesize for Akka(Should > > user enlarge it after our change for the same job). The size of TDD > > after serialization seems to be smaller after change. I don't know the > > root reason of this phenomenon at the moment. The way I measure it is: > > ``` > > ByteArrayOutputStream bos = new ByteArrayOutputStream(); > > ObjectOutputStream oos = new ObjectOutputStream(bos); > > oos.writeObject(deployment); > > oos.flush(); > > LOG.info("BENCHMARK TDD_SERIAL_SIZE {}.", bos.toByteArray().length); > > ``` > > Please correct me if I'm wrong. > > > > I'll experiment with higher parallelism to see if there is any > > regression regarding Akka framesize. > > > > Regarding the TDD building time, the parallelism in my investigation > > might be too small. Meanwhile, this time might be influence by many > > factors. Thus, I'll > > - experiment with higher parallelism. > > - measure the time spent from "Starting scheduling" to the last task > > change state to running. > > > > Best, > > Yangze Guo > > > > > > On Fri, Apr 10, 2020 at 12:53 PM Yangze Guo <karma...@gmail.com> wrote: > > > > > > Hi there, > > > > > > Sorry for the belated reply. I just make a preliminary investigation > > > of the infect of refactoring IntermediateResultPartitionID. > > > > > > The key change is making it being composed of IntermediateDataSetID > > > and a partitionNum. > > > public class IntermediateResultPartitionID { > > > private final IntermediateDataSetID intermediateDataSetID; > > > private final int partitionNum; > > > } > > > > > > In this benchmark, I use examples/streaming/WordCount.jar as the test > > > job and run Flink on Yarn. All the configurations are kept default > > > except for "taskmanager.numberOfTaskSlots", which is set to 2. > > > > > > The result shows it does have an impact on performance. > > > - After this change, the maximum parallelism went from 760 to 685, > > > which limited by the total number of network buffers. For the same > > > parallelism, user needs more network buffer than before. > > > - The netty message "PartitionRequest" and "TaskEventRequest" increase > > > by 4 bytes. For "PartitionRequest", it means 7% increase. > > > - The TDD takes longer to construct. With 600 parallelisms, it takes > > > twice as long to construct TDD than before. > > > > > > Details record in > > > > https://docs.google.com/spreadsheets/d/13lt6J29uikWoux79047lkvbGi2fS4ioWHSbvT1kTL2M/edit?usp=sharing > > > > > > The same issue could happen in ExecutionAttemptID, which will increase > > > the "PartitionRequest" and "TaskEventRequest" by 8 bytes(subtaskIndex > > > and attemptNumber). But it may not infect the TDD as much as > > > IntermediateResultPartitionID, since there is only one > > > ExecutionAttemptID per TDD. > > > > > > After that preliminary investigation, I tend to not refactor > > > ExecutionAttemptID and IntermediateResultPartitionID at the moment or > > > treat it as future work. > > > > > > WDYT? @ZhuZhu @Till > > > > > > Best, > > > Yangze Guo > > > > > > On Wed, Apr 1, 2020 at 11:53 AM Zhu Zhu <reed...@gmail.com> wrote: > > > > > > > > >> However, it seems the JobVertexID is derived from hashcode ... > > > > You are right. JobVertexID is widely used and reworking it may > affect the > > > > public > > > > interfaces, e.g. REST api. We can take it as a long term goal and > exclude > > > > it from this FLIP. > > > > This same applies to IntermediateDataSetID, which can be also > composed of a > > > > JobID > > > > and an index as Till proposed. > > > > > > > > Thanks, > > > > Zhu Zhu > > > > > > > > Till Rohrmann <trohrm...@apache.org> 于2020年3月31日周二 下午8:36写道: > > > > > > > > > For the IntermediateDataSetID I was just thinking that it might > actually be > > > > > interesting to know which job produced the result which, by using > cluster > > > > > partitions, could be used across different jobs. Not saying that > we have to > > > > > do it, though. > > > > > > > > > > A small addition to Zhu Zhu's comment about TDD sizes: For the > problem with > > > > > too large TDDs there is already an issue [1]. The current > suspicion is that > > > > > the size of TDDs for jobs with a large parallelism can indeed > become > > > > > problematic for Flink. Hence, it would be great to investigate the > impacts > > > > > of the proposed changes. > > > > > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-16069 > > > > > > > > > > Cheers, > > > > > Till > > > > > > > > > > On Tue, Mar 31, 2020 at 11:50 AM Yangze Guo <karma...@gmail.com> > wrote: > > > > > > > > > > > Hi, Zhu, > > > > > > > > > > > > Thanks for the feedback. > > > > > > > > > > > > > make JobVertexID a composition of JobID and a topology index > > > > > > I think it is a good idea. However, it seems the JobVertexID is > > > > > > derived from hashcode which used to identify them across > submission. > > > > > > I'm not familiar with that component though. I prefer to keep > this > > > > > > idea out the scope of this FLIP if no one could help us to > figure it > > > > > > out. > > > > > > > > > > > > > How about we still keep IntermediateDataSetID independent from > > > > > > JobVertexID, > > > > > > > but just print the producing relationships in logs? I think > keeping > > > > > > > IntermediateDataSetID independent may be better considering > the cross > > > > > job > > > > > > > result usages in interactive query cases. > > > > > > I think you are right. I'll keep IntermediateDataSetID > independent > > > > > > from JobVertexID. > > > > > > > > > > > > > The new IDs will become larger with this rework. > > > > > > Yes, I also have the same concern. Benchmark is necessary, I'll > try to > > > > > > provide one during the implementation phase. > > > > > > > > > > > > > > > > > > Best, > > > > > > Yangze Guo > > > > > > > > > > > > On Tue, Mar 31, 2020 at 4:55 PM Zhu Zhu <reed...@gmail.com> > wrote: > > > > > > > > > > > > > > Thanks for proposing this improvement Yangze. Big +1 for the > overall > > > > > > > proposal. It can help a lot in troubleshooting. > > > > > > > > > > > > > > Here are a few questions for it: > > > > > > > 1. Shall we make JobVertexID a composition of JobID and a > topology > > > > > index? > > > > > > > This would help in the session cluster case, so that we can > identify > > > > > > which > > > > > > > tasks are from which jobs along with the rework of > ExecutionAttemptID. > > > > > > > > > > > > > > 2. You mentioned that "Add the producer info to the string > literal of > > > > > > > IntermediateDataSetID". Do you mean to make > IntermediateDataSetID a > > > > > > > composition of JobVertexID and a consumer index? > > > > > > > How about we still keep IntermediateDataSetID independent from > > > > > > JobVertexID, > > > > > > > but just print the producing relationships in logs? I think > keeping > > > > > > > IntermediateDataSetID independent may be better considering > the cross > > > > > job > > > > > > > result usages in interactive query cases. > > > > > > > > > > > > > > 3. The new IDs will become larger with this rework. The > > > > > > > TaskDeploymentDescriptor can become much larger since it is > mainly > > > > > > composed > > > > > > > of a variety DIs. I'm not sure how much it would be but there > can be > > > > > more > > > > > > > memory and CPU cost for it, and results in more frequent GCs, > message > > > > > > size > > > > > > > exceeding akka frame limits, and a longer blocked time of main > thread. > > > > > > > This should not be a problem in most cases but might be a > problem for > > > > > > large > > > > > > > scale jobs. Shall we have an benchmark for it? > > > > > > > > > > > > > > Thanks, > > > > > > > Zhu Zhu > > > > > > > > > > > > > > Yangze Guo <karma...@gmail.com> 于2020年3月31日周二 下午2:19写道: > > > > > > > > > > > > > > > Thank you all for the feedback! Sorry for the belated reply. > > > > > > > > > > > > > > > > @Till > > > > > > > > I'm +1 for your two ideas and I'd like to move these two out > of the > > > > > > > > scope of this FLIP since the pipelined region scheduling is > an > > > > > ongoing > > > > > > > > work now. > > > > > > > > I also agree that we should not make the InstanceID in > > > > > > > > TaskExecutorConnection being composed of the ResourceID plus > a > > > > > > > > monotonically increasing value. Thanks a lot for your > explanation. > > > > > > > > > > > > > > > > @Konstantin @Yang > > > > > > > > Regarding the PodName of TaskExecutor on K8s, I second Yang's > > > > > > > > suggestion. It makes sense to me to let user export > RESOURCE_ID and > > > > > > > > make TM respect it. User needs to guarantee there is no > collision for > > > > > > > > different TM. > > > > > > > > > > > > > > > > Best, > > > > > > > > Yangze Guo > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Mar 31, 2020 at 12:25 AM Steven Wu < > stevenz...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > > > > > > +1 on allowing user defined resourceId for taskmanager > > > > > > > > > > > > > > > > > > On Sun, Mar 29, 2020 at 7:24 PM Yang Wang < > danrtsey...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hi Konstantin, > > > > > > > > > > > > > > > > > > > > I think it is a good idea. Currently, our users also > report a > > > > > > similar > > > > > > > > issue > > > > > > > > > > with > > > > > > > > > > resourceId of standalone cluster. When we start a > standalone > > > > > > cluster > > > > > > > > now, > > > > > > > > > > the `TaskManagerRunner` always generates a uuid for the > > > > > > resourceId. It > > > > > > > > will > > > > > > > > > > be used to register to the jobmanager and not convenient > to match > > > > > > with > > > > > > > > the > > > > > > > > > > real > > > > > > > > > > taskmanager, especially in container environment. > > > > > > > > > > > > > > > > > > > > I think a probably solution is we could support the user > defined > > > > > > > > > > resourceId. > > > > > > > > > > We could get it from the environment. For standalone on > K8s, we > > > > > > could > > > > > > > > set > > > > > > > > > > the "RESOURCE_ID" env to the pod name so that it is > easier to > > > > > > match the > > > > > > > > > > taskmanager with K8s pod. > > > > > > > > > > > > > > > > > > > > Moreover, i am afraid we could not set the pod name to > the > > > > > > resourceId. > > > > > > > > I > > > > > > > > > > think > > > > > > > > > > you could set the "deployment.meta.name". Since the pod > name is > > > > > > > > generated > > > > > > > > > > by > > > > > > > > > > K8s in the pattern > {deployment.meta.nane}-{rc.uuid}-{uuid}. On > > > > > the > > > > > > > > > > contrary, we > > > > > > > > > > will set the resourceId to the pod name. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > Yang > > > > > > > > > > > > > > > > > > > > Konstantin Knauf <konstan...@ververica.com> > 于2020年3月29日周日 > > > > > > 下午8:06写道: > > > > > > > > > > > > > > > > > > > > > Hi Yangze, Hi Till, > > > > > > > > > > > > > > > > > > > > > > thanks you for working on this topic. I believe it > will make > > > > > > > > debugging > > > > > > > > > > > large Apache Flink deployments much more feasible. > > > > > > > > > > > > > > > > > > > > > > I was wondering whether it would make sense to allow > the user > > > > > to > > > > > > > > specify > > > > > > > > > > > the Resource ID in standalone setups? For example, > many users > > > > > > still > > > > > > > > > > > implicitly use standalone clusters on Kubernetes (the > native > > > > > > support > > > > > > > > is > > > > > > > > > > > still experimental) and in these cases it would be > interesting > > > > > to > > > > > > > > also > > > > > > > > > > set > > > > > > > > > > > the PodName as the ResourceID. What do you think? > > > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > > > > > > > > > > > > Kosntantin > > > > > > > > > > > > > > > > > > > > > > On Thu, Mar 26, 2020 at 6:49 PM Till Rohrmann < > > > > > > trohrm...@apache.org> > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > Hi Yangze, > > > > > > > > > > > > > > > > > > > > > > > > thanks for creating this FLIP. I think it is a very > good > > > > > > > > improvement > > > > > > > > > > > > helping our users and ourselves understanding better > what's > > > > > > going > > > > > > > > on in > > > > > > > > > > > > Flink. > > > > > > > > > > > > > > > > > > > > > > > > Creating the ResourceIDs with host information/pod > name is a > > > > > > good > > > > > > > > idea. > > > > > > > > > > > > > > > > > > > > > > > > Also deriving ExecutionGraph IDs from their superset > ID is a > > > > > > good > > > > > > > > idea. > > > > > > > > > > > > > > > > > > > > > > > > The InstanceID is used for fencing purposes. I would > not make > > > > > > it a > > > > > > > > > > > > composition of the ResourceID + a monotonically > increasing > > > > > > number. > > > > > > > > The > > > > > > > > > > > > problem is that in case of a RM failure the > InstanceIDs would > > > > > > start > > > > > > > > > > from > > > > > > > > > > > 0 > > > > > > > > > > > > again and this could lead to collisions. > > > > > > > > > > > > > > > > > > > > > > > > Logging more information on how the different > runtime IDs are > > > > > > > > > > correlated > > > > > > > > > > > is > > > > > > > > > > > > also a good idea. > > > > > > > > > > > > > > > > > > > > > > > > Two other ideas for simplifying the ids are the > following: > > > > > > > > > > > > > > > > > > > > > > > > * The SlotRequestID was introduced because the > SlotPool was a > > > > > > > > separate > > > > > > > > > > > > RpcEndpoint a while ago. With this no longer being > the case I > > > > > > > > think we > > > > > > > > > > > > could remove the SlotRequestID and replace it with > the > > > > > > > > AllocationID. > > > > > > > > > > > > * Instead of creating new SlotRequestIDs for multi > task slots > > > > > > one > > > > > > > > could > > > > > > > > > > > > derive them from the SlotRequestID used for > requesting the > > > > > > > > underlying > > > > > > > > > > > > AllocatedSlot. > > > > > > > > > > > > > > > > > > > > > > > > Given that the slot sharing logic will most likely be > > > > > reworked > > > > > > > > with the > > > > > > > > > > > > pipelined region scheduling, we might be able to > resolve > > > > > these > > > > > > two > > > > > > > > > > points > > > > > > > > > > > > as part of the pipelined region scheduling effort. > > > > > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > > Till > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Mar 26, 2020 at 10:51 AM Yangze Guo < > > > > > > karma...@gmail.com> > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > Hi everyone, > > > > > > > > > > > > > > > > > > > > > > > > > > We would like to start a discussion thread on > "FLIP-118: > > > > > > Improve > > > > > > > > > > > > > Flink’s ID system"[1]. > > > > > > > > > > > > > > > > > > > > > > > > > > This FLIP mainly discusses the following issues, > target to > > > > > > > > enhance > > > > > > > > > > the > > > > > > > > > > > > > readability of IDs in log and help user to debug > in case of > > > > > > > > failures: > > > > > > > > > > > > > > > > > > > > > > > > > > - Enhance the readability of the string literals > of IDs. > > > > > > Most of > > > > > > > > them > > > > > > > > > > > > > are hashcodes, e.g. ExecutionAttemptID, which do > not > > > > > provide > > > > > > much > > > > > > > > > > > > > meaningful information and are hard to recognize > and > > > > > compare > > > > > > for > > > > > > > > > > > > > users. > > > > > > > > > > > > > - Log the ID’s lineage information to make > debugging more > > > > > > > > convenient. > > > > > > > > > > > > > Currently, the log fails to always show the lineage > > > > > > information > > > > > > > > > > > > > between IDs. Finding out relationships between > entities > > > > > > > > identified by > > > > > > > > > > > > > given IDs is a common demand, e.g., slot of which > > > > > > AllocationID is > > > > > > > > > > > > > assigned to satisfy slot request of with > SlotRequestID. > > > > > > Absence > > > > > > > > of > > > > > > > > > > > > > such lineage information, it’s impossible to track > the end > > > > > > to end > > > > > > > > > > > > > lifecycle of an Execution or a Task now, which > makes > > > > > > debugging > > > > > > > > > > > > > difficult. > > > > > > > > > > > > > > > > > > > > > > > > > > Key changes proposed in the FLIP are as follows: > > > > > > > > > > > > > > > > > > > > > > > > > > - Add location information to distributed > components > > > > > > > > > > > > > - Add topology information to graph components > > > > > > > > > > > > > - Log the ID’s lineage information > > > > > > > > > > > > > - Expose the identifier of distributing component > to user > > > > > > > > > > > > > > > > > > > > > > > > > > Please find more details in the FLIP wiki document > [1]. > > > > > > Looking > > > > > > > > > > forward > > > > > > > > > > > > to > > > > > > > > > > > > > your feedbacks. > > > > > > > > > > > > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148643521 > > > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > Yangze Guo > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > > > > > > > > > Konstantin Knauf | Head of Product > > > > > > > > > > > > > > > > > > > > > > +49 160 91394525 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Follow us @VervericaData Ververica < > https://www.ververica.com/ > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > > > > > > > > > Join Flink Forward <https://flink-forward.org/> - The > Apache > > > > > > Flink > > > > > > > > > > > Conference > > > > > > > > > > > > > > > > > > > > > > Stream Processing | Event Driven | Real Time > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > > > > > > > > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, > Germany > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > Ververica GmbH > > > > > > > > > > > Registered at Amtsgericht Charlottenburg: HRB 158244 B > > > > > > > > > > > Managing Directors: Timothy Alexander Steinert, Yip > Park Tung > > > > > > Jason, > > > > > > > > Ji > > > > > > > > > > > (Tony) Cheng > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >