https://www.youtube.com/watch?v=vKxhPUUEDmM
이 글은 위의 링크 많이 참조하여 작성하였습니다. 한번 보시면 좋을것 같아요 ㅎㅎ
Kafka Streams는 뭘까?
카프카에 대한 정의는 분산 이벤트 스트리밍 플랫폼으로써 프로듀서와 컨슈머를 통해 데이터를 생산하고 받아와서 처리하는곳에 사용해왔습니다. 이 기술은 강력한 성능을 가지고 있고 단순히 메시지를 전달하는 것이 아닌 연속적으로 대용량의 메세지를 처리하는 곳에도 사용해왔었습니다.
이제는 컨슈머로 받아와서 처리하는 것보다 더 빠르고 안전하게 실시간으로 처리할 수 있게 카프카에서 지원해준 것이 Kafka Streams입니다.
간단하게는 어떤 Topic으로 들어오는 데이터를 Consume하여 Kafka Streams에서 제공하는 처리 로직을 통해 처리 후 다른 Topic으로 전송하거나 끝내는 부분을 수행해주는 라이브러리입니다.
Kafka Streams의 장점
Kafka Streams는 카프카에 저장된 데이터를 처리하고 분석하기 위해 개발된 자바 라이브러리입니다.
이렇게 라이브러리로 제공되니까 JVM 기반 언어중 아무 언어를 선택해서 사용해도 무관합니다. 이외에도 장점들이 여러가지가 있습니다.
- 카프카와 완벽 호환된다.
- 로그스태시, 스파크와 같은 오픈소스툴과 다르게 매 카프카 버전에 맞춰서 호환을 제공한다.
- 데이터 유실과 중복처리 되지 않고 딱 1번만 처리되는 것을 보장한다.
- 스케쥴링 도구가 필요없다.
- 스파크 스트리밍과 연동해서 사용한다면 마이크로 배치 서비스를 구축 할 수 있지만 이를 위해 클러스터 관리자, 리소스 관리자가 필요하고 또한 서버들도 필요합니다. 하지만 Kafka Streams는 다른 것 필요없이 스트림즈 어플리케이션만 가지고 사용할 수 있습니다.
- 이벤트 처리 기능 Streams DSL, Processor API 제공
- 이벤트 기반 데이터 처리에 필요한 기능들을 제공하기 때문에 스트림즈를 구현하기 편하다.
- 자체 로컬 상태 저장소를 사용한다.
- 상태 기반 처리를 도와주기 위해 rocksDB를 로컬에서 사용하여 상태를 저장한다.
- 로컬 DB에 저장한 상태에 대한 변환 정보는 카프카 변경로그에 저장한다.
- 이를 통해 프로세스에 장애가 발생하더라도 상태가 모두 안전하게 저장되기 때문에 장애 복구를 할 수 있다.
Java 예시
Spring에서 사용하는 방법보다 더 통용적으로 사용 가능한 방법을 보기위해 Spring이 아닌 Java에서의 간단한 예시를 보겠습니다. 제가 만든 예시는 너무나 간단한 구조로 구성되어 있습니다.
public static void main(String[] args) {
producerTest();
topic1Stream();
topic2Stream();
}
private static Properties getKafkaStreamsProperties(String appId) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "카프카 브로커 IP");
//메시지의 키, 값의 직렬화/역직렬화를 위한 설정
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
return props;
}
private static void topic1Stream() {
//스트림 토폴로지 정의
//StreamsBuilder를 build 하면 Topology 반환
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream("streamTopic");
stream
.mapValues(String::trim)
.filter((String v1, String v2) -> (v1 + v2).length() > 5)
.mapValues((String v1) -> v1 + " 토픽1 처리완료 ")
.to("streamTopic2");
//스트림 생성
KafkaStreams streams = new KafkaStreams(builder.build(), getKafkaStreamsProperties("SecondId"));
streams.start();
}
private static void topic2Stream() {
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream("streamTopic2");
stream
.filter((key, value) -> value.contains("3"))
.mapValues((String v1) -> v1 + " 토픽2 처리완료 ")
.to("streamTopic3");
KafkaStreams streams = new KafkaStreams(builder.build(), getKafkaStreamsProperties("LastId"));
streams.start();
}
private static void producerTest() {
//1. Properties 만들기
Properties kafkaProducerProperties = new Properties();
kafkaProducerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "카프카 브로커 ID");
kafkaProducerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
kafkaProducerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
kafkaProducerProperties.put(ProducerConfig.ACKS_CONFIG, "-1");
// 2. Producer 생성
Producer<String, String> producer = new KafkaProducer<>(kafkaProducerProperties);
// 3. 전송
for (int index = 0; index < 3; index++) {
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("streamTopic", "stream", "스트림테스트 : " + index);
producer.send(producerRecord);
}
// 4. 닫기
producer.close();
}
코드가 난잡해서 이해가 가셨을지는 모르지만 간단하게 Kafka Streams를 이용해 연속적인 데이터를 가공, Produce까지 해보았습니다. 단순히 데이터를 받아 처리하고 끝내는것이 아니라 연속된 처리를 위한 파이프라인을 구성할 수 있다는 것이 가장 큰 장점이라고 할 수 있습니다.
그리고 위에서 사용한 stream.filter와 같은 메서드 들이 Streams DSL입니다. 확실히 DSL만으로도 충분하다는 느낌이 들 정도로 많은 기능들을 담고 있습니다.
Streams를 잠시 사용해보며 느낀점은 간단하게 Produce부터 Consume을 할 수 있다는 것과 제가 실무에서 다뤄본 카프카로 이벤트 전달을 하는 Task-Step 구조를 대체할 수 있겠다는 생각이 들었습니다.
'Web > Kafka' 카테고리의 다른 글
[Kafka] Kafka를 편하게 Kafkactl 명령어 (2) | 2023.12.08 |
---|---|
[Kafka] Kafka Connect란? (0) | 2022.03.11 |
[kafka] Elastic Stack과 Kafka 실습해보자! - (3)(elk 설치) (0) | 2021.04.13 |
[kafka] Elastic Stack과 Kafka 실습해보자! - (2)(produce and consume) (3) | 2021.03.24 |
[kafka] 카프카 관련 라이브러리 or API (0) | 2021.03.20 |