설정 (Configure)
@Configuration
@RequiredArgsConstructor
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
private final CustomWebSocketHandler customWebSocketHandler;
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(customWebSocketHandler, "/chat")
.setAllowedOrigins("*");
}
}
먼저 WebSocket 을 사용하기 위해 WebSocketConfigure 를 작성해줍니다. 이때, handler 는 제가 직접 구현한 구현체를 넣기 위해서 다음과 같이 CustomWebSocketHandler를 정의하였습니다.
@Component
@RequiredArgsConstructor
public class CustomWebSocketHandler implements WebSocketHandler {
private final SessionRepo sessionRepo;
private final MessagePublisher messagePublisher;
private final ObjectMapper objectMapper;
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
sessionRepo.save(session);
}
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
String payload = message.getPayload().toString();
ChatResponse chatResponse = objectMapper.readValue(payload, ChatResponse.class);
messagePublisher.publishMessage(chatResponse);
}
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
sessionRepo.remove(session.getId());
}
@Override
public boolean supportsPartialMessages() {
return false;
}
}
먼저 클라이언트와 서버 사이의 소켓 연결이 성공된 뒤 호출되는 함수인 afterConnectionEstablished를 보겠습니다.
이전 글에서 말씀드렸다시피 2가지 동작이 필요합니다.
- 클라이언트와 직접 연결된 서버의 세션 저장소에 세션을 저장.
- 모든 세션에 대한 관리를 위한 Redis에도 저장. (이 부분은, 하나의 채팅방에서 모든 사용자가 채팅을 하는 경우에는 큰 의미가 없다고 생각합니다.)
따라서 sessionRepo를 다음과 같이 정의하였습니다.
@Repository
@RequiredArgsConstructor
public class SessionRepo {
private final RedisTemplate<String, Object> redisTemplate;
private final ConcurrentHashMap<String, WebSocketSession> sessionMap = new ConcurrentHashMap<>();
private static final String PREFIX = "session:";
public void save(WebSocketSession session) {
sessionMap.put(session.getId(), session);
redisTemplate.opsForValue().set(PREFIX + session.getId(), session.getId());
}
public void remove(String sessionId) {
redisTemplate.delete(sessionId);
sessionMap.remove(sessionId);
}
public List<WebSocketSession> getAllSessions() {
return sessionMap.values().stream().collect(Collectors.toList());
}
public void convertAndSend(String message) {
redisTemplate.convertAndSend("chat-message", message);
}
}
save 함수 호출 시, 서버 자신의 WebSocketSession 저장소에는 <Session ID, WebSocketSession> 타입으로 값을 저장합니다. 이후, Redis에는 Session ID만 저장합니다.
만약, 어느 사용자가 채팅을 전송하면(서버에게 메세지를 보내면) 해당 사용자와 연결된 서버는 해당 메세지에 대한 내용을 Redis를 통해 Publish 할 것입니다. 그럼 해당 메세지를 받은 채팅 서버들은 각자 자신의 WebSocketSession 저장소에 저장된 WebSocketSession 들에게 메세지를 전송해주면 연결된 모든 사용자들이 채팅 메세지를 수신할 수 있게 될 것입니다! (전송한 사용자 포함)
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
String payload = message.getPayload().toString();
ChatResponse chatResponse = objectMapper.readValue(payload, ChatResponse.class);
messagePublisher.publishMessage(chatResponse);
}
해당 코드를 통해, 클라이언트가 전송한 메세지의 내용을 레디스에게 publish 합니다.
@Service
@RequiredArgsConstructor
public class MessageSubscriber {
private final SessionRepo sessionRepo;
private final ObjectMapper objectMapper;
public void onMessage(String message) {
try {
ChatResponse msg = objectMapper.readValue(message, ChatResponse.class);
List<WebSocketSession> sessions = sessionRepo.getAllSessions();
sessions.forEach(session -> {
try {
session.sendMessage(new TextMessage(msg.message()));
} catch (IOException e) {
throw new RuntimeException(e);
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
}
이후, subscriber 객체를 통해 받은 메세지를 위의 설명과 같이 모든 세션들에게 메세지로 뿌려줍니다!
테스트 by PostMan

* 현재 목적은 단일 채팅방이기 때문에 receiverId 는 필요 없을 것 같습니다..!
* 또한, 서버에서 클라이언트에게 메세지를 전송해줄 때에도 sender 에 대한 정보를 포함해주어야 할 것 같네요...!!
이젠, 정말 지저분한 코드를 정리해야할 것 같습니다! 감사합니다!
댓글