Thanks Damian for taking the time to point out my stupidity - that was it :-) Case of Occams Razor!! Thanks again
On Wed, 2 May 2018 at 16:56 Conrad Crampton <conrad.cramp...@gmail.com> wrote: > Looks like I may be victim of that copy and paste gremlin!! > Without checking I think you might be onto something - I'll check it out > later and repost back. > On one hand I really hope it's this, on the other I can't believe I've > been so stupid! > Thanks > > > On Wed, 2 May 2018 at 16:30 Damian Guy <damian....@gmail.com> wrote: > >> Hi, >> >> I think it **might** be related to this: >> final Serializer<HttpSession> httpSessionSerializer = new >> JsonPOJOSerializer<>(); >> serdeProps.put("JsonPOJOClass", Http.class); >> httpSessionSerializer.configure(serdeProps, false); >> >> final Deserializer<HttpSession> httpSessionDeserializer = new >> JsonPOJODeserializer<>(); >> serdeProps.put("JsonPOJOClass", Http.class); >> httpSessionDeserializer.configure(serdeProps, false); >> >> Shouldn't the class be HttpSession.class ? >> >> On Wed, 2 May 2018 at 16:12 Conrad Crampton <conrad.cramp...@gmail.com> >> wrote: >> >>> I'm trying to window over http logs and create an HttpSession i.e. a list >>> of http requests (and some other properties). However when in my >>> aggregate >>> Merger part (I think) I'm getting a classcastexception I think in when my >>> sessions are being merged and cannot for the life of me work out why. >>> The exception is at the bottom and I think the relevant code is here. >>> Can anyone give a suggestion as to why Http is trying to be cast to >>> HttpSession? >>> Thanks >>> >>> >>> final Serializer<Http> httpSerializer = new JsonPOJOSerializer<>(); >>> serdeProps.put("JsonPOJOClass", Http.class); >>> httpSerializer.configure(serdeProps, false); >>> >>> final Deserializer<Http> httpDeserializer = new >>> JsonPOJODeserializer<>(); >>> serdeProps.put("JsonPOJOClass", Http.class); >>> httpDeserializer.configure(serdeProps, false); >>> >>> final Serde<Http> httpSerde = Serdes.serdeFrom(httpSerializer, >>> httpDeserializer); >>> >>> final Serializer<HttpSession> httpSessionSerializer = new >>> JsonPOJOSerializer<>(); >>> serdeProps.put("JsonPOJOClass", Http.class); >>> httpSessionSerializer.configure(serdeProps, false); >>> >>> final Deserializer<HttpSession> httpSessionDeserializer = new >>> JsonPOJODeserializer<>(); >>> serdeProps.put("JsonPOJOClass", Http.class); >>> httpSessionDeserializer.configure(serdeProps, false); >>> >>> final Serde<HttpSession> httpSessionSerde = >>> Serdes.serdeFrom(httpSessionSerializer, httpSessionDeserializer); >>> >>> StreamsBuilder builder = new StreamsBuilder(); >>> >>> KStream<String, HttpSession> httpStream = null; >>> try { >>> httpStream = builder.stream( >>> config.getString(ConfigConstants.HTTP_TOPIC_KEY), >>> Consumed.with(Serdes.String(), httpSerde)) >>> .selectKey((s, http) -> http.getClient() + >>> http.getSourceIp() + http.getUseragent()) >>> .groupByKey(Serialized.with(Serdes.String(), >>> httpSerde)) >>> // window by session >>> >>> .windowedBy(SessionWindows.with(TimeUnit.MINUTES.toMillis(10))) >>> .aggregate( >>> new Initializer<HttpSession>() { >>> @Override >>> public HttpSession apply() { >>> return new HttpSession(); >>> } >>> }, >>> new Aggregator<String, Http, HttpSession>() { >>> @Override >>> public HttpSession apply(String s, Http >>> http, HttpSession session) { >>> return session.addRequest(http); >>> } >>> }, >>> new Merger<String, HttpSession>() { >>> @Override >>> public HttpSession apply(String s, >>> HttpSession session, HttpSession v1) >>> log.debug("merging key {}, session >>> {} >>> with other {}", s, session, v1); >>> return session.merge(v1);} >>> }, >>> Materialized.<String, HttpSession, >>> SessionStore<Bytes, >>> byte[]>>as(config.getString(StreamsConfig.APPLICATION_ID_CONFIG) + >>> >>> "-session-store").withKeySerde(Serdes.String()).withValueSerde(httpSessionSerde) >>> ).toStream((stringWindowed, session) -> >>> (stringWindowed.key())); >>> } catch (Exception e) { >>> e.printStackTrace(); >>> } >>> >>> httpStream >>> .filter((key, message) -> message != null) >>> .filter((key, message) -> message.getClient() != null) >>> .filter((key, message) -> >>> httpClients.stream().anyMatch(message.getClient()::equals)) >>> .foreach((key, message) -> { >>> log.info("message {}", message); >>> }); >>> >>> final KafkaStreams streams = new KafkaStreams(builder.build(), >>> props); >>> streams.start(); >>> >>> java.lang.ClassCastException: com.secdata.gi.graph.model.Http cannot be >>> cast to com.secdata.gi.graph.model.HttpSession >>> at com.secdata.gi.graph.Process$$Lambda$45/1474607212.apply(Unknown >>> Source) >>> at >>> >>> org.apache.kafka.streams.kstream.internals.KStreamImpl$2.apply(KStreamImpl.java:157) >>> at >>> >>> org.apache.kafka.streams.kstream.internals.KStreamImpl$2.apply(KStreamImpl.java:154) >>> at >>> >>> org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41) >>> at >>> >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46) >>> at >>> >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208) >>> at >>> >>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124) >>> at >>> >>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85) >>> at >>> >>> org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41) >>> at >>> >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46) >>> at >>> >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208) >>> at >>> >>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124) >>> at >>> >>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85) >>> at >>> >>> org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:42) >>> at >>> >>> org.apache.kafka.streams.state.internals.CachingSessionStore.putAndMaybeForward(CachingSessionStore.java:176) >>> at >>> >>> org.apache.kafka.streams.state.internals.CachingSessionStore.access$000(CachingSessionStore.java:38) >>> at >>> >>> org.apache.kafka.streams.state.internals.CachingSessionStore$1.apply(CachingSessionStore.java:88) >>> at >>> >>> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141) >>> at >>> >>> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99) >>> at >>> >>> org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:127) >>> at >>> >>> org.apache.kafka.streams.state.internals.CachingSessionStore.flush(CachingSessionStore.java:196) >>> at >>> >>> org.apache.kafka.streams.state.internals.CachingSessionStore.close(CachingSessionStore.java:201) >>> at >>> >>> org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractStateStore.close(WrappedStateStore.java:89) >>> at >>> >>> org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:275) >>> at >>> >>> org.apache.kafka.streams.processor.internals.AbstractTask.closeStateManager(AbstractTask.java:238) >>> at >>> >>> org.apache.kafka.streams.processor.internals.StreamTask.closeSuspended(StreamTask.java:450) >>> at >>> >>> org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:532) >>> at >>> >>> org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:500) >>> at >>> >>> org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:493) >>> at >>> >>> org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:212) >>> at >>> >>> org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1213) >>> at >>> >>> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:755) >>> >>