[ https://issues.apache.org/jira/browse/KAFKA-4794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15886680#comment-15886680 ]
ASF GitHub Bot commented on KAFKA-4794: --------------------------------------- GitHub user fhussonnois opened a pull request: https://github.com/apache/kafka/pull/2604 KAFKA-4794: Add access to OffsetStorageReader from SourceConnector This a first attempt to implement Add access to OffsetStorageReader from Source Connector. I am not sure if I did it right. I prefer to take your feedbacks before writing some tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/fhussonnois/kafka KAFKA-4794 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/2604.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2604 ---- commit e2bb682afd052cf1c83c68933c1165b584a0acc9 Author: Florian Hussonnois <florian.hussonn...@gmail.com> Date: 2017-02-27T22:35:33Z KAFKA-4794: Add access to OffsetStorageReader from SourceConnector ---- > Add access to OffsetStorageReader from SourceConnector > ------------------------------------------------------ > > Key: KAFKA-4794 > URL: https://issues.apache.org/jira/browse/KAFKA-4794 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect > Affects Versions: 0.10.2.0 > Reporter: Florian Hussonnois > Priority: Minor > > Currently the offsets storage is only accessible from SourceTask to able to > initialize properly tasks after a restart, a crash or a reconfiguration > request. > To implement more complex connectors that need to track the progression of > each task it would helpful to have access to an OffsetStorageReader instance > from the SourceConnector. > In that way, we could have a background thread that could request a tasks > reconfiguration based on source offsets. > This improvement proposal comes from a customer project that needs to > periodically scan directories on a shared storage for detecting and for > streaming new files into Kafka. > The connector implementation is pretty straightforward. > The connector uses a background thread to periodically scan directories. When > new inputs files are detected a tasks reconfiguration is requested. Then the > connector assigns a file subset to each task. > Each task stores sources offsets for the last sent record. The source offsets > data are: > - the size of file > - the bytes offset > - the bytes size > Tasks become idle when the assigned files are completed (in : > recordBytesOffsets + recordBytesSize = fileBytesSize). > Then, the connector should be able to track offsets for each assigned file. > When all tasks has finished the connector can stop them or assigned new files > by requesting tasks reconfiguration. > Moreover, another advantage of monitoring source offsets from the connector > is detect slow or failed tasks and if necessary to be able to restart all > tasks. > If you think this improvement is OK, I can work a pull request. > Thanks, -- This message was sent by Atlassian JIRA (v6.3.15#6346)