반응형

새로운 회사로 오자마자, 세미나를 하라그러길래 (....)

이것저것 문서를 만들고, 데모 샘플을 만들기 위해 Spring Boot AMQP를 써보던중 삽질했던 부분이 몇 가지 있어서 정리하게 됨.


데모 샘플의 목표는 RabbitMQ 공식 홈에 있는 Tutorial 중 Worker, Publish/Subscribe에 대한 구현이었다.


Java소스를 제공은 하지만, Spring Boot가 어마어마하게 숨겨놓아서..

삽질이 필요하게 됨.


Spring Boot AMQP가 기본적으로 해주는건, RabbitTemplate에 대해서 기본설정을 해주기 때문에,

그냥 @Autowired만 붙여서 끄집어와서 사용만 하면된다.


하지만, Host가 localhost 기본으로 잡혀있기 때문에, remote 환경에 있는 경우에는 변경을 해주어야 할 것이고,

Connection관련된 세부적인 튜닝도 해주어야 할 것이다.


Host, Port등 바꾸는것은 매우 간단하다!

application.properties에서 Ctrl + Space눌러가며 입맛에 맞는 설정을 하면 되기 때문이다. sts기준..

intelliJ는 지원안할지도 모르겠다. 그래서 정리해봄.


spring.rabbitmq.host=192.168.33.10       // host (어차피 가상머신이라서 그냥 ip공개)
spring.rabbitmq.port=5672              // RabbitMQ 기본 포트 (굳이 안적어도 될듯 하지만 혹시나 실패 떨어지는걸 보기싫어서...)
spring.rabbitmq.username=jss              // localhost접근이 아니라서, 인증 정보가 필요하더군. admin페이지와 마찬가지
spring.rabbitmq.password=1234            // 비번도 유저네임과 같은 이유

저기 없는 값들은, 공식홈 가면 정말 자세하게 나와있음. 영어라는게 문제..


Host랑 인증정보는 성공적으로 커스터마이징 했다.

이제 커넥션 관련된 여러가지 옵션들을 튜닝을 해줘야 하는데.

org.springframework.amqp.rabbit.listener패키지에 있는 SimpleMessageListenerContainer가 옵션을 오버라이드 가능하도록 지원을 해준다.

그냥 아래처럼 Bean으로 만들어서 등록해버리면, 옵션 값 지정이 가능하다. 

RabbitMQ 공식홈에서 Java Client받아서 해본사람들은 알겠지만, Spring Boot AMQP가 설정까지도 여러가지 바꿔놨다.

비권장 설정들을 빼고, 좀 더 편하게 바꿔놓은 듯 하니 믿고 사용하자.


@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames("testQueue");
    container.setMessageListener(listenerAdapter);
    container.setMaxConcurrentConsumers(1);      // 최대 컨슈머 수 인데.. 어떤 기준인지 모르겠다.
    container.setReceiveTimeout(3000L);         // 메시지 받을 때 타임아웃 값 (ms)
    container.setRecoveryInterval(3000L);        // 연결이 끊어졌을 시 Recover 시도를 어느 주기로 할지에 대한 term (ms)
    return container;
}


여기까지 지정해주고, 바로 rabbitTemplate가지고 convertAndSend를 날리면,

지정한 queue이름에 해당하는 queue가 없다고 뭐라뭐라 에러가 떨어질 것이다.


이 문제는, Queue를 Bean으로 등록해주면 해결이 된다.


@Bean
Queue queue() {
     return new Queue("testQueue", false);      // 두번째 인자값은 디스크에 저장할지, Ram에 저장할지에 대한 flag (false : ram) 
}


위의 설정으로 인해 Queue가 존재하지 않으면, 알아서 생성을 하도록 구성이 되어, 정상적으로 Send가 될 수 있는 환경이 만들어졌다!


받는 부분은 RPC형식으로 구현을 하는 사람들은, rabbitTemplate으로 recv를 하면 되지만,

보통 Queue에 메시지가 있는지 없는지 체크하면서, 가져와서 뭔가를 하는 행위가 많기 때문에

MessageListenerAdapter라는 리스너를 지원한다. (org.springframework.amqp.rabbit.listener 패키지에 있음.)


@Bean MessageListenerAdapter listenerAdapter(Receiver receiver) { return new MessageListenerAdapter(receiver, "receiveMessage"); }



그냥 복붙하면, Receiver클래스가 없다고 나올 것인데, 만들어주면 되고

Bean생성 시 인자값에는 방금 만든 POJO를 넣어주면 되고,

MessageListenerAdapter생성자의 두번째 인자값에 있는 method를 만들어주고, 

Queue로부터, 메시지가 도착했을 경우에 할 일을 구현해주면 된다.


여기까지 구현하고, 받는 부분 2개이상 띄우고

보내는 부분으로 메시지를 쭉 보내보면, 보내는 부분에서 bind되어 있는 채널들에다가 나눠서 뿌려주게 되고,

받는 부분에서는 그에따라 나눠서 받을 수 있게 된다. 이렇게 쓸 일이 있을지는 잘 모르겠다.. -_-;;

(Worker 구현 완료)


Pub/Sub 패턴 같은 경우에는 비교적 간단한 Worker와는 다르게 Fanout Exchange라는 것을 추가로 이용해야 한다.


일단 받는 쪽에서는 고칠 게 Queue이름밖에 없다. (Queue가 여러개 있을 때 제대로 BroadCasting을 하는지 보기위해)


보내는 쪽에서는 FanoutExchange를 등록을 해주어야 하는데, FanoutExchange를 Bean으로 만들어주고, 

만들어진 Bean을 Binding해주는 작업을 해주면 된다. 

테스트를 위해 2개의 Queue를 바인딩한다.


@Bean
FanoutExchange exchange() {
      return new FanoutExchange("test-exchange");
}

@Bean
Binding binding() {  
      return BindingBuilder.bind(queue1()).to(exchange());
}

@Bean
Binding binding_2() {
      return BindingBuilder.bind(queue2()).to(exchange());
}



그리고 가장 중요한.. rabbitTemplate에서 setExchange("test-exchange")를 해줘야 정상적으로 동작을 한다.


서로 다른 Queue를 바라보고 있는 Receiver Application을 띄우고, (보내는 쪽에서 exchange로 바인딩 된 Queue여야 한다.)

메시지를 보내보면, 메시지가 각 Queue에 BroadCasting되는 것을 확인할 수 있다.

반응형
,