Re: [DISCUSS] KIP-449: Add connector contexts to Connect worker logs
Chris is correct the examples are mixed but it's pretty easy to follow. From what I have gathered it looks like manipulation of the context would be handled by the framework and not necessarily for the connector developer. I'm not sure what benefit a connector developer would have to manipulate the connector context further. If we head down the path of allowing developers to extend the context, I would prefer the output format of %X{connector.context} to be key value. Something like "connector=myconnector task=0" The state of the corresponding pull request looks really good as is. I would be fine with merging it as is or expanding it to write the context as key value. On Mon, Apr 15, 2019 at 12:55 PM Chris Egerton wrote: > > Hi Randall, > > Thanks for the KIP. Debugging Connect workers definitely becomes harder as > the number of connectors and tasks increases, and your proposal would > simplify the process of sifting through logs and finding relevant > information faster and more accurately. > > I have a couple small comments: > > First--I believe the example snippet in your KIP under the "Public > Interfaces" header is inaccurate: > `[my-connector|worker]` - used on log messages where the Connect worker is > validating the configuration for or starting/stopping the > "local-file-source" connector via the SourceConnector / SinkConnector > implementation methods. > `[my-connector|task-0]` - used on log messages where the Connect worker is > executing task 0 of the "local-file-source" connector, including calling > any of the SourceTask / SinkTask implementation methods, processing the > messages for/from the task, and calling the task's producer/consumer. > `[my-connector|task-0|offsets]` - used on log messages where the Connect > worker is committing source offsets for task 0 of the "local-file-source" > connector. > The sample contexts mention "my-connector" but their explanations say that > they correspond to "local-file-source"; shouldn't the two align? > > Second--I'm unclear on whether we expect (or want to encourage) developers > to manipulate the "connector.context" MDC key themselves, from with > connectors, transforms, etc. If we want to encourage this (in order to make > debugging even easier, which would align with the intent behind this KIP), > we may want to expose the LoggingContext class in the Connect API package > and expand on it so that users can set the context themselves. This would > be especially helpful in connectors with multithreaded logic. However, if > that would expand the scope of this KIP too much I think we could afford to > address that later. > > Cheers, > > Chris
Re: [VOT] KIP-449: Add connector contexts to Connect worker logs (vote thread)
+1 non binding On Mon, Apr 29, 2019 at 5:34 PM Randall Hauch wrote: > > I would like to start the vote for KIP-258: > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-449%3A+Add+connector+contexts+to+Connect+worker+logs > > The KIP uses the Mapped Diagnostic Contexts (MDC) feature of SLF4J API to > add more context to log messages from within Connect workers and connector > implementations. This would not be enabled by default, though it would be > easy to enable within the Connect Log4J configuration. > > Thanks! > > Randall
Permission to create a KIP
Hello All, Could someone grant permissions to create a KIP to the user jcustenborder? Thanks!
Re: [ANNOUNCE] New committer: Konstantine Karantasis
Congrats Konstantine! On Wed, Feb 26, 2020 at 2:39 PM Bill Bejeck wrote: > > Congratulations Konstantine! Well deserved. > > -Bill > > On Wed, Feb 26, 2020 at 5:37 PM Jason Gustafson wrote: > > > The PMC for Apache Kafka has invited Konstantine Karantasis as a committer > > and we > > are pleased to announce that he has accepted! > > > > Konstantine has contributed 56 patches and helped to review even more. His > > recent work includes a major overhaul of the Connect task management system > > in order to support incremental rebalancing. In addition to code > > contributions, Konstantine helps the community in many other ways including > > talks at meetups and at Kafka Summit and answering questions on > > stackoverflow. He consistently shows good judgement in design and a careful > > attention to details when it comes to code. > > > > Thanks for all the contributions and looking forward to more! > > > > Jason, on behalf of the Apache Kafka PMC > >
Re: [ANNOUNCE] New Committer: Chris Egerton
Congrats Chris! On Mon, Jul 25, 2022 at 12:07 PM Sagar wrote: > > Congratulations Chris ! > > On Mon, 25 Jul 2022 at 10:32 PM, Viktor Somogyi-Vass > wrote: > > > Congrats Chris! > > > > On Mon, Jul 25, 2022, 18:33 Matthew Benedict de Detrich > > wrote: > > > > > Congratulations! > > > > > > -- > > > Matthew de Detrich > > > Aiven Deutschland GmbH > > > Immanuelkirchstraße 26, 10405 Berlin > > > Amtsgericht Charlottenburg, HRB 209739 B > > > > > > Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen > > > m: +491603708037 > > > w: aiven.io e: matthew.dedetr...@aiven.io > > > On 25. Jul 2022, 18:26 +0200, Mickael Maison , > > wrote: > > > > Hi all, > > > > > > > > The PMC for Apache Kafka has invited Chris Egerton as a committer, and > > > > we are excited to announce that he accepted! > > > > > > > > Chris has been contributing to Kafka since 2017. He has made over 80 > > > > commits mostly around Kafka Connect. His most notable contributions > > > > include KIP-507: Securing Internal Connect REST Endpoints and KIP-618: > > > > Exactly-Once Support for Source Connectors. > > > > > > > > He has been an active participant in discussions and reviews on the > > > > mailing lists and on Github. > > > > > > > > Thanks for all of your contributions Chris. Congratulations! > > > > > > > > -- Mickael, on behalf of the Apache Kafka PMC > > > > >
Re: [ANNOUNCE] New Kafka PMC Member: A. Sophie Blee-Goldman
Congrats Sophie! On Wed, Aug 3, 2022 at 9:25 AM David Arthur wrote: > > Well done! Congrats! > > -David > > On Wed, Aug 3, 2022 at 9:48 AM Sagar wrote: > > > Congratulations Sophie ! > > > > On Wed, 3 Aug 2022 at 6:37 PM, José Armando García Sancio > > wrote: > > > > > Congratulations Sophie! > > > -- > > > -José > > > > > > > > -- > -David
Re: [ANNOUNCE] New Kafka PMC Member: Chris Egerton
Congrats buddy! On Fri, Mar 10, 2023 at 9:28 AM Randall Hauch wrote: > > Congratulations, Chris! > > Randall > > On Fri, Mar 10, 2023 at 9:07 AM David Arthur > wrote: > > > Congrats, Chris! > > > > On Fri, Mar 10, 2023 at 8:27 AM Matthew Benedict de Detrich > > wrote: > > > > > Congrats, well deserved! > > > > > > On Thu, 9 Mar 2023, 19:12 Jun Rao, wrote: > > > > > > > Hi, Everyone, > > > > > > > > Chris Egerton has been a Kafka committer since July 2022. He has been > > > very > > > > instrumental to the community since becoming a committer. It's my > > > pleasure > > > > to announce that Chris is now a member of Kafka PMC. > > > > > > > > Congratulations Chris! > > > > > > > > Jun > > > > on behalf of Apache Kafka PMC > > > > > > > > > > > > > -- > > -David > >
Proposing KIP-713: Validation of Enums in configuration
Hello All, I am proposing an API addition for handling enums in configuration. https://cwiki.apache.org/confluence/display/KAFKA/KIP-713%3A+Validation+of+Enums+in+configuration The Kafka ecosystem utilizes configuration values that map to java enums. If there is a typo in the configuration an IllegalArgumentException exception is thrown. This signals to the user that there is a configuration problem but it doesn't actually help them fix the problem. This KIP proposes a few API changes that a developer can use to validate configuration values that map to enums, and throw helpful error messages. For example: Before: java.lang.IllegalArgumentException: No enum constant org.apache.kafka.common.security.auth.SecurityProtocol.SASL_PLAINTEXTA After: org.apache.kafka.common.config.ConfigException: Invalid value SASL_PLAINTEXTA for security.protocol. Enum value not found. Valid values are: PLAINTEXT, SASL_PLAINTEXT, SASL_SSL, SSL Please share discussions/feedback along this email thread.
Re: [ANNOUNCE] New Kafka PMC Member: Randall Hauch
Congratulations Randall! On Sat, Apr 17, 2021 at 11:51 AM Kowshik Prakasam wrote: > > Congrats Randall! > > > Cheers, > Kowshik > > On Sat, Apr 17, 2021, 5:28 AM Rankesh Kumar > wrote: > > > Congratulations, Randall! > > Best regards, > > Rankesh Kumar > > Partner Solutions Engineer > > +91 (701)913-0147 > > Follow us: Blog • Slack • Twitter • YouTube > > > > > On 17-Apr-2021, at 1:41 PM, Tom Bentley wrote: > > > > > > Congratulations Randall! > > > > > > > > > > > > On Sat, Apr 17, 2021 at 7:36 AM feyman2009 > > > > > wrote: > > > > > >> Congratulations Randall! > > >> > > >> Haoran > > >> -- > > >> 发件人:Luke Chen > > >> 发送时间:2021年4月17日(星期六) 12:05 > > >> 收件人:Kafka Users > > >> 抄 送:dev > > >> 主 题:Re: [ANNOUNCE] New Kafka PMC Member: Randall Hauch > > >> > > >> Congratulations Randall! > > >> > > >> Luke > > >> > > >> Bill Bejeck 於 2021年4月17日 週六 上午11:33 寫道: > > >> > > >>> Congratulations Randall! > > >>> > > >>> -Bill > > >>> > > >>> On Fri, Apr 16, 2021 at 11:10 PM lobo xu > > wrote: > > >>> > > Congrats Randall > > > > >>> > > >> > > >> > > > >
Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets
Hey Chris! Nice work on this KIP! What are the thoughts about letting the connector developer control the boundaries of the transaction? For example kafka-connect-spooldir is used to parse and import files to kafka. It would be amazing if I could begin and end the transaction as I open and close files. This would allow me to guard from the dreaded exception a line 732 and wrap an entire file in a transaction. It either makes it or it doesn't. J On Tue, May 18, 2021 at 10:04 AM Chris Egerton wrote: > > Hi Randall, > > Thanks for clarifying the issues with large transactions. I think we're > starting to converge on an understanding of the problem space, if not quite > on an approach to tackle it. > > I think my biggest issue with defining transaction boundaries on a > per-task-poll basis are these points that you make: > > > First, it reuses something that connector implementations > already employ, and which many connectors have configs to tune the behavior > to maximize throughput. Second, it requires no additional configuration > properties or connector-specific behavior. Third, high-throughput > connectors are likely returning large batches, so write amplification for > the extra offset record per source partition is likely to have a much > smaller impact (assuming that most high throughput connectors have high > records-per-source-partition ratios). > > I don't believe the first and third points are safe assumptions to make. > There's actually very little, if any, performance benefit to writing source > connectors whose tasks give Connect larger batches (at least, not as far as > the framework logic goes). Every record is sequentially transformed, > converted, and dispatched to the producer, regardless of whether it came > from a batch of one or one million. So Connect does have the capability > right now to support high-throughput, small-batch connectors. > > For a concrete example, Confluent's Datagen connector ( > https://github.com/confluentinc/kafka-connect-datagen), which is used to > produce sample data for quickstarts and demos, performs just fine even > though it does absolutely no batching whatsoever and returns only a single > record per call to "SourceTask::Poll". In some not-super-rigorous > performance testing, I ran the connector three times against a local build > of the 2.8.0 release of Connect, then modified the 2.8.0 release of Connect > to perform a producer flush for every task-provided batch of records, then > ran the connector three times against that. Each test run produced exactly > one million records. The worst run out of the initial cases (against the > unmodified local 2.8.0 build) took 15 seconds to complete. The best run out > of the subsequent cases (against the modified local 2.8.0 build) took 2 > minutes and 44 seconds, or 164 seconds--over a 10x slowdown. And this was > against a single local broker with no replication factor. > > Of course, if it's a matter of accommodating this one (demo-oriented) > connector, then it might still be worth it to put the onus on the > developers of that connector to modify it appropriately to work with > exactly-once support. But my point with this connector is more > general--simply put, there don't appear to be safe grounds for the > assumption that source tasks must produce large batches in order to achieve > high throughput. > > > Yes, per-connector offset commit intervals is one approach that would be > more explicit, though see my concerns earlier in this email about > controlling the number of records in a transaction by a time-based config, > even if set on a per-connector basis. > > I'll try to summarize the concerns presented; let me know if I'm missing > something: > > 1. Large transaction sizes can inflate the memory requirements for > consumers and possibly even overwhelm them with out-of-memory errors. > 2. High offset commit intervals can inflate the read latency of downstream > applications. > > Combining both of these concerns with the high-throughput-small-batch > scenario, it still seems worthwhile to provide users with a way to do some > multi-batch transactions for their source tasks. This is analogous to > consumer-side buffering; yes, by default you probably want events to be > available downstream as soon as possible, but in some cases it's still > necessary to introduce a small hit in latency in order to keep up with high > throughput. And if each batch is relatively small, then heightened memory > requirements for consumers shouldn't really be a problem. > > And for other scenarios, with per-connector offset commit intervals, users > can always just set the interval to zero to get exactly the behavior that > you're describing. > > > I'm not sure that setting up a separate Connect cluster is that practical > for many Connect users, especially in larger organizations where one group > manages the cluster and others manage connectors. > > I agree; I was mostly responding to what I perceived as the assumptio
Re: [ANNOUNCE] New Kafka PMC Member: Konstantine Karantasis
NICE work buddy! On Tue, Jun 22, 2021 at 9:33 AM Rankesh Kumar wrote: > > Yay! Congratulations, KK! > > Best regards, > Rankesh Kumar > Partner Solutions Engineer > +91 (701)913-0147 > Follow us: Blog • Slack • Twitter • YouTube > > > On 21-Jun-2021, at 8:58 PM, Mickael Maison wrote: > > > > Hi, > > > > It's my pleasure to announce that Konstantine Karantasis is now a > > member of the Kafka PMC. > > > > Konstantine has been a Kafka committer since Feb 2020. He has remained > > active in the community since becoming a committer. > > > > Congratulations Konstantine! > > > > Mickael, on behalf of the Apache Kafka PMC >
[jira] [Created] (KAFKA-6651) SchemaBuilder should not allow Arrays or Maps to be created by type()
Jeremy Custenborder created KAFKA-6651: -- Summary: SchemaBuilder should not allow Arrays or Maps to be created by type() Key: KAFKA-6651 URL: https://issues.apache.org/jira/browse/KAFKA-6651 Project: Kafka Issue Type: Bug Components: KafkaConnect Reporter: Jeremy Custenborder The following code should throw an exception because we cannot set valueSchema() or keySchema() once the builder is returned. {code:java} SchemaBuilder.type(Schema.Type.ARRAY); SchemaBuilder.type(Schema.Type.MAP);{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6811) Tasks should have access to connector and task metadata
Jeremy Custenborder created KAFKA-6811: -- Summary: Tasks should have access to connector and task metadata Key: KAFKA-6811 URL: https://issues.apache.org/jira/browse/KAFKA-6811 Project: Kafka Issue Type: Improvement Components: KafkaConnect Reporter: Jeremy Custenborder As a connector developer it would be nice to have access to more metadata about within a (Source|Sink)Task. For example I could use this to log task specific data within the log. There are several connectors where I only run a single task but would be able to do taskId() % totalTasks() for partitioning. High level I'm thinking something like this. {code:java} String connectorName(); int taskId(); int totalTasks(); {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7273) Converters should have access to headers.
Jeremy Custenborder created KAFKA-7273: -- Summary: Converters should have access to headers. Key: KAFKA-7273 URL: https://issues.apache.org/jira/browse/KAFKA-7273 Project: Kafka Issue Type: Improvement Components: KafkaConnect Reporter: Jeremy Custenborder I found myself wanting to build a converter that stored additional type information within headers. The converter interface does not allow a developer to access to the headers in a Converter. I'm not suggesting that we change the method for serializing them, rather that *org.apache.kafka.connect.header.Headers* be passed in for *fromConnectData* and *toConnectData*. For example something like this. {code:java} import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.header.Headers; import org.apache.kafka.connect.storage.Converter; public interface ExtendedConverter extends Converter { byte[] fromConnectData(String topic, Headers headers, Schema schema, Object object); SchemaAndValue toConnectData(String topic, Headers headers, byte[] payload); } {code} This would be a similar approach to what was already done with ExtendedDeserializer and ExtendedSerializer in the Kafka client. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7292) Converters should report their configuration options
Jeremy Custenborder created KAFKA-7292: -- Summary: Converters should report their configuration options Key: KAFKA-7292 URL: https://issues.apache.org/jira/browse/KAFKA-7292 Project: Kafka Issue Type: Improvement Components: KafkaConnect Reporter: Jeremy Custenborder Converters do not support returning their configuration like Connectors and Transformations do. Given this can be configured by an end user it should also be reported via the API. {code:java} public interface Converter { void configure(Map var1, boolean var2); byte[] fromConnectData(String var1, Schema var2, Object var3); SchemaAndValue toConnectData(String var1, byte[] var2); default ConfigDef config() { return new ConfigDef(); } } {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7900) JsonConverter - Floats should not be written in scientific notation.
Jeremy Custenborder created KAFKA-7900: -- Summary: JsonConverter - Floats should not be written in scientific notation. Key: KAFKA-7900 URL: https://issues.apache.org/jira/browse/KAFKA-7900 Project: Kafka Issue Type: Improvement Components: KafkaConnect Reporter: Jeremy Custenborder The JSON Converter should not format float32, float64, and Decimal as scientific notation. This should be configurable. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7955) Provide a BOM for EmbeddedConnectCluster and EmbeddedCluster
Jeremy Custenborder created KAFKA-7955: -- Summary: Provide a BOM for EmbeddedConnectCluster and EmbeddedCluster Key: KAFKA-7955 URL: https://issues.apache.org/jira/browse/KAFKA-7955 Project: Kafka Issue Type: Improvement Components: KafkaConnect Affects Versions: 2.1.1 Reporter: Jeremy Custenborder Using EmbeddedConnectCluster for testing connectors is a little difficult given the number of dependencies that are required. Providing a [BOM|https://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html] will make it easier for connector developers. For example here are the dependencies that are required. {code:xml} org.apache.kafka connect-api ${kafka.version} org.apache.kafka connect-runtime ${kafka.version} test test-jar org.apache.kafka connect-runtime ${kafka.version} org.apache.kafka kafka-clients ${kafka.version} junit junit 4.12 org.apache.kafka kafka-clients ${kafka.version} test test-jar org.apache.kafka kafka_2.11 ${kafka.version} org.apache.kafka kafka_2.11 test-jar test ${kafka.version} {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-5548) SchemaBuilder does not validate input.
Jeremy Custenborder created KAFKA-5548: -- Summary: SchemaBuilder does not validate input. Key: KAFKA-5548 URL: https://issues.apache.org/jira/browse/KAFKA-5548 Project: Kafka Issue Type: Bug Components: KafkaConnect Reporter: Jeremy Custenborder Assignee: Jeremy Custenborder Priority: Minor SchemaBuilder.map(), SchemaBuilder.array(), and SchemaBuilder.field() do not validate input. This can cause weird NullPointerException exceptions later. For example I mistakenly called field("somefield", null), then later performed an operation against field.schema() which yielded a null. It would be preferable to throw an exception stating the issue. We could throw the a NPE but state what is null. Schema is null in this case for example. {code:java} @Test(expected = NullPointerException.class) public void fieldNameNull() { Schema schema = SchemaBuilder.struct() .field(null, Schema.STRING_SCHEMA) .build(); } @Test(expected = NullPointerException.class) public void fieldSchemaNull() { Schema schema = SchemaBuilder.struct() .field("fieldName", null) .build(); } @Test(expected = NullPointerException.class) public void arraySchemaNull() { Schema schema = SchemaBuilder.array(Schema.STRING_SCHEMA) .build(); } @Test(expected = NullPointerException.class) public void mapKeySchemaNull() { Schema schema = SchemaBuilder.map(null, Schema.STRING_SCHEMA) .build(); } @Test(expected = NullPointerException.class) public void mapValueSchemaNull() { Schema schema = SchemaBuilder.map(Schema.STRING_SCHEMA, null) .build(); } {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5550) Struct.put() should include the field name if validation fails
Jeremy Custenborder created KAFKA-5550: -- Summary: Struct.put() should include the field name if validation fails Key: KAFKA-5550 URL: https://issues.apache.org/jira/browse/KAFKA-5550 Project: Kafka Issue Type: Bug Components: KafkaConnect Reporter: Jeremy Custenborder Assignee: Jeremy Custenborder Priority: Minor When calling struct.put() with an invalid value, the error message should include the field name. {code:java} @Test public void testPutIncludesFieldName() { final String fieldName = "fieldName"; Schema testSchema = SchemaBuilder.struct() .field(fieldName, Schema.STRING_SCHEMA); Struct struct = new Struct(testSchema); try { struct.put(fieldName, null); } catch (DataException ex) { assertEquals( "Invalid value: null used for required field: \"fieldName\", schema type: STRING", ex.getMessage() ); } } {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5572) ConfigDef.Type.List should support escaping comma character
Jeremy Custenborder created KAFKA-5572: -- Summary: ConfigDef.Type.List should support escaping comma character Key: KAFKA-5572 URL: https://issues.apache.org/jira/browse/KAFKA-5572 Project: Kafka Issue Type: Improvement Reporter: Jeremy Custenborder Assignee: Jeremy Custenborder Priority: Minor You should be able to include a comma in a list. Currently the split regex is only looks for comma. This should be escapable with as something like \,. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5575) SchemaBuilder should have a method to clone an existing Schema.
Jeremy Custenborder created KAFKA-5575: -- Summary: SchemaBuilder should have a method to clone an existing Schema. Key: KAFKA-5575 URL: https://issues.apache.org/jira/browse/KAFKA-5575 Project: Kafka Issue Type: Improvement Components: KafkaConnect Reporter: Jeremy Custenborder Assignee: Jeremy Custenborder Priority: Minor Now that Transformations have landed in Kafka Connect we should have an easy way to do quick modifications to schemas. For example changing the name of a schema shouldn't be much more than. I should be able to do more stuff like this. {code:java} return SchemaBuilder.from(Schema.STRING_SCHEMA).name("MyNewName").build() {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5579) SchemaBuilder.type(Schema.Type) should not allow null.
Jeremy Custenborder created KAFKA-5579: -- Summary: SchemaBuilder.type(Schema.Type) should not allow null. Key: KAFKA-5579 URL: https://issues.apache.org/jira/browse/KAFKA-5579 Project: Kafka Issue Type: Bug Components: KafkaConnect Reporter: Jeremy Custenborder Assignee: Jeremy Custenborder Priority: Minor -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5620) SerializationException in doSend() masks class cast exception
Jeremy Custenborder created KAFKA-5620: -- Summary: SerializationException in doSend() masks class cast exception Key: KAFKA-5620 URL: https://issues.apache.org/jira/browse/KAFKA-5620 Project: Kafka Issue Type: Bug Affects Versions: 0.11.0.0 Reporter: Jeremy Custenborder Assignee: Jeremy Custenborder I misconfigured my Serializer and passed a byte array to BytesSerializer. This caused the following exception to be thrown. {code} org.apache.kafka.common.errors.SerializationException: Can't convert value of class [B to class org.apache.kafka.common.serialization.BytesSerializer specified in value.serializer {code} This doesn't provide much detail because it strips the ClassCastException. It made figuring this out much more difficult. The real value was the inner exception which was: {code} [B cannot be cast to org.apache.kafka.common.utils.Bytes {code} We should include the ClassCastException. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5807) NPE on Connector.validate
Jeremy Custenborder created KAFKA-5807: -- Summary: NPE on Connector.validate Key: KAFKA-5807 URL: https://issues.apache.org/jira/browse/KAFKA-5807 Project: Kafka Issue Type: Bug Reporter: Jeremy Custenborder Assignee: Jeremy Custenborder Priority: Minor NPE is thrown when a developer returns a null when overloading Connector.validate(). {code} [2017-08-23 13:36:30,086] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:99) java.lang.NullPointerException at org.apache.kafka.connect.connector.Connector.validate(Connector.java:134) at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:254) at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:158) at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:93) {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-3906) Connect logical types do not support nulls.
Jeremy Custenborder created KAFKA-3906: -- Summary: Connect logical types do not support nulls. Key: KAFKA-3906 URL: https://issues.apache.org/jira/browse/KAFKA-3906 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 0.10.0.0 Reporter: Jeremy Custenborder Assignee: Ewen Cheslack-Postava The logical types for Kafka Connect do not support null data values. Date, Decimal, Time, and Timestamp all will throw null reference exceptions if a null is passed in to their fromLogical and toLogical methods. Date, Time, and Timestamp require signature changes for these methods to support nullable types. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3943) ConfigDef should support a builder pattern.
Jeremy Custenborder created KAFKA-3943: -- Summary: ConfigDef should support a builder pattern. Key: KAFKA-3943 URL: https://issues.apache.org/jira/browse/KAFKA-3943 Project: Kafka Issue Type: Improvement Reporter: Jeremy Custenborder Assignee: Jeremy Custenborder Priority: Minor I catch myself always having to lookup the overloads for define. What about adding a builder pattern? {code} ConfigDef def = new ConfigDef() .define().name("a").type(Type.INT).defaultValue(5).validator(Range.between(0, 14)).importance(Importance.HIGH).documentation("docs").build() .define().name("b").type(Type.LONG).importance(Importance.HIGH).documentation("docs").build() .define().name("c").type(Type.STRING).defaultValue("hello").importance(Importance.HIGH).documentation("docs").build() .define().name("d").type(Type.LIST).importance(Importance.HIGH).documentation("docs").build() .define().name("e").type(Type.DOUBLE).importance(Importance.HIGH).documentation("docs").build() .define().name("f").type(Type.CLASS).importance(Importance.HIGH).documentation("docs").build() .define().name("g").type(Type.BOOLEAN).importance(Importance.HIGH).documentation("docs").build() .define().name("h").type(Type.BOOLEAN).importance(Importance.HIGH).documentation("docs").build() .define().name("i").type(Type.BOOLEAN).importance(Importance.HIGH).documentation("docs").build() .define().name("j").type(Type.PASSWORD).importance(Importance.HIGH).documentation("docs").build(); {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-4855) Struct SchemaBuilder should not allow duplicate fields.
Jeremy Custenborder created KAFKA-4855: -- Summary: Struct SchemaBuilder should not allow duplicate fields. Key: KAFKA-4855 URL: https://issues.apache.org/jira/browse/KAFKA-4855 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 0.10.2.0 Reporter: Jeremy Custenborder I would expect this to fail at the build() on schema. It actually makes it all the way to Struct.validate() and throws a cryptic error message. .field() should throw an exception if a field is already used. Repro: {code} @Test public void duplicateFields() { final Schema schema = SchemaBuilder.struct() .name("testing") .field("id", SchemaBuilder.string().doc("").build()) .field("id", SchemaBuilder.string().doc("").build()) .build(); final Struct struct = new Struct(schema) .put("id", "testing"); struct.validate(); } {code} {code} org.apache.kafka.connect.errors.DataException: Invalid value: null used for required field at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:212) at org.apache.kafka.connect.data.Struct.validate(Struct.java:232) at io.confluent.kafka.connect.jms.RecordConverterTest.duplicateFieldRepro(RecordConverterTest.java:289) {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (KAFKA-4709) Error message from Struct.validate() should include the name of the offending field.
Jeremy Custenborder created KAFKA-4709: -- Summary: Error message from Struct.validate() should include the name of the offending field. Key: KAFKA-4709 URL: https://issues.apache.org/jira/browse/KAFKA-4709 Project: Kafka Issue Type: Improvement Components: KafkaConnect Reporter: Jeremy Custenborder Assignee: Jeremy Custenborder Priority: Minor Take a look at this repro. {code} @Test public void structValidate() { Schema schema = SchemaBuilder.struct() .field("one", Schema.STRING_SCHEMA) .field("two", Schema.STRING_SCHEMA) .field("three", Schema.STRING_SCHEMA) .build(); Struct struct = new Struct(schema); struct.validate(); } {code} Any one of the fields could be causing the issue. The following exception is thrown. This makes troubleshooting missing fields in connectors much more difficult. {code} org.apache.kafka.connect.errors.DataException: Invalid value: null used for required field {code} The error message should include the field or fields in the error message. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3407) ErrorLoggingCallback trims helpful logging messages.
Jeremy Custenborder created KAFKA-3407: -- Summary: ErrorLoggingCallback trims helpful logging messages. Key: KAFKA-3407 URL: https://issues.apache.org/jira/browse/KAFKA-3407 Project: Kafka Issue Type: Improvement Reporter: Jeremy Custenborder Priority: Minor ErrorLoggingCallback currently only returns the message of the message returned. Any inner exception or callstack is not included. This makes troubleshooting more difficult. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-3407) ErrorLoggingCallback trims helpful diagnosting information.
[ https://issues.apache.org/jira/browse/KAFKA-3407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeremy Custenborder updated KAFKA-3407: --- Summary: ErrorLoggingCallback trims helpful diagnosting information. (was: ErrorLoggingCallback trims helpful logging messages.) > ErrorLoggingCallback trims helpful diagnosting information. > --- > > Key: KAFKA-3407 > URL: https://issues.apache.org/jira/browse/KAFKA-3407 > Project: Kafka > Issue Type: Improvement > Reporter: Jeremy Custenborder >Priority: Minor > > ErrorLoggingCallback currently only returns the message of the message > returned. Any inner exception or callstack is not included. This makes > troubleshooting more difficult. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-3407) ErrorLoggingCallback trims helpful diagnostic information.
[ https://issues.apache.org/jira/browse/KAFKA-3407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeremy Custenborder updated KAFKA-3407: --- Summary: ErrorLoggingCallback trims helpful diagnostic information. (was: ErrorLoggingCallback trims helpful diagnosting information.) > ErrorLoggingCallback trims helpful diagnostic information. > -- > > Key: KAFKA-3407 > URL: https://issues.apache.org/jira/browse/KAFKA-3407 > Project: Kafka > Issue Type: Improvement > Reporter: Jeremy Custenborder >Priority: Minor > > ErrorLoggingCallback currently only returns the message of the message > returned. Any inner exception or callstack is not included. This makes > troubleshooting more difficult. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3237) ConfigDef validators require a default value
Jeremy Custenborder created KAFKA-3237: -- Summary: ConfigDef validators require a default value Key: KAFKA-3237 URL: https://issues.apache.org/jira/browse/KAFKA-3237 Project: Kafka Issue Type: Bug Components: config Affects Versions: 0.9.0.0 Reporter: Jeremy Custenborder Priority: Minor I should be able to add a ConfigDef that has a validator but does has null as the default value. This would allow me to have a required property that is restricted to certain strings in this example. {code} ConfigDef def = new ConfigDef(); def.define(key, Type.STRING, null, ValidString.in("ONE", "TWO", "THREE"), Importance.HIGH, "docs"); {code} {code} Invalid value null for configuration test: String must be one of: ONE, TWO, THREE org.apache.kafka.common.config.ConfigException: Invalid value null for configuration enum_test: String must be one of: ONE, TWO, THREE at org.apache.kafka.common.config.ConfigDef$ValidString.ensureValid(ConfigDef.java:349) at org.apache.kafka.common.config.ConfigDef$ConfigKey.(ConfigDef.java:375) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-3237) ConfigDef validators require a default value
[ https://issues.apache.org/jira/browse/KAFKA-3237?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeremy Custenborder updated KAFKA-3237: --- Description: I should be able to add a ConfigDef that has a validator but does has null as the default value. This would allow me to have a required property that is restricted to certain strings in this example. This exception should be thrown upon call to ConfigDef.parse instead. {code} ConfigDef def = new ConfigDef(); def.define(key, Type.STRING, null, ValidString.in("ONE", "TWO", "THREE"), Importance.HIGH, "docs"); {code} {code} Invalid value null for configuration test: String must be one of: ONE, TWO, THREE org.apache.kafka.common.config.ConfigException: Invalid value null for configuration enum_test: String must be one of: ONE, TWO, THREE at org.apache.kafka.common.config.ConfigDef$ValidString.ensureValid(ConfigDef.java:349) at org.apache.kafka.common.config.ConfigDef$ConfigKey.(ConfigDef.java:375) {code} was: I should be able to add a ConfigDef that has a validator but does has null as the default value. This would allow me to have a required property that is restricted to certain strings in this example. {code} ConfigDef def = new ConfigDef(); def.define(key, Type.STRING, null, ValidString.in("ONE", "TWO", "THREE"), Importance.HIGH, "docs"); {code} {code} Invalid value null for configuration test: String must be one of: ONE, TWO, THREE org.apache.kafka.common.config.ConfigException: Invalid value null for configuration enum_test: String must be one of: ONE, TWO, THREE at org.apache.kafka.common.config.ConfigDef$ValidString.ensureValid(ConfigDef.java:349) at org.apache.kafka.common.config.ConfigDef$ConfigKey.(ConfigDef.java:375) {code} > ConfigDef validators require a default value > > > Key: KAFKA-3237 > URL: https://issues.apache.org/jira/browse/KAFKA-3237 > Project: Kafka > Issue Type: Bug > Components: config >Affects Versions: 0.9.0.0 >Reporter: Jeremy Custenborder >Priority: Minor > > I should be able to add a ConfigDef that has a validator but does has null as > the default value. This would allow me to have a required property that is > restricted to certain strings in this example. This exception should be > thrown upon call to ConfigDef.parse instead. > {code} > ConfigDef def = new ConfigDef(); > def.define(key, Type.STRING, null, ValidString.in("ONE", "TWO", "THREE"), > Importance.HIGH, "docs"); > {code} > {code} > Invalid value null for configuration test: String must be one of: ONE, TWO, > THREE > org.apache.kafka.common.config.ConfigException: Invalid value null for > configuration enum_test: String must be one of: ONE, TWO, THREE > at > org.apache.kafka.common.config.ConfigDef$ValidString.ensureValid(ConfigDef.java:349) > at > org.apache.kafka.common.config.ConfigDef$ConfigKey.(ConfigDef.java:375) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3237) ConfigDef validators require a default value
[ https://issues.apache.org/jira/browse/KAFKA-3237?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15145692#comment-15145692 ] Jeremy Custenborder commented on KAFKA-3237: There are two test cases [testInvalidDefaultRange() and testInvalidDefaultString()|https://github.com/apache/kafka/blob/trunk/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java#L118-L126] which test the defaults passed in with ConfigDef.define(). Does checking the default really matter? The exception text is going to be the same if checked during define or when parse() is called. Correcting the behavior in the description requires removal of these two test cases. Does that sound valid? > ConfigDef validators require a default value > > > Key: KAFKA-3237 > URL: https://issues.apache.org/jira/browse/KAFKA-3237 > Project: Kafka > Issue Type: Bug > Components: config >Affects Versions: 0.9.0.0 > Reporter: Jeremy Custenborder >Priority: Minor > > I should be able to add a ConfigDef that has a validator but does has null as > the default value. This would allow me to have a required property that is > restricted to certain strings in this example. This exception should be > thrown upon call to ConfigDef.parse instead. > {code} > ConfigDef def = new ConfigDef(); > def.define(key, Type.STRING, null, ValidString.in("ONE", "TWO", "THREE"), > Importance.HIGH, "docs"); > {code} > {code} > Invalid value null for configuration test: String must be one of: ONE, TWO, > THREE > org.apache.kafka.common.config.ConfigException: Invalid value null for > configuration enum_test: String must be one of: ONE, TWO, THREE > at > org.apache.kafka.common.config.ConfigDef$ValidString.ensureValid(ConfigDef.java:349) > at > org.apache.kafka.common.config.ConfigDef$ConfigKey.(ConfigDef.java:375) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3237) ConfigDef validators require a default value
[ https://issues.apache.org/jira/browse/KAFKA-3237?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15145816#comment-15145816 ] Jeremy Custenborder commented on KAFKA-3237: Correct me if i'm wrong but there is only one [define|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java#L75] method that takes a validator. It also looks like the testing of default values is handled by the constructor of [ConfigKey|https://github.com/apache/kafka/blob/ab5ac264a71d7f895b21b4acfd93d9581dabd7c1/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java#L363]. If there is a validator present it's ran against the default. In my case I want the user to define a value that is present in an enum, that I will hit with Enum.valueOf() later. I don't want to define a default because it could be wrong for the user. Setting a validator with the constants from the enum will give me a nice error message to the user if they omit the setting. {code} public ConfigDef define(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation) { {code} > ConfigDef validators require a default value > > > Key: KAFKA-3237 > URL: https://issues.apache.org/jira/browse/KAFKA-3237 > Project: Kafka > Issue Type: Bug > Components: config >Affects Versions: 0.9.0.0 >Reporter: Jeremy Custenborder >Priority: Minor > > I should be able to add a ConfigDef that has a validator but does has null as > the default value. This would allow me to have a required property that is > restricted to certain strings in this example. This exception should be > thrown upon call to ConfigDef.parse instead. > {code} > ConfigDef def = new ConfigDef(); > def.define(key, Type.STRING, null, ValidString.in("ONE", "TWO", "THREE"), > Importance.HIGH, "docs"); > {code} > {code} > Invalid value null for configuration test: String must be one of: ONE, TWO, > THREE > org.apache.kafka.common.config.ConfigException: Invalid value null for > configuration enum_test: String must be one of: ONE, TWO, THREE > at > org.apache.kafka.common.config.ConfigDef$ValidString.ensureValid(ConfigDef.java:349) > at > org.apache.kafka.common.config.ConfigDef$ConfigKey.(ConfigDef.java:375) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3260) Increase the granularity of commit for SourceTask
Jeremy Custenborder created KAFKA-3260: -- Summary: Increase the granularity of commit for SourceTask Key: KAFKA-3260 URL: https://issues.apache.org/jira/browse/KAFKA-3260 Project: Kafka Issue Type: Improvement Components: copycat Affects Versions: 0.9.0.1 Reporter: Jeremy Custenborder Assignee: Ewen Cheslack-Postava As of right now when commit is called the developer does not know which messages have been accepted since the last poll. I'm proposing that we extend the SourceTask class to allow records to be committed individually. {code} public void commitRecord(SourceRecord record) throws InterruptedException { // This space intentionally left blank. } {code} This method could be overridden to receive a SourceRecord during the callback of producer.send. This will give us messages that have been successfully written to Kafka. The developer then has the capability to commit messages to the source individually or in batch. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3263) Add Markdown support for ConfigDef
Jeremy Custenborder created KAFKA-3263: -- Summary: Add Markdown support for ConfigDef Key: KAFKA-3263 URL: https://issues.apache.org/jira/browse/KAFKA-3263 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.9.0.1 Reporter: Jeremy Custenborder Priority: Minor The ability to output markdown for ConfigDef would be nice given a lot of people use README.md files in their repositories. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3347) Configure java to prefer ipv4
Jeremy Custenborder created KAFKA-3347: -- Summary: Configure java to prefer ipv4 Key: KAFKA-3347 URL: https://issues.apache.org/jira/browse/KAFKA-3347 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.9.0.1 Reporter: Jeremy Custenborder Priority: Minor I've noticed that ports are sometimes binding on IPv6 addresses rather than the IPv4 address I'm expecting. Can we change this so we bing on the IPv4 address rather than the IPv6 address? I'm proposing to add this to KAFKA_JVM_PERFORMANCE_OPTS. {code} -Djava.net.preferIPv4Stack=true {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-9537) Abstract transformations in configurations cause unfriendly error message.
Jeremy Custenborder created KAFKA-9537: -- Summary: Abstract transformations in configurations cause unfriendly error message. Key: KAFKA-9537 URL: https://issues.apache.org/jira/browse/KAFKA-9537 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 2.4.0 Reporter: Jeremy Custenborder Assignee: Jeremy Custenborder I was working with a coworker who had a bash script posting a config to connect with {code:java}org.apache.kafka.connect.transforms.ExtractField.$Key{code} in the script. Bash removed the $Key because it wasn't escaped properly. {code:java} org.apache.kafka.connect.transforms.ExtractField.{code} is made it to the rest interface. A Class was create for the abstract implementation of ExtractField and passed to getConfigDefFromTransformation. It tried to call newInstance which threw an exception. The following gets returned via the rest interface. {code} { "error_code": 400, "message": "Connector configuration is invalid and contains the following 1 error(s):\nInvalid value class org.apache.kafka.connect.transforms.ExtractField for configuration transforms.extractString.type: Error getting config definition from Transformation: null\nYou can also find the above list of errors at the endpoint `/{connectorType}/config/validate`" } {code} It would be a much better user experience if we returned something like {code} { "error_code": 400, "message": "Connector configuration is invalid and contains the following 1 error(s):\nInvalid value class org.apache.kafka.connect.transforms.ExtractField for configuration transforms.extractString.type: Error getting config definition from Transformation: Transformation is abstract and cannot be created.\nYou can also find the above list of errors at the endpoint `/{connectorType}/config/validate`" } {code} or {code} { "error_code": 400, "message": "Connector configuration is invalid and contains the following 1 error(s):\nInvalid value class org.apache.kafka.connect.transforms.ExtractField for configuration transforms.extractString.type: Error getting config definition from Transformation: Transformation is abstract and cannot be created. Did you mean ExtractField$Key, ExtractField$Value?\nYou can also find the above list of errors at the endpoint `/{connectorType}/config/validate`" } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12301) Support for enum validation in configuration
Jeremy Custenborder created KAFKA-12301: --- Summary: Support for enum validation in configuration Key: KAFKA-12301 URL: https://issues.apache.org/jira/browse/KAFKA-12301 Project: Kafka Issue Type: Improvement Components: config Reporter: Jeremy Custenborder Assignee: Jeremy Custenborder Several configuration elements are mapped to internal enums. A typo in configuration will yield error messages that are not descriptive and require the user to find valid values. For example: {code:java} Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to create new KafkaAdminClient at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:479) at org.apache.kafka.clients.admin.Admin.create(Admin.java:61) at org.apache.kafka.clients.admin.AdminClient.create(AdminClient.java:39) ... Caused by: java.lang.IllegalArgumentException: No enum constant org.apache.kafka.common.security.auth.SecurityProtocol.SASL_PLAINTEXTA at java.lang.Enum.valueOf(Enum.java:238) at org.apache.kafka.common.security.auth.SecurityProtocol.valueOf(SecurityProtocol.java:26) at org.apache.kafka.common.security.auth.SecurityProtocol.forName(SecurityProtocol.java:72) at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:103) at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:454) ... 7 more {code} This is easier to troubleshoot. {code:java} Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to create new KafkaAdminClient at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:479) at org.apache.kafka.clients.admin.Admin.create(Admin.java:61) at org.apache.kafka.clients.admin.AdminClient.create(AdminClient.java:39) ... Caused by: org.apache.kafka.common.config.ConfigException: Invalid value SASL_PLAINTEXTA for security.protocol. Enum value not found. Valid values are: PLAINTEXT, SASL_PLAINTEXT, SASL_SSL, SSL at java.lang.Enum.valueOf(Enum.java:238) at org.apache.kafka.common.security.auth.SecurityProtocol.valueOf(SecurityProtocol.java:26) at org.apache.kafka.common.security.auth.SecurityProtocol.forName(SecurityProtocol.java:72) at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:103) at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:454) ... 7 more {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)