Kafka Offset Commit
Hi Samza Community, This is my first email. Forgive my lack of knowledge about samza. I am running a testing job in my environment. I run in local model but somehow my job is processing data however it does not commit offset on Kafka side. I use an apache beam samza runner. My pipeline is simply read from kafka write to GCS bucket. Do you have any idea where I should look for debugging this issue? This is my job.properties file app.runner.class=org.apache.samza.runtime.LocalApplicationRunner job.coordinator.factory=org.apache.samza.zk.ZkJobCoordinatorFactory job.coordinator.zk.connect=localhost:2181 task.name.grouper.factory=org.apache.samza.container.grouper.task.GroupByContainerIdsFactory job.config.rewriters=env-config job.config.rewriter.env-config.class=org.apache.samza.config.EnvironmentConfigRewriter job.default.system=filereader systems.filereader.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory job.container.thread.pool.size=300 job.systemstreampartition.grouper.factory=org.apache.samza.container.grouper.stream.GroupBySystemStreamPartitionFactory task.checkpoint.factory=org.apache.samza.checkpoint.file.FileSystemCheckpointManagerFactory task.checkpoint.path=/home/checkpoints Thanks for your help in advance. Talat
Re: Kafka Offset Commit
Hi Talat, Since in the job.properties the task.checkpoint.factory is set to FileSystemCheckpointManagerFactory and not org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory. That is why its writing checkpoints to the filesystem (with its. location controlled by task.checkpoint.path). https://samza.apache.org/learn/documentation/1.0.0/container/checkpointing.html has details on the configs we need to add to enable checkpointing to kafka for a job. thanks On Tue, Aug 10, 2021 at 5:03 PM Talat Uyarer wrote: > Hi Samza Community, > > This is my first email. Forgive my lack of knowledge about samza. I am > running a testing job in my environment. I run in local model but somehow > my job is processing data however it does not commit offset on Kafka side. > I use an apache beam samza runner. > > My pipeline is simply read from kafka write to GCS bucket. Do you have any > idea where I should look for debugging this issue? > > This is my job.properties file > > app.runner.class=org.apache.samza.runtime.LocalApplicationRunner > job.coordinator.factory=org.apache.samza.zk.ZkJobCoordinatorFactory > job.coordinator.zk.connect=localhost:2181 > > task.name.grouper.factory=org.apache.samza.container.grouper.task.GroupByContainerIdsFactory > job.config.rewriters=env-config > > job.config.rewriter.env-config.class=org.apache.samza.config.EnvironmentConfigRewriter > job.default.system=filereader > > systems.filereader.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory > job.container.thread.pool.size=300 > > job.systemstreampartition.grouper.factory=org.apache.samza.container.grouper.stream.GroupBySystemStreamPartitionFactory > > task.checkpoint.factory=org.apache.samza.checkpoint.file.FileSystemCheckpointManagerFactory > task.checkpoint.path=/home/checkpoints > > Thanks for your help in advance. > > Talat > -- thanks rayman
Re: Kafka Offset Commit
Thank you rayman. But my question is when i check kafka consumer group of the job. I dont see any offset movement. I chose to store checkpoints on file system. Do you think because of that i dont see my job's consumer group does not move offset ? On Tue, Aug 10, 2021, 9:32 PM rayman preet wrote: > Hi Talat, > > Since in the job.properties the task.checkpoint.factory is set to > FileSystemCheckpointManagerFactory > and not org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory. > That is why its writing checkpoints to the filesystem (with its. location > controlled by task.checkpoint.path). > > > https://urldefense.proofpoint.com/v2/url?u=https-3A__samza.apache.org_learn_documentation_1.0.0_container_checkpointing.html&d=DwIBaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=sDaqSy0r4X9x43flCgkiuMeZhtbLCX9uwEMkqxtvQ2g&s=vlTnWi-4Xxnk52pXrMZTfQekBoDWp66hovL2E_qi-Z8&e= > > has details on the configs we need to add to enable checkpointing to kafka > for a job. > > thanks > > > On Tue, Aug 10, 2021 at 5:03 PM Talat Uyarer > > wrote: > > > Hi Samza Community, > > > > This is my first email. Forgive my lack of knowledge about samza. I am > > running a testing job in my environment. I run in local model but somehow > > my job is processing data however it does not commit offset on Kafka > side. > > I use an apache beam samza runner. > > > > My pipeline is simply read from kafka write to GCS bucket. Do you have > any > > idea where I should look for debugging this issue? > > > > This is my job.properties file > > > > app.runner.class=org.apache.samza.runtime.LocalApplicationRunner > > job.coordinator.factory=org.apache.samza.zk.ZkJobCoordinatorFactory > > job.coordinator.zk.connect=localhost:2181 > > > > > task.name.grouper.factory=org.apache.samza.container.grouper.task.GroupByContainerIdsFactory > > job.config.rewriters=env-config > > > > > job.config.rewriter.env-config.class=org.apache.samza.config.EnvironmentConfigRewriter > > job.default.system=filereader > > > > > systems.filereader.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory > > job.container.thread.pool.size=300 > > > > > job.systemstreampartition.grouper.factory=org.apache.samza.container.grouper.stream.GroupBySystemStreamPartitionFactory > > > > > task.checkpoint.factory=org.apache.samza.checkpoint.file.FileSystemCheckpointManagerFactory > > task.checkpoint.path=/home/checkpoints > > > > Thanks for your help in advance. > > > > Talat > > > > > -- > thanks > rayman >