Skip to main content

☕ Java Kafka Üretici (Producer) Kurulumu: Programatik Veri Gönderme

Bu rehberde, Java ve Apache Maven kullanarak Kafka kümesine mesaj gönderen bir üretici uygulaması (Producer) geliştireceğiz.
Uygulama, mesaj gönderimini asenkron şekilde yapacak ve Callback (geri çağrım) yöntemiyle gönderilen mesajın küme içindeki metadata bilgilerini (zaman, bölüt, ofset) kaydedecek.


🧠 Teknik Özet

Ana konu: Java ile Apache Kafka Producer uygulaması geliştirme.
Çözdüğü problem: Komut satırı betikleri yerine, esnek ve programatik bir yöntemle Kafka konularına mesaj göndermeyi sağlar.
Kullanıcı adımları:

  1. Maven projesi oluşturmak
  2. Kafka bağımlılıklarını eklemek
  3. Producer sınıfını yazmak
  4. Mesajı Kafka’ya göndermek
  5. Callback ile metadata bilgilerini almak

🎯 Bu yapı, özellikle mikro servislerin Kafka aracılığıyla veri üretmesini isteyen geliştiriciler için idealdir.


🧩 Ön Koşullar

  • Sanal Sunucu: En az 4 GB RAM, 2 CPU (örnek: tr1-node01.rabisu.cloud)
  • Java JDK: Sürüm 8 veya üzeri
  • Apache Kafka: Kurulu ve yapılandırılmış olmalı
  • Maven: Paket yönetimi aracı kurulu
  • Kafka Bilgisi: Temel topic, partition, offset kavramlarına aşinalık

💡 Henüz Kafka kurmadıysan, Kafka Kurulumu Rehberi ile başlayabilirsin.


⚙️ Adım 1 – Maven Projesi Oluşturma

Önce sistemini güncelle ve Maven’ı kur:

sudo apt update
sudo apt install maven -y

Kurulumu doğrula:


mvn --version

Yeni bir klasör oluştur ve içine gir:


mkdir ~/kafka-projeleri
cd ~/kafka-projeleri

Yeni bir proje başlat:


mvn archetype:generate \
-DgroupId=com.rabisucloud \
-DartifactId=rabisu-kafka-uyg \
-DarchetypeArtifactId=maven-archetype-quickstart \
-DarchetypeVersion=1.4 \
-DinteractiveMode=false

Oluşturulan proje klasörüne gir:


cd rabisu-kafka-uyg

📦 Adım 2 – Kafka Bağımlılıklarını Eklemek

Kafka istemci kütüphanelerini pom.xml dosyasına ekleyin:


<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.7.0</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.12</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.12</version>
</dependency>
</dependencies>

Ayrıca bağımlılıkların projeyle birlikte paketlenmesini sağlamak için build kısmına şu eklentiyi ekleyin:


<build>
<plugins>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

Projeyi derleyip bağımlılıkları indirin:


mvn package

✅ Çıktı: BUILD SUCCESS — Proje başarıyla derlendi.


☕ Adım 3 – Kafka Üretici (Producer) Sınıfını Yazmak

Varsayılan App.java dosyasını silin:


rm src/main/java/com/rabisucloud/App.java

Yeni dosyayı oluşturun:


nano src/main/java/com/rabisucloud/ProducerDemo.java

Aşağıdaki kodu ekleyin:


package com.rabisucloud;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProducerDemo {
private static final Logger log = LoggerFactory.getLogger(ProducerDemo.class);

public static void main(String[] args) {
String bootstrapServers = "localhost:9092";
String topicName = "java_demo";

Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "Rabisu Bulut'tan selamlar 👋");

// Mesajı asenkron olarak gönder
producer.send(record);

// Gönderim için bekleyen tüm mesajları zorla gönder
producer.flush();

// Producer nesnesini kapat
producer.close();

log.info("Mesaj başarıyla gönderildi!");
}
}

💬 Bu sınıf, “java_demo” adlı konuya mesaj gönderen basit bir Kafka üreticisidir.


🧱 Adım 4 – Çalıştırma Betiği Oluşturma

Uygulamayı kolayca çalıştırmak için bir bash betiği oluşturun:


nano run-producer.sh

İçeriği ekleyin:


#!/bin/bash
mvn clean
mvn package
java -cp target/rabisu-kafka-uyg-1.0-SNAPSHOT.jar:target/lib/* com.rabisucloud.ProducerDemo

Betik izinlerini ayarlayın ve çalıştırın:


chmod +x run-producer.sh
./run-producer.sh

💡 Artık Kafka’ya mesaj gönderiyorsunuz. 🎉


🔄 Adım 5 – Callback (Geri Çağrım) ile Metadata Alma

send() metodunu callback desteğiyle güncelleyin:


producer.send(record, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null) {
log.error("Hata oluştu!", e);
return;
}

log.info(String.format("Zaman: %s | Bölüt: %s | Ofset: %s",
metadata.timestamp(),
metadata.partition(),
metadata.offset()));
}
});

💬 Bu callback, mesajın kümeye ulaştığı zamanı, bölütü ve ofseti loglar.

Çıktı örneği:


[ProducerDemo] Zaman: 1710181831814 | Bölüt: 0 | Ofset: 3

❓ Sıkça Sorulan Sorular (SSS)

  1. flush() ne işe yarar?

Bekleyen mesajların anında gönderilmesini sağlar. Asenkron işlemlerde veri kaybını önler.

  1. Callback kullanmak neden önemlidir?

Mesajın gerçekten kümeye ulaşıp ulaşmadığını doğrular ve hata yönetimi sağlar.

  1. ProducerRecord nedir?

Kafka’ya gönderilecek mesajı (topic, key, value) temsil eden veri modelidir.

  1. “UNKNOWN_TOPIC_OR_PARTITION” hatası ne anlama gelir?

Topic mevcut değil. kafka-topics.sh ile oluşturman gerekir.

  1. Performansı nasıl artırabilirim?

acks, batch.size ve linger.ms ayarlarını optimize ederek throughput yükseltebilirsin.


🏁 Sonuç

Bu rehberde:

Maven ile Java Kafka projesi oluşturmayı,

Kafka istemcisi eklemeyi,

Producer yazmayı ve callback kullanmayı öğrendiniz.

💡 Şimdi sıra sizde! Rabisu Bulut üzerinde Kafka kümeleri kurarak veri akışlarınızı test edin.