Producer를 개발해서 메시지를 Queue에 보낼 수 있게 해놓았으면 이게 그 메시지를 받아 DB에 적재하든지, 아니면 다른 방법으로 활용하기 위한 Consumer를 개발해야 할 것이다.
이것도 수신 기능을 구현하는 것 자체는 해보면 간단하다.. SpringBoot 프로젝트를 하나 새로 생성한다.
Kafka Consumer
1.build.gradle 에 spring-kafka 라이브러리 의존성을 추가한다.
| dependencies { // Kafka implementation 'org.springframework.kafka:spring-kafka:2.8.2' } | cs |
2.application.yml에 메시지를 수신할 Kafka Broker 서버 정보와, 컨슈머 그룹ID를 설정한다.
| spring: kafka: consumer: group-id: test_consumer_group_01 bootstrap-servers: kafka.01.server.com:9092,kafka.02.server.com:9092,kafka.03.server.com:9092 | cs |
컨슈머 그룹ID는 임의의 값을 직접 설정해서 쓰면 된다. 같은 컨슈머 그룹 내에서는 메시지를 중복 수신하지 않는다. 다른 컨슈머 그룹끼리는 동일한 메시지를 중복 수신할 수 있다.
컨슈머 그룹은 Kafka Topic의 offset에 등록되어 메시지가 모든 컨슈머 그룹에서 수신이 완료되지 않았을 경우 서버에 Lag으로 쌓이게 된다.
3.Listener 역할을 할 클래스를 만들고 @KafkaListener 애노테이션으로 메시지 수신 후 로직을 담을 메서드를 만든다.
| import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @Component public class KafkaConsume { @KafkaListener(topics = "TEST-TOPIC", concurrency = "1", autoStartup = "false") public void listener(ConsumerRecord<String, String> payload) { System.out.println("Received Key : " + payload.key()); System.out.println("Received Message : " + payload.value()); } } | cs |
옵션
- topics : 수신 대상이 되는 Kafka Topic의 이름을 넣는다.
- concurrency : 이 옵션에 적시할 수대로 thread가 발생하여 돌아간다. 통상 partition의 숫자만큼 thread가 동작하는 것이 가장 좋다고 한다.
- autoStartup : true 로 지정 시 애플리케이션이 start될 때 자동으로 컨슈밍이 시작되며, false 로 되어 있으면 수동으로 시작 메서드를 실행시켜줘야 한다.
4.Graceful Up/Down 만들기
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | import org.springframework.kafka.config.KafkaListenerEndpointRegistry; import org.springframework.kafka.listener.MessageListenerContainer; import org.springframework.stereotype.Service; @Service public class KafkaListenerService { private final KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; public KafkaListenerService(KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry) { this.kafkaListenerEndpointRegistry = kafkaListenerEndpointRegistry; } public void stopListener() { kafkaListenerEndpointRegistry.stop(); } public void startListener() { for (MessageListenerContainer listenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) { if (!listenerContainer.isRunning()) { listenerContainer.start(); } } } } | cs |
적당히 서비스 클래스를 만들고 KafkaListenerEndpointRegistry 빈 의존성을 주입한다.
kafkaListenerEndpointRegistry 객체의 stop() 메서드를 실행하면 모든 컨슈밍 작동이 중단되며, 등록된 listenerContainer 객체들의 start() 메서드를 실행시켜 주면 컨슈밍이 재개된다.
SQS Consumer
1.build.gradle에 Spring Cloud AWS의 라이브러리 중 SQS에 관련된 요소의 의존성을 추가한다.
| dependencies { // Sqs implementation 'io.awspring.cloud:spring-cloud-aws-autoconfigure:2.4.1' implementation 'io.awspring.cloud:spring-cloud-aws-messaging:2.4.1' implementation 'io.awspring.cloud:spring-cloud-starter-aws:2.4.1' } | cs |
2.application.yml에 AWS Credential 과 Region 관련 설정을 넣는다.
| cloud: aws: credentials: accessKey: # access-key 사용시 적시 secretKey: # secret-key 사용시 적시 use-default-aws-credentials-chain: true region: static: ap-northeast-2 stack: auto: false | cs |
cloud.aws.stack.auto는 기본값 설정이 true인데 AWS CloudFormation이 셋팅되어 있지 않으면 에러를 발생시키므로 false로 설정한다.
위 설정에서 accessKey와 secretKey는 AWS가 아닌 다른 환경에서 구동할 때 필요하다.
AWS SDK에서는 credential 관련해서 6가지의 인증 옵션을 제공하는데, 아무런 설정을 하지 않으면 우선 순위에 의해서 인증 옵션이 순차적으로 적용된다.(Provider Chain)
Default Credential Provider Chain : https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html
별도의 설정을 하고 싶지 않으면 use-default-aws-credentials-chain의 값을 true로 지정해줘야 Provider Chain이 작동된다. (기본값이 false)
특정한 인증수단을 지정해주고 싶으면, SQS나 S3의 Client 빈을 별도로 생성해주고 인증수단을 적시해놓는다.(아래 예시)
| @Bean public AmazonSQS amazonSQS() { return AmazonSQSAsyncClientBuilder .standard() .withCredentials(new WebIdentityTokenCredentialsProvider()) .withRegion(Regions.AP_NORTHEAST_2) .build(); } | cs |
3.Listener 클래스를 만들고 @SqsListener 애노테이션으로 수신 후 로직을 담을 메서드를 만든다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | import org.springframework.cloud.aws.messaging.listener.annotation.SqsListener; import org.springframework.messaging.handler.annotation.Headers; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; import java.util.Map; import static org.springframework.cloud.aws.messaging.listener.SqsMessageDeletionPolicy.ON_SUCCESS; @Component public class SqsConsume { @SqsListener(value = "TEST-QUEUE", deletionPolicy = ON_SUCCESS) public void listener(@Payload String payload, @Headers Map<String, String> headers) { log.info("Received Header : {}", headers); log.info("Received Payload : {}", payload); } } | cs |
매개변수 중 대개는 payload만 활용한다. (Message Body)
옵션
- value : 수신 대상이 되는 SQS 대기열(queue)의 이름이나, URL을 넣는다.
- deletionPolicy : 수신 후 대기열에 남아있는 메시지를 어떻게 할 것인지 정책.
- ALWAYS : 메서드로 메시지가 수신되면 바로 삭제한다.
- NEVER : 메시지를 삭제하지 않는다.
- NO_REDRIVE : DLQ(Dead-letter queue) 가 설정되지 않으면 삭제한다.
- ON_SUCCESS : Exception이 발생하지 않고 메서드가 정상 종료되면 삭제한다.
- DEFAULT : 아무 것도 정의하지 않으면 DEFAULT로 구동되는데, NO_REDRIVE로 작동한다.
deletionPolicy를 NEVER로 하고 Acknowledgment 파라메터의 메서드로 메시지를 삭제하는 CASE
| @SqsListener(value = "TEST-QUEUE", deletionPolicy = NEVER) public void onMessage(@Payload String payload, Acknowledgment ack) { try { transferService.convertAndTransfer(payload); ack.acknowledge(); } catch (Exception e) { e.printStackTrace(); } } | cs |
4.Graceful Up/Down 만들기
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | import io.awspring.cloud.messaging.listener.SimpleMessageListenerContainer; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; @Service public class ConsumeSwitchService { private final SimpleMessageListenerContainer simpleMessageListenerContainer; public ConsumeSwitchService(SimpleMessageListenerContainer simpleMessageListenerContainer) { this.simpleMessageListenerContainer = simpleMessageListenerContainer; } public void start() { simpleMessageListenerContainer.start(); } public void stop() { simpleMessageListenerContainer.stop(); } public boolean isRunning() { return simpleMessageListenerContainer.isRunning(); } } | cs |
적당히 서비스 클래스를 만들고 SimpleMessageListenerContainer 빈 의존성을 주입한다.
simpleMessageListenerContainer 객체의 start() 메서드를 실행하면 컨슈밍이 시작되고, stop() 메서드를 실행하면 컨슈밍이 중단된다.
API를 만들어 외부에서 호출하여 Start/Stop을 컨트롤할 수 있게 해줘도 좋을 것이다.
simpleMessageListenerContainer는 기본 설정이 autoStartup = true 이다. 빈 생성시나 서비스가 시작할 때 false로 별도 셋팅해주면 된다.
| @Bean public SimpleMessageListenerContainer simpleMessageListenerContainer(AmazonSQSAsync amazonSqs, QueueMessageHandler queueMessageHandler) { final SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory = new SimpleMessageListenerContainerFactory(); simpleMessageListenerContainerFactory.setAmazonSqs(amazonSqs); SimpleMessageListenerContainer simpleMessageListenerContainer = simpleMessageListenerContainerFactory.createSimpleMessageListenerContainer(); simpleMessageListenerContainer.setMessageHandler(queueMessageHandler); simpleMessageListenerContainer.setAutoStartup(false); // 자동 시작 여부 simpleMessageListenerContainer.setMaxNumberOfMessages(10); // Consume 1회시 수신할 수 있는 최대의 메시지 갯수 : 10이 Maximun 값 return simpleMessageListenerContainer; } | cs |