들어가며..
Docker 환경, 카프카 클러스터 구축 및 토픽생성, 메시지 전송 Docker hub에 등록된 카프카 이미지(bitnami)를 사용하여 카프카 클러스터를 구축하고, 카프카에서 제공하는 쉘스크립트를 사용하여 구축
2021.12.31 - [OSS] - kafka - Docker 사용, 카프카 클러스터 구축 및 토픽생성, 메시지 전송
전 블로그에 이어 Java 프로듀서/컨슈머 애플리케이션을 만들어보자. Visual Studio Code를 사용하여 애플리케이션을 생성한다. Gradle을 사용하여 Java 프로젝트를 생성할 것이므로 설치를 진행한다.
개발환경
- Java 11
- Visual studio code ( Java extension, Gradle extension )
- Docker container ( 전 블로그 kafka 환경 )
- Gradle 7.3.3
Install Gradle
다음의 Gradle 공식사이트에서 OS에 맞게 Gradle을 설치한다.
https://gradle.org/install/
Install on MacOS
MacOS는 다음 명령으로 쉽게 설치할 수 있다.
brew install gradle
gradle -v
Welcome to Gradle 7.3.3!
Here are the highlights of this release:
- Easily declare new test suites in Java projects
- Support for Java 17
- Support for Scala 3
블로그 작성일 기준 7.3.3 버전이 최신버전으로 배포되고 있다.
gradle init
을 사용하여 Producer java app을 생성한다. 프로젝트를 위치할 디렉터리를 생성하고 해당 위치에서 실행한다.
gradle init
Starting a Gradle Daemon (subsequent builds will be faster)
Select type of project to generate:
1: basic
2: application
3: library
4: Gradle plugin
Enter selection (default: basic) [1..4] 2
Select implementation language:
1: C++
2: Groovy
3: Java
4: Kotlin
5: Scala
6: Swift
Enter selection (default: Java) [1..6] 3
Split functionality across multiple subprojects?:
1: no - only one application project
2: yes - application and library projects
Enter selection (default: no - only one application project) [1..2] 1
Select build script DSL:
1: Groovy
2: Kotlin
Enter selection (default: Groovy) [1..2] 1
Generate build using new APIs and behavior (some features may change in the next minor release)? (default: no) [yes, no]
Select test framework:
1: JUnit 4
2: TestNG
3: Spock
4: JUnit Jupiter
Enter selection (default: JUnit Jupiter) [1..4] 1
Project name (default: java_producer_dev):
Source package (default: java_producer_dev):
위와 같이 입력하면 생성한 프로젝트 폴더에 Java Application template이 생성된다.org.apache.kafka
라이브러리를 build.gradle
에 dependency를 추가하여 임포트한다.
build.gradle
dependencies {
... 생략 ...
implementation 'org.apache.kafka:kafka-clients:2.8.0'
}
프로듀서 애플리케이션 개발
App.java
kafka
라이브러리를 사용하여 간단한 Java producer App을 만들어본다.
public class App {
private static String topicName = "test";
public static void main(String[] args) {
Properties conf = new Properties();
// docker-compose >> KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9093
conf.setProperty("bootstrap.servers", "localhost:9093");
// kafka 메시지는 직렬화하여 전송한다.
conf.setProperty("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
conf.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 프로듀서 객체
Producer<Integer, String> producer = new KafkaProducer<>(conf);
// 메시지 생성
int key = 1;
String message = "Hello world!! this message was from Java application";
ProducerRecord<Integer, String> record = new ProducerRecord<Integer,String>(topicName, key, message);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if(metadata != null) {
System.out.printf("Success sending message, partition [%d], offset [%d]", metadata.partition(), metadata.offset());
} else {
System.out.printf("Failed to send message: %s", exception.getMessage());
}
}
});
// 프로듀서 자원(연결) 종료
producer.close();
}
}
- Producer는
Properties
설정을 인자로 받는다.bootstrap.servers
는 카프카 클러스터에 환경변수KAFKA_CFG_ADVERTISED_LISTENERS
에EXTERNAL
로 노출된 port를 입력key.serializer
,value.serializer
프로듀서는 카프카 브로커로 직렬화하여 값을 전송하므로 메시지에 사용될 키,값에 대한 직렬화클래스를 지정하여야 한다.
- Producer<Integer, String>, 키(Integer), 값(String)의 형태를 갖는 Producer 객체를 생성한다. Producer는
send
함수를 호출하여 브로커 서버로 메시지를 전달하고, 인자로 전달한Callback
함수로 결과가 전달된다. close
호출로 해당 프로듀서 객체 큐에 담긴 메시지를 안전하게 모두 전송하고 종료한다.
프로듀서 애플리케이션 테스트
전 블로그에서 셋트한 카프카 클러스터를 구동하고 해당 컨테이너로 접속하여 consumer를 실행한다.
docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
4b70a4bcd986 bitnami/kafka:3 "/opt/bitnami/script…" 2 days ago Up 2 seconds 9092/tcp, 0.0.0.0:9093->9093/tcp kafka
a63fce1f35da bitnami/zookeeper:3.7 "/opt/bitnami/script…" 2 days ago Up 3 seconds 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, 8080/tcp zookeeper
카프카 클러스터 컨테이너 접속
docker exec -it kafka /bin/bash
I have no name!@kafka:/$
consumer 실행
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
Hello world!! this message was from Java application
consumer
에서test
토픽에 푸쉬된 데이터를 정상적으로pull
하였다.
인터렉티브 콘솔 App
약간의 재미를 위해 사용자 입력을 프로듀싱하는 인터렉티브한 App을 만들어보자. 카프카 클라이언트의 역할을 간단히 클래스화하여 관리한다.
public class InterectiveApp {
public static void main(String args[]) throws IOException {
BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
String message;
try ( KafkaStringMessageSender kafkaSender = new KafkaStringMessageSender("localhost", 9093) ) {
while(true) {
message = br.readLine();
kafkaSender.sendMessage("test", message);
if ( "exit".equals(message) ) {
break;
}
}
} catch(Exception e) {
System.err.println(e.getMessage());
}
}
}
class KafkaStringMessageSender implements AutoCloseable {
private Producer<Integer, String> producer;
private int key = 0;
public KafkaStringMessageSender(String brokerIp, int brokerPort) {
Properties conf = new Properties();
// docker-compose >> KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9093
conf.setProperty("bootstrap.servers", String.format("%s:%d", brokerIp, brokerPort));
// kafka 메시지는 직렬화하여 전송한다.
conf.setProperty("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
conf.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<>(conf);
}
public Future<RecordMetadata> sendMessage(String topic, String message) {
key++;
ProducerRecord<Integer, String> record = new ProducerRecord<Integer,String>(topic, key, message);
return this.producer.send(record);
}
@Override
public void close() throws Exception {
this.producer.close();
}
}
KafkaStringMessageSender
클래스는bootstrap.servers
값을 생성인자로 받는다.AutoCloseable
의 close() 기능을 확장하여 App에서 자동으로 자원을 반납한다.kafkaSender.sendMessage
을 호출하여 브로커에 메시지를 push한다.
컨슈머 애플리케이션 개발
이제 외부에서 데이터를 pulling하는 컨슈머 애플리케이션을 만들어보자.
ConsumerApp.java
public class ConsumerApp {
public static void main(String args[]) throws IOException {
// Consumer 설정
Properties conf = new Properties();
conf.setProperty("bootstrap.servers", "localhost:9093");
conf.setProperty("group.id", "ConsumerGroup1");
conf.setProperty("enable.auto.commit", "false");
conf.setProperty("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
conf.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// Consumer 객체 생성
try(Consumer<Integer, String> consumer = new KafkaConsumer<>(conf)) {
// Consumer 객체의 구독 객체 설정
consumer.subscribe(Collections.singletonList("test"));
while(true) {
ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(1));
records.forEach(record -> {
System.out.printf("Pulled message, key [%d] message [%s] \n", record.key(), record.value());
TopicPartition tp = new TopicPartition(record.topic(), record.partition());
OffsetAndMetadata om = new OffsetAndMetadata(record.offset() + 1);
Map<TopicPartition, OffsetAndMetadata> commitObjMap = Collections.singletonMap(tp, om);
consumer.commitSync(commitObjMap);
});
try {
Thread.sleep(500);
} catch (Exception e) {
//TODO: handle exception
}
}
} catch (Exception e) {
System.err.println(e.getMessage());
};
}
}
consumer.subscribe
구독할 토픽을 지정한다.enable.auto.commit
을false
로 지정하여 수동으로 커밋한다.commitSync
,commitAsync
를 사용하여 커밋처리한다.
마무리하며..
Java kafka client library를 사용하여 간단한 프로듀서/컨슈머 애플리케이션을 생성해보았다. Java뿐만 아니라 Python, Go등의 인기있는 언어는 대부분 kafka library가 존재한다. 다음 장에서는 Kakfa Connector를 사용하여 file, db데이터를 핸들링하는 방법을 살펴보도록 한다.
'Web Programming' 카테고리의 다른 글
gRPC - From Java(Spring) Client to Python Server 예제 (0) | 2022.09.02 |
---|---|
문제해결: No ParameterResolver registered for parameter (0) | 2022.06.07 |
카프카(kafka) - Docker + 카프카 클러스터 구축 및 토픽생성, 메시지 전송 (0) | 2021.12.31 |
카프카(kafka) 개요 - 주요 용어 개념 정리 (0) | 2021.12.27 |
Nginx log 관련 설정 파헤치기 (feat. AWS EC2) (0) | 2021.12.15 |