SSE (Server Sent Event)는 웹 어플리케이션에서 실시간 업데이트를 제공하는 데 사용되는 웹 기술 중 하나이다. Spring에서는 Spring MVC에서 사용되는 SseEmitter 클래스를 제공하여 손쉽게 SSE 스트림을 생성하고 클라이언트에게 이벤트를 푸시할 수 있도록 한다. 이를 통해 클라이언트가 연결을 유지하면서 서버로부터 데이터를 지속적으로 수신할 수 있도록 한다. 이번 포스팅에서는 Spring에서 SSE를 사용하는 방법에 대해서 기록한다.
SSE (Server Sent Event) 특징
- 서버 -> 클라이언트 단방향으로 이벤트를 스트리밍 할 수 있도록 대부분의 브라우저에서 채택한 사양
- 이벤트 데이터는 UTF-8로 인코딩된 텍스트 데이터 스트림
- 이벤트 데이터 형식은 줄 바꿈으로 구분된 일련의 키-값(id, retry, data, event)으로 구성
- 데이터 페이로드 형식을 어떤 식으로든 제한하지 않으며, 간단한 문자열이나 복잡한 JSON 또는 XML 구조를 사용가능
- 데이터 전송 MIME Type은 text/event-stream
- event stream 데이터는 항상 UTF-8로 인코딩 해야 한다.
자세한 내용은 SSE spec 정의 문서를 참고
SSE 서버 구현
SSE는 SseEmitter를 통해서 client subscription을 관리한다.
client와 연결을 맺고 이벤트를 broadcasting 하는 Controller
package com.example.stream.controller;
import com.example.stream.dto.EventPayload;
import com.example.stream.service.SseEmitterService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.UUID;
@RestController
@RequiredArgsConstructor
@Slf4j
public class SSEController {
private final SseEmitterService sseEmitterService;
//응답 mime type 은 반드시 text/event-stream 이여야 한다.
//클라이언트로 부터 SSE subscription 을 수락한다.
@GetMapping(path = "/v1/sse/subscribe", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public ResponseEntity<SseEmitter> subscribe() {
String sseId = UUID.randomUUID().toString();
SseEmitter emitter = sseEmitterService.subscribe(sseId);
return ResponseEntity.ok(emitter);
}
//eventPayload 를 SSE 로 연결된 모든 클라이언트에게 broadcasting 한다.
@PostMapping(path = "/v1/sse/broadcast")
public ResponseEntity<Void> broadcast(@RequestBody EventPayload eventPayload) {
sseEmitterService.broadcast(eventPayload);
return ResponseEntity.ok().build();
}
}
SseEmitter를 관리하는 Service
package com.example.stream.service;
import com.example.stream.dto.EventPayload;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Service
@Slf4j
public class SseEmitterService {
// thread-safe 한 컬렉션 객체로 sse emitter 객체를 관리해야 한다.
private final Map<String, SseEmitter> emitterMap = new ConcurrentHashMap<>();
private static final long TIMEOUT = 60 * 1000;
private static final long RECONNECTION_TIMEOUT = 1000L;
public SseEmitter subscribe(String id) {
SseEmitter emitter = createEmitter();
//연결 세션 timeout 이벤트 핸들러 등록
emitter.onTimeout(() -> {
log.info("server sent event timed out : id={}", id);
//onCompletion 핸들러 호출
emitter.complete();
});
//에러 핸들러 등록
emitter.onError(e -> {
log.info("server sent event error occurred : id={}, message={}", id, e.getMessage());
//onCompletion 핸들러 호출
emitter.complete();
});
//SSE complete 핸들러 등록
emitter.onCompletion(() -> {
if (emitterMap.remove(id) != null) {
log.info("server sent event removed in emitter cache: id={}", id);
}
log.info("disconnected by completed server sent event: id={}", id);
});
emitterMap.put(id, emitter);
//초기 연결시에 응답 데이터를 전송할 수도 있다.
try {
SseEmitter.SseEventBuilder event = SseEmitter.event()
//event 명 (event: event example)
.name("event example")
//event id (id: id-1) - 재연결시 클라이언트에서 `Last-Event-ID` 헤더에 마지막 event id 를 설정
.id(String.valueOf("id-1"))
//event data payload (data: SSE connected)
.data("SSE connected")
//SSE 연결이 끊어진 경우 재접속 하기까지 대기 시간 (retry: <RECONNECTION_TIMEOUT>)
.reconnectTime(RECONNECTION_TIMEOUT);
emitter.send(event);
} catch (IOException e) {
log.error("failure send media position data, id={}, {}", id, e.getMessage());
}
return emitter;
}
public void broadcast(EventPayload eventPayload) {
emitterMap.forEach((id, emitter) -> {
try {
emitter.send(SseEmitter.event()
.name("broadcast event")
.id("broadcast event 1")
.reconnectTime(RECONNECTION_TIMEOUT)
.data(eventPayload, MediaType.APPLICATION_JSON));
log.info("sended notification, id={}, payload={}", id, eventPayload);
} catch (IOException e) {
//SSE 세션이 이미 해제된 경우
log.error("fail to send emitter id={}, {}", id, e.getMessage());
}
});
}
private SseEmitter createEmitter() {
return new SseEmitter(TIMEOUT);
}
}
연결 세션에 대한 timeout 은 1분을 설정하였다. 1분 동안 아무런 데이터 전송이 없는 경우 SseEmitter의 onTimeout 핸들러가 호출되고 아래와 같은 로그가 출력된다.
c.e.stream.service.SseEmitterService : server sent event timed out : id=db76eb66-a8ed-4dc9-890f-4dcd71018821
c.e.stream.service.SseEmitterService : server sent event removed in emitter cache: id=db76eb66-a8ed-4dc9-890f-4dcd71018821
c.e.stream.service.SseEmitterService : disconnected by completed server sent event: id=db76eb66-a8ed-4dc9-890f-4dcd71018821
SSE Event payload 정의
package com.example.stream.dto;
import com.fasterxml.jackson.annotation.JsonProperty;
public record EventPayload(@JsonProperty("memberId") String memberId,
@JsonProperty("memberName") String memberName,
@JsonProperty("memberAge") String memberAge) {
}
테스트
위 예제 코드의 서버 포트는 8081이다.
postman 테스트
1. GET http://localhost:80801/v1/sse/subscribe를 호출하여 연결을 수립한다.
2. 이벤트를 발생시켜 본다.
POST http://localhost:8081/v1/sse/broadcast 를 호출하고 아래와 같은 JSON body 를 payload 로 전송한다.
{
"memberId": "test-id",
"memberName": "test-name",
"memberAge": "test-age"
}
1번에서 연결된 세션으로 발생시킨 이벤트를 수신한다.
브라우저 테스트
두 개의 크롬 브라우저 창을 열어 각각 /v1/sse/subscribe 로 연결을 수립한다.
이벤트를 발생시키면 두개의 브라우저에서 이벤트 메시지를 수신한다.
서버 로그는 아래와 같이 출력된다.
c.e.stream.service.SseEmitterService : sended notification, id=bf166289-a20c-4434-93b6-e51843a1afb9, payload=EventPayload[memberId=test-id, memberName=test-name, memberAge=test-age]
c.e.stream.service.SseEmitterService : sended notification, id=dbe4b71a-1b12-48a7-b029-bfd17bd067f8, payload=EventPayload[memberId=test-id, memberName=test-name, memberAge=test-age]
마지막으로 포스팅에서 작성된 예제는 단일 서버 환경에서는 크게 문제가 없겠지만 멀티 서버 환경의 경우에는 SseEmitter를 통합 관리할 수 있는 방법을 고민해봐야 한다.
'스프링부트' 카테고리의 다른 글
Spring boot embedded server 의 default 포트 변경 (0) | 2023.09.16 |
---|---|
Spring에서 Jasypt를 이용하여 설정 정보 암호화 하기 (0) | 2023.09.09 |
Spring Boot GraalVM Native Image 빌드 하기 (0) | 2023.09.04 |
spring boot resource 파일 access (0) | 2023.08.27 |
spring-boot-starter-parent 와 spring-boot-dependencies (0) | 2023.08.15 |
댓글