Kafka와 SQS에 메시지를 발송하는 Producer의 개발 방법이다.
솔직히 해보면 매우 간단하고, 단일 프로젝트에 같이 구현하는 것도 가능하다. SpringBoot로 기본적인 API 프로젝트를 만들고 Controller에서 호출할 메시지 송신 메서드를 구현해본다.
Kafka Producer
1.build.gradle 에 spring-kafka 라이브러리 의존성을 추가한다.
1 2 3 4 | dependencies { // Kafka implementation 'org.springframework.kafka:spring-kafka:2.8.2' } | cs |
2.application.yml에 메시지를 송신할 Kafka Broker의 서버 정보를 설정한다.
1 2 3 4 | spring: kafka: producer: bootstrap-servers: kafka.01.server.com:9092,kafka.02.server.com:9092,kafka.03.server.com:9092 | cs |
3.Service 클래스를 하나 만들고 KafkaTemplate 빈 의존성을 주입한다.
1 2 3 4 5 6 7 8 | @Service public class KafkaSendingService { private final KafkaTemplate kafkaTemplate; public KafkaSendingService(KafkaTemplate kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } } | cs |
KafkaTemplate 빈은 자동 생성되므로 Override 할 필요가 없다면 따로 생성하지 않아도 된다.
4.TOPIC명과 메시지 내용을 매개변수로 받아 Kafka Broker로 보내는 메서드를 아래와 같이 작성한다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | public void sendMessage(String topicName, String message) { final ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topicName, message); future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() { @Override public void onFailure(Throwable ex) { ex.printStackTrace(); throw new RuntimeException(); } @Override public void onSuccess(SendResult<String, String> result) { log.info("Kafka sent message='{}'", message); } }); } | cs |
Controller나 다른 요소에서 위 sendMessage 메서드를 호출하면 메시지 전송이 된다.
SQS Producer
1.build.gradle에 Spring Cloud AWS의 라이브러리 중 SQS에 관련된 요소의 의존성을 추가한다.
1 2 3 4 5 6 | dependencies { // Sqs implementation 'io.awspring.cloud:spring-cloud-aws-autoconfigure:2.4.1' implementation 'io.awspring.cloud:spring-cloud-aws-messaging:2.4.1' implementation 'io.awspring.cloud:spring-cloud-starter-aws:2.4.1' } | cs |
2.application.yml에 AWS Credential 과 Region 관련 설정을 넣는다.
1 2 3 4 5 6 7 8 9 10 | cloud: aws: credentials: accessKey: # access-key 사용시 적시 secretKey: # secret-key 사용시 적시 use-default-aws-credentials-chain: true region: static: ap-northeast-2 stack: auto: false | cs |
cloud.aws.stack.auto는 기본값 설정이 true인데 AWS CloudFormation이 셋팅되어 있지 않으면 에러를 발생시키므로 false로 설정한다.
위 설정에서 accessKey와 secretKey는 AWS가 아닌 다른 환경에서 구동할 때 필요하다.
AWS SDK에서는 credential 관련해서 6가지의 인증 옵션을 제공하는데, 아무런 설정을 하지 않으면 우선 순위에 의해서 인증 옵션이 순차적으로 적용된다.(Provider Chain)
Default Credential Provider Chain : https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html
별도의 설정을 하고 싶지 않으면 use-default-aws-credentials-chain의 값을 true로 지정해줘야 Provider Chain이 작동된다. (기본값이 false)
특정한 인증수단을 지정해주고 싶으면, SQS나 S3의 Client 빈을 별도로 생성해주고 인증수단을 적시해놓는다.(아래 예시)
1 2 3 4 5 6 7 8 | @Bean public AmazonSQS amazonSQS() { return AmazonSQSAsyncClientBuilder .standard() .withCredentials(new WebIdentityTokenCredentialsProvider()) .withRegion(Regions.AP_NORTHEAST_2) .build(); } | cs |
3.QueueMessagingTemplate 빈을 생성한다.
1 2 3 4 5 6 7 8 9 10 11 12 13 | import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.AmazonSQSAsync; import io.awspring.cloud.messaging.core.QueueMessagingTemplate; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class SqsConfig { @Bean public QueueMessagingTemplate queueMessagingTemplate(AmazonSQS amazonSQS) { return new QueueMessagingTemplate((AmazonSQSAsync) amazonSQS); } } | cs |
4.Service 클래스를 하나 만들고 QueueMessagingTemplate 빈 의존성을 주입한다.
1 2 3 4 5 6 7 8 | @Service public class SqsSendingService { private final QueueMessagingTemplate queueMessagingTemplate; public SqsSendingService(QueueMessagingTemplate queueMessagingTemplate) { this.queueMessagingTemplate = queueMessagingTemplate; } } | cs |
5.Queue 이름과 메시지를 매개변수로 받아 SQS로 보내는 메서드를 아래와 같이 작성한다.
1 2 3 4 | public void sendMessage(String queueName, String message) { final Message<String> newMessage = MessageBuilder.withPayload(message).build(); queueMessagingTemplate.send(queueName, newMessage); } | cs |
여기서 Queue 이름만 넣어서 전송하면 메시지는 당연하게도(?) credential 설정으로 인증된 AWS Account의 SQS에서 같은 이름의 Queue를 찾아간다.
만약 Queue가 없거나 하면 Error가 발생하고, 다른 AWS Account에서 생성된 SQS Queue로 보내고자 한다면 Queue의 Full URL을 넣어서 전송해야 한다. 서울 리전이라면 SQS의 Full URL은 아래의 형식으로 되어 있을 것이다.
https://sqs.ap-northeast2.amazonaws.com/{aws account 번호}/{Queue 대기열 이름}
Queue의 엑세스 정책에도 특정한 타 AWS Account에서도 접근이 가능하게 설정이 되어 있어야 한다. Controller나 다른 요소에서 위 sendMessage 메서드를 호출하면 메시지 전송이 된다.
댓글 없음:
댓글 쓰기