스프링부트

spring boot kafka 연동하기

알쓸개잡 2023. 10. 8.

Spring에서 kafka 연동은 Apache kafka의 spring-kafka 프로젝트의 auto configuration 제공을 통해서 지원된다. kafka 설정은 spring.kafka.*의 외부 구성 속성에 의해서 설정할 수 있다. 이번 포스팅에서는 kafka messeage 송수신을 위한 auto configuration에 대해서 기록한다.

 

dependency

spring boot에서 kafka를 사용하기 위해서 필요한 dependency는 아래와 같다.

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>3.1.4</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

...
<dependencies>
	<dependency>
    	<groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
	<dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    ...
</dependencies>

만약 org.springframework.boot:spring-boot-starter-web 을 dependency를 추가하였다면 org.springframework.boot:spring-boot-starter는 추가할 필요는 없다.

 

spring kafka properties

spring에서 kafka 사용을 위한 설정은 spring-boot-autoconfigure artifact의 org.springframework.boot.autoconfigure.kafka.KafkaProperties 클래스를 통해서 외부 설정과 연동된다.

지원되는 옵션은 github의 KafkaProperties 파일의 코드를 참고하기 바란다.

org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration 클래스 파일의 클래스 annotation은 아래와 같다.

@AutoConfiguration
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(KafkaProperties.class)
@Import({ KafkaAnnotationDrivenConfiguration.class, KafkaStreamsAnnotationDrivenConfiguration.class })

@EnableConfigurationProperties(KafkaProperties.class) 를 통해서 kafka 연동 관련 속성은 KafkaProperties 클래스에서 처리함을 알 수 있다. KafkaProperties의 클래스 annotation은 아래와 같다.

@ConfigurationProperties(prefix = "spring.kafka")
 public class KafkaProperties { ... }

application.yml 혹은 application.properties 의 속성에서 spring.kafka.* 의 속성은 kafka 관련 속성으로 처리함을 알 수 있다.

KafkaProperties 클래스의 멤버 변수는 아래와 같다.

private List<String> bootstrapServers = new ArrayList<>(Collections.singletonList("localhost:9092"));

/**
 * ID to pass to the server when making requests. Used for server-side logging.
 */
private String clientId;

/**
 * Additional properties, common to producers and consumers, used to configure the
 * client.
 */
private final Map<String, String> properties = new HashMap<>();

private final Consumer consumer = new Consumer();

private final Producer producer = new Producer();

private final Admin admin = new Admin();

private final Streams streams = new Streams();

private final Listener listener = new Listener();

private final Ssl ssl = new Ssl();

private final Jaas jaas = new Jaas();

private final Template template = new Template();

private final Security security = new Security();

private final Retry retry = new Retry();

예로

Consumer consumer는 spring.kafka.consumer.* 속성과 연결된다.

Producer producer 는 spring.kafka.producer.* 속성과 연결된다.

KafkaProperties의 몇가지 메서드를 보면 

private Map<String, Object> buildCommonProperties()
public Map<String, Object> buildConsumerProperties()
public Map<String, Object> buildProducerProperties()
public Map<String, Object> buildAdminProperties()
public Map<String, Object> buildStreamsProperties()

메서드가 있는데 위 메서드들은 spring boot kafka 관련 설정을 kafka의 설정항목과 mapping 하여 변경하는 작업을 수행한다.

예를 들어 spring.kafka.bootstrap-servers 설정은 kafka의 bootstrap.servers 설정으로 지정한다.

kafka의 설정 항목은 kafka 공식 문서를 참고하면 도움이 된다.

 

KafkaProperties 에서 할 수 있는 설정 항목 이외의 추가적인 설정은

spring.kafka.properties.* : 공통 추가 설정

spring.kafka.consumer.properties.* : consumer 관련 추가 설정

spring.kafka.producer.properties.* : producer 관련 추가 설정

spring.kafka.admin.properties.* : admin 관련 추가 설정

spring.kafka.streams.properties.* : stream 관련 추가 설정

으로 지정할 수 있다.

spring:
  kafka:
    consumer:
      properties:
        max.partition.fetch.bytes: 1048576
        receive.buffer.bytes: 65536
    producer:
      properties:
        delivery.timeout.ms: 120000
        linger.ms: 0

 

spring kafka autoconfiguration

spring boot에서는 spring-boot-autoconfigure artifact의 org.springframework.boot.autoconfigure.kafka 패키지에 있는 클래스 파일을 통해서 kafka 관련 빈들을 제공한다. 관련된 configuration 클래스 파일은 아래와 같다.

  • KafkaAutoConfiguration 
    • KafkaTemplate Bean 생성
    • LoggingProducerListener Bean 생성
    • DefaultKafkaConsumerFactory Bean 생성
    • DefaultKafkaProducerFactory Bean 생성
  • KafkaAnnotationDrivenConfiguration
    • ConcurrentKafkaListenerContainerFactory Bean 생성

DefaultKafkaConsumerFactory Bean을 생성하는 코드는 아래와 같은데

@Bean
@ConditionalOnMissingBean(ConsumerFactory.class)
public DefaultKafkaConsumerFactory<?, ?> kafkaConsumerFactory(KafkaConnectionDetails connectionDetails,
        ObjectProvider<DefaultKafkaConsumerFactoryCustomizer> customizers) {
    Map<String, Object> properties = this.properties.buildConsumerProperties();
    applyKafkaConnectionDetailsForConsumer(properties, connectionDetails);
    DefaultKafkaConsumerFactory<Object, Object> factory = new DefaultKafkaConsumerFactory<>(properties);
    customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));
    return factory;
}

별도의 ConsumerFactory Bean이 생성되지 않은 경우 자동으로 생성된다.

메서드 마지막 라인에 보면 customizers.orderedStream().forEach((customizer) -> customizer.customize(factory)) 코드를 통해서 programmatical 하게 consumer factory에 설정을 적용할 수 있다.

@Bean
public DefaultKafkaConsumerFactoryCustomizer defaultKafkaConsumerFactoryCustomizer() {
    //deserializer 설정
    return consumerFactory ->
        consumerFactory.setValueDeserializer(new JsonDeserializer<>());
}

위와 같은 코드를 통해서 생성된 ConsumerFactory Bean에 추가적인 설정을 적용할 수 있다.

ProducerFactory에도 DefaultKafkaProducerFactoryCustomizer를 통해서 추가적인 설정을 적용할 수 있다.

@Bean
public DefaultKafkaProducerFactoryCustomizer defaultKafkaProducerFactoryCustomizer() {
    //serializer 설정
    return producerFactory ->
        producerFactory.setValueSerializer(new JsonSerializer<>());
}

 

message send and receive

kafka message 전송은 auto configuration을 통해서 주입되는 KafkaTemplate Bean을 통해서 간단하게 전송할 수 있다.

kafka message 수신은 @KafkaListener annotation을 통해서 간단하게 수신할 수 있다.

 

send message

@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;

...

String key = UUID.randomUUID().toString();
kafkaTemplate.send("topic name", key, message)
    .whenComplete((result, throwable) -> {
        if (throwable != null) {
            //메시지 전송 실패 코드
        } else {
            //메시지 전송 성공 코드
        }
    });

kafkaTemplate를 통해서 메시지를 전송할 때 별도의 key를 지정하지 않으면 항상 동일한 partition에 데이터가 전송된다.

 

receive message

@Component
public class ListenerService {

...

    @KafkaListener(
        topics = "topic name",
        clientIdPrefix = "topic1-listener",
        groupId = "${spring.kafka.consumer.group-id}"
    )
    public void listen(Message message) {
        log.info("received topic1 message: {}", message);
    }
...
}

@KafkaListener 어노테이션을 통해 메시지 수신 listener를 지정할 수 있다.

@KafkaListener 어노테이션의 속성에 대한 정보는 해당 파일에서 확인해 보기 바란다. 주석 설명이 잘 되어 있다.

Listener는 kafkaListenerContainerFactory Bean을 통해서 생성되는데 kafkaListenerContainerFactory Bean을 별도로 정의하지 않으면 spring.kafka.listener.* 설정을 기반으로 auto configuration을 통해서 자동으로 Bean이 생성된다.

@KafkaListener 어노테이션을 사용하기 위해서는 @Configuration 클래스에 @EnableKafka 어노테이션이 지정되어야 한다.

listener container factory는 기본적으로 ConcurrentMessageListenerContainer를 구성하는 데 사용된다.

아래 코드는 KafkaAnnotationDrivenConfiguration 클래스에 정의된 kafkaListenerContainerFactory Bean을 생성하는 코드이다.

@Bean
@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
        ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
        ObjectProvider<ConsumerFactory<Object, Object>> kafkaConsumerFactory,
        ObjectProvider<ContainerCustomizer<Object, Object, ConcurrentMessageListenerContainer<Object, Object>>> kafkaContainerCustomizer) {
    ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    configurer.configure(factory, kafkaConsumerFactory
        .getIfAvailable(() -> new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties())));
    kafkaContainerCustomizer.ifAvailable(factory::setContainerCustomizer);
    return factory;
}

kafkaListenerContainerFactory Bean이 정의되지 않은 경우 생성되며 ConcurrentKafkaListenerContainerFactory가 생성된다.

 

spring boot kafka sample code

kafka에 test1, test2 두개의 토픽을 생성하여 각 토픽에 메시지를 송수신하는 샘플 코드다.

로컬 kafka 설치

docker compose를 이용하여 로컬에 kafka를 설치하였다.

version: "3.8"

services:
  zookeeper:
    image: docker.io/bitnami/zookeeper:3.8
    ports:
      - "2181:2181"
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    image: docker.io/bitnami/kafka:3.4
    ports:
      - "9092:9092"
      - "9094:9094"
    environment:
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_ENABLE_KRAFT=no
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_CFG_LISTENERS=INTERNAL://kafka:9094,EXTERNAL://kafka:9092
      - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka:9094,EXTERNAL://127.0.0.1:9092
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL
    depends_on:
      - zookeeper

로컬 loopback으로 접속할 것이므로 9092 포트를 통해서 접속할 것이다.

 

application.yml

kafka 관련 설정이다.

spring:
  kafka:
    bootstrap-servers: localhost:9092
    #producer.bootstrap-servers가 지정되지 않으면 kafka.bootstrap-servers가 사용된다.
    producer:
      bootstrap-servers: localhost:9092
      #producer key serializer 설정
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      #producer value serializer 설정
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      #kafka broker에서 producer를 식별하기 위한 식별값
      client-id: kafka-test-producer
    #consumer.bootstrap-servers가 지정되지 않으면 kafka.bootstrap-servers가 사용된다.
    consumer:
      bootstrap-servers: localhost:9092
      #consumer key deserializer 설정
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      #consumer value deserializer 설정
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      #consumer group id 설정
      group-id: test-group-id
      auto-offset-reset: earliest
      #kafka broker에서 consumer를 식별하기 위한 식별값
      client-id: kafka-test-consumer
      properties:
        spring.json.trusted.packages: com.example.spring.kafka.dto
    listener:
      concurrency: 3
      ack-mode: record

spring.kafka.consumer.properties의 spring.json.trusted.packages 설정은 지정된 package에 클래스에 대해서 deserialization을 허용하겠다는 설정이다.

 

spring.kafka.listener.* 는 메시지를 수신하기 위한 Listener 설정인데 concurrency를 3으로 설정하여 3개의 concurrent listener를 동작시키기 위한 설정이다.

ack-mode는 offset commit을 위한 설정인데 관련 내용은 spring 문서의 Committing Offsets를 참고하면 도움이 된다.

spring kafka listener container 관련 설정은 spring 공식 문서를 참고하면 도움이 된다.

 

configuration for topic

package com.example.spring.kafka.configuration;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.requests.CreateTopicsRequest;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;

@Configuration
@Slf4j
//for a auto-configuration listener container factory
@EnableKafka
public class KafkaConfig {

	public static final String TOPIC1 = "test1";
	public static final String TOPIC2 = "test2";


	@Bean
	public NewTopic topic1() {
		return new NewTopic(TOPIC1, 3, CreateTopicsRequest.NO_REPLICATION_FACTOR);
	}

	@Bean
	public NewTopic topic2() {
		return new NewTopic(TOPIC2, 3, CreateTopicsRequest.NO_REPLICATION_FACTOR);
	}
}

NewTopic 타입의 bean을 생성하여 test1, test2 이름의 토픽을 생성하도록 한다. test1, test2 토픽이 이미 존재하는 경우 topic1, topic2 Bean은 무시된다.

NewTopic 생성자의 각 파라미터는 아래와 같다.

parameter1 : topic 이름

parameter2 : partition 개수

parameter3 : replication factor 개수 (kafka 클러스터 환경에서는 2개 이상의 값을 설정하는 것을 권장한다.)

 

message payload dto

package com.example.spring.kafka.dto;

import com.fasterxml.jackson.annotation.JsonProperty;

public record Topic1Dto(
	@JsonProperty("id") String id,
	@JsonProperty("message") String message) {
}

test1 topic으로 전송할 메시지 payload 정의이다.

package com.example.spring.kafka.dto;

import com.fasterxml.jackson.annotation.JsonProperty;

public record Topic2Dto(
	@JsonProperty("id") String id,
	@JsonProperty("message") String message) {
}

test2 topic 으로 전송할 메시지 payload 정의이다.

Topic1Dto, Topic2Dto 는 

spring:
  kafka:
    consumer:
      properties:
        spring.json.trusted.packages: com.example.spring.kafka.dto

위 설정으로 이 샘플코드 에서는 deserialization을 수행할 DTO 클래스는 com.example.spring.kafka.dto 패키지에 있어야 한다.

 

controller

package com.example.spring.kafka.controller;

import com.example.spring.kafka.dto.Topic1Dto;
import com.example.spring.kafka.dto.Topic2Dto;
import com.example.spring.kafka.producer.ProducerService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

@RestController
@Slf4j
@RequiredArgsConstructor
public class KafkaController {
	private final ProducerService producerService;

	@PostMapping(value = "/topic1", consumes = MediaType.APPLICATION_JSON_VALUE)
	public void produceTopic1(@RequestBody Topic1Dto topic1Dto) {
		producerService.produceTopic1(topic1Dto);
	}

	@PostMapping(value = "/topic2", consumes = MediaType.APPLICATION_JSON_VALUE)
	public void produceTopic2(@RequestBody Topic2Dto topic2Dto) {
		producerService.produceTopic2(topic2Dto);
	}
}

POST /topic1 호출은 test1 topic으로 메시지를 전송하는 API이다.

POST /topic2 호출은 test2 topic으로 메시지를 전송하는 API이다.

 

Message Producer Service

package com.example.spring.kafka.producer;

import com.example.spring.kafka.configuration.KafkaConfig;
import com.example.spring.kafka.dto.Topic1Dto;
import com.example.spring.kafka.dto.Topic2Dto;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

import java.util.UUID;

@Service
@Slf4j
@RequiredArgsConstructor
public class ProducerService {
	private final KafkaTemplate<String, Object> kafkaTemplate;

	public void produceTopic1(final Topic1Dto topic1Dto) {
		String key = UUID.randomUUID().toString();
		kafkaTemplate.send(KafkaConfig.TOPIC1, key, topic1Dto)
			.whenComplete((result, throwable) -> {
				if (throwable != null) {
					log.error("fail to send message, {}", throwable.getMessage());
				} else {
					RecordMetadata metadata = result.getRecordMetadata();
					log.info("send message: {}", topic1Dto);
					log.info("send topic: {}", metadata.topic());
					log.info("send partition: {}", metadata.partition());
					log.info("send offset: {}", metadata.offset());
				}
			});
	}

	public void produceTopic2(final Topic2Dto topic2Dto) {
		String key = UUID.randomUUID().toString();
		kafkaTemplate.send(KafkaConfig.TOPIC2, key, topic2Dto)
			.whenComplete((result, throwable) -> {
				if (throwable != null) {
					log.error("fail to send message, {}", throwable.getMessage());
				} else {
					RecordMetadata metadata = result.getRecordMetadata();
					log.info("send message: {}", topic2Dto);
					log.info("send topic: {}", metadata.topic());
					log.info("send partition: {}", metadata.partition());
					log.info("send offset: {}", metadata.offset());
					log.info("success to send message, topic: {}, partition: {}, offset: {}",
						metadata.topic(), metadata.partition(), metadata.offset());
				}
			});
	}
}

kafkaTemplate.send는 CompletableFuture를 리턴하여 비동기로 동작한다. whenComplete 메서드를 호출하여 전송 결과 처리를 수행한다.

 

Message Consumer Service

package com.example.spring.kafka.consumer;

import com.example.spring.kafka.configuration.KafkaConfig;
import com.example.spring.kafka.dto.Topic1Dto;
import com.example.spring.kafka.dto.Topic2Dto;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.adapter.ConsumerRecordMetadata;
import org.springframework.stereotype.Service;

@Slf4j
@Service
public class ConsumerService {

	@KafkaListener(
		topics = KafkaConfig.TOPIC1,
		//consumer.client-id 의 설정을 override하여 consumer를 생성한다.
		clientIdPrefix = "topic1-listener",
		groupId = "${spring.kafka.consumer.group-id}"
	)
	public void listenTopic1(Topic1Dto topic1Dto, ConsumerRecordMetadata metadata) {
		log.info("received message: {}", topic1Dto);
		log.info("received topic: {}", metadata.topic());
		log.info("received partition: {}", metadata.partition());
		log.info("received offset: {}", metadata.offset());
	}

	@KafkaListener(
		topics = KafkaConfig.TOPIC2,
		//consumer.client-id 의 설정을 override하여 consumer를 생성한다.
		clientIdPrefix = "topic2-listener",
		groupId = "${spring.kafka.consumer.group-id}"
	)
	public void listenTopic2(Topic2Dto topic2Dto, ConsumerRecordMetadata metadata) {
		log.info("received message: {}", topic2Dto);
		log.info("received topic: {}", metadata.topic());
		log.info("received partition: {}", metadata.partition());
		log.info("received offset: {}", metadata.offset());
	}
}

@KafkaListener 어노테이션을 지정하여 메시지 consumer listener container를 생성한다. ConsumerRecordMetadata 타입의 인수를 통해서 수신된 메시지의 메타 정보를 확인할 수 있다.

consumer listener는 아래와 같이 생성된다.

o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=topic1-listener-0, groupId=test-group-id] Subscribed to topic(s): test1
o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=topic1-listener-1, groupId=test-group-id] Subscribed to topic(s): test1
o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=topic1-listener-2, groupId=test-group-id] Subscribed to topic(s): test1
o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=topic2-listener-0, groupId=test-group-id] Subscribed to topic(s): test2
o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=topic2-listener-1, groupId=test-group-id] Subscribed to topic(s): test2
o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=topic2-listener-2, groupId=test-group-id] Subscribed to topic(s): test2

test1, test2 토픽에 대해서 각각 concurrency 개수인 3개의 consumer 가 생성된다.

 

실행 결과

postman을 통해서 아래와 같이 /topic1 호출하였다.

POST http://localhost:8080/topic1

message body
{
    "id": "test1",
    "message": "topic1 message"
}

로그는 아래와 같이 생성된다.

[test-producer-1] c.e.s.kafka.producer.ProducerService     : send message: Topic1Dto[id=test1, message=topic1 message]
[test-producer-1] c.e.s.kafka.producer.ProducerService     : send topic: test1
[test-producer-1] c.e.s.kafka.producer.ProducerService     : send partition: 0
[test-producer-1] c.e.s.kafka.producer.ProducerService     : send offset: 6
[ntainer#0-0-C-1] c.e.s.kafka.consumer.ConsumerService     : received message: Topic1Dto[id=test1, message=topic1 message]
[ntainer#0-0-C-1] c.e.s.kafka.consumer.ConsumerService     : received topic: test1
[ntainer#0-0-C-1] c.e.s.kafka.consumer.ConsumerService     : received partition: 0
[ntainer#0-0-C-1] c.e.s.kafka.consumer.ConsumerService     : received offset: 6

 

postman을 통해서 아래와 같이 /topic2 호출하였다.

POST http://localhost:8080/topic2

message body
{
    "id": "test2",
    "message": "topic2 message"
}

로그는 아래와 같이 생성된다.

[test-producer-1] c.e.s.kafka.producer.ProducerService     : send message: Topic2Dto[id=test2, message=topic2 message]
[test-producer-1] c.e.s.kafka.producer.ProducerService     : send topic: test2
[test-producer-1] c.e.s.kafka.producer.ProducerService     : send partition: 1
[test-producer-1] c.e.s.kafka.producer.ProducerService     : send offset: 0
[test-producer-1] c.e.s.kafka.producer.ProducerService     : success to send message, topic: test2, partition: 1, offset: 0
[ntainer#1-1-C-1] c.e.s.kafka.consumer.ConsumerService     : received message: Topic2Dto[id=test2, message=topic2 message]
[ntainer#1-1-C-1] c.e.s.kafka.consumer.ConsumerService     : received topic: test2
[ntainer#1-1-C-1] c.e.s.kafka.consumer.ConsumerService     : received partition: 1
[ntainer#1-1-C-1] c.e.s.kafka.consumer.ConsumerService     : received offset: 0

 


참고링크

https://docs.spring.io/spring-kafka/docs/current/reference/html/#preface

 

Spring for Apache Kafka

When using Spring for Apache Kafka in a Spring Boot application, the Apache Kafka dependency versions are determined by Spring Boot’s dependency management. If you wish to use a different version of kafka-clients or kafka-streams, and use the embedded ka

docs.spring.io

https://docs.spring.io/spring-boot/docs/current/reference/html/messaging.html#messaging.kafka

 

Messaging

The Spring Framework provides extensive support for integrating with messaging systems, from simplified use of the JMS API using JmsTemplate to a complete infrastructure to receive messages asynchronously. Spring AMQP provides a similar feature set for the

docs.spring.io

https://kafka.apache.org/documentation/#gettingStarted

 

Apache Kafka

Apache Kafka: A Distributed Streaming Platform.

kafka.apache.org

 

'스프링부트' 카테고리의 다른 글

STOMP와 ACTIVEMQ를 이용한 메시지 broadcasting  (0) 2023.10.11
spring boot scheduler basic  (0) 2023.10.09
spring bean scope  (0) 2023.10.03
application events and listeners  (0) 2023.10.02
spring boot JMS activemq connection factory  (0) 2023.09.28

댓글

💲 추천 글