티스토리 뷰

Spring Boot와 AWS SQS를 활용한 간단한 애플리케이션을 만들고자 합니다. 

 

제목에 Spring Boot 2.4.x 이상 환경이라고 말씀드린 이유는 여기에서 확인해보실 수 있습니다. 대략 내용을 말씀드리자면 Spring Cloud AWS 2.3 버전이 출시되었는데 Spring Boot 2.4.x 이상 버전과 함께 사용할 수 있다고 합니다.

 

기존에 Spring Cloud AWS 관련 라이브러리를 사용할 때 org.springframework.cloud.aws 라이브러리를 사용했지만

지금은 업데이트가 중단되었습니다.

 

Spring Boot 2.4.x 이상 환경에서 호환되는 Spring Cloud AWS 2.3에서는 org.springframework.cloud.aws 패키지에 있던 모든 클래스들이 io.awspring.cloud로 이전되어 꾸준히 업데이트 되고있습니다.

 

여기를 확인해보시면 Spring Cloud AWS 버전과 호환되는 Spring Cloud, Spring Boot 버전을 확인하실 수 있으며

이제 Spring Boot 예제코드와 AWS SQS 세팅해보겠습니다. 순서대로 따라 하시면 문제없이 진행하실 수 있습니다.

 

AWS IAM 사용자 생성 및 키 발급

IAM 사용자를 생성해줍니다.

 

 

AmazonSQSFullAccess 권한을 적용해줍니다.

 

 

사용자 상세 페이지에 들어가셔서 액세스 키 발급 버튼을 누르면 위와 같은 화면이 나옵니다.

5번째 항목을 선택 후 다음으로 넘어갑니다.

 

 

AccessKey와 SecretKey를 복사해둡니다. 이 값은 절대 외부에 노출이 되면 안 되니 관리를 잘하셔야 합니다.

 

 

 

AWS SQS 생성

 

대기열에 메세지를 보낼 큐를 생성합니다. 유형은 '표준'으로 지정하고 이름은 'TEST_QUEUE'로 지정해두고

구성정보는 기본값으로 생성합니다.  구성에 대한 각 정보는 다음과 같습니다.

 

표시 제한 시간

listener에서 수신한 메세지를 다른 listener에서는 중복으로 수신할 수 없도록 지정하는 시간입니다.

예를 들어 30초일 경우 A listener에서 메세지를 처리하는 시간이 30초가 넘어가 버리면 B listener에서도 동일한 메세지를 중복으로 수신할 수 있습니다.

따라서 SQS Listener를 이용해 메세지를 처리할 경우 처리시간을 고려해서 시간을 조정하시면 될 것 같습니다.

 

전송 지연

대기열에 메세지를 보내는데 지연시킬 시간입니다.

예를들어 listener가 많은 메세지를 처리하고 있는데 추가시간이 필요할 경우 이 시간을 지연시킬 수 있습니다.

 

메세지 수신 대기 시간

메세지를 수신할 수 있을 때까지의 대기시간을 말합니다.

대기열에 메세지를 보내고 바로 수신하는 게 아니라 지정된 시간 이후에 수신해서 처리합니다.

 

메세지 보존 기간

기본적으로 대기열의 메세지는 4일간 보관됩니다. 만약 4일동안 메세지가 처리되지 않는다면 자동으로 삭제됩니다.

 

최대 메세지 크기

대기열 메세지의 최대 크기는 256KB입니다. 이 크기보다 더 큰 메세지를 전송할 때는 Amazon SQS Extended Client Library를 이용하면 됩니다. (참고링크)

 

 

Spring Boot 예제코드

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
plugins {
    id 'org.springframework.boot' version '2.6.8'
    id 'io.spring.dependency-management' version '1.0.11.RELEASE'
    id 'java'
}
...
...
dependencyManagement {
    imports {
        mavenBom("io.awspring.cloud:spring-cloud-aws-dependencies:2.3.3")
    }
}
 
dependencies {
    implementation "io.awspring.cloud:spring-cloud-starter-aws-messaging"
}
cs

저는 Spring Boot 2.6.8 버전, io.awspring.cloud 2.3.3 버전을 사용하고 있습니다. 실무에서도 동일한 버전을 사용하고 있고 정상적으로 서비스를 운영하고 있습니다. Spring Boot 2.4.x 버전과 Spring Boot 2.6.8은 Spring Framework 5.3.x이랑 호환이 되고 있기 때문에 버전과 관련된 문제는 없습니다. 위 사진처럼 라이브러리를 추가해줍니다.

 

 

 

1
2
3
4
5
6
7
8
9
cloud:
  aws:
    region:
      static: ap-northeast-2
    credentials:
      access-key: 발급된 AccessKey
      secret-key: 발급된 SecretKey
    sqs:
      queue-name: TEST_QUEUE
cs

application.yml 파일에는 다음과 같이 설정해줍니다.

 

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@SpringBootApplication
public class DemoApplication {
    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }
 
    @Bean
    public ApplicationRunner applicationRunner(
        AmazonSQSAsync amazonSQSAsync,
        @Value("${cloud.aws.sqs.queue-name}"String queueName
    ) {
        return args -> {
            String payload = "Hello, World!";
 
            QueueMessagingTemplate queueMessagingTemplate = new QueueMessagingTemplate(amazonSQSAsync);
            queueMessagingTemplate.convertAndSend(queueName, payload);
        };
    }
 
    @SqsListener(value = "${cloud.aws.sqs.queue-name}", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
    public void receive(String payload) {
        System.out.println("payload : " + payload);
    }
}
cs

간단하게 Hello, World 라는 메세지를 대기열에 넣고 수신 측에서 받은 메세지를 출력해보겠습니다.

 

 

메세지가 정상적으로 수신되었습니다.

 

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@SpringBootApplication
public class DemoApplication {
    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }
 
    @Bean
    public ApplicationRunner applicationRunner(
        AmazonSQSAsync amazonSQSAsync,
        @Value("${cloud.aws.sqs.queue-name}"String queueName
    ) {
        return args -> {
            Member payload = new Member();
            payload.setName("김종현");
            payload.setAge(30);
 
            Map<String, Object> header = Map.of(
                "header1""value1",
                "header2""value2"
            );
 
            QueueMessagingTemplate queueMessagingTemplate = new QueueMessagingTemplate(amazonSQSAsync);
            queueMessagingTemplate.convertAndSend(queueName, payload, header);
        };
    }
 
    @SqsListener(value = "${cloud.aws.sqs.queue-name}", deletionPolicy = SqsMessageDeletionPolicy.ALWAYS)
    public void receive(@Payload Member member, @Headers Map<String, Object> header, @Header("header1"String header1) {
        System.out.println("payload : " + member);
 
        header.forEach((k, v) -> System.out.println("header name : " + k  + ", header value : " + v));
        System.out.println("header1 : " + header1);
    }
}
cs

메세지에 Java 객체를 넘겨줄 수 있고 header 정보도 담아서 넘겨줄 수 있습니다.

이때 receive 쪽에서는 @Payload (생략가능)을 이용하여 대기열 큐에 있는 메세지 본문의 값을 매핑시켜 줍니다.

그리고 header 정보를 모두 얻어오는 @Headers,  특정 header의 값을 얻어올 수 있는 @Header을 제공해줍니다.

 

 

Java 객체를 payload에 담아서 전송할 때는 JSON 포맷의 문자열로 직렬화해서 전달됩니다.

 

 

그리고 위 코드를 실행했을 때 출력되는 값들입니다. 메세지 본문과 헤더의 정보가 잘 넘어온 걸 확인하실 수 있습니다.

 

 

 

AWS SQS 메세지 삭제 정책

공식문서에도 나와 있지만 기본적으로 SQS에서는 대기열에 있는 메세지를 자동으로 삭제하지 않습니다.

메세지를 삭제하려면 소비자가 메세지를 수신하고 처리한 후 대기열에서 삭제해야 하는데요.

Spring Cloud AWS의 @SqsListener에는 4가지의 메세지 삭제 정책을 제공해줍니다.

 

 

ALWAYS

listener 메서드에서 메세지를 처리하는 동안 문제없이 처리하거나 혹은 예외가 발생해도 무조건 메세지를 삭제합니다.

 

NEVER

listener 메서드에서 메세지를 처리하는 동안 문제없이 처리하거나 혹은 예외가 발생하면 메세지를 삭제하지 않습니다.

단, 이 정책을 사용할 땐 메서드 매개변수에 Acknowledgment 타입의 변수를 받아서 수동으로 삭제해줘야 합니다.

 

NO_REDRIVE

이 정책은 DLQ가 정의되어 있지 않으면 메세지를 삭제합니다.

DLQ에 대해 간단히 설명해 드리면 해당 listener 메서드에서 메세지를 정상적으로 처리하지 못할 경우 메세지가 DLQ로 전달됩니다. 애플리케이션에서 메세지를 제대로 처리하지 못했을 때 디버깅 목적으로 활용되는 큐입니다.

 

메세지를 제대로 처리하지 못했을 경우 사용된다고 생각하시면 됩니다.

기본적으로 listener 메서드에서 메세지 처리에 실패할 경우 10번 재시도를 하게됩니다.  (참고링크)

만약 10번 이내에 처리가 정상적으로 되지 않는다면 DLQ로 이동됩니다.

 

한가지 고려하셔야 할 점은 '표준' 큐에서 DLQ를 정의할 때 해당 DLQ도 '표준' 큐여야 합니다. FIFO 큐도 마찬가지입니다.

DLQ에 동작 원리는 여기를 참고하시면 됩니다.

 

ON_SUCCESS

listener 메서드에서 메세지를 정상적으로 처리했을 경우에만 메세지를 삭제합니다.

 

 

AWS SQS를 사용할 경우 메세지는 멱등성을 보장하도록 처리하기

SQS의 아키텍처는 분산 대기열 구조입니다. 즉, 대기열에 메세지가 전송되면 메세지와 그 메세지의 사본을 여러 대기열에 분산해서 저장하게 되는데요. 이때 위 설명에서 나온 문제가 발생할 수 있습니다.

그래서 애플리케이션이 멱등성이 보장되도록 설계해야 합니다.

 

 

728x90
댓글
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
TAG
more
«   2024/04   »
1 2 3 4 5 6
7 8 9 10 11 12 13
14 15 16 17 18 19 20
21 22 23 24 25 26 27
28 29 30
글 보관함