본문 바로가기
Web Programming

카프카(kafka) - VSCode에서 Java Producer/Consumer 생성 with Gradle

by 맑은안개 2021. 12. 31.

들어가며..

Docker 환경, 카프카 클러스터 구축 및 토픽생성, 메시지 전송 Docker hub에 등록된 카프카 이미지(bitnami)를 사용하여 카프카 클러스터를 구축하고, 카프카에서 제공하는 쉘스크립트를 사용하여 구축
2021.12.31 - [OSS] - kafka - Docker 사용, 카프카 클러스터 구축 및 토픽생성, 메시지 전송

전 블로그에 이어 Java 프로듀서/컨슈머 애플리케이션을 만들어보자. Visual Studio Code를 사용하여 애플리케이션을 생성한다. Gradle을 사용하여 Java 프로젝트를 생성할 것이므로 설치를 진행한다.

개발환경

  • Java 11
  • Visual studio code ( Java extension, Gradle extension )
  • Docker container ( 전 블로그 kafka 환경 )
  • Gradle 7.3.3

Install Gradle

다음의 Gradle 공식사이트에서 OS에 맞게 Gradle을 설치한다.
https://gradle.org/install/

Install on MacOS

MacOS는 다음 명령으로 쉽게 설치할 수 있다.

brew install gradle
gradle -v

Welcome to Gradle 7.3.3!

Here are the highlights of this release:
 - Easily declare new test suites in Java projects
 - Support for Java 17
 - Support for Scala 3

블로그 작성일 기준 7.3.3 버전이 최신버전으로 배포되고 있다.

gradle init을 사용하여 Producer java app을 생성한다. 프로젝트를 위치할 디렉터리를 생성하고 해당 위치에서 실행한다.

 gradle init
Starting a Gradle Daemon (subsequent builds will be faster)

Select type of project to generate:
  1: basic
  2: application
  3: library
  4: Gradle plugin
Enter selection (default: basic) [1..4] 2

Select implementation language:
  1: C++
  2: Groovy
  3: Java
  4: Kotlin
  5: Scala
  6: Swift
Enter selection (default: Java) [1..6] 3

Split functionality across multiple subprojects?:
  1: no - only one application project
  2: yes - application and library projects
Enter selection (default: no - only one application project) [1..2] 1

Select build script DSL:
  1: Groovy
  2: Kotlin
Enter selection (default: Groovy) [1..2] 1

Generate build using new APIs and behavior (some features may change in the next minor release)? (default: no) [yes, no] 
Select test framework:
  1: JUnit 4
  2: TestNG
  3: Spock
  4: JUnit Jupiter
Enter selection (default: JUnit Jupiter) [1..4] 1

Project name (default: java_producer_dev): 
Source package (default: java_producer_dev): 

위와 같이 입력하면 생성한 프로젝트 폴더에 Java Application template이 생성된다.
org.apache.kafka 라이브러리를 build.gradle에 dependency를 추가하여 임포트한다.

build.gradle

dependencies {
    ... 생략 ...
    implementation 'org.apache.kafka:kafka-clients:2.8.0'
}

프로듀서 애플리케이션 개발

App.java

kafka라이브러리를 사용하여 간단한 Java producer App을 만들어본다.

public class App {

    private static String topicName = "test";

    public static void main(String[] args) {

        Properties conf = new Properties();
        // docker-compose >> KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9093
        conf.setProperty("bootstrap.servers", "localhost:9093");
        // kafka 메시지는 직렬화하여 전송한다. 
        conf.setProperty("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
        conf.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 프로듀서 객체
        Producer<Integer, String> producer = new KafkaProducer<>(conf);

        // 메시지 생성
        int key = 1;
        String message = "Hello world!! this message was from Java application";
        ProducerRecord<Integer, String> record = new ProducerRecord<Integer,String>(topicName, key, message);

        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if(metadata != null) {
                    System.out.printf("Success sending message, partition [%d], offset [%d]", metadata.partition(), metadata.offset());
                } else {
                    System.out.printf("Failed to send message: %s", exception.getMessage());
                }
            }
        });

        // 프로듀서 자원(연결) 종료
        producer.close();

    }
}
  • Producer는 Properties 설정을 인자로 받는다.
    • bootstrap.servers는 카프카 클러스터에 환경변수 KAFKA_CFG_ADVERTISED_LISTENERSEXTERNAL로 노출된 port를 입력
    • key.serializer, value.serializer 프로듀서는 카프카 브로커로 직렬화하여 값을 전송하므로 메시지에 사용될 키,값에 대한 직렬화클래스를 지정하여야 한다.
  • Producer<Integer, String>, 키(Integer), 값(String)의 형태를 갖는 Producer 객체를 생성한다. Producer는 send함수를 호출하여 브로커 서버로 메시지를 전달하고, 인자로 전달한 Callback함수로 결과가 전달된다.
  • close 호출로 해당 프로듀서 객체 큐에 담긴 메시지를 안전하게 모두 전송하고 종료한다.

프로듀서 애플리케이션 테스트

전 블로그에서 셋트한 카프카 클러스터를 구동하고 해당 컨테이너로 접속하여 consumer를 실행한다.

 docker ps
CONTAINER ID   IMAGE                   COMMAND                  CREATED       STATUS          PORTS                                                  NAMES
4b70a4bcd986   bitnami/kafka:3         "/opt/bitnami/script…"   2 days ago    Up 2 seconds    9092/tcp, 0.0.0.0:9093->9093/tcp                       kafka
a63fce1f35da   bitnami/zookeeper:3.7   "/opt/bitnami/script…"   2 days ago    Up 3 seconds    2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, 8080/tcp   zookeeper

카프카 클러스터 컨테이너 접속

 docker exec -it kafka /bin/bash
I have no name!@kafka:/$

consumer 실행

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
Hello world!! this message was from Java application
  • consumer에서 test토픽에 푸쉬된 데이터를 정상적으로 pull하였다.

인터렉티브 콘솔 App

약간의 재미를 위해 사용자 입력을 프로듀싱하는 인터렉티브한 App을 만들어보자. 카프카 클라이언트의 역할을 간단히 클래스화하여 관리한다.

public class InterectiveApp {
    public static void main(String args[]) throws IOException {

        BufferedReader br = new BufferedReader(new InputStreamReader(System.in));  
        String message; 

        try ( KafkaStringMessageSender kafkaSender = new KafkaStringMessageSender("localhost", 9093) ) {

            while(true) {
                message = br.readLine();
                kafkaSender.sendMessage("test", message);

                if ( "exit".equals(message) ) {
                    break;
                }
            }

        } catch(Exception e) {
            System.err.println(e.getMessage());
        }

    }
}

class KafkaStringMessageSender implements AutoCloseable {

    private Producer<Integer, String> producer;
    private int key = 0;

    public KafkaStringMessageSender(String brokerIp, int brokerPort) {

        Properties conf = new Properties();
        // docker-compose >> KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9093
        conf.setProperty("bootstrap.servers", String.format("%s:%d", brokerIp, brokerPort));
        // kafka 메시지는 직렬화하여 전송한다. 
        conf.setProperty("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
        conf.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        producer = new KafkaProducer<>(conf);

    }

    public Future<RecordMetadata> sendMessage(String topic, String message) {
        key++;
        ProducerRecord<Integer, String> record = new ProducerRecord<Integer,String>(topic, key, message);

        return this.producer.send(record);
    }

    @Override
    public void close() throws Exception {
        this.producer.close();
    }

}
  • KafkaStringMessageSender클래스는 bootstrap.servers 값을 생성인자로 받는다.
  • AutoCloseable의 close() 기능을 확장하여 App에서 자동으로 자원을 반납한다.
  • kafkaSender.sendMessage을 호출하여 브로커에 메시지를 push한다.

컨슈머 애플리케이션 개발

이제 외부에서 데이터를 pulling하는 컨슈머 애플리케이션을 만들어보자.

ConsumerApp.java

public class ConsumerApp {
    public static void main(String args[]) throws IOException {

        // Consumer 설정
        Properties conf = new Properties();
        conf.setProperty("bootstrap.servers", "localhost:9093");
        conf.setProperty("group.id", "ConsumerGroup1");
        conf.setProperty("enable.auto.commit", "false");
        conf.setProperty("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
        conf.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        // Consumer 객체 생성
        try(Consumer<Integer, String> consumer = new KafkaConsumer<>(conf)) {

            // Consumer 객체의 구독 객체 설정
            consumer.subscribe(Collections.singletonList("test"));

            while(true) {

                ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(1));

                records.forEach(record -> {
                    System.out.printf("Pulled message, key [%d] message [%s] \n", record.key(), record.value());

                    TopicPartition tp = new TopicPartition(record.topic(), record.partition());
                    OffsetAndMetadata om = new OffsetAndMetadata(record.offset() + 1);
                    Map<TopicPartition, OffsetAndMetadata> commitObjMap = Collections.singletonMap(tp, om);

                    consumer.commitSync(commitObjMap);
                });

                try {
                    Thread.sleep(500);
                } catch (Exception e) {
                    //TODO: handle exception
                }
            }

        } catch (Exception e) {
            System.err.println(e.getMessage());
        };

    }
}
  • consumer.subscribe 구독할 토픽을 지정한다.
  • enable.auto.commitfalse로 지정하여 수동으로 커밋한다. commitSync, commitAsync를 사용하여 커밋처리한다.


    마무리하며..

    Java kafka client library를 사용하여 간단한 프로듀서/컨슈머 애플리케이션을 생성해보았다. Java뿐만 아니라 Python, Go등의 인기있는 언어는 대부분 kafka library가 존재한다. 다음 장에서는 Kakfa Connector를 사용하여 file, db데이터를 핸들링하는 방법을 살펴보도록 한다.
반응형