RabbitMQ는 AMQP (Advanced Message Queuing Protocol) 기반의 메시지 브로커로, 다양한 메시지 큐 기능을 제공합니다. Python에서 RabbitMQ를 사용하는 일반적인 방법은 pika
라이브러리를 사용하는 것입니다. 이를 사용하면 메시지 발행(Publish), 큐 소비(Consume), 교환(Exchange), 라우팅(Routing), 바인딩(Binding) 등을 수행할 수 있다.
다음은 RabbitMQ의 다양한 기능을 pika
라이브러리를 통해 사용하는 방법에 대한 설명과 예제
1. pika
라이브러리 설치
먼저 pika
를 설치해야 함:
pip install pika
2. RabbitMQ 연결 설정 및 채널 생성
RabbitMQ에 연결하고 채널을 생성하는 기본 방법입니다. localhost로 현재 로컬에 서버를 열었고,
http://localhost:15672/ 을 입력하면 접속할 수 있음. id/pw 는 둘다 guest
import pika
def connect_to_rabbitmq():
# localhost로 현재 로컬에 서버를 염
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
return connection, channel
3. 기본 큐 생성 및 메시지 전송
메시지를 큐에 발행(Publish) 하고, 큐를 생성하여 메시지를 수신하는 기본적인 코드
메시지 발행(Publisher)
def publish_message(channel, queue_name, message):
# 큐 선언 (큐가 없는 경우 생성됨)
channel.queue_declare(queue=queue_name)
# 메시지 발행 exchange=''는 RabbitMQ에서 **기본 교환기(default exchange)**를 사용
# 기본 교환기는 명시적으로 선언할 필요가 없고, 큐를 교환기에 바인딩할 필요도 없습니다. 큐 이름이 라우팅 키와 일치하기만 하면 메시지가 해당 큐로 라우팅됨
channel.basic_publish(exchange='',
routing_key=queue_name,
body=message)
print(f"Sent '{message}' to queue '{queue_name}'")
메시지 소비(Consumer)
def callback(ch, method, properties, body):
print(f"Received {body}")
def consume_message(channel, queue_name):
# 큐 선언 (큐가 없는 경우 생성됨)
channel.queue_declare(queue=queue_name)
# 큐 소비 시작
channel.basic_consume(queue=queue_name,
on_message_callback=callback,
auto_ack=True)
print(f"Waiting for messages in queue '{queue_name}'. To exit press CTRL+C")
channel.start_consuming()
4. 교환(Exchange) 사용
RabbitMQ에서 메시지를 큐로 직접 보내는 대신 교환(Exchange) 을 사용하여 메시지를 여러 큐로 라우팅할 수 있음
교환 설정 및 메시지 발행
def publish_message_with_exchange(channel, exchange_name, routing_key, message):
# 교환 선언 (direct 타입 사용)
channel.exchange_declare(exchange=exchange_name, exchange_type='direct')
# 메시지 발행 (교환을 통해 발행)
channel.basic_publish(exchange=exchange_name,
routing_key=routing_key,
body=message)
print(f"Sent '{message}' to exchange '{exchange_name}' with routing key '{routing_key}'")
교환을 통한 큐 바인딩 및 메시지 소비
def consume_message_with_exchange(channel, exchange_name, queue_name, routing_key):
# 교환 선언
channel.exchange_declare(exchange=exchange_name, exchange_type='direct')
# 큐 선언, durable=True를 추가하면 큐서버가 껏다 켜져도 선언한 큐가 유지됨
channel.queue_declare(queue=queue_name,durable=True)
# 큐와 교환 바인딩
channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key=routing_key)
# 메시지 소비 시작
channel.basic_consume(queue=queue_name,
on_message_callback=callback,
auto_ack=True)
print(f"Waiting for messages in queue '{queue_name}'. To exit press CTRL+C")
channel.start_consuming()
5. 라우팅(Routing)
메시지를 특정한 큐로 라우팅하는 방식으로 라우팅 키를 사용합니다. 이를 통해 여러 큐에 메시지를 보내거나, 특정 큐에만 메시지를 보낼 수 있음
def publish_routing_message(channel, exchange_name, routing_key, message):
# 메시지를 교환과 라우팅 키를 통해 발행
channel.basic_publish(exchange=exchange_name,
routing_key=routing_key,
body=message)
print(f"Sent '{message}' with routing key '{routing_key}'")
6. 팬아웃(팬아웃 교환)
팬아웃 교환은 메시지를 모든 큐로 브로드캐스트하는 방식
팬아웃 메시지 발행
def publish_fanout_message(channel, exchange_name, message):
# 팬아웃 교환 선언
channel.exchange_declare(exchange=exchange_name, exchange_type='fanout')
# 메시지 발행
channel.basic_publish(exchange=exchange_name,
routing_key='',
body=message)
print(f"Sent '{message}' to exchange '{exchange_name}' (fanout)")
팬아웃 메시지 소비
def consume_fanout_message(channel, exchange_name, queue_name):
# 팬아웃 교환 선언
channel.exchange_declare(exchange=exchange_name, exchange_type='fanout')
# 큐 선언
channel.queue_declare(queue=queue_name)
# 큐와 팬아웃 교환 바인딩
channel.queue_bind(exchange=exchange_name, queue=queue_name)
# 메시지 소비 시작
channel.basic_consume(queue=queue_name,
on_message_callback=callback,
auto_ack=True)
print(f"Waiting for fanout messages in queue '{queue_name}'. To exit press CTRL+C")
channel.start_consuming()
7. 우선순위 큐(Priority Queue)
RabbitMQ에서 메시지의 우선순위를 설정할 수 있습니다. 우선순위가 높은 메시지가 먼저 처리됨
우선순위 큐 선언 및 메시지 발행
def declare_priority_queue(channel, queue_name):
# 우선순위 큐 선언
channel.queue_declare(queue=queue_name, arguments={'x-max-priority': 10})
def publish_priority_message(channel, queue_name, message, priority):
# 우선순위를 가진 메시지 발행
channel.basic_publish(exchange='',
routing_key=queue_name,
body=message,
properties=pika.BasicProperties(priority=priority))
print(f"Sent '{message}' with priority '{priority}' to queue '{queue_name}'")
8. 메시지 영속성 설정(Persistence)
메시지를 영속성 있게 만들어 RabbitMQ 서버가 재시작되어도 메시지가 사라지지 않도록 설정할 수 있음
영속성 메시지 발행
def publish_persistent_message(channel, queue_name, message):
# 큐 선언 (영속성 있는 큐)
channel.queue_declare(queue=queue_name, durable=True)
# 메시지 발행 (영속성 있는 메시지)
channel.basic_publish(exchange='',
routing_key=queue_name,
body=message,
properties=pika.BasicProperties(delivery_mode=2)) # 메시지 영속성 설정
print(f"Sent persistent message '{message}' to queue '{queue_name}'")
9. RabbitMQ 연결 종료
def close_connection(connection):
connection.close()
print("Connection closed")
### 정리
- **메시지 발행(Publish)**: 기본 큐나 교환을 통해 메시지를 발행할 수 있음
- **메시지 소비(Consume)**: 큐에서 메시지를 가져와 처리할 수 있음
- **교환(Exchange)**: 다양한 라우팅 전략을 통해 메시지를 큐로 전달
- **우선순위 큐(Priority Queue)**: 특정 메시지의 우선순위를 설정하여 처리
- **영속성(Persistence)**: 서버 재시작 후에도 메시지가 사라지지 않도록 영속성을 설정
### 쓰면서 느낀 팁
- 서버가 갑자기 꺼질 경우도 있으니 큐를 선언할때(queue_declare) durable=True 해놓으면 해당 큐가 유지
```python
channel.queue_declare(queue=queue_name, durable=True)
- 마찬가지 이유로 auto_ack=True가 아닌 false로 한다. 큐를 처리중에(basic_consume) 서버가 꺼지면, 서버에는 큐를 소비함과 동시에 처리된거로 전달되기 때문. 단 마지막엔 수동으로 큐 서버에 큐를 받았음을 알려줘야(ack : acknowledge)한다.
channel.basic_consume(queue=queue_name,on_message_callback=callback, auto_ack=false)
# 수동 (ack : acknowledge)
channel.basic_ack(delivery_tag=method.delivery_tag)
'사용툴 분석' 카테고리의 다른 글
Type Hinting과 Pydantic 차이 (0) | 2024.11.08 |
---|---|
피클(pkl)과 json의 차이. (2) | 2024.10.03 |
메타데이터 활용한 코딩들 (0) | 2024.10.02 |
TypedDict의 개념과 langsmith (1) | 2024.09.08 |
VScode에 공짜 로컬 GPT 연결하기(continue) (0) | 2024.08.02 |