티스토리 뷰

저번 글에서는 Spring Boot와 Spring Cloud AWS 라이브러리를 이용하여 AWS SQS에 대해 간단히 알아보았습니다.

이번에는 Spring Cloud AWS SQS에서 제공해주는 두 가지 내용들에 대해 주의할 점에 대해 알아보겠습니다.

 

살펴볼 내용은 다음과 같습니다.

1. MappingJackson2MessageConverter

2. ThreadPoolTaskExecutor (@SqsListener Multiple 구성 시 주의할 점)

 

 

1. MappingJackson2MessageConverter

Spring Cloud AWS SQS의 코어 기능들은 Spring Cloud AWS Messaging 라이브러리에 포함되어 있으며

Spring Cloud AWS Messaging 라이브러리 내부에는 org.springframework.messaging이 있습니다.

 

messaging 패키지 내부에는 MappingJackson2MessageConverter라는 이름의 클래스가 있습니다.

이 클래스는 org.springframework.http 패키지에 있는 MappingJackson2HttpMessageConverter와 동일한 역할을 합니다.

 

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
@SpringBootApplication
public class DemoApplication {
    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }
 
    @Bean
    public ApplicationRunner applicationRunner(
        AmazonSQSAsync amazonSQSAsync,
        @Value("${cloud.aws.sqs.queue-name}"String queueName
    ) {
        return args -> {
            String payload = """
                    {"name":"kimjonghyun", "age":30}
                    """
                    .trim();
 
            Message<String> message = MessageBuilder
                    .withPayload(payload)
                    .setHeader(MessageHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
                    .build();
            QueueMessagingTemplate queueMessagingTemplate = new QueueMessagingTemplate(amazonSQSAsync);
            queueMessagingTemplate.send(queueName, message);
        };
    }
 
    @SqsListener(value = "${cloud.aws.sqs.queue-name}", deletionPolicy = SqsMessageDeletionPolicy.ALWAYS)
    public void receive(Member member) {
        System.out.println("name : " + member.getName());
        System.out.println("age : " + member.getAge());
    }
}
 
 
import lombok.Getter;
 
@Getter
public class Member {
    private final String name;
 
    private final int age;
 
    public Member(String name, int age) {
        this.name = name;
        this.age = age;
    }
}
cs

여기서 한가지 말씀드리자면 JSON 포맷의 문자열을 전송할 때 header에 contentType을 application/json으로 세팅을 해줘야 하는데 이때 header 이름이 "Content-Type"이 아니라 "contentType" 으로 전송해야 합니다.

 

 

header를 체크할 때 MessageHeaders 클래스에 있는 CONTENT_TYPE이라는 상수로 체크하기 때문입니다.

 

 

JSON 데이터가 정상적으로 Java 객체로 변환되었습니다.

그런데 header에 contentType을 세팅하지 않고도 변환할 방법은 있습니다.

MappingJackson2MessageConverter를 아닌 직접 Bean으로 만들어서 사용하시면 됩니다.

 

 

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Configuration
@RequiredArgsConstructor
public class AmazonSQSConfiguration {
    private final ObjectMapper objectMapper;
 
    @Bean
    public AmazonSQSAsync amazonSQSAsync() {
        return AmazonSQSAsyncClientBuilder.standard()
                .withRegion(Regions.AP_NORTHEAST_2)
                .withCredentials(DefaultAWSCredentialsProviderChain.getInstance())
                .build();
    }
 
    @Bean
    public QueueMessageHandler queueMessageHandler() {
        QueueMessageHandlerFactory queueMessageHandlerFactory = new QueueMessageHandlerFactory();
        queueMessageHandlerFactory.setAmazonSqs(amazonSQSAsync());
        queueMessageHandlerFactory.setMessageConverters(getMessageConverters());
 
        return queueMessageHandlerFactory.createQueueMessageHandler();
    }
 
    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer() {
        SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
        simpleMessageListenerContainer.setAmazonSqs(amazonSQSAsync());
        simpleMessageListenerContainer.setMessageHandler(queueMessageHandler());
 
        return simpleMessageListenerContainer;
    }
 
    private List<MessageConverter> getMessageConverters() {
        List<MessageConverter> messageConverters = new ArrayList<>();
 
        MappingJackson2MessageConverter mappingJackson2MessageConverter = new MappingJackson2MessageConverter();
        mappingJackson2MessageConverter.setObjectMapper(this.objectMapper);
        messageConverters.add(mappingJackson2MessageConverter);
 
        return messageConverters;
    }
}
cs

 

setStrictContentTypeMatch 메서드에 false를 세팅해주면 contentType이 없어도 메세지를 변환하도록 해줍니다.

기본값은 false이며 생략해도 되며 Auto Configuration에서 제공되는 건 해당 필드가 true로 되어있습니다.

 

참고로 SimpleMessageListenerContainer 타입의 Bean이 존재하면 SQS Auto Configuration은 작동하지 않습니다.

 

 

2. ThreadPoolTaskExecutor (@SqsListener Multiple 구성 시 주의할 점)

SQS 큐는 큐마다 별도의 쓰레드풀이 생성되는 걸 확인하실 수 있습니다.

애플리케이션에 선언된 SQS Listener의 갯수가 증가할 때 마다 corePoolSzie는 그 갯수의 2배만큼 증가합니다.

DEFAULT_WORKER_THREADS의 값은 2입니다.

 

QueueCapacity를 0으로 해둔 이유는 SQS 큐에 있는 메세지를 오랫동안 보관하지 않고 빨리빨리 처리하기 위해 0으로 해둔 것 같습니다.

 

그런데 corePoolSize를 큐마다 2개씩 해두는 이유는 하나의 SQS 큐에 메세지가 쌓였을 때 동시에 처리하게끔 구성한 것 같습니다. 하지만 상황에 따라 수치를 조정할 필요는 있어 보이지만 왜 2개씩 해뒀는지는 밑에서 알아보도록 하겠습니다.

 

 

@SqsListener에 메세지를 처리할 SQS 큐 이름을 선언할 수 있고 String 배열로 여러 개의 큐 이름을 넣어줄 수 있습니다. 그리고 header에 lookupDestination 값으로 현재 어떤 큐가 처리하는지 큐 이름을 받아올 수 있습니다.

 

 

정상적으로 두 개의 SQS 큐가 메세지를 잘 받아와서 처리한 것 같습니다.

하지만 @SqsListener를 여러 개 사용할 때 주의 할 사항이 있습니다.

 

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Configuration
@RequiredArgsConstructor
public class AmazonSQSConfiguration {
    .....
    .....
 
    @Bean    
    public SimpleMessageListenerContainer simpleMessageListenerContainer() {
        SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
        simpleMessageListenerContainer.setAmazonSqs(amazonSQSAsync());
        simpleMessageListenerContainer.setMessageHandler(queueMessageHandler());
 
        simpleMessageListenerContainer.setTaskExecutor(createDefaultTaskExecutor());
        
        return simpleMessageListenerContainer;
    }
 
    private AsyncTaskExecutor createDefaultTaskExecutor() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setCorePoolSize(2);
        threadPoolTaskExecutor.initialize();
 
        return threadPoolTaskExecutor;
    }
}
cs

만약에 ThreadPoolTaskExecutor를 커스텀 하실 때 corePoolSize를 적게 설정해주실 경우 SqsListener가 정상적으로 작동하지 않을 수 있습니다. 일단 corePoolSize를 2개로 해두고 테스트 해보겠습니다.

 

 

일단 값이 출력이 안된 걸 보니 SqsListener가 작동을 안 한 것 같습니다.

제가 맨 처음 이 현상을 봤을 때는 "SQS Listener가 2대고 corePoolSize도 2개로 지정해두었으니 각 큐에 메세지를 1개씩 쌓아두면 하나씩 처리할 거로 생각했습니다."

 

 

 

그래서 코드를 한번 살펴보기로 했습니다. 일단 SQS 갯수만큼 for 문을 돌려서 AsynchronousMessageListener의 run 메서드를 실행합니다.

 

 

run 메서드 내부에서도 또 동일한 쓰레드풀의 쓰레드를 가져다 사용하는 코드가 있습니다. 데드락에 걸려서 작업이 안된 것 같습니다.  

 

 

corePoolSize를 2 -> 3개로 올려서 다시 서버를 구동해보니 이제야 정상적으로 작동합니다.

아까 위에서 ThreadPoolTaskExecutor의 기본설정을 보면 SQS 큐 1개당 2개의 쓰레드풀을 할당하는데

이유는 바로 데드락 때문인 것 같습니다.

 

보통 @SqsListener를 하나의 서버에 N개이상 두는 것보다 N대의 서버에 1개씩 두는 케이스가 많으니 별문제는 없을 것 같습니다. 근데 하나의 서버에 N개 이상 두어야 할 때는 이 점을 참고하면 좋겠습니다.

728x90
댓글
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
TAG
more
«   2024/04   »
1 2 3 4 5 6
7 8 9 10 11 12 13
14 15 16 17 18 19 20
21 22 23 24 25 26 27
28 29 30
글 보관함