스프링부트

Spring SSE (Server Sent Event) 사용 방법

알쓸개잡 2023. 9. 9.

SSE (Server Sent Event)는 웹 어플리케이션에서 실시간 업데이트를 제공하는 데 사용되는 웹 기술 중 하나이다. Spring에서는 Spring MVC에서 사용되는 SseEmitter 클래스를 제공하여 손쉽게 SSE 스트림을 생성하고 클라이언트에게 이벤트를 푸시할 수 있도록 한다. 이를 통해 클라이언트가 연결을 유지하면서 서버로부터 데이터를 지속적으로 수신할 수 있도록 한다. 이번 포스팅에서는 Spring에서 SSE를 사용하는 방법에 대해서 기록한다.

 

SSE (Server Sent Event) 특징

  • 서버 -> 클라이언트 단방향으로 이벤트를 스트리밍 할 수 있도록 대부분의 브라우저에서 채택한 사양
  • 이벤트 데이터는 UTF-8로 인코딩된 텍스트 데이터 스트림
  • 이벤트 데이터 형식은 줄 바꿈으로 구분된 일련의 키-값(id, retry, data, event)으로 구성
  • 데이터 페이로드 형식을 어떤 식으로든 제한하지 않으며, 간단한 문자열이나 복잡한 JSON 또는 XML 구조를 사용가능
  • 데이터 전송 MIME Type은 text/event-stream
    • event stream 데이터는 항상 UTF-8로 인코딩 해야 한다.

자세한 내용은 SSE spec 정의 문서를 참고

 

HTML Standard

This section is non-normative. To enable servers to push data to web pages over HTTP or using dedicated server-push protocols, this specification introduces the EventSource interface. Using this API consists of creating an EventSource object and registerin

html.spec.whatwg.org

 

SSE 서버 구현

SSE는 SseEmitter를 통해서 client subscription을 관리한다.

client와 연결을 맺고 이벤트를 broadcasting 하는 Controller

package com.example.stream.controller;

import com.example.stream.dto.EventPayload;
import com.example.stream.service.SseEmitterService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.util.UUID;

@RestController
@RequiredArgsConstructor
@Slf4j
public class SSEController {
	private final SseEmitterService sseEmitterService;

	//응답 mime type 은 반드시 text/event-stream 이여야 한다.
	//클라이언트로 부터 SSE subscription 을 수락한다.
	@GetMapping(path = "/v1/sse/subscribe", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
	public ResponseEntity<SseEmitter> subscribe() {
		String sseId = UUID.randomUUID().toString();
		SseEmitter emitter = sseEmitterService.subscribe(sseId);
		return ResponseEntity.ok(emitter);
	}

	//eventPayload 를 SSE 로 연결된 모든 클라이언트에게 broadcasting 한다.
	@PostMapping(path = "/v1/sse/broadcast")
	public ResponseEntity<Void> broadcast(@RequestBody EventPayload eventPayload) {
		sseEmitterService.broadcast(eventPayload);
		return ResponseEntity.ok().build();
	}
}

 

SseEmitter를 관리하는 Service

package com.example.stream.service;

import com.example.stream.dto.EventPayload;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Service
@Slf4j
public class SseEmitterService {
	// thread-safe 한 컬렉션 객체로 sse emitter 객체를 관리해야 한다.
	private final Map<String, SseEmitter> emitterMap = new ConcurrentHashMap<>();
	private static final long TIMEOUT = 60 * 1000;
	private static final long RECONNECTION_TIMEOUT = 1000L;

	public SseEmitter subscribe(String id) {
		SseEmitter emitter = createEmitter();
		//연결 세션 timeout 이벤트 핸들러 등록
		emitter.onTimeout(() -> {
			log.info("server sent event timed out : id={}", id);
            //onCompletion 핸들러 호출
			emitter.complete();
		});

		//에러 핸들러 등록
		emitter.onError(e -> {
			log.info("server sent event error occurred : id={}, message={}", id, e.getMessage());
            //onCompletion 핸들러 호출
			emitter.complete();
		});

		//SSE complete 핸들러 등록
		emitter.onCompletion(() -> {
			if (emitterMap.remove(id) != null) {
				log.info("server sent event removed in emitter cache: id={}", id);
			}

			log.info("disconnected by completed server sent event: id={}", id);
		});

		emitterMap.put(id, emitter);

		//초기 연결시에 응답 데이터를 전송할 수도 있다.
		try {
			SseEmitter.SseEventBuilder event = SseEmitter.event()
				//event 명 (event: event example)
				.name("event example")
				//event id (id: id-1) - 재연결시 클라이언트에서 `Last-Event-ID` 헤더에 마지막 event id 를 설정
				.id(String.valueOf("id-1"))
				//event data payload (data: SSE connected)
				.data("SSE connected")
				//SSE 연결이 끊어진 경우 재접속 하기까지 대기 시간 (retry: <RECONNECTION_TIMEOUT>)
				.reconnectTime(RECONNECTION_TIMEOUT);
			emitter.send(event);
		} catch (IOException e) {
			log.error("failure send media position data, id={}, {}", id, e.getMessage());
		}
		return emitter;
	}

	public void broadcast(EventPayload eventPayload) {
		emitterMap.forEach((id, emitter) -> {
			try {
				emitter.send(SseEmitter.event()
					.name("broadcast event")
					.id("broadcast event 1")
					.reconnectTime(RECONNECTION_TIMEOUT)
					.data(eventPayload, MediaType.APPLICATION_JSON));
				log.info("sended notification, id={}, payload={}", id, eventPayload);
			} catch (IOException e) {
				//SSE 세션이 이미 해제된 경우
				log.error("fail to send emitter id={}, {}", id, e.getMessage());
			}
		});
	}

	private SseEmitter createEmitter() {
		return new SseEmitter(TIMEOUT);
	}
}

연결 세션에 대한 timeout 은 1분을 설정하였다. 1분 동안 아무런 데이터 전송이 없는 경우 SseEmitter의 onTimeout 핸들러가 호출되고 아래와 같은 로그가 출력된다.

c.e.stream.service.SseEmitterService     : server sent event timed out : id=db76eb66-a8ed-4dc9-890f-4dcd71018821
c.e.stream.service.SseEmitterService     : server sent event removed in emitter cache: id=db76eb66-a8ed-4dc9-890f-4dcd71018821
c.e.stream.service.SseEmitterService     : disconnected by completed server sent event: id=db76eb66-a8ed-4dc9-890f-4dcd71018821

SSE Event payload 정의

package com.example.stream.dto;

import com.fasterxml.jackson.annotation.JsonProperty;

public record EventPayload(@JsonProperty("memberId") String memberId,
						   @JsonProperty("memberName") String memberName,
						   @JsonProperty("memberAge") String memberAge) {
}

 

테스트

위 예제 코드의 서버 포트는 8081이다.

postman 테스트

1. GET http://localhost:80801/v1/sse/subscribe를 호출하여 연결을 수립한다.

응답헤더

2. 이벤트를 발생시켜 본다.

POST http://localhost:8081/v1/sse/broadcast 를 호출하고 아래와 같은 JSON body 를 payload 로 전송한다.

 

{
    "memberId": "test-id",
    "memberName": "test-name",
    "memberAge": "test-age"
}

1번에서 연결된 세션으로 발생시킨 이벤트를 수신한다.

수신된 event 데이터 payload

 

브라우저 테스트

두 개의 크롬 브라우저 창을 열어 각각 /v1/sse/subscribe 로 연결을 수립한다.

이벤트를 발생시키면 두개의 브라우저에서 이벤트 메시지를 수신한다.

서버 로그는 아래와 같이 출력된다.

c.e.stream.service.SseEmitterService     : sended notification, id=bf166289-a20c-4434-93b6-e51843a1afb9, payload=EventPayload[memberId=test-id, memberName=test-name, memberAge=test-age]
c.e.stream.service.SseEmitterService     : sended notification, id=dbe4b71a-1b12-48a7-b029-bfd17bd067f8, payload=EventPayload[memberId=test-id, memberName=test-name, memberAge=test-age]

 

마지막으로 포스팅에서 작성된 예제는 단일 서버 환경에서는 크게 문제가 없겠지만 멀티 서버 환경의 경우에는 SseEmitter를 통합 관리할 수 있는 방법을 고민해봐야 한다.

댓글

💲 추천 글