본문 바로가기

Web/Kafka

[kafka] Elastic Stack과 Kafka 실습해보자! - (2)(produce and consume)

<1편>

https://marrrang.tistory.com/40?category=927204

 

[kafka] Elastic Stack과 Kafka 실습해보자! - (1)(설치 및 구성)

카프카만 하기엔 아쉬운데? ES도 한번 도전해보자 카프카만 하기엔 시간이 아깝기에 ES도 함께 적용시켜보는 아주아주 간단한 실습을 진행해보려합니다. 우선은 카프카부터 시작해봅시다. 모든

marrrang.tistory.com

데이터 소통이 있어야 ES도 쓸곳이 생기겠지

앞서서 카프카를 설치했으니 사용을 해보겠습니다. 이번 실습에서는 Spring Boot 프로젝트를 생성하고 간단하게 Kafka로 메세지를 전송해보겠습니다.

1. SpringBoot 프로젝트로 시작

스프링 부트 프로젝트를 생성하고 시작하겠습니다. 이 실습에서 boot 프로젝트 생성하는 부분은 생략하고 넘어가겠습니다. 저는 아래의 디렉토리 구조를 만들고 시작하겠습니다.

 

com
 └ example
 	 └ kafkatest
     		└ config
        	└ controller

2. Dependency 추가 (Maven 기준)

kafka Dependency를 추가해줍니다.

 

<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka</artifactId>
</dependency>

3. 카프카 설정 추가

간단하게 설정하기 위해 application.properties에 설정을 추가해서 사용하겠습니다.

 

//producer 설정
// 여러대의 서버 설정시 ','로 나누어 적는다
spring.kafka.producer.bootstrap-servers= [카프카 설치해놓은 서버ip:port]
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

//consumer 설정
spring.kafka.consumer.bootstrap-servers= [카프카 설치해놓은 서버ip:port]
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

spring.kafka.consumer.group-id=kafka-test-group
spring.kafka.consumer.client-id=kafka-test-client
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.max-poll-records=5
spring.kafka.producer.acks=all
spring.kafka.consumer.fetch-min-size=2

4. MessageSender 클래스 작성

Message produce에 사용할 클래스를 작성해 볼까 합니다. 그냥 KafkaTemplate.send(...) 해서 사용해도 되지만 message를 보내면서 다양한 작업들을 진행하려면 Custom 클래스가 있는게 좋을것 같습니다.

 

package com.example.kafkatest.config;

@Component
public class MessageSender {
	private final KafkaTemplate<String, String> kafkaTemplate;

	public MessageSender(KafkaTemplate<String, String> kafkaTemplate) {
		this.kafkaTemplate = kafkaTemplate;
	}

	public void send(String topic, String message) {
		System.out.println("Produce message : " + message);
		kafkaTemplate.send(topic, message);
	}
}

 

5. Controller 작성

메세지 전송 요청을 할 Controller 클래스를 작성해봅시다. 간단하게 메세지를 보내는 API 하나만 만들어보겠습니다.

 

package com.example.kafkatest.controller;

@RestController
public class KafkaController {
    private final MessageSender messageSender;
    
    @Autowired
    public kafkaController(MessageSender messageSender) {
    	this.messageSender = messageSender;
    }
    
    @GetMapping("/send")
    public String sendMessage(@RequestParam String message) {
    	//"test-kafka" 이 부분은 저번 실습에서 정한 Topic명으로
    	messageSender.send("test-kafka", message);
        return "success";
    }
}

 

6. Consumer 작성

나중에 Elastic Search로 Consume을 할것이지만 그 전에 테스트를 위해 Consumer를 하나 만들어 봅시다.

kafkaListener를 사용해서 작성해보겠습니다.

 

package com.example.kafkatest;

@Component
public class KafkaConsumerListener {
	@KafkaListener(
    	topics = "test-kafka",
        groupId = "kafka-group",
        clientIdPrefix = "kafka-client"
    )
    public void kafkaTestItemIncoming(String message) {
    	System.out.println("Received Message : " + message);
    }
}

 

Consumer 까지 작성했으니 테스트 해보겠습니다. Postman 등의 툴을 이용해 서버에 메세지를 보내보겠습니다.

확인보시면 아래와 같은 Console log를 확인할 수 있습니다.

 

Produce message : Marrrang Dev Blog!
Received Message : Marrrang Dev Blog!

 

7. 다음으로

다음번에는 ES가 저희가 보낸 메세지를 consume 하거나 log를 가져가서 표시하도록 해보겠습니다.

부족한 저의 실습을 봐주셔서 감사합니다 ㅎㅎ

 


Kafka 초보가 테스트한 내용을 정리한 글입니다

오류나 수정이 필요한 사항 있으면 댓글 감사히 받겠습니다!

반응형