☕ Kafka Consumer Kurulumu: Java ile CLI Üzerinden Veri Alma
Bu rehberde Apache Kafka Consumer’ı kullanarak Java üzerinden veri çekmeyi öğreneceksiniz.
Tüketici gruplarıyla yük dengeleme, manuel ofset kaydı (commit) ve güvenli kapanma (graceful shutdown) adımlarını uygulayacağız.
🧠 Teknik Özet
Bu rehber, Maven tabanlı bir Java projesinde Kafka Consumer uygulaması geliştirmeyi gösterir.
Amaç, java_demo konusundan veri okuyup işlem yapmak,
gruplarla yükü dengelemek ve manuel ofset kaydı ile veri kaybını önlemektir.
🧰 Ön Koşullar
- Sunucu: En az 4GB RAM / 2 CPU (örn. Rabisu Bulut’ta
tr1-node01) - Java: JDK 8+ kurulu (
sudo apt install openjdk-17-jdk) - Kafka: Apache Kafka kurulmuş ve yapılandırılmış
- Producer: CLI ile mesaj gönderen Kafka üretici projesi hazır olmalı
🛠️ Adım 1 – Kafka Tüketici Sınıfını Oluşturma
src/main/java/com/dokafka/ConsumerDemo.java dosyasını oluşturun ve açın:
package com.dokafka;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.*;
import java.time.Duration;
import java.util.*;
public class ConsumerDemo {
private static final Logger log = LoggerFactory.getLogger(ConsumerDemo.class);
public static void main(String[] args) {
String bootstrapServers = "localhost:9092";
String topicName = "java_demo";
String groupId = "grup_rabisu";
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(topicName));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
log.info(String.format("konu=%s | bölüt=%d | ofset=%d | değer=%s",
record.topic(), record.partition(), record.offset(), record.value()));
}
}
} catch (Exception e) {
log.error("Hata oluştu", e);
} finally {
consumer.close();
}
}
}
💬 Bu kod, java_demo konusundan gelen mesajları alır ve log’lar.
🔧 Adım 2 – Derleme ve Çalıştırma
run-consumer.sh adında bir betik oluşturun:
#!/bin/bash
mvn clean
mvn package
java -cp target/dokafka-1.0-SNAPSHOT.jar:target/lib/* com.dokafka.ConsumerDemo
Betiği çalıştırılabilir yapın:
chmod +x run-consumer.sh
./run-consumer.sh
💬 Bu betik, Maven projesini temizler, paketler ve tüketiciyi çalıştırır.
🧼 Adım 3 – Güvenli Kapanma (Graceful Shutdown)
Kafka Consumer aniden kapanırsa, bağlantılar açık kalabilir. Bunu önlemek için bir shutdown hook ekleyin:
Thread currentThread = Thread.currentThread();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
consumer.wakeup();
try {
currentThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}));
try bloğunu güncelleyin:
} catch (WakeupException e) {
log.info("Consumer durduruluyor...");
💬 Bu yöntem, CTRL+C gibi durumlarda tüketicinin düzgünce kapanmasını sağlar.
⚖️ Adım 4 – Tüketici Grupları ve Bölütler (Partitions)
Kafka konuları bölütlere (partition) ayrılır. Aynı Group ID’ye sahip tüketiciler bu bölütleri paylaşarak yükü dengeler.
bin/kafka-topics.sh --create --topic java_demo_partitions \
--bootstrap-server localhost:9092 --partitions 2
💬 Aynı key değerine sahip mesajlar aynı bölüte yazılır — sıralama (ordering) korunur.
🧩 Adım 5 – Manuel Ofset Kaydı (Manual Commit)
Otomatik ofset kaydı yerine commitSync() kullanarak veri kaybını önleyin:
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
for (ConsumerRecord<String, String> record : records) {
log.info(String.format("konu=%s | bölüt=%d | ofset=%d | değer=%s",
record.topic(), record.partition(), record.offset(), record.value()));
consumer.commitSync(); // Manuel ofset kaydı
}
💬 Bu işlem bloklayıcıdır ama en güvenli commit yöntemidir.
❓ Sıkça Sorulan Sorular (SSS)
- Deserializer ne işe yarar?
Bayt dizilerini orijinal veri tipine (örneğin String) dönüştürür.
- Consumer Group ID neden önemlidir?
Aynı ID’li tüketiciler aynı topic’i paylaşır, yük dengelenir.
- AUTO_OFFSET_RESET_CONFIG ne işe yarar?
Tüketici geçmiş konum yoksa earliest ile baştan okur, latest ile yeni mesajlardan başlar.
- commitAsync farkı nedir?
Performanslıdır ancak hata yönetimi zayıftır.
- consumer.poll() ne yapar?
Kafka’dan belirli süre aralığında yeni kayıtları çeker.
🏁 Sonuç
Bu rehberde:
Kafka Consumer oluşturmayı,
Güvenli kapanış (graceful shutdown) yapısını,
Tüketici grupları ve ofset yönetimini öğrendiniz.
💡 Şimdi bu uygulamayı Rabisu Bulut üzerinde deneyin ve kendi Kafka kümenizi kurarak gerçek zamanlı veri akışını yönetin. 🚀