2022년 5월 28일 토요일

Queue 메시지를 받아 처리하는 Consumer 개발

Producer를 개발해서 메시지를 Queue에 보낼 수 있게 해놓았으면 이게 그 메시지를 받아 DB에 적재하든지, 아니면 다른 방법으로 활용하기 위한 Consumer를 개발해야 할 것이다.

이것도 수신 기능을 구현하는 것 자체는 해보면 간단하다.. SpringBoot 프로젝트를 하나 새로 생성한다.

Kafka Consumer

1.build.gradle 에 spring-kafka 라이브러리 의존성을 추가한다.

1
2
3
4
dependencies {
    // Kafka
    implementation 'org.springframework.kafka:spring-kafka:2.8.2'
}
cs

2.application.yml에 메시지를 수신할 Kafka Broker 서버 정보와, 컨슈머 그룹ID를 설정한다.

1
2
3
4
5
spring:
  kafka:
    consumer:
      group-id: test_consumer_group_01
      bootstrap-servers: kafka.01.server.com:9092,kafka.02.server.com:9092,kafka.03.server.com:9092
cs

컨슈머 그룹ID는 임의의 값을 직접 설정해서 쓰면 된다. 같은 컨슈머 그룹 내에서는 메시지를 중복 수신하지 않는다. 다른 컨슈머 그룹끼리는 동일한 메시지를 중복 수신할 수 있다.

컨슈머 그룹은 Kafka Topic의 offset에 등록되어 메시지가 모든 컨슈머 그룹에서 수신이 완료되지 않았을 경우 서버에 Lag으로 쌓이게 된다.

3.Listener 역할을 할 클래스를 만들고 @KafkaListener 애노테이션으로 메시지 수신 후 로직을 담을 메서드를 만든다.

1
2
3
4
5
6
7
8
9
10
11
12
13
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
 
@Component
public class KafkaConsume {
     
    @KafkaListener(topics = "TEST-TOPIC", concurrency = "1", autoStartup = "false")
    public void listener(ConsumerRecord<StringString> payload) {
        System.out.println("Received Key : " + payload.key());
        System.out.println("Received Message : " + payload.value());
    }
}
cs

옵션

  • topics : 수신 대상이 되는 Kafka Topic의 이름을 넣는다.
  • concurrency : 이 옵션에 적시할 수대로 thread가 발생하여 돌아간다. 통상 partition의 숫자만큼 thread가 동작하는 것이 가장 좋다고 한다.
  • autoStartup : true 로 지정 시 애플리케이션이 start될 때 자동으로 컨슈밍이 시작되며, false 로 되어 있으면 수동으로 시작 메서드를 실행시켜줘야 한다.

4.Graceful Up/Down 만들기

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.stereotype.Service;
 
@Service
public class KafkaListenerService {
 
    private final KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
 
    public KafkaListenerService(KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry) {
        this.kafkaListenerEndpointRegistry = kafkaListenerEndpointRegistry;
    }
 
    public void stopListener() {
        kafkaListenerEndpointRegistry.stop();
    }
 
    public void startListener() {
        for (MessageListenerContainer listenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
            if (!listenerContainer.isRunning()) {
                listenerContainer.start();
            }
        }
    }
}
cs

적당히 서비스 클래스를 만들고 KafkaListenerEndpointRegistry 빈 의존성을 주입한다.

kafkaListenerEndpointRegistry 객체의 stop() 메서드를 실행하면 모든 컨슈밍 작동이 중단되며, 등록된 listenerContainer 객체들의 start() 메서드를 실행시켜 주면 컨슈밍이 재개된다.

SQS Consumer

1.build.gradle에 Spring Cloud AWS의 라이브러리 중 SQS에 관련된 요소의 의존성을 추가한다.

1
2
3
4
5
6
dependencies {
    // Sqs
    implementation 'io.awspring.cloud:spring-cloud-aws-autoconfigure:2.4.1'
    implementation 'io.awspring.cloud:spring-cloud-aws-messaging:2.4.1'
    implementation 'io.awspring.cloud:spring-cloud-starter-aws:2.4.1'
}
cs

2.application.yml에 AWS Credential 과 Region 관련 설정을 넣는다.

1
2
3
4
5
6
7
8
9
10
cloud:
  aws:
    credentials:
      accessKey: # access-key 사용시 적시
      secretKey: # secret-key 사용시 적시
      use-default-aws-credentials-chain: true
    region:
      static: ap-northeast-2
    stack:
      auto: false
cs

cloud.aws.stack.auto는 기본값 설정이 true인데 AWS CloudFormation이 셋팅되어 있지 않으면 에러를 발생시키므로 false로 설정한다.

위 설정에서 accessKey와 secretKey는 AWS가 아닌 다른 환경에서 구동할 때 필요하다.

AWS SDK에서는 credential 관련해서 6가지의 인증 옵션을 제공하는데, 아무런 설정을 하지 않으면 우선 순위에 의해서 인증 옵션이 순차적으로 적용된다.(Provider Chain)

Default Credential Provider Chain : https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html

별도의 설정을 하고 싶지 않으면 use-default-aws-credentials-chain의 값을 true로 지정해줘야 Provider Chain이 작동된다. (기본값이 false)

특정한 인증수단을 지정해주고 싶으면, SQS나 S3의 Client 빈을 별도로 생성해주고 인증수단을 적시해놓는다.(아래 예시)

1
2
3
4
5
6
7
8
@Bean
public AmazonSQS amazonSQS() {
    return AmazonSQSAsyncClientBuilder
            .standard()
            .withCredentials(new WebIdentityTokenCredentialsProvider())
            .withRegion(Regions.AP_NORTHEAST_2)
            .build();
}
cs

3.Listener 클래스를 만들고 @SqsListener 애노테이션으로 수신 후 로직을 담을 메서드를 만든다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import org.springframework.cloud.aws.messaging.listener.annotation.SqsListener;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
 
import java.util.Map;
 
import static org.springframework.cloud.aws.messaging.listener.SqsMessageDeletionPolicy.ON_SUCCESS;
 
@Component
public class SqsConsume {
 
    @SqsListener(value = "TEST-QUEUE", deletionPolicy = ON_SUCCESS)
    public void listener(@Payload String payload, @Headers Map<StringString> headers) {
        log.info("Received Header : {}", headers);
        log.info("Received Payload : {}", payload);
    }
}
cs

매개변수 중 대개는 payload만 활용한다. (Message Body)

옵션

  • value : 수신 대상이 되는 SQS 대기열(queue)의 이름이나, URL을 넣는다.
  • deletionPolicy : 수신 후 대기열에 남아있는 메시지를 어떻게 할 것인지 정책.
    • ALWAYS : 메서드로 메시지가 수신되면 바로 삭제한다.
    • NEVER : 메시지를 삭제하지 않는다.
    • NO_REDRIVE : DLQ(Dead-letter queue) 가 설정되지 않으면 삭제한다.
    • ON_SUCCESS : Exception이 발생하지 않고 메서드가 정상 종료되면 삭제한다.
    • DEFAULT : 아무 것도 정의하지 않으면 DEFAULT로 구동되는데, NO_REDRIVE로 작동한다.

deletionPolicy를 NEVER로 하고 Acknowledgment 파라메터의 메서드로 메시지를 삭제하는 CASE

1
2
3
4
5
6
7
8
9
@SqsListener(value = "TEST-QUEUE", deletionPolicy = NEVER)
public void onMessage(@Payload String payload, Acknowledgment ack) {
    try {
        transferService.convertAndTransfer(payload);
        ack.acknowledge();
    } catch (Exception e) {
        e.printStackTrace();
    }
}
cs

4.Graceful Up/Down 만들기

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import io.awspring.cloud.messaging.listener.SimpleMessageListenerContainer;
import org.springframework.stereotype.Service;
 
import javax.annotation.PostConstruct;
 
@Service
public class ConsumeSwitchService {
 
    private final SimpleMessageListenerContainer simpleMessageListenerContainer;
 
    public ConsumeSwitchService(SimpleMessageListenerContainer simpleMessageListenerContainer) {
        this.simpleMessageListenerContainer = simpleMessageListenerContainer;
    }
 
    public void start() {
        simpleMessageListenerContainer.start();
    }
 
    public void stop() {
        simpleMessageListenerContainer.stop();
    }
 
    public boolean isRunning() {
        return simpleMessageListenerContainer.isRunning();
    }
 
}
cs

적당히 서비스 클래스를 만들고 SimpleMessageListenerContainer 빈 의존성을 주입한다.

simpleMessageListenerContainer 객체의 start() 메서드를 실행하면 컨슈밍이 시작되고, stop() 메서드를 실행하면 컨슈밍이 중단된다.

API를 만들어 외부에서 호출하여 Start/Stop을 컨트롤할 수 있게 해줘도 좋을 것이다.

simpleMessageListenerContainer는 기본 설정이 autoStartup = true 이다. 빈 생성시나 서비스가 시작할 때 false로 별도 셋팅해주면 된다.

1
2
3
4
5
6
7
8
9
10
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(AmazonSQSAsync amazonSqs, QueueMessageHandler queueMessageHandler) {
    final SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory = new SimpleMessageListenerContainerFactory();
    simpleMessageListenerContainerFactory.setAmazonSqs(amazonSqs);
    SimpleMessageListenerContainer simpleMessageListenerContainer = simpleMessageListenerContainerFactory.createSimpleMessageListenerContainer();
    simpleMessageListenerContainer.setMessageHandler(queueMessageHandler);
    simpleMessageListenerContainer.setAutoStartup(false);       // 자동 시작 여부
    simpleMessageListenerContainer.setMaxNumberOfMessages(10);  // Consume 1회시 수신할 수 있는 최대의 메시지 갯수 : 10이 Maximun 값
    return simpleMessageListenerContainer;
}
cs

댓글 없음:

댓글 쓰기

Kotlin, SpringBoot 3, GraalVM 환경에서 Native Image로 컴파일하여 애플리케이션 실행

Spring Boot 3부터, GraalVM Native Image를 공식 지원하여 애플리케이션의 시작 속도와 메모리 사용량을 크게 줄일 수 있다. Native Image란 기존의 JVM 기반 위에서 돌아가는 Java 애플리케이션과는 달리 JVM 없이...