단순한 기능만 만들려고 했는데, 자꾸 뭔가가 늘어난다.
user1이 server1에서 SSE 연결이 맺어졌다.
이 경우 Emitter 객체는 Server1의 인스턴스 내부에서 관리되고 있다.
그런데 다중화된 서버에서는 User1의 파일 업로드 요청이 Server2로 전달되는 경우가 있다.
지금 내 로직 구조에서는 이 경우 사용자에게 알림을 보낼 방법이 없다.
그래서 두 가지 방법을 생각해봤다.
1. SseEmitter 객체 그 자체, 혹은 내부의 Connection 정보를 DB나 Redis 등에 저장했다가, 메시지가 오면 해당 객체를 복원하거나 꺼내와서 쓸 수는 없는가?
2. Pub/Sub 구조를 이용한 메시지 발행과 수신으로 모든 서버에서 자신이 보관하고 있는 연결 중 해당 사용자가 있으면 응답 전송
1번은 아주 건방진 생각이었다.
SseEmitter 객체는 완벽한 직렬화나 역직렬화가 불가능하다.
어딘가에 저장하기 위해서는 바이트 스트림 형태로 변환하고, 원래의 객체로 복원되어야 한다.
하지만 SseEmitter는 클라이언트와 서버 간의 살아있는 TCP 연결 스트림, 비동기 처리를 위한 스레드 정보 등을 포함하는 런타임 객체다. OS 수준에서 관리되고 있는 네트워크 연결이나 스레드의 상태는 SSE 연결을 맺은 특정 서버에 종속되어 있으므로 SSE 객체의 핵심 내용을 저수준에서 직접 생성한다고 치더라도 다른 서버 인스턴스에서는 해당 연결을 복원할 수도 없고 제어할 수도 없다.
조금 더 자세하게 파고들면 Client에서 들어온 요청은 NIC에 도달하고, OS는 TCP 연결을 거쳐 소켓을 생성한다.
JVM 내부에서 동작하고 있는 톰캣은 JVM 네이티브 메서드를 통해 해당 소켓 연결을 수신한다.
톰캣의 작업 스레드가 요청을 처리하여 스프링 컨트롤러로 전달한다.
컨트롤러는 SseEmitter 객체를 생성하고, 요청 처리 스레드는 즉시 반환된다.
다른 이벤트가 발생하면 이벤트를 처리하는 스레드가 SseEmitter의 send()를 호출한다.
데이터는 SseEmitter에 연결된 Response, 출력 스트림을 통해 클라이언트에게 전송된다.
** Sse가 연결되어 있는 경우, 운영체제에서 확인하면 Sse 연결 갯수 만큼의 ESTABLISHED 상태의 연결을 확인할 수 있다.
** 이 부분을 확인하다가 메모리 누수가 발생하는 지점을 찾았다. 만약 먼저 연결된 SseEmitter가 있다면 명시적으로 삭제한다.
** 이렇게 삭제를 안하더라도 Map에서 제거되면 GC의 대상이 되고, 언젠가는 삭제가 되기는 한다...
public SseEmitter subscribe(Long userId) {
SseEmitter preEmitter = emitterRepository.findByUserId(userId);
if(preEmitter != null) {
emitterRepository.deleteById(userId);
preEmitter.complete();
}
SseEmitter emitter = createEmitter(userId);
sendToClient(emitter, userId, "connection", "SSE 연결이 성공했습니다. [userId=" + userId + "]");
return emitter;
}
2. Redis를 이용한 Pub/Sub 구조 생성
구조는 다음과 같다. Server2에서 사용자 알림이 필요한 메시지를 Redis에 특정 채널에 발행하고, Redis의 채널을 구독하는 모든 구독자들이 해당 메시지를 받아서 처리한다.
1) Redis 라이브러리 추가
implementation 'org.springframework.boot:spring-boot-starter-data-redis'
2) 설정 추가
spring:
data:
redis:
host: localhost
port: 6379
3) Spring에서 Redis 설정
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(connectionFactory);
template.setKeySerializer(new StringRedisSerializer());
template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
template.setHashKeySerializer(new StringRedisSerializer());
template.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());
template.afterPropertiesSet();
return template;
}
@Bean
public MessageListenerAdapter listenerAdapter(RedisSubscriber subscriber) {
return new MessageListenerAdapter(subscriber, "onMessage");
}
@Bean
public RedisMessageListenerContainer redisMessageListener(
RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// "notification_channel" 이라는 채널로부터 메시지를 수신
container.addMessageListener(listenerAdapter, new ChannelTopic("notification_channel"));
return container;
}
}
4) NotificationPayload 생성
@Getter
@NoArgsConstructor
@AllArgsConstructor
public class NotificationPayload {
private Long userId;
private String message;
private String eventName;
}
5) NotificationService 수정
이제부터는 발행과 구독을 이용한다.
발행하는 곳에서 사용할 메시지를 발행하는 send 메서드를 추가한다.
구독하는 곳에서 사용할 알림을 보내는 로직인 sendNotification도 추가한다.
private final RedisTemplate<String, Object> redisTemplate; // Redis 연동
private static final String NOTIFICATION_CHANNEL = "notification_channel";
public void send(Long userId, String message, String eventName) {
redisTemplate.convertAndSend(NOTIFICATION_CHANNEL, new NotificationPayload(userId, message, eventName));
}
public void sendNotification(Long userId, String message, String eventName) {
SseEmitter emitter = emitterRepository.findByUserId(userId);
if (emitter != null) {
sendToClient(emitter, userId, eventName, message);
} else {
log.info("사용자 {}가 연결되어 있지 않습니다.", userId);
}
}
5) 이벤트 발행하는 부분
log.info("수신된 비디오 업로드 이벤트: {}", videoId);
VideoDto dto = videoService.getVideo(videoId);
log.info("파일 조회: {}", dto);
dto.setStatus(UploadStatus.PROCESSING_AUDIO);
dto = videoService.updateVideo(dto);
notificationService.send(dto.getUploader().getId(), dto.getStatus().name(), redisEventName);
6) 이벤트를 구독하는 부분
@Service
@RequiredArgsConstructor
@Slf4j
public class RedisSubscriber implements MessageListener {
private final ObjectMapper objectMapper;
private final NotificationService notificationService;
@Override
public void onMessage(Message message, byte[] pattern) {
try {
String publishedMessage = new String(message.getBody());
NotificationPayload payload = objectMapper.readValue(publishedMessage, NotificationPayload.class);
log.info("메시지를 다음의 채널에서 수신합니다. : {}", publishedMessage);
notificationService.sendNotification(payload.getUserId(), payload.getMessage(), payload.getEventName());
} catch (Exception e) {
log.error("레디스에서 메시지를 받는 도중 에러가 발생했습니다. : {}", e.getMessage());
}
}
}
일단 잘 적용되었다.
Redis를 이용하면 실시간 수준으로 알림을 제공할 수는 있지만, 메시지 유실에 대한 부분을 책임져주진 않는다.
일단 여기서는 Spring-boot-starter-data-redis를 추가하며 기본으로 의존성에 포함되는 Lettuce를 사용했다.
이 외에도 Jedis, Redisson과 같은 유명한 Redis와 SpringBoot 연동 라이브러리들이 존재한다.
Redis는 요즘 듣고 있는 강의를 제대로 다 듣고 나서... 글을 올려야겠다!
'공부' 카테고리의 다른 글
[REST] API 요청, 응답 (2) | 2025.06.12 |
---|---|
[BE] SSE 재연결과 관련된 시간 (0) | 2025.06.12 |
[BE] 알림 기능 리팩토링 - 단일 책임 원칙 (1) | 2025.06.10 |
[React] 참조 동일성 (1) | 2025.06.08 |
[모니터링] APM & Distributed Tracing (2) | 2025.06.07 |