OK, I think we've figured out what is happening. SamzaContainer.apply() builds "serdeStreams calling StreamConfig.getSerdeStreams which uses StreamConfig.streamIdToSystemStream. In StreamConfig.streamIdToSystemStream the "stream" property of SystemStream is being set using getPhysicalName(). This makes sense after we thought about it but the doc on SystemStream constructor says "@param stream The name of the stream as specified in the stream configuration file." which we read as being the logical name of the stream as specified in the configuration file :)
This is the first time our stream IDs and physical names have not aligned (due to the validation in class StreamDescriptor using STREAM_ID_PATTERN which doesn't allow period/dot.) Once we changed our SystemStream instances to use the physical name for the "stream" everything started working. Thanks! Thunder On Wed, Jun 5, 2019 at 11:20 AM Thunder Stumpges <thunder.stump...@gmail.com> wrote: > Thanks, I think this all makes sense. I was able to get my application up > and running with the exception of the Output SystemStream not finding its > Serde. > > For a little more background, here's how we are set up: > > We are moving from YARN to standalone (running in Kubernetes). I think we > have this part under control. > > Previously, we had a config rewriter that would iterate over all of the > topics in the config "*task.inputs*" and "*task.outputs.**" config > settings and create serializers for each stream. > One issue we ran into was that the descriptor setup in code doesn't like > our old StreamId(s) that have dots in the name, so we went through some > work to use logical names for our stream IDs, and set the topic name using > the "withPhysicalName" method. We figured that with the outputs all being > wired up in code, neither of these should matter, and we expected the Serde > we specified when registering the OutputDescriptor to be found and used > when sending messages using that logical StreamId. > > However what we see while debugging is that when sending a message, the > logical StreamId we registered with the OutputDescriptor cannot be found in > SystemProducer.send() which calls serdeManager.toBytes(...) and in there, > it cannot locate the serde using the logical StreamId. What is strange is > that there ARE Serde(s) in the "systemStreamMessageSerdes" map set up for > StreamId(s) that use the physical names of our topics (including the dots) > but NOT the logical names. > > We seem to be stuck a little right now, and unsure why this new > restriction on the use of dot in StreamIDs from the Descriptor side, but > not seemingly aligning in the rest of the system. > > Any ideas or support would be great. We'll continue to dig as well. > Thanks, > -Thunder > > On Mon, Jun 3, 2019 at 10:57 AM Prateek Maheshwari <prateek...@gmail.com> > wrote: > >> Hi Thunder, >> >> I'm assuming you're talking about the low level (StreamTask) API here, >> since the High Level API has stronger requirements for I/O >> systems/streams. >> >> > How much IS picked up from config. >> All of the system, stream and store properties can still be specified >> in configuration. Properties specified in config will override those >> specified using descriptors (with a couple of exceptions like >> task.inputs). >> >> > I don't see how to register [custom coordinator] system via the >> ApplicationDescriptor >> Re: dedicated coordinator system, you can continue to specify >> 'job.coordinator.system' and it's properties in configs. To keep the >> API simple, we only support specifying the job.default.system (which >> is the default system for intermediate, coordinator, changelog and >> checkpoint streams) using descriptors for now. >> >> > It seems that a KafkaSystem is only associated with the >> ApplicationDescriptor via its Input/Output/Table descriptors. >> Yeah, the ApplicationDescriptor is only aware of system descriptors >> transitively through the input / output streams or the default system. >> However see the response above for adding systems via configs. >> >> > we have dynamic output SystemStream(s) created based on other runtime >> state >> This will still work in Low Level API. It is recommended to, but >> there's no hard requirement to pre-specify your output systems and >> streams. >> >> In general, when migrating your Low Level TaskApplication to Samza >> 1.0, you should be able to do >> 'applicationDescriptor.withTaskFactory(() -> new MyTask)' in your >> TaskApplication#describe with no other code changes. Please give that >> a shot and let us know if you run into any issues. >> >> Apologies for the confusion, we'll update the upgrade docs. >> >> Thanks, >> Prateek >> >> On Sat, Jun 1, 2019 at 11:13 AM Thunder Stumpges >> <thunder.stump...@gmail.com> wrote: >> > >> > Hey guys, >> > >> > I'm following the guide here: >> > http://samza.apache.org/releases/1.0.0 >> > >> > In step 3 it says: >> > "In Samza 1.0, a Samza application’s input, output, and processing-task >> > should be specified in code, rather than in config. " >> > >> > How much IS picked up from config? Will all the configuration of the >> > systems (consumer and producer properties, buffering, etc) be picked up >> > from the config properties still? What about stream settings like offset >> > reset, offset default, etc? >> > >> > In some of my tasks, I have a dedicated coordinator system. I don't see >> how >> > to register that system via the ApplicationDescriptor, nor how to >> associate >> > it with the coordinator (config setting `*job.coordinator.system*`). It >> > seems that a KafkaSystem is only associated with the >> ApplicationDescriptor >> > via its Input/Output/Table descriptors. Is this correct? >> > >> > I would like to keep my config in config, not in code, but it feels like >> > this is forcing me to move some (or all?) of it into code. I had custom >> > config re-writers which made this very flexible, but I'm not seeing how >> to >> > adapt this to the "new way". The Application/ApplicationDescriptor >> seems to >> > have no connection to the Configuration / properties... >> > >> > One other thing, is that in a few of my jobs, we have dynamic output >> > SystemStream(s) created based on other runtime state. Is this not going >> to >> > be possible anymore? >> > >> > A little more guidance would be most helpful. >> > >> > Thanks! >> > Thunder Stumpges >> >