KSQL UDFs with structs as parameters, structs as output

2020-03-02 Thread Federico D'Ambrosio
Hello everyone,

is there any example of UDF which has Structs as input parameters or
outputs?

I'm currently implementing a UDF which should be something like:

Struct merge(Struct before, Struct after)

(these structs are from nested JSON objects, like {"field1": "value",
"before": {}, "after": {}})
Now, I was using KsqlStruct to enforce types and set values, but then I
found this issue (https://github.com/confluentinc/ksql/issues/3413).

So, I'm wondering if there's any example of how to use Structs in UDFs,
since I cannot seem to find any in the official documentation.

Thank you very much for your help,
Federico D'Ambrosio


Re: KSQL UDFs with structs as parameters, structs as output

2020-03-02 Thread Federico D'Ambrosio
As an additional question, I would like to ask if it's compulsory or not to
specify the schema of the input struct statically within the UDF
implementation.

To add a bit more details, my use case as a matter of fact is the following:
I have a handful of topics on which are published JSON with the following
structure (it is a CDC event):

{"rootField": "value", "timestamp": 1234,
"before": {"field1": 1, "field1_isMissing": false},
"after": {"field1": 2, "field1_isMissing": false}
}

I want to convert these JSONs to Avro and then, for each json, output a
sort of "merged" version of the JSON: filtering out some fields, keeping
only a field between before.field1 and after.field1, removing the fields
with '_isMissing', as a flat structure like:

{"field1": 2, "timestamp": 1234}

There are a bunch of topics, each topic has a different schema, fields
"before" and "after" have different struct schemas (again, this is a CDC
event), because each topic is a different table and the UDF handling this
should be generic and dynamic enough.
Now, apart from the conversion from JSON to Avro (which I could handle with
creating streams with VALUE_FORMAT='avro' starting from the avro schemas I
will be given (then why is the message in JSON format and not Avro?
Unfortunately this doesn't depend on me :/ )), the real challenge is how to
implement the merge operation.

As already explained, ideally I would like to implement a UDF which could
be used as

CREATE STREAM output_stream AS SELECT unnest(merge(before, after)),
timestamp FROM avro_stream;

To output a flat structure.

To sum up my questions:

   1. First of all: is it even possible to do this in KSQL?

If so:

   1. I see there's the possibility to use the @schemaProvider annotation
   to add the possibility of dynamic typing, but I don't find the
   documentation particularly clear: "To use this functionality, you need
   to specify a method with signature public SqlType
   (final List params) and annotate it with
   @SchemaProvider. Also, you need to link it to the corresponding UDF by
   using the schemaProvider= parameter of the @Udf
   annotation."
   1. What is being represented by the input parameter final List,
  is it the SqlTypes of the UDF runtime inputs? So that in params.get(0)
  I will find (in my usecase) the Struct type of 'before' struct field? I
  guess the returned SqlType is the type against which is
validated the main
  udf method.
  2. How should this method even be implemented? I cannot find any
  example on this.
   2. Is it possible to unnest a Struct field? I cannot find any reference
   on this and since there would be quite a bit of different topics, I really
   don't want to specify manually an additional stream with SELECT AS
   SELECT as strct->field1 as field1, timestamp FROM output_stream

I hope it wasn't too much of a reading.

Thank you for your help,
Federico


Il giorno lun 2 mar 2020 alle ore 09:47 Federico D'Ambrosio <
fedex...@gmail.com> ha scritto:

> Hello everyone,
>
> is there any example of UDF which has Structs as input parameters or
> outputs?
>
> I'm currently implementing a UDF which should be something like:
>
> Struct merge(Struct before, Struct after)
>
> (these structs are from nested JSON objects, like {"field1": "value",
> "before": {}, "after": {}})
> Now, I was using KsqlStruct to enforce types and set values, but then I
> found this issue (https://github.com/confluentinc/ksql/issues/3413).
>
> So, I'm wondering if there's any example of how to use Structs in UDFs,
> since I cannot seem to find any in the official documentation.
>
> Thank you very much for your help,
> Federico D'Ambrosio
>


-- 
Federico D'Ambrosio


Re: MM2 for DR

2020-03-02 Thread Péter Sinóros-Szabó
Hi Ryanne,

> I frequently demo this stuff, where I pull the plug on entire DCs and
apps keep running like nothing happened.
Is there any public recording, documentation about these demos?
I would be very useful to see how it works.

Thanks,
Peter

On Thu, 13 Feb 2020 at 00:42, Ryanne Dolan  wrote:

> > elaborate a bit more about the active-active
>
> Active/active in this context just means that both (or multiple)
> clusters are used under normal operation, not just during an outage.
> For this to work, you basically have isolated instances of your application
> stack running in each DC, with MM2 keeping each DC in sync. If one DC is
> unavailable, traffic is shifted to another DC. It's possible to set this up
> s.t. failover/failback between DCs happens automatically and seamlessly,
> e.g. with load balancers and health checks. It's more complicated to set up
> than the active/standby approach, but DR sorta takes care of itself from
> then on. I frequently demo this stuff, where I pull the plug on entire DCs
> and apps keep running like nothing happened.
>
> On Wed, Feb 12, 2020 at 12:05 AM benitocm  wrote:
>
> > Hi Ryanne,
> >
> > Please could you elaborate a bit more about the active-active
> > recommendation?
> >
> > Thanks in advance
> >
> > On Mon, Feb 10, 2020 at 10:21 PM benitocm  wrote:
> >
> > > Thanks very much for the response.
> > >
> > > Please could you elaborate a bit more about  "I'd
> > > arc in that direction. Instead of migrating A->B->C->D...,
> active/active
> > is
> > > more like having one big cluster".
> > >
> > > Another thing that I would like to share is that currently my consumers
> > > only consumer from one topic so the fact of introducing MM2 will impact
> > > them.
> > > Any suggestion in this regard would be greatly appreciated
> > >
> > > Thanks in advance again!
> > >
> > >
> > > On Mon, Feb 10, 2020 at 9:40 PM Ryanne Dolan 
> > > wrote:
> > >
> > >> Hello, sounds like you have this all figured out actually. A couple
> > notes:
> > >>
> > >> > For now, we just need to handle DR requirements, i.e., we would not
> > need
> > >> active-active
> > >>
> > >> If your infrastructure is sufficiently advanced, active/active can be
> a
> > >> lot
> > >> easier to manage than active/standby. If you are starting from scratch
> > I'd
> > >> arc in that direction. Instead of migrating A->B->C->D...,
> active/active
> > >> is
> > >> more like having one big cluster.
> > >>
> > >> > secondary.primary.topic1
> > >>
> > >> I'd recommend using regex subscriptions where possible, so that apps
> > don't
> > >> need to worry about these potentially complex topic names.
> > >>
> > >> > An additional question. If the topic is compacted, i.e.., the topic
> > >> keeps
> > >> > forever, does switchover operations would imply add an additional
> path
> > >> in
> > >> > the topic name?
> > >>
> > >> I think that's right. You could always clean things up manually, but
> > >> migrating between clusters a bunch of times would leave a trail of
> > >> replication hops.
> > >>
> > >> Also, you might look into implementing a custom ReplicationPolicy. For
> > >> example, you could squash "secondary.primary.topic1" into something
> > >> shorter
> > >> if you like.
> > >>
> > >> Ryanne
> > >>
> > >> On Mon, Feb 10, 2020 at 1:24 PM benitocm  wrote:
> > >>
> > >> > Hi,
> > >> >
> > >> > After having a look to the talk
> > >> >
> > >> >
> > >>
> >
> https://www.confluent.io/kafka-summit-lon19/disaster-recovery-with-mirrormaker-2-0
> > >> > and the
> > >> >
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0#KIP-382
> > >> > I am trying to understand how I would use it
> > >> > in the setup that I have. For now, we just need to handle DR
> > >> requirements,
> > >> > i.e., we would not need active-active
> > >> >
> > >> > My requirements, more or less, are the following:
> > >> >
> > >> > 1) Currently, we have just one Kafka cluster "primary" where all the
> > >> > producers are producing to and where all the consumers are consuming
> > >> from.
> > >> > 2) In case "primary" crashes, we would need to have other Kafka
> > cluster
> > >> > "secondary" where we will move all the producer and consumers and
> keep
> > >> > working.
> > >> > 3) Once "primary" is recovered, we would need to move to it again
> (as
> > we
> > >> > were in #1)
> > >> >
> > >> > To fullfill #2, I have thought to have a new Kafka cluster
> "secondary"
> > >> and
> > >> > setup a replication procedure using MM2. However, it is not clear to
> > me
> > >> how
> > >> > to proceed.
> > >> >
> > >> > I would describe the high level details so you guys can point my
> > >> > misconceptions:
> > >> >
> > >> > A) Initial situation. As in the example of the KIP-382, in the
> primary
> > >> > cluster, we will have a local topic: "topic1" where the producers
> will
> > >> > produce to and the consumers will consume from. MM2 will create in
> > the
> > >> > primary the remote topic "primary.topic1" where the

Re: KafkaStreams GroupBy with new key. Can I skip repartition?

2020-03-02 Thread John Roesler
Hi, all,

Sorry for the confusion. I didn’t look too closely at it, I was just going by 
the fact that it was listed under the scope of KIP-221.

I agree that the final design of the KIP doesn’t have too much to do with the 
description of KAFKA-4835. Maybe we should remove that ticket from the KIP, and 
also give it a more specific name. 

I’ll ask in the ticket if Levani is also actively working on it, or if he was 
just planning on KIP-221. 

Thanks,
John

On Sun, Mar 1, 2020, at 20:13, Murilo Tavares wrote:
> I agree with Mathias. Can’t see how this KIP/PR helps with the problem
> described in the KAFKA-4835...
> 
> On Sun, Mar 1, 2020 at 2:16 PM Matthias J. Sax  wrote:
> 
> > -BEGIN PGP SIGNED MESSAGE-
> > Hash: SHA512
> >
> > I don't think that KIP-221 addressed the discussed use case.
> >
> > KIP-221 allows to force a repartitioning manually, while the use case
> > describe in the original email was to suppress/skip a repartitioning ste
> > p.
> >
> > The issue to avoid unnecessary repartitioning came up a few time
> > already and I personally believe it's worth to close this gap. But we
> > would need to do a KIP to introduce some API to allow user to tell
> > Kafka Streams that repartitioning is not necessary.
> >
> > In Apache Flink, there is an operator called
> > `reinterpretAsKeyedStream`. We could introduce something similar.
> >
> > - -Matthias
> >
> >
> > On 3/1/20 4:43 AM, John Roesler wrote:
> > > Hi all,
> > >
> > > The KIP is accepted and implemented already, but is blocked on
> > > code review: https://github.com/apache/kafka/pull/7170
> > >
> > > A quick note on the lack of recent progress... It's completely our
> > > fault, the reviews fell by the wayside during the 2.5.0 release
> > > cycle, and we haven't gotten back to it. The contributor, Levani,
> > > has been exceptionally patient with us and continually kept the PR
> > > up-to-date and mergeable since then.
> > >
> > > If you'd like to help get it across the line, Murilo, maybe you can
> > > give it a review?
> > >
> > > Thanks, John
> > >
> > > On Sat, Feb 29, 2020, at 20:52, Guozhang Wang wrote:
> > >> It is in progress, but I was not the main reviewer of that ticket
> > >> so I cannot say for sure. I saw the last update is on Jan/2019 so
> > >> maybe it's a bit loose now.. If you want to pick it up and revive
> > >> the KIP completion feel free to do so :)
> > >>
> > >>
> > >> Guozhang
> > >>
> > >>
> > >> On Fri, Feb 28, 2020 at 5:54 PM Murilo Tavares
> > >>  wrote:
> > >>
> > >>> Guozhang The ticket definitely describes what I’m trying to
> > >>> achieve. And should I be hopeful with the fact it’s in
> > >>> progress? :) Thanks for pointing that out. Murilo
> > >>>
> > >>> On Fri, Feb 28, 2020 at 2:57 PM Guozhang Wang
> > >>>  wrote:
> > >>>
> >  Hi Murilo,
> > 
> >  Would this be helping your case?
> >  https://issues.apache.org/jira/browse/KAFKA-4835
> > 
> > 
> >  Guozhang
> > 
> >  On Fri, Feb 28, 2020 at 7:01 AM Murilo Tavares
> >   wrote:
> > 
> > > Hi I am currently doing a simple KTable
> > > groupby().aggregate() in
> >  KafkaStreams.
> > > In the groupBy I do need to select a new key, but I know
> > > for sure that
> >  the
> > > new key would still fall in the same partition. Because of
> > > this, I
> >  believe
> > > the repartition would not be necessary, but my question is:
> > > is it
> >  possible
> > > to do a groupBy, changing the key, and tell KafkaStreams to
> > > not create
> >  the
> > > repartition topic? Thanks Murilo
> > >
> > 
> > 
> >  -- -- Guozhang
> > 
> > >>>
> > >>
> > >>
> > >> -- -- Guozhang
> > >>
> > -BEGIN PGP SIGNATURE-
> >
> > iQIzBAEBCgAdFiEEI8mthP+5zxXZZdDSO4miYXKq/OgFAl5cCfQACgkQO4miYXKq
> > /Og0fhAApNlB1LodYwne6x5Fqe5CSveY0c2bmBArDCgmd1BvAstf85ooR9ht05+c
> > 8e1sq/3iLcVaolLDXITK0ptfLB6ZkJRs/sUh4N1ebMNEMtJabAepJ/Y/eEmHJiYX
> > wZ8NcyAZC6QQzFEWavyllMGUVyBMM6ZwFk3/ahwWruCQovWcpxKeItgWqI5thR0B
> > FIVAE6k9qDOfZiu3Qd5Atshfov3PpfG1ezpj4LKqlKfgWhsU+P9U8kfAJVsrgc0i
> > qIPeya1o6hyyAzHnH09EMfNqcRpuJQvYwANq6Br/k+nH4WQQjxXvgE6n8scGJ0TH
> > alAnMmm62UNd88lSltNuF+vf73/omdymJkwMO4sTGK9tC8W5p2OzrIaxfAa8reWU
> > sblSEnH1gHvmIeIzKbb5diqIvwAPNjPMt0FcCJLWUiqjTz1KUHKj/hbAR3AUYxaO
> > PZavruFgQm6jTkuZkWRHW0+5/TytTnR4Ca/KBALQcLcolwMkhYZ5hFIeMW8qWGtR
> > JZHMLEW4doQ66gnWBSaTOSv5LhGOEjp2xQEGoAgO5m8IVfpfwO7Vk6XLa2xjnTN8
> > Z2fUQKIJNxjHgbjOCYZmSnVfpf3egEGmHlbKgaxOOcpnVFee/NOZ5aQxy6MpJfN9
> > 3KvH4yfUNgSEB/b97/W/VdNeJl8dTa11Pd36mMQraUAxcrGcOFA=
> > =DaB8
> > -END PGP SIGNATURE-
> >
>


Subject: [VOTE] 2.4.1 RC0

2020-03-02 Thread Bill Bejeck
Hello Kafka users, developers and client-developers,

This is the first candidate for release of Apache Kafka 2.4.1.

This is a bug fix release and it includes fixes and improvements from 38
JIRAs, including a few critical bugs.

Release notes for the 2.4.1 release:
https://home.apache.org/~bbejeck/kafka-2.4.1-rc0/RELEASE_NOTES.html

*Please download, test and vote by Thursday, March 5, 9 am PT*

Kafka's KEYS file containing PGP keys we use to sign the release:
https://kafka.apache.org/KEYS

* Release artifacts to be voted upon (source and binary):
https://home.apache.org/~bbejeck/kafka-2.4.1-rc0/

* Maven artifacts to be voted upon:
https://repository.apache.org/content/groups/staging/org/apache/kafka/

* Javadoc:
https://home.apache.org/~bbejeck/kafka-2.4.1-rc0/javadoc/

* Tag to be voted upon (off 2.4 branch) is the 2.4.1 tag:
https://github.com/apache/kafka/releases/tag/2.4.1-rc0

* Documentation:
https://kafka.apache.org/24/documentation.html

* Protocol:
https://kafka.apache.org/24/protocol.html

* Successful Jenkins builds for the 2.4 branch:
Unit/integration tests: Links to successful unit/integration test build to
follow
System tests:
https://jenkins.confluent.io/job/system-test-kafka/job/2.4/152/


Thanks,
Bill Bejeck


what happened in case of single disk failure

2020-03-02 Thread 张祥
Hi community,

I ran into disk failure when using Kafka, and fortunately it did not crash
the entire cluster. So I am wondering how Kafka handles multiple disks and
it manages to work in case of single disk failure. The more detailed, the
better. Thanks !


Re: what happened in case of single disk failure

2020-03-02 Thread Peter Bukowinski
Whether your brokers have a single data directory or multiple data directories 
on separate disks, when a disk fails, the topic partitions located on that disk 
become unavailable. What happens next depends on how your cluster and topics 
are configured.

If the topics on the affected broker have replicas and the minimum ISR (in-sync 
replicas) count is met, then all topic partitions will remain online and 
leaders will move to another broker. Producers and consumers will continue to 
operate as usual.

If the topics don’t have replicas or the minimum ISR count is not met, then the 
topic partitions on the failed disk will be offline. Producers can still send 
data to the affected topics — it will just go to the online partitions. 
Consumers can still consume data from the online partitions.

-- Peter

> On Mar 2, 2020, at 7:00 PM, 张祥  wrote:
> 
> Hi community,
> 
> I ran into disk failure when using Kafka, and fortunately it did not crash
> the entire cluster. So I am wondering how Kafka handles multiple disks and
> it manages to work in case of single disk failure. The more detailed, the
> better. Thanks !