사용툴 분석

RabbitMQ로 우선순위 큐 실제 적용해보자

필만이 2024. 10. 2. 22:07

RabbitMQ는 AMQP (Advanced Message Queuing Protocol) 기반의 메시지 브로커로, 다양한 메시지 큐 기능을 제공합니다. Python에서 RabbitMQ를 사용하는 일반적인 방법은 pika 라이브러리를 사용하는 것입니다. 이를 사용하면 메시지 발행(Publish), 큐 소비(Consume), 교환(Exchange), 라우팅(Routing), 바인딩(Binding) 등을 수행할 수 있다.

다음은 RabbitMQ의 다양한 기능을 pika 라이브러리를 통해 사용하는 방법에 대한 설명과 예제

1. pika 라이브러리 설치

먼저 pika를 설치해야 함:

pip install pika

2. RabbitMQ 연결 설정 및 채널 생성

RabbitMQ에 연결하고 채널을 생성하는 기본 방법입니다. localhost로 현재 로컬에 서버를 열었고,
http://localhost:15672/ 을 입력하면 접속할 수 있음. id/pw 는 둘다 guest

import pika

def connect_to_rabbitmq():
    # localhost로 현재 로컬에 서버를 염
    connection = pika.BlockingConnection(
        pika.ConnectionParameters('localhost')
    )
    channel = connection.channel()
    return connection, channel

3. 기본 큐 생성 및 메시지 전송

메시지를 큐에 발행(Publish) 하고, 큐를 생성하여 메시지를 수신하는 기본적인 코드

메시지 발행(Publisher)

def publish_message(channel, queue_name, message):
    # 큐 선언 (큐가 없는 경우 생성됨)
    channel.queue_declare(queue=queue_name)

    # 메시지 발행 exchange=''는 RabbitMQ에서 **기본 교환기(default exchange)**를 사용
    # 기본 교환기는 명시적으로 선언할 필요가 없고, 큐를 교환기에 바인딩할 필요도 없습니다. 큐 이름이 라우팅 키와 일치하기만 하면 메시지가 해당 큐로 라우팅됨
    channel.basic_publish(exchange='',
                          routing_key=queue_name,
                          body=message)
    print(f"Sent '{message}' to queue '{queue_name}'")

메시지 소비(Consumer)

def callback(ch, method, properties, body):
    print(f"Received {body}")

def consume_message(channel, queue_name):
    # 큐 선언 (큐가 없는 경우 생성됨)
    channel.queue_declare(queue=queue_name)

    # 큐 소비 시작
    channel.basic_consume(queue=queue_name,
                          on_message_callback=callback,
                          auto_ack=True)

    print(f"Waiting for messages in queue '{queue_name}'. To exit press CTRL+C")
    channel.start_consuming()

4. 교환(Exchange) 사용

RabbitMQ에서 메시지를 큐로 직접 보내는 대신 교환(Exchange) 을 사용하여 메시지를 여러 큐로 라우팅할 수 있음

교환 설정 및 메시지 발행

def publish_message_with_exchange(channel, exchange_name, routing_key, message):
    # 교환 선언 (direct 타입 사용)
    channel.exchange_declare(exchange=exchange_name, exchange_type='direct')

    # 메시지 발행 (교환을 통해 발행)
    channel.basic_publish(exchange=exchange_name,
                          routing_key=routing_key,
                          body=message)
    print(f"Sent '{message}' to exchange '{exchange_name}' with routing key '{routing_key}'")

교환을 통한 큐 바인딩 및 메시지 소비

def consume_message_with_exchange(channel, exchange_name, queue_name, routing_key):
    # 교환 선언
    channel.exchange_declare(exchange=exchange_name, exchange_type='direct')

    # 큐 선언, durable=True를 추가하면 큐서버가 껏다 켜져도 선언한 큐가 유지됨
    channel.queue_declare(queue=queue_name,durable=True)

    # 큐와 교환 바인딩
    channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key=routing_key)

    # 메시지 소비 시작
    channel.basic_consume(queue=queue_name,
                          on_message_callback=callback,
                          auto_ack=True)

    print(f"Waiting for messages in queue '{queue_name}'. To exit press CTRL+C")
    channel.start_consuming()

5. 라우팅(Routing)

메시지를 특정한 큐로 라우팅하는 방식으로 라우팅 키를 사용합니다. 이를 통해 여러 큐에 메시지를 보내거나, 특정 큐에만 메시지를 보낼 수 있음

def publish_routing_message(channel, exchange_name, routing_key, message):
    # 메시지를 교환과 라우팅 키를 통해 발행
    channel.basic_publish(exchange=exchange_name,
                          routing_key=routing_key,
                          body=message)
    print(f"Sent '{message}' with routing key '{routing_key}'")

6. 팬아웃(팬아웃 교환)

팬아웃 교환은 메시지를 모든 큐로 브로드캐스트하는 방식

팬아웃 메시지 발행

def publish_fanout_message(channel, exchange_name, message):
    # 팬아웃 교환 선언
    channel.exchange_declare(exchange=exchange_name, exchange_type='fanout')

    # 메시지 발행
    channel.basic_publish(exchange=exchange_name,
                          routing_key='',
                          body=message)
    print(f"Sent '{message}' to exchange '{exchange_name}' (fanout)")

팬아웃 메시지 소비

def consume_fanout_message(channel, exchange_name, queue_name):
    # 팬아웃 교환 선언
    channel.exchange_declare(exchange=exchange_name, exchange_type='fanout')

    # 큐 선언
    channel.queue_declare(queue=queue_name)

    # 큐와 팬아웃 교환 바인딩
    channel.queue_bind(exchange=exchange_name, queue=queue_name)

    # 메시지 소비 시작
    channel.basic_consume(queue=queue_name,
                          on_message_callback=callback,
                          auto_ack=True)

    print(f"Waiting for fanout messages in queue '{queue_name}'. To exit press CTRL+C")
    channel.start_consuming()

7. 우선순위 큐(Priority Queue)

RabbitMQ에서 메시지의 우선순위를 설정할 수 있습니다. 우선순위가 높은 메시지가 먼저 처리됨

우선순위 큐 선언 및 메시지 발행

def declare_priority_queue(channel, queue_name):
    # 우선순위 큐 선언
    channel.queue_declare(queue=queue_name, arguments={'x-max-priority': 10})

def publish_priority_message(channel, queue_name, message, priority):
    # 우선순위를 가진 메시지 발행
    channel.basic_publish(exchange='',
                          routing_key=queue_name,
                          body=message,
                          properties=pika.BasicProperties(priority=priority))
    print(f"Sent '{message}' with priority '{priority}' to queue '{queue_name}'")

8. 메시지 영속성 설정(Persistence)

메시지를 영속성 있게 만들어 RabbitMQ 서버가 재시작되어도 메시지가 사라지지 않도록 설정할 수 있음

영속성 메시지 발행

def publish_persistent_message(channel, queue_name, message):
    # 큐 선언 (영속성 있는 큐)
    channel.queue_declare(queue=queue_name, durable=True)

    # 메시지 발행 (영속성 있는 메시지)
    channel.basic_publish(exchange='',
                          routing_key=queue_name,
                          body=message,
                          properties=pika.BasicProperties(delivery_mode=2))  # 메시지 영속성 설정
    print(f"Sent persistent message '{message}' to queue '{queue_name}'")

9. RabbitMQ 연결 종료

def close_connection(connection):
    connection.close()
    print("Connection closed")

### 정리

- **메시지 발행(Publish)**: 기본 큐나 교환을 통해 메시지를 발행할 수 있음
- **메시지 소비(Consume)**: 큐에서 메시지를 가져와 처리할 수 있음
- **교환(Exchange)**: 다양한 라우팅 전략을 통해 메시지를 큐로 전달
- **우선순위 큐(Priority Queue)**: 특정 메시지의 우선순위를 설정하여 처리
- **영속성(Persistence)**: 서버 재시작 후에도 메시지가 사라지지 않도록 영속성을 설정

### 쓰면서 느낀 팁
- 서버가 갑자기 꺼질 경우도 있으니 큐를 선언할때(queue_declare) durable=True 해놓으면 해당 큐가 유지

```python
channel.queue_declare(queue=queue_name, durable=True)
  • 마찬가지 이유로 auto_ack=True가 아닌 false로 한다. 큐를 처리중에(basic_consume) 서버가 꺼지면, 서버에는 큐를 소비함과 동시에 처리된거로 전달되기 때문. 단 마지막엔 수동으로 큐 서버에 큐를 받았음을 알려줘야(ack : acknowledge)한다.
channel.basic_consume(queue=queue_name,on_message_callback=callback, auto_ack=false)
# 수동 (ack : acknowledge)
channel.basic_ack(delivery_tag=method.delivery_tag)