2022년 9월 4일 일요일

SpringBoot 버전 2.7 이상에서 MariaDB(RDS)의 Aurora 옵션 미지원

AWS RDS에서 MariaDB 기반의 Aurora 클러스터를 사용한다면, FailOver를 위해 보통 DB Connection URL에 aurora 옵션을 넣어서 사용한다.

이 URL의 의미는 아래와 같다.

그러나,

스프링부트 최신 버전 (2.7.x 이상)에서는 이 옵션을 인식하지 못해서 Application이 실행되지 않는다.

이유인즉슨 aurora 옵션은 Maria DB Driver 1.2까지만 지원하는데, 스프링부트 2.7 부터는 Maria DB Driver 3.0 버전을 사용하기 때문이다. FailOver 기능 유지를 위해서 aurora 옵션을 sequential, replication, loadbalance 옵션 등으로 변경하여 사용할 수 있다.

2022년 7월 8일 금요일

SQS DLQ의 메시지를 처리

SQS Queue로 전송된 메시지를 어떤 이유로 인해 컨슈머에서 수신 처리를 하지 못하면 DLQ(Dead Letter Queue)로 Re-Drive를 하게 설정할 수 있다. 그러니까 단순히 말하면 처리에 실패한 메시지를 다른 Queue로 보내서 개발자나 시스템 운영자가 개별 처리할 수 있게끔 한 것인데 보통은 수기로 처리하겠지만 시스템을 이용하여 메시지를 다시 소비할 수 있는 방법도 고안하는 것이 맞다고 생각해서 관련된 자료를 찾아보고 궁리를 해보았다.

열심히 구글링 해보았지만 대부분 메시지가 소비가 잘 안될 때 DLQ를 설정해서 ReDrive하는 것에 대한 설명이 주류이고 이것을 우리는 어떻게 시스템적으로 처리한다, 이런 자료는 거의 나오지를 않았다. 사실 개발자나 운영자가 직접 AWS 콘솔에서 확인하고 수작업(?)으로 조치하는 것이 일반적이고 안전한 방법일 것이다.

여러가지 이유가 있겠지만 보통 DLQ에 메시지가 쌓이게 되는 원인은 보통 2가지다.

  • 컨슈머 소스코드의 문제로 메시지 처리 도중 Exception이 발생하여 정상적인 처리에 실패했을 때
    • SQL의 문제가 있을 수 있다. (필드명을 잘못 써서 없는 컬럼을 참조하려 CRUD에러가 났거나..)
    • 메시지 형식이 JSON이라면 매퍼를 이용해서 DTO나 Entity 객체로 컨버팅할텐데 Data Type 등의 차이로 파싱할 때 에러가 날 수 있다.
      • → 이 경우는 컨슈머를 수정하여 배포한 후 DLQ에 들어있는 메시지를 꺼내 원래 적재됐었던 본래의 Queue로 보내고 난 뒤 DLQ에서 삭제
  • 메시지 내용의 문제로 컨슈머에서 처리 불가능할 때
    • 메시지를 받아서 RDBMS 같은 것에 적재할 경우, primary key에 해당되는 내용이 미비되어 INSERT시 에러가 발생할 수 있다. 보통 Producer 쪽을 수정해줘야 한다.
      • → 내용 확인하고 Producer 수정한 뒤 재전송하고 나서 DLQ에 적재된 메시지 삭제 (사실 이게 가장 안전하고 주로 쓰게 되는 방법)
      • → DLQ에 적재된 메시지를 변조하고 나서 Queue로 보내고 난 뒤 DLQ에서 삭제

그런데 이슈가 있다.

SQS에는 메시지마다 할당된 고유의 ID가 있는데… 이 ID를 가지고 특정 메시지만을 선별해서 다른 Queue로 보내거나 내용을 변조해서 재전송하거나 할 수 있을까? 하면

그것이 불가능하다.

No. It is not possible to retrieve a specific message from an Amazon SQS queue. You can call ReceiveMessage() to get 1 to 10 messages, but you cannot choose which messages to receive. 

https://stackoverflow.com/questions/61785839/get-particular-message-from-amazon-sqs

SQS에서 특정 메시지만 수신받는 것은 불가능하다. AWS에서 제공하고 있는 SDK의 Amazon SQS Client에서는 한번에 수신받을 수 있는 메시지 수를 설정할 수 있는데 최대치가 10이다..

10이 넘는 값을 셋팅하면 어떻게 될까? 에러 뱉어낸다..

고로 최대 10개의 수신된 메시지 중 메시지 ID를 가지고 Filter를 걸어서 해당 메시지를 가져와서 처리하는 것만 가능하다.

그래서 2개의 처리 로직을 고안해보았다.

  1. DLQ의 메시지를 일괄 다른 Queue로 보내고 삭제
  2. 메시지ID를 가지고 특정 메시지만을 변조한 뒤 다른 Queue로 보냄

1번은 메시지 내용의 문제는 없고 컨슈머의 문제를 처리하고 나서 조치할 때 쓸 수 있다. 2번은 메시지 내용에 문제가 있고 프로듀서의 문제가 빨리 해결되지 않고 있는데 처리는 시급할 때 몸 사려가며(…) 써볼 수 있다. 만약을 위해 만들어 놓고 가급적 쓰지 않는 것이 좋다.

DLQ의 메시지를 일괄 다른 Queue로 보내고 삭제

프로세스의 흐름은 DLQ의 메시지를 수신 → 다른 Queue로 전송 → 전송이 완료되면 DLQ 메시지를 삭제 이 작업을 DLQ에 적재된 메시지가 없어질 때까지 반복하는 것이다.

SQS Client의 MaxNumberOfMessages의 최대값이 10이니 10개 메시지 단위씩 이 작업이 처리된다. Service 클래스를 하나 만들고 로직을 작성한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.DeleteMessageRequest;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import io.awspring.cloud.messaging.core.QueueMessagingTemplate;
import org.apache.commons.lang.StringUtils;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
 
import java.util.List;
 
@Service
public class DlqProcessingService {
    public static final String DLQ = "DLQ.fifo";    // DLQ 대기열 이름
 
    private final AmazonSQS amazonSQS;
    private final QueueMessagingTemplate messagingTemplate;
 
    public ListenerServiceImpl(AmazonSQS amazonSQS, QueueMessagingTemplate messagingTemplate) {
        this.amazonSQS = amazonSQS;
        this.messagingTemplate = messagingTemplate;
    }
 
    public void transferDlQMessages(String targetQueueName) {   
        // DLQ 메시지를 보낼 Queue 이름을 매개변수로 받음
        boolean isContinue = true;
        while (isContinue) {    // isContinue가 true이면 계속 반복됨
            final ReceiveMessageRequest request = new ReceiveMessageRequest(DLQ).withMaxNumberOfMessages(10);
            final List<Message> messages = amazonSQS.receiveMessage(request).getMessages();     // DLQ에서 메시지 수신 (최대 10개)
            if (messages.size() == 0) {
                // 수신 메시지가 0개이면 순환문을 빠져나온다. (isContinue = false)
                isContinue = false;
                return;
            }
            messages.forEach(message -> {
                messagingTemplate.send(targetQueueName, MessageBuilder.withPayload(message.getBody()).build()); 
                // 다른 Queue로 메시지 내용을 그대로 전송한다.
                amazonSQS.deleteMessage(new DeleteMessageRequest(DLQ, message.getReceiptHandle()));             
                // 전송하고 나서 DLQ 메시지를 삭제한다.
            });
        }
    }
}
cs

이렇게 하면 DLQ에 적재된 메시지가 순환문을 통해 모두 지정된 Queue로 보내진다. Controller를 만들어서 외부에서 호출해서 쓸 수 있게 해도 되고, Exception 처리나 처리 후 결과값을 리턴하는 등의 처리를 추가하면 실무에도 써먹을 수 있을 것 같다.

메시지ID를 가지고 특정 메시지만을 변조한 뒤 다른 Queue로 보냄

메시지 내용 중 일부 잘못된 형식이 동일한 패턴으로 다수 발송되어서 소비 실패로 DLQ로 보내진 것이라면, String의 replaceAll로 잘못 들어간 키워드를 올바른 키워드로 치환시켜 재전송하면 될 것이다.

그런데 가급적이면 쓰지 않고 차라리 Producer를 수정한 뒤 재전송하거나 하는게 나을 것 같다. SQS Client에서는 10개 메시지 수신이 최대치이기 때문에 그 10개 메시지 안에 해당하는 메시지 ID가 없다면 처리가 안되기에 완벽한 구현이 되지 않기도 하고… (솔직히 비효율적이다)

그래도 일단 코딩 해보면

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
    private String getTargetMessage(String messageId) { // messageId를 매개변수로 받아 Id가 일치하는 메시지를 리턴한다.
        final ReceiveMessageRequest request = new ReceiveMessageRequest(DLQ).withMaxNumberOfMessages(10);
        final List<Message> messages = amazonSQS.receiveMessage(request).getMessages();     // DLQ에서 메시지 수신 (최대 10개)
        return messages.stream()
                    .filter(message -> messageId.equals(message.getMessageId()))
                    .findAny()
                    .orElse(StringUtils.EMPTY); // messageId가 일치하는 메시지가 없으면 빈 String ""을 반환한다.
    }
 
    public void fixAndTransferDlqMessage(String targetQueueName, String messageId, String regex, String replacement) {
        final String targetMessage = getTargetMessage(messageId);   // Id가 일치하는 메시지를 가져옴
        if (StringUtils.isEmpty(targetMessage)) {
            return;
        }
        final String fixedMessage = targetMessage.replaceAll(regex, replacement);   // replaceAll 로 메시지 내용 중 특정 키워드를 다른 키워드로 치환
        messagingTemplate.send(targetQueueName, MessageBuilder.withPayload(fixedMessage).build());  // 변조한 메시지를 다른 Queue로 전송
    }
cs

어지간하면 이걸 쓰는 일이 없게 사전에 준비 잘하자…

2022년 5월 28일 토요일

Queue 메시지를 받아 처리하는 Consumer 개발

Producer를 개발해서 메시지를 Queue에 보낼 수 있게 해놓았으면 이게 그 메시지를 받아 DB에 적재하든지, 아니면 다른 방법으로 활용하기 위한 Consumer를 개발해야 할 것이다.

이것도 수신 기능을 구현하는 것 자체는 해보면 간단하다.. SpringBoot 프로젝트를 하나 새로 생성한다.

Kafka Consumer

1.build.gradle 에 spring-kafka 라이브러리 의존성을 추가한다.

1
2
3
4
dependencies {
    // Kafka
    implementation 'org.springframework.kafka:spring-kafka:2.8.2'
}
cs

2.application.yml에 메시지를 수신할 Kafka Broker 서버 정보와, 컨슈머 그룹ID를 설정한다.

1
2
3
4
5
spring:
  kafka:
    consumer:
      group-id: test_consumer_group_01
      bootstrap-servers: kafka.01.server.com:9092,kafka.02.server.com:9092,kafka.03.server.com:9092
cs

컨슈머 그룹ID는 임의의 값을 직접 설정해서 쓰면 된다. 같은 컨슈머 그룹 내에서는 메시지를 중복 수신하지 않는다. 다른 컨슈머 그룹끼리는 동일한 메시지를 중복 수신할 수 있다.

컨슈머 그룹은 Kafka Topic의 offset에 등록되어 메시지가 모든 컨슈머 그룹에서 수신이 완료되지 않았을 경우 서버에 Lag으로 쌓이게 된다.

3.Listener 역할을 할 클래스를 만들고 @KafkaListener 애노테이션으로 메시지 수신 후 로직을 담을 메서드를 만든다.

1
2
3
4
5
6
7
8
9
10
11
12
13
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
 
@Component
public class KafkaConsume {
     
    @KafkaListener(topics = "TEST-TOPIC", concurrency = "1", autoStartup = "false")
    public void listener(ConsumerRecord<StringString> payload) {
        System.out.println("Received Key : " + payload.key());
        System.out.println("Received Message : " + payload.value());
    }
}
cs

옵션

  • topics : 수신 대상이 되는 Kafka Topic의 이름을 넣는다.
  • concurrency : 이 옵션에 적시할 수대로 thread가 발생하여 돌아간다. 통상 partition의 숫자만큼 thread가 동작하는 것이 가장 좋다고 한다.
  • autoStartup : true 로 지정 시 애플리케이션이 start될 때 자동으로 컨슈밍이 시작되며, false 로 되어 있으면 수동으로 시작 메서드를 실행시켜줘야 한다.

4.Graceful Up/Down 만들기

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.stereotype.Service;
 
@Service
public class KafkaListenerService {
 
    private final KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
 
    public KafkaListenerService(KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry) {
        this.kafkaListenerEndpointRegistry = kafkaListenerEndpointRegistry;
    }
 
    public void stopListener() {
        kafkaListenerEndpointRegistry.stop();
    }
 
    public void startListener() {
        for (MessageListenerContainer listenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
            if (!listenerContainer.isRunning()) {
                listenerContainer.start();
            }
        }
    }
}
cs

적당히 서비스 클래스를 만들고 KafkaListenerEndpointRegistry 빈 의존성을 주입한다.

kafkaListenerEndpointRegistry 객체의 stop() 메서드를 실행하면 모든 컨슈밍 작동이 중단되며, 등록된 listenerContainer 객체들의 start() 메서드를 실행시켜 주면 컨슈밍이 재개된다.

SQS Consumer

1.build.gradle에 Spring Cloud AWS의 라이브러리 중 SQS에 관련된 요소의 의존성을 추가한다.

1
2
3
4
5
6
dependencies {
    // Sqs
    implementation 'io.awspring.cloud:spring-cloud-aws-autoconfigure:2.4.1'
    implementation 'io.awspring.cloud:spring-cloud-aws-messaging:2.4.1'
    implementation 'io.awspring.cloud:spring-cloud-starter-aws:2.4.1'
}
cs

2.application.yml에 AWS Credential 과 Region 관련 설정을 넣는다.

1
2
3
4
5
6
7
8
9
10
cloud:
  aws:
    credentials:
      accessKey: # access-key 사용시 적시
      secretKey: # secret-key 사용시 적시
      use-default-aws-credentials-chain: true
    region:
      static: ap-northeast-2
    stack:
      auto: false
cs

cloud.aws.stack.auto는 기본값 설정이 true인데 AWS CloudFormation이 셋팅되어 있지 않으면 에러를 발생시키므로 false로 설정한다.

위 설정에서 accessKey와 secretKey는 AWS가 아닌 다른 환경에서 구동할 때 필요하다.

AWS SDK에서는 credential 관련해서 6가지의 인증 옵션을 제공하는데, 아무런 설정을 하지 않으면 우선 순위에 의해서 인증 옵션이 순차적으로 적용된다.(Provider Chain)

Default Credential Provider Chain : https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html

별도의 설정을 하고 싶지 않으면 use-default-aws-credentials-chain의 값을 true로 지정해줘야 Provider Chain이 작동된다. (기본값이 false)

특정한 인증수단을 지정해주고 싶으면, SQS나 S3의 Client 빈을 별도로 생성해주고 인증수단을 적시해놓는다.(아래 예시)

1
2
3
4
5
6
7
8
@Bean
public AmazonSQS amazonSQS() {
    return AmazonSQSAsyncClientBuilder
            .standard()
            .withCredentials(new WebIdentityTokenCredentialsProvider())
            .withRegion(Regions.AP_NORTHEAST_2)
            .build();
}
cs

3.Listener 클래스를 만들고 @SqsListener 애노테이션으로 수신 후 로직을 담을 메서드를 만든다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import org.springframework.cloud.aws.messaging.listener.annotation.SqsListener;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
 
import java.util.Map;
 
import static org.springframework.cloud.aws.messaging.listener.SqsMessageDeletionPolicy.ON_SUCCESS;
 
@Component
public class SqsConsume {
 
    @SqsListener(value = "TEST-QUEUE", deletionPolicy = ON_SUCCESS)
    public void listener(@Payload String payload, @Headers Map<StringString> headers) {
        log.info("Received Header : {}", headers);
        log.info("Received Payload : {}", payload);
    }
}
cs

매개변수 중 대개는 payload만 활용한다. (Message Body)

옵션

  • value : 수신 대상이 되는 SQS 대기열(queue)의 이름이나, URL을 넣는다.
  • deletionPolicy : 수신 후 대기열에 남아있는 메시지를 어떻게 할 것인지 정책.
    • ALWAYS : 메서드로 메시지가 수신되면 바로 삭제한다.
    • NEVER : 메시지를 삭제하지 않는다.
    • NO_REDRIVE : DLQ(Dead-letter queue) 가 설정되지 않으면 삭제한다.
    • ON_SUCCESS : Exception이 발생하지 않고 메서드가 정상 종료되면 삭제한다.
    • DEFAULT : 아무 것도 정의하지 않으면 DEFAULT로 구동되는데, NO_REDRIVE로 작동한다.

deletionPolicy를 NEVER로 하고 Acknowledgment 파라메터의 메서드로 메시지를 삭제하는 CASE

1
2
3
4
5
6
7
8
9
@SqsListener(value = "TEST-QUEUE", deletionPolicy = NEVER)
public void onMessage(@Payload String payload, Acknowledgment ack) {
    try {
        transferService.convertAndTransfer(payload);
        ack.acknowledge();
    } catch (Exception e) {
        e.printStackTrace();
    }
}
cs

4.Graceful Up/Down 만들기

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import io.awspring.cloud.messaging.listener.SimpleMessageListenerContainer;
import org.springframework.stereotype.Service;
 
import javax.annotation.PostConstruct;
 
@Service
public class ConsumeSwitchService {
 
    private final SimpleMessageListenerContainer simpleMessageListenerContainer;
 
    public ConsumeSwitchService(SimpleMessageListenerContainer simpleMessageListenerContainer) {
        this.simpleMessageListenerContainer = simpleMessageListenerContainer;
    }
 
    public void start() {
        simpleMessageListenerContainer.start();
    }
 
    public void stop() {
        simpleMessageListenerContainer.stop();
    }
 
    public boolean isRunning() {
        return simpleMessageListenerContainer.isRunning();
    }
 
}
cs

적당히 서비스 클래스를 만들고 SimpleMessageListenerContainer 빈 의존성을 주입한다.

simpleMessageListenerContainer 객체의 start() 메서드를 실행하면 컨슈밍이 시작되고, stop() 메서드를 실행하면 컨슈밍이 중단된다.

API를 만들어 외부에서 호출하여 Start/Stop을 컨트롤할 수 있게 해줘도 좋을 것이다.

simpleMessageListenerContainer는 기본 설정이 autoStartup = true 이다. 빈 생성시나 서비스가 시작할 때 false로 별도 셋팅해주면 된다.

1
2
3
4
5
6
7
8
9
10
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(AmazonSQSAsync amazonSqs, QueueMessageHandler queueMessageHandler) {
    final SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory = new SimpleMessageListenerContainerFactory();
    simpleMessageListenerContainerFactory.setAmazonSqs(amazonSqs);
    SimpleMessageListenerContainer simpleMessageListenerContainer = simpleMessageListenerContainerFactory.createSimpleMessageListenerContainer();
    simpleMessageListenerContainer.setMessageHandler(queueMessageHandler);
    simpleMessageListenerContainer.setAutoStartup(false);       // 자동 시작 여부
    simpleMessageListenerContainer.setMaxNumberOfMessages(10);  // Consume 1회시 수신할 수 있는 최대의 메시지 갯수 : 10이 Maximun 값
    return simpleMessageListenerContainer;
}
cs

2022년 5월 27일 금요일

Queue에 메시지 전송하는 Producer 개발

Kafka와 SQS에 메시지를 발송하는 Producer의 개발 방법이다.

솔직히 해보면 매우 간단하고, 단일 프로젝트에 같이 구현하는 것도 가능하다. SpringBoot로 기본적인 API 프로젝트를 만들고 Controller에서 호출할 메시지 송신 메서드를 구현해본다.

Kafka Producer

1.build.gradle 에 spring-kafka 라이브러리 의존성을 추가한다.

1
2
3
4
dependencies {
    // Kafka
    implementation 'org.springframework.kafka:spring-kafka:2.8.2'
}
cs

2.application.yml에 메시지를 송신할 Kafka Broker의 서버 정보를 설정한다.

1
2
3
4
spring:
  kafka:
    producer:
      bootstrap-servers: kafka.01.server.com:9092,kafka.02.server.com:9092,kafka.03.server.com:9092
cs

3.Service 클래스를 하나 만들고 KafkaTemplate 빈 의존성을 주입한다.

1
2
3
4
5
6
7
8
@Service
public class KafkaSendingService {
    private final KafkaTemplate kafkaTemplate;
 
    public KafkaSendingService(KafkaTemplate kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }
}
cs

KafkaTemplate 빈은 자동 생성되므로 Override 할 필요가 없다면 따로 생성하지 않아도 된다.

4.TOPIC명과 메시지 내용을 매개변수로 받아 Kafka Broker로 보내는 메서드를 아래와 같이 작성한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
    public void sendMessage(String topicName, String message) {
        final ListenableFuture<SendResult<StringString>> future = kafkaTemplate.send(topicName, message);
        future.addCallback(new ListenableFutureCallback<SendResult<StringString>>() {
            @Override
            public void onFailure(Throwable ex) {
                ex.printStackTrace();
                throw new RuntimeException();
            }
 
            @Override
            public void onSuccess(SendResult<StringString> result) {
                log.info("Kafka sent message='{}'", message);
            }
        });
    }
cs

Controller나 다른 요소에서 위 sendMessage 메서드를 호출하면 메시지 전송이 된다.

SQS Producer

1.build.gradle에 Spring Cloud AWS의 라이브러리 중 SQS에 관련된 요소의 의존성을 추가한다.

1
2
3
4
5
6
dependencies {
    // Sqs
    implementation 'io.awspring.cloud:spring-cloud-aws-autoconfigure:2.4.1'
    implementation 'io.awspring.cloud:spring-cloud-aws-messaging:2.4.1'
    implementation 'io.awspring.cloud:spring-cloud-starter-aws:2.4.1'
}
cs

2.application.yml에 AWS Credential 과 Region 관련 설정을 넣는다.

1
2
3
4
5
6
7
8
9
10
cloud:
  aws:
    credentials:
      accessKey: # access-key 사용시 적시
      secretKey: # secret-key 사용시 적시
      use-default-aws-credentials-chain: true
    region:
      static: ap-northeast-2
    stack:
      auto: false
cs

cloud.aws.stack.auto는 기본값 설정이 true인데 AWS CloudFormation이 셋팅되어 있지 않으면 에러를 발생시키므로 false로 설정한다.

위 설정에서 accessKey와 secretKey는 AWS가 아닌 다른 환경에서 구동할 때 필요하다.

AWS SDK에서는 credential 관련해서 6가지의 인증 옵션을 제공하는데, 아무런 설정을 하지 않으면 우선 순위에 의해서 인증 옵션이 순차적으로 적용된다.(Provider Chain)

Default Credential Provider Chain : https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html

별도의 설정을 하고 싶지 않으면 use-default-aws-credentials-chain의 값을 true로 지정해줘야 Provider Chain이 작동된다. (기본값이 false)

특정한 인증수단을 지정해주고 싶으면, SQS나 S3의 Client 빈을 별도로 생성해주고 인증수단을 적시해놓는다.(아래 예시)

1
2
3
4
5
6
7
8
@Bean
public AmazonSQS amazonSQS() {
    return AmazonSQSAsyncClientBuilder
            .standard()
            .withCredentials(new WebIdentityTokenCredentialsProvider())
            .withRegion(Regions.AP_NORTHEAST_2)
            .build();
}
cs

3.QueueMessagingTemplate 빈을 생성한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSAsync;
import io.awspring.cloud.messaging.core.QueueMessagingTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
@Configuration
public class SqsConfig {
    @Bean
    public QueueMessagingTemplate queueMessagingTemplate(AmazonSQS amazonSQS) {
        return new QueueMessagingTemplate((AmazonSQSAsync) amazonSQS);
    }
}
cs

4.Service 클래스를 하나 만들고 QueueMessagingTemplate 빈 의존성을 주입한다.

1
2
3
4
5
6
7
8
@Service
public class SqsSendingService {
    private final QueueMessagingTemplate queueMessagingTemplate;
 
    public SqsSendingService(QueueMessagingTemplate queueMessagingTemplate) {
        this.queueMessagingTemplate = queueMessagingTemplate;
    }
}
cs

5.Queue 이름과 메시지를 매개변수로 받아 SQS로 보내는 메서드를 아래와 같이 작성한다.

1
2
3
4
public void sendMessage(String queueName, String message) {
    final Message<String> newMessage = MessageBuilder.withPayload(message).build();
    queueMessagingTemplate.send(queueName, newMessage);
}
cs

여기서 Queue 이름만 넣어서 전송하면 메시지는 당연하게도(?) credential 설정으로 인증된 AWS Account의 SQS에서 같은 이름의 Queue를 찾아간다.

만약 Queue가 없거나 하면 Error가 발생하고, 다른 AWS Account에서 생성된 SQS Queue로 보내고자 한다면 Queue의 Full URL을 넣어서 전송해야 한다. 서울 리전이라면 SQS의 Full URL은 아래의 형식으로 되어 있을 것이다.

https://sqs.ap-northeast2.amazonaws.com/{aws account 번호}/{Queue 대기열 이름}

Queue의 엑세스 정책에도 특정한 타 AWS Account에서도 접근이 가능하게 설정이 되어 있어야 한다. Controller나 다른 요소에서 위 sendMessage 메서드를 호출하면 메시지 전송이 된다.

Kotlin, SpringBoot 3, GraalVM 환경에서 Native Image로 컴파일하여 애플리케이션 실행

Spring Boot 3부터, GraalVM Native Image를 공식 지원하여 애플리케이션의 시작 속도와 메모리 사용량을 크게 줄일 수 있다. Native Image란 기존의 JVM 기반 위에서 돌아가는 Java 애플리케이션과는 달리 JVM 없이...