SQS Queue로 전송된 메시지를 어떤 이유로 인해 컨슈머에서 수신 처리를 하지 못하면 DLQ(Dead Letter Queue)로 Re-Drive를 하게 설정할 수 있다. 그러니까 단순히 말하면 처리에 실패한 메시지를 다른 Queue로 보내서 개발자나 시스템 운영자가 개별 처리할 수 있게끔 한 것인데 보통은 수기로 처리하겠지만 시스템을 이용하여 메시지를 다시 소비할 수 있는 방법도 고안하는 것이 맞다고 생각해서 관련된 자료를 찾아보고 궁리를 해보았다.
열심히 구글링 해보았지만 대부분 메시지가 소비가 잘 안될 때 DLQ를 설정해서 ReDrive하는 것에 대한 설명이 주류이고 이것을 우리는 어떻게 시스템적으로 처리한다, 이런 자료는 거의 나오지를 않았다. 사실 개발자나 운영자가 직접 AWS 콘솔에서 확인하고 수작업(?)으로 조치하는 것이 일반적이고 안전한 방법일 것이다.
여러가지 이유가 있겠지만 보통 DLQ에 메시지가 쌓이게 되는 원인은 보통 2가지다.
- 컨슈머 소스코드의 문제로 메시지 처리 도중 Exception이 발생하여 정상적인 처리에 실패했을 때
- SQL의 문제가 있을 수 있다. (필드명을 잘못 써서 없는 컬럼을 참조하려 CRUD에러가 났거나..)
- 메시지 형식이 JSON이라면 매퍼를 이용해서 DTO나 Entity 객체로 컨버팅할텐데 Data Type 등의 차이로 파싱할 때 에러가 날 수 있다.
- → 이 경우는 컨슈머를 수정하여 배포한 후 DLQ에 들어있는 메시지를 꺼내 원래 적재됐었던 본래의 Queue로 보내고 난 뒤 DLQ에서 삭제
- 메시지 내용의 문제로 컨슈머에서 처리 불가능할 때
- 메시지를 받아서 RDBMS 같은 것에 적재할 경우, primary key에 해당되는 내용이 미비되어 INSERT시 에러가 발생할 수 있다. 보통 Producer 쪽을 수정해줘야 한다.
- → 내용 확인하고 Producer 수정한 뒤 재전송하고 나서 DLQ에 적재된 메시지 삭제 (사실 이게 가장 안전하고 주로 쓰게 되는 방법)
- → DLQ에 적재된 메시지를 변조하고 나서 Queue로 보내고 난 뒤 DLQ에서 삭제
그런데 이슈가 있다.
SQS에는 메시지마다 할당된 고유의 ID가 있는데… 이 ID를 가지고 특정 메시지만을 선별해서 다른 Queue로 보내거나 내용을 변조해서 재전송하거나 할 수 있을까? 하면
그것이 불가능하다.
No. It is not possible to retrieve a specific message from an Amazon SQS queue. You can call ReceiveMessage() to get 1 to 10 messages, but you cannot choose which messages to receive.
https://stackoverflow.com/questions/61785839/get-particular-message-from-amazon-sqs
SQS에서 특정 메시지만 수신받는 것은 불가능하다. AWS에서 제공하고 있는 SDK의 Amazon SQS Client에서는 한번에 수신받을 수 있는 메시지 수를 설정할 수 있는데 최대치가 10이다..
10이 넘는 값을 셋팅하면 어떻게 될까? 에러 뱉어낸다..
고로 최대 10개의 수신된 메시지 중 메시지 ID를 가지고 Filter를 걸어서 해당 메시지를 가져와서 처리하는 것만 가능하다.
그래서 2개의 처리 로직을 고안해보았다.
- DLQ의 메시지를 일괄 다른 Queue로 보내고 삭제
- 메시지ID를 가지고 특정 메시지만을 변조한 뒤 다른 Queue로 보냄
1번은 메시지 내용의 문제는 없고 컨슈머의 문제를 처리하고 나서 조치할 때 쓸 수 있다. 2번은 메시지 내용에 문제가 있고 프로듀서의 문제가 빨리 해결되지 않고 있는데 처리는 시급할 때 몸 사려가며(…) 써볼 수 있다. 만약을 위해 만들어 놓고 가급적 쓰지 않는 것이 좋다.
DLQ의 메시지를 일괄 다른 Queue로 보내고 삭제
프로세스의 흐름은 DLQ의 메시지를 수신 → 다른 Queue로 전송 → 전송이 완료되면 DLQ 메시지를 삭제 이 작업을 DLQ에 적재된 메시지가 없어질 때까지 반복하는 것이다.
SQS Client의 MaxNumberOfMessages의 최대값이 10이니 10개 메시지 단위씩 이 작업이 처리된다. Service 클래스를 하나 만들고 로직을 작성한다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 | import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.model.DeleteMessageRequest; import com.amazonaws.services.sqs.model.Message; import com.amazonaws.services.sqs.model.ReceiveMessageRequest; import io.awspring.cloud.messaging.core.QueueMessagingTemplate; import org.apache.commons.lang.StringUtils; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Service; import java.util.List; @Service public class DlqProcessingService { public static final String DLQ = "DLQ.fifo"; // DLQ 대기열 이름 private final AmazonSQS amazonSQS; private final QueueMessagingTemplate messagingTemplate; public ListenerServiceImpl(AmazonSQS amazonSQS, QueueMessagingTemplate messagingTemplate) { this.amazonSQS = amazonSQS; this.messagingTemplate = messagingTemplate; } public void transferDlQMessages(String targetQueueName) { // DLQ 메시지를 보낼 Queue 이름을 매개변수로 받음 boolean isContinue = true; while (isContinue) { // isContinue가 true이면 계속 반복됨 final ReceiveMessageRequest request = new ReceiveMessageRequest(DLQ).withMaxNumberOfMessages(10); final List<Message> messages = amazonSQS.receiveMessage(request).getMessages(); // DLQ에서 메시지 수신 (최대 10개) if (messages.size() == 0) { // 수신 메시지가 0개이면 순환문을 빠져나온다. (isContinue = false) isContinue = false; return; } messages.forEach(message -> { messagingTemplate.send(targetQueueName, MessageBuilder.withPayload(message.getBody()).build()); // 다른 Queue로 메시지 내용을 그대로 전송한다. amazonSQS.deleteMessage(new DeleteMessageRequest(DLQ, message.getReceiptHandle())); // 전송하고 나서 DLQ 메시지를 삭제한다. }); } } } | cs |
이렇게 하면 DLQ에 적재된 메시지가 순환문을 통해 모두 지정된 Queue로 보내진다. Controller를 만들어서 외부에서 호출해서 쓸 수 있게 해도 되고, Exception 처리나 처리 후 결과값을 리턴하는 등의 처리를 추가하면 실무에도 써먹을 수 있을 것 같다.
메시지ID를 가지고 특정 메시지만을 변조한 뒤 다른 Queue로 보냄
메시지 내용 중 일부 잘못된 형식이 동일한 패턴으로 다수 발송되어서 소비 실패로 DLQ로 보내진 것이라면, String의 replaceAll로 잘못 들어간 키워드를 올바른 키워드로 치환시켜 재전송하면 될 것이다.
그런데 가급적이면 쓰지 않고 차라리 Producer를 수정한 뒤 재전송하거나 하는게 나을 것 같다. SQS Client에서는 10개 메시지 수신이 최대치이기 때문에 그 10개 메시지 안에 해당하는 메시지 ID가 없다면 처리가 안되기에 완벽한 구현이 되지 않기도 하고… (솔직히 비효율적이다)
그래도 일단 코딩 해보면
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | private String getTargetMessage(String messageId) { // messageId를 매개변수로 받아 Id가 일치하는 메시지를 리턴한다. final ReceiveMessageRequest request = new ReceiveMessageRequest(DLQ).withMaxNumberOfMessages(10); final List<Message> messages = amazonSQS.receiveMessage(request).getMessages(); // DLQ에서 메시지 수신 (최대 10개) return messages.stream() .filter(message -> messageId.equals(message.getMessageId())) .findAny() .orElse(StringUtils.EMPTY); // messageId가 일치하는 메시지가 없으면 빈 String ""을 반환한다. } public void fixAndTransferDlqMessage(String targetQueueName, String messageId, String regex, String replacement) { final String targetMessage = getTargetMessage(messageId); // Id가 일치하는 메시지를 가져옴 if (StringUtils.isEmpty(targetMessage)) { return; } final String fixedMessage = targetMessage.replaceAll(regex, replacement); // replaceAll 로 메시지 내용 중 특정 키워드를 다른 키워드로 치환 messagingTemplate.send(targetQueueName, MessageBuilder.withPayload(fixedMessage).build()); // 변조한 메시지를 다른 Queue로 전송 } | cs |
어지간하면 이걸 쓰는 일이 없게 사전에 준비 잘하자…

.png)