2022년 5월 28일 토요일

Queue 메시지를 받아 처리하는 Consumer 개발

Producer를 개발해서 메시지를 Queue에 보낼 수 있게 해놓았으면 이게 그 메시지를 받아 DB에 적재하든지, 아니면 다른 방법으로 활용하기 위한 Consumer를 개발해야 할 것이다.

이것도 수신 기능을 구현하는 것 자체는 해보면 간단하다.. SpringBoot 프로젝트를 하나 새로 생성한다.

Kafka Consumer

1.build.gradle 에 spring-kafka 라이브러리 의존성을 추가한다.

1
2
3
4
dependencies {
    // Kafka
    implementation 'org.springframework.kafka:spring-kafka:2.8.2'
}
cs

2.application.yml에 메시지를 수신할 Kafka Broker 서버 정보와, 컨슈머 그룹ID를 설정한다.

1
2
3
4
5
spring:
  kafka:
    consumer:
      group-id: test_consumer_group_01
      bootstrap-servers: kafka.01.server.com:9092,kafka.02.server.com:9092,kafka.03.server.com:9092
cs

컨슈머 그룹ID는 임의의 값을 직접 설정해서 쓰면 된다. 같은 컨슈머 그룹 내에서는 메시지를 중복 수신하지 않는다. 다른 컨슈머 그룹끼리는 동일한 메시지를 중복 수신할 수 있다.

컨슈머 그룹은 Kafka Topic의 offset에 등록되어 메시지가 모든 컨슈머 그룹에서 수신이 완료되지 않았을 경우 서버에 Lag으로 쌓이게 된다.

3.Listener 역할을 할 클래스를 만들고 @KafkaListener 애노테이션으로 메시지 수신 후 로직을 담을 메서드를 만든다.

1
2
3
4
5
6
7
8
9
10
11
12
13
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
 
@Component
public class KafkaConsume {
     
    @KafkaListener(topics = "TEST-TOPIC", concurrency = "1", autoStartup = "false")
    public void listener(ConsumerRecord<StringString> payload) {
        System.out.println("Received Key : " + payload.key());
        System.out.println("Received Message : " + payload.value());
    }
}
cs

옵션

  • topics : 수신 대상이 되는 Kafka Topic의 이름을 넣는다.
  • concurrency : 이 옵션에 적시할 수대로 thread가 발생하여 돌아간다. 통상 partition의 숫자만큼 thread가 동작하는 것이 가장 좋다고 한다.
  • autoStartup : true 로 지정 시 애플리케이션이 start될 때 자동으로 컨슈밍이 시작되며, false 로 되어 있으면 수동으로 시작 메서드를 실행시켜줘야 한다.

4.Graceful Up/Down 만들기

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.stereotype.Service;
 
@Service
public class KafkaListenerService {
 
    private final KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
 
    public KafkaListenerService(KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry) {
        this.kafkaListenerEndpointRegistry = kafkaListenerEndpointRegistry;
    }
 
    public void stopListener() {
        kafkaListenerEndpointRegistry.stop();
    }
 
    public void startListener() {
        for (MessageListenerContainer listenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
            if (!listenerContainer.isRunning()) {
                listenerContainer.start();
            }
        }
    }
}
cs

적당히 서비스 클래스를 만들고 KafkaListenerEndpointRegistry 빈 의존성을 주입한다.

kafkaListenerEndpointRegistry 객체의 stop() 메서드를 실행하면 모든 컨슈밍 작동이 중단되며, 등록된 listenerContainer 객체들의 start() 메서드를 실행시켜 주면 컨슈밍이 재개된다.

SQS Consumer

1.build.gradle에 Spring Cloud AWS의 라이브러리 중 SQS에 관련된 요소의 의존성을 추가한다.

1
2
3
4
5
6
dependencies {
    // Sqs
    implementation 'io.awspring.cloud:spring-cloud-aws-autoconfigure:2.4.1'
    implementation 'io.awspring.cloud:spring-cloud-aws-messaging:2.4.1'
    implementation 'io.awspring.cloud:spring-cloud-starter-aws:2.4.1'
}
cs

2.application.yml에 AWS Credential 과 Region 관련 설정을 넣는다.

1
2
3
4
5
6
7
8
9
10
cloud:
  aws:
    credentials:
      accessKey: # access-key 사용시 적시
      secretKey: # secret-key 사용시 적시
      use-default-aws-credentials-chain: true
    region:
      static: ap-northeast-2
    stack:
      auto: false
cs

cloud.aws.stack.auto는 기본값 설정이 true인데 AWS CloudFormation이 셋팅되어 있지 않으면 에러를 발생시키므로 false로 설정한다.

위 설정에서 accessKey와 secretKey는 AWS가 아닌 다른 환경에서 구동할 때 필요하다.

AWS SDK에서는 credential 관련해서 6가지의 인증 옵션을 제공하는데, 아무런 설정을 하지 않으면 우선 순위에 의해서 인증 옵션이 순차적으로 적용된다.(Provider Chain)

Default Credential Provider Chain : https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html

별도의 설정을 하고 싶지 않으면 use-default-aws-credentials-chain의 값을 true로 지정해줘야 Provider Chain이 작동된다. (기본값이 false)

특정한 인증수단을 지정해주고 싶으면, SQS나 S3의 Client 빈을 별도로 생성해주고 인증수단을 적시해놓는다.(아래 예시)

1
2
3
4
5
6
7
8
@Bean
public AmazonSQS amazonSQS() {
    return AmazonSQSAsyncClientBuilder
            .standard()
            .withCredentials(new WebIdentityTokenCredentialsProvider())
            .withRegion(Regions.AP_NORTHEAST_2)
            .build();
}
cs

3.Listener 클래스를 만들고 @SqsListener 애노테이션으로 수신 후 로직을 담을 메서드를 만든다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import org.springframework.cloud.aws.messaging.listener.annotation.SqsListener;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
 
import java.util.Map;
 
import static org.springframework.cloud.aws.messaging.listener.SqsMessageDeletionPolicy.ON_SUCCESS;
 
@Component
public class SqsConsume {
 
    @SqsListener(value = "TEST-QUEUE", deletionPolicy = ON_SUCCESS)
    public void listener(@Payload String payload, @Headers Map<StringString> headers) {
        log.info("Received Header : {}", headers);
        log.info("Received Payload : {}", payload);
    }
}
cs

매개변수 중 대개는 payload만 활용한다. (Message Body)

옵션

  • value : 수신 대상이 되는 SQS 대기열(queue)의 이름이나, URL을 넣는다.
  • deletionPolicy : 수신 후 대기열에 남아있는 메시지를 어떻게 할 것인지 정책.
    • ALWAYS : 메서드로 메시지가 수신되면 바로 삭제한다.
    • NEVER : 메시지를 삭제하지 않는다.
    • NO_REDRIVE : DLQ(Dead-letter queue) 가 설정되지 않으면 삭제한다.
    • ON_SUCCESS : Exception이 발생하지 않고 메서드가 정상 종료되면 삭제한다.
    • DEFAULT : 아무 것도 정의하지 않으면 DEFAULT로 구동되는데, NO_REDRIVE로 작동한다.

deletionPolicy를 NEVER로 하고 Acknowledgment 파라메터의 메서드로 메시지를 삭제하는 CASE

1
2
3
4
5
6
7
8
9
@SqsListener(value = "TEST-QUEUE", deletionPolicy = NEVER)
public void onMessage(@Payload String payload, Acknowledgment ack) {
    try {
        transferService.convertAndTransfer(payload);
        ack.acknowledge();
    } catch (Exception e) {
        e.printStackTrace();
    }
}
cs

4.Graceful Up/Down 만들기

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import io.awspring.cloud.messaging.listener.SimpleMessageListenerContainer;
import org.springframework.stereotype.Service;
 
import javax.annotation.PostConstruct;
 
@Service
public class ConsumeSwitchService {
 
    private final SimpleMessageListenerContainer simpleMessageListenerContainer;
 
    public ConsumeSwitchService(SimpleMessageListenerContainer simpleMessageListenerContainer) {
        this.simpleMessageListenerContainer = simpleMessageListenerContainer;
    }
 
    public void start() {
        simpleMessageListenerContainer.start();
    }
 
    public void stop() {
        simpleMessageListenerContainer.stop();
    }
 
    public boolean isRunning() {
        return simpleMessageListenerContainer.isRunning();
    }
 
}
cs

적당히 서비스 클래스를 만들고 SimpleMessageListenerContainer 빈 의존성을 주입한다.

simpleMessageListenerContainer 객체의 start() 메서드를 실행하면 컨슈밍이 시작되고, stop() 메서드를 실행하면 컨슈밍이 중단된다.

API를 만들어 외부에서 호출하여 Start/Stop을 컨트롤할 수 있게 해줘도 좋을 것이다.

simpleMessageListenerContainer는 기본 설정이 autoStartup = true 이다. 빈 생성시나 서비스가 시작할 때 false로 별도 셋팅해주면 된다.

1
2
3
4
5
6
7
8
9
10
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(AmazonSQSAsync amazonSqs, QueueMessageHandler queueMessageHandler) {
    final SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory = new SimpleMessageListenerContainerFactory();
    simpleMessageListenerContainerFactory.setAmazonSqs(amazonSqs);
    SimpleMessageListenerContainer simpleMessageListenerContainer = simpleMessageListenerContainerFactory.createSimpleMessageListenerContainer();
    simpleMessageListenerContainer.setMessageHandler(queueMessageHandler);
    simpleMessageListenerContainer.setAutoStartup(false);       // 자동 시작 여부
    simpleMessageListenerContainer.setMaxNumberOfMessages(10);  // Consume 1회시 수신할 수 있는 최대의 메시지 갯수 : 10이 Maximun 값
    return simpleMessageListenerContainer;
}
cs

2022년 5월 27일 금요일

Queue에 메시지 전송하는 Producer 개발

Kafka와 SQS에 메시지를 발송하는 Producer의 개발 방법이다.

솔직히 해보면 매우 간단하고, 단일 프로젝트에 같이 구현하는 것도 가능하다. SpringBoot로 기본적인 API 프로젝트를 만들고 Controller에서 호출할 메시지 송신 메서드를 구현해본다.

Kafka Producer

1.build.gradle 에 spring-kafka 라이브러리 의존성을 추가한다.

1
2
3
4
dependencies {
    // Kafka
    implementation 'org.springframework.kafka:spring-kafka:2.8.2'
}
cs

2.application.yml에 메시지를 송신할 Kafka Broker의 서버 정보를 설정한다.

1
2
3
4
spring:
  kafka:
    producer:
      bootstrap-servers: kafka.01.server.com:9092,kafka.02.server.com:9092,kafka.03.server.com:9092
cs

3.Service 클래스를 하나 만들고 KafkaTemplate 빈 의존성을 주입한다.

1
2
3
4
5
6
7
8
@Service
public class KafkaSendingService {
    private final KafkaTemplate kafkaTemplate;
 
    public KafkaSendingService(KafkaTemplate kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }
}
cs

KafkaTemplate 빈은 자동 생성되므로 Override 할 필요가 없다면 따로 생성하지 않아도 된다.

4.TOPIC명과 메시지 내용을 매개변수로 받아 Kafka Broker로 보내는 메서드를 아래와 같이 작성한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
    public void sendMessage(String topicName, String message) {
        final ListenableFuture<SendResult<StringString>> future = kafkaTemplate.send(topicName, message);
        future.addCallback(new ListenableFutureCallback<SendResult<StringString>>() {
            @Override
            public void onFailure(Throwable ex) {
                ex.printStackTrace();
                throw new RuntimeException();
            }
 
            @Override
            public void onSuccess(SendResult<StringString> result) {
                log.info("Kafka sent message='{}'", message);
            }
        });
    }
cs

Controller나 다른 요소에서 위 sendMessage 메서드를 호출하면 메시지 전송이 된다.

SQS Producer

1.build.gradle에 Spring Cloud AWS의 라이브러리 중 SQS에 관련된 요소의 의존성을 추가한다.

1
2
3
4
5
6
dependencies {
    // Sqs
    implementation 'io.awspring.cloud:spring-cloud-aws-autoconfigure:2.4.1'
    implementation 'io.awspring.cloud:spring-cloud-aws-messaging:2.4.1'
    implementation 'io.awspring.cloud:spring-cloud-starter-aws:2.4.1'
}
cs

2.application.yml에 AWS Credential 과 Region 관련 설정을 넣는다.

1
2
3
4
5
6
7
8
9
10
cloud:
  aws:
    credentials:
      accessKey: # access-key 사용시 적시
      secretKey: # secret-key 사용시 적시
      use-default-aws-credentials-chain: true
    region:
      static: ap-northeast-2
    stack:
      auto: false
cs

cloud.aws.stack.auto는 기본값 설정이 true인데 AWS CloudFormation이 셋팅되어 있지 않으면 에러를 발생시키므로 false로 설정한다.

위 설정에서 accessKey와 secretKey는 AWS가 아닌 다른 환경에서 구동할 때 필요하다.

AWS SDK에서는 credential 관련해서 6가지의 인증 옵션을 제공하는데, 아무런 설정을 하지 않으면 우선 순위에 의해서 인증 옵션이 순차적으로 적용된다.(Provider Chain)

Default Credential Provider Chain : https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html

별도의 설정을 하고 싶지 않으면 use-default-aws-credentials-chain의 값을 true로 지정해줘야 Provider Chain이 작동된다. (기본값이 false)

특정한 인증수단을 지정해주고 싶으면, SQS나 S3의 Client 빈을 별도로 생성해주고 인증수단을 적시해놓는다.(아래 예시)

1
2
3
4
5
6
7
8
@Bean
public AmazonSQS amazonSQS() {
    return AmazonSQSAsyncClientBuilder
            .standard()
            .withCredentials(new WebIdentityTokenCredentialsProvider())
            .withRegion(Regions.AP_NORTHEAST_2)
            .build();
}
cs

3.QueueMessagingTemplate 빈을 생성한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSAsync;
import io.awspring.cloud.messaging.core.QueueMessagingTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
@Configuration
public class SqsConfig {
    @Bean
    public QueueMessagingTemplate queueMessagingTemplate(AmazonSQS amazonSQS) {
        return new QueueMessagingTemplate((AmazonSQSAsync) amazonSQS);
    }
}
cs

4.Service 클래스를 하나 만들고 QueueMessagingTemplate 빈 의존성을 주입한다.

1
2
3
4
5
6
7
8
@Service
public class SqsSendingService {
    private final QueueMessagingTemplate queueMessagingTemplate;
 
    public SqsSendingService(QueueMessagingTemplate queueMessagingTemplate) {
        this.queueMessagingTemplate = queueMessagingTemplate;
    }
}
cs

5.Queue 이름과 메시지를 매개변수로 받아 SQS로 보내는 메서드를 아래와 같이 작성한다.

1
2
3
4
public void sendMessage(String queueName, String message) {
    final Message<String> newMessage = MessageBuilder.withPayload(message).build();
    queueMessagingTemplate.send(queueName, newMessage);
}
cs

여기서 Queue 이름만 넣어서 전송하면 메시지는 당연하게도(?) credential 설정으로 인증된 AWS Account의 SQS에서 같은 이름의 Queue를 찾아간다.

만약 Queue가 없거나 하면 Error가 발생하고, 다른 AWS Account에서 생성된 SQS Queue로 보내고자 한다면 Queue의 Full URL을 넣어서 전송해야 한다. 서울 리전이라면 SQS의 Full URL은 아래의 형식으로 되어 있을 것이다.

https://sqs.ap-northeast2.amazonaws.com/{aws account 번호}/{Queue 대기열 이름}

Queue의 엑세스 정책에도 특정한 타 AWS Account에서도 접근이 가능하게 설정이 되어 있어야 한다. Controller나 다른 요소에서 위 sendMessage 메서드를 호출하면 메시지 전송이 된다.

2022년 5월 25일 수요일

MSK vs SQS

애플리케이션들을 AWS 클라우드로 이관하고 Pub/Sub 체계를 구축하는데 있어서 우리는 이미 온프레미스 환경에서 Kafka를 사용한 경험이 있었기에 AWS에서 SaaS형태로 제공하는 MSK와 SQS를 놓고 검토를 하게 되었는데 두 솔루션의 장단점이 명확하므로 여기에 정리를 해본다.

Amazon MSK (Managed Streaming for Apache Kafka)

  • AWS에서 제공하는 완전 관리형 Apache Kafka 서비스
  • 기존에 On-Promise에서 사용하던 혹은 EC2로 관리하던 Apache Kafka를 SaaS형태로 사용할 수 있음
  • Apache Kafka의 특정 버젼을 그대로 사용할 수 있기 때문에 Vanila Apache Kafka의 버젼별 Api spec을 따라서 사용 가능
  • MSK 모니터링 제공
  • Apache Kafka의 특징을 그대로 활용
    • 같은 partition 내에서만 순서 보장이 됨. → partition이 다르면 순서를 보장하지 못한다는 의미.
    • Pub/Sub 아키텍처 구축에 많이 사용되어 레퍼런스 자료가 풍부.
    • 컨슈머 그룹을 분리하여 운영한다면 같은 메시지를 서로 다른 컨슈머에서 각각 수신하여 사용할 수 있다.
  • 장점
    • 아키텍처 구성이 간편하고 구축 시간을 줄일 수 있음.
    • AWS에서 관리툴 제공하므로 모니터링이 용이할 듯함.
    • 자동복구 및 패치 기능이 있어 애플리케이션 지속성 향상, 안정적임.
  • 단점
    • 쓰는 만큼 비용이 나가고 그 비용이 다소 비쌈, 강제로 VPC/AZ에 broker 들이 할당되어서 제약사항이 많음.
    • 로컬 개발환경에서 MSK 접속이 안됨.

Amazon SQS (Simple Queue Service)

  • 지속적이고 가용성이 뛰어난 보안 호스팅 대기열을 제공하며 이를 통해 분산 소프트웨어 시스템과 구성 요소를 통합 및 분리
  • 이미 만들어지고 운영 중인 AWS 서비스에서 Queue 채널만 추가 생성하는 개념으로 이해
  • 일반적인 Queue는 순서 보장을 하지 않으며, FIFO Queue는 순서 보장이 됨.
    • 단 FIFO Queue는 초당 3,000개 메시지까지만 처리 가능
  • 256KB 용량 제한
    • 256KB를 초과하는 메시지는 S3에 업로드하는 방식으로 우회하여 처리하는 방법이 있고, AWS에서도 해당 기능을 가진 라이브러리를 제공 중
    • https://github.com/awslabs/amazon-sqs-java-extended-client-lib
  • 장점
    • 관리와 사용이 매우 쉽고 간편 단점 256KB의 용량 제한, 초당 처리량 제한 등의 성능 이슈
    • 기능이 Kafka만큼 High Level로 제공되지 않음
    • 하나의 메시지를 여러 개의 컨슈머가 동시에 수신할 수 없음

Note: With Amazon SQS, multiple consumers can’t receive the same message simultaneously. A message can only be received from a queue by one consumer that’s ready to process and then delete the message received. A message can be retained in queues for 14 days maximum.

2022년 5월 19일 목요일

HTTP Header Referrer 관리 및 Google Chrome 정책

A 사이트에서 B 사이트를 연결할 때 HTTP 헤더의 Referrer에는 요청된 웹사이트의 URL이나 도메인이 들어감.

Referrer를 어떤 형태로 보낼 것인지는 요청 사이트에서 meta tag로 제어할 수 있다.

아래는 NAVER의 블로그 서비스에 정의된 Referrer 정책이다.

Referrer 정책이 “always” 로 지정되어 있고, 이는 NAVER 블로그에서 다른 웹사이트로 HTTP 요청을 보낼 때 항상 Referrer 에 Full URL을 전송한다는 의미이다.

HTTP 요청의 헤더에서 Referrer 항목을 로그로 출력해보면 위와 같이 요청 사이트의 URL이 나온다.

NAVER 블로그에서 Link로 연결된 모든 웹사이트에서는 Referrer 주소를 이와 같이 취득이 가능하다 는 얘기다.

Referrer는 통상 Apache나 Nginx 등 웹서버에서 정책적으로 관리하지만 HTML의 meta tag로 제어할 경우에는 아래와 같은 옵션으로 관리한다.

브라우저별 지원 여부 : https://caniuse.com/mdn-html_elements_meta_name_referrer

Chrome에서는 no-referrer-when-downgrade 를 기본 정책으로 사용하였다가 85버전부터는 strict-origin-when-cross-origin 으로 정책을 변경하였고, 타 브라우저도 보안 강화 흐름에 따라 점차 엄격한 기준으로 변경할 것이 확실시됨.

Notice 원문 : https://developer.chrome.com/blog/referrer-policy-new-chrome-default/

Up until recently, no-referrer-when-downgrade has been a widespread default policy across browsers. But now many browsers are in some stage of moving to more privacy-enhancing defaults. Chrome plans to switch its default policy from no-referrer-when-downgrade to strict-origin-when-cross-origin, starting in version 85.

HTTP 헤더의 Referer 정보를 사용한 개발 시 이슈

  • Chrome 웹브라우저의 기본 정책 상으로는 요청 사이트의 Full URL을 알 수 없음.
    • 도메인 단위의 필터링은 가능할 수 있으나, 더 세분화된 path 레벨의 필터링은 보편적으로 어려움.
  • Referrer 정책은 웹서버의 설정으로 통해 제어하는 것이 일반적이기 때문에, 단순 HTML 태그의 내용만으로는 단정지을 수 없고, 사이트마다 CASE를 만들어 사전 테스트가 필요함.
    • 원하는 결과가 나오지 않았을 때는 해당되는 요청 웹사이트의 개발/관리부서에 설정 변경 요청이 필요할 수 있으며 (요청 내용이) 해당 기업의 보안 정책과 상충될 소지가 있음.
    • 사내망에서 Link를 타고 접속 시 외부망으로 나가는 트래픽에 대해 Proxy에서 HTTP 헤더 내용의 변조 등의 보안 정책이 있을 수 있음
      • 접속 장소에 따라 Referrer 내용이 들어올 수도, 안들어올 수도 있음.
      • AWS에 탑재된 웹사이트의 경우 AWS에서 HTTP 헤더의 내용을 변조하기 때문에 별도의 조치가 필요함 → 기획전이 이에 해당.
    • 웹브라우저 정책이 점차 보안을 강화해나가는 추세이고 제각각이기 때문에, 향후 정책 변경에 대한 대응에 어려움이 있음.

→ Referrer 체크의 활용은 현재에는 제한적으로 사용가능하나,

각각의 웹브라우저 버전 및 정책, 요청 웹사이트 회사의 보안 정책과 인프라 환경 등에 대한 변수가 상존하기 때문에 (구동에 관한) 완전한 개런티 보장은 되지 않음.

Kotlin, SpringBoot 3, GraalVM 환경에서 Native Image로 컴파일하여 애플리케이션 실행

Spring Boot 3부터, GraalVM Native Image를 공식 지원하여 애플리케이션의 시작 속도와 메모리 사용량을 크게 줄일 수 있다. Native Image란 기존의 JVM 기반 위에서 돌아가는 Java 애플리케이션과는 달리 JVM 없이...