博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RabbitMQ学习小结(五)—— Topics[Python]
阅读量:3514 次
发布时间:2019-05-20

本文共 3554 字,大约阅读时间需要 11 分钟。

1. 简介

上节我们提到直连交换机,并基于我们的日志系统使用直连交换机,而本节我们使用rabbitmq内置的另外一种交换机——主题交换机。

所谓主题交换机是什么呢?通俗点来讲就是直连交换机的升级版,我们可以认为直连交换机就是她其中的一种形式。
直连交换机需要通过绑定(bindings)和队列建立起关系,而其主要通过的就是routing_key,并且routing_key是要与生产者提供的routing_key完全匹配才能得到消息。主题交换机对其进行了升级,升级的方式就是通过两个符号"#""*"以及对routing_key的长度限制和格式限制,完成了模糊匹配,从而达到消费者可以更灵活的匹配要处理的消息,下面再具体针对实例来进行主题交换机的学习。

2. 主题交换机

发送到主题交换机(topic exchange)的消息携带的路由键(routing_key)必须是一个由.分隔开的词语列表。这些单词随便是什么都可以,但是最好是跟携带它们的消息有关系的词汇。以下是几个推荐的例子:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"。词语的个数可以随意,但是不要超过255字节。

绑定键也必须拥有同样的格式。主题交换机背后的逻辑跟直连交换机很相似 —— 一个携带着特定路由键的消息会被主题交换机投递给绑定键与之想匹配的队列。但是它的绑定键和路由键有两个特殊应用方式:

"*" (星号) 用来表示一个单词。

"#" (井号) 用来表示任意数量(零个或多个)单词。

大家可以根据下图,猜测哪些路由键会被发送到相应的队列,就是类似于正则的匹配,但是比正则简单太多了,有兴趣的可以到官网查看完整内容,我就不再阐述了。

主题交换机

当一个队列的绑定键为 "#"(井号) 的时候,这个队列将会无视消息的路由键,接收所有的消息。
当 * (星号) 和 # (井号) 这两个特殊字符都未在绑定键中出现的时候,此时主题交换机就拥有的直连交换机的行为。
所以主题交换机也就实现了扇形交换机的功能,和直连交换机的功能

总结

我们基于整个流程进行一下总结

生产者

创建连接
创建交换机(主题交换机)
发送消息到指定交换机,并将消息中指定routing_key(注意key的规则)

消费者

创建连接
创建交换机(主题交换机)
创建匿名队列(每个消费者都要创建的独立的队列,用于接收路由后消息)
创建绑定(通过路由键绑定交换机队列关系,确定队列处理交换机中的哪些消息,注意路由键的正则规则)
在指定交换机处理消息

代码整合

emit_logs_topic.py
#!/usr/bin/env python# -*- coding: utf-8 -*-# @Date    : 2016-02-28 21:28:17# @Author  : mx (mx472756841@gmail.com)# @Link    : http://www.shujutiyu.com/# @Version : $Id$import osimport pikaimport sysconn = Nonerouting_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'message = ' '.join(sys.argv[2:]) or 'Hello World!'try:    # 获取连接    conn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))    # 获取通道    channel = conn.channel()    # 创建直连交换机    channel.exchange_declare(exchange='topic_logs',                             type='topic')    # 在RabbitMQ中发送消息,指定交换机(exchange),指定routing    ret = channel.basic_publish(exchange='topic_logs',                                routing_key=routing_key,                                body=message,)    print " [x] Sent '{0}'".format(message)    print retexcept Exception, e:    raise efinally:    if conn:        conn.close()
recv_logs_topic.py
#!/usr/bin/env python# -*- coding: utf-8 -*-# @Date    : 2016-02-29 16:30:21# @Author  : mx (mx472756841@gmail.com)# @Link    : http://www.shujutiyu.com/# @Version : $Id$import pikaimport sysconn = Nonebinding_keys = sys.argv[1:]if not binding_keys:    print >> sys.stderr, "Usage: %s [binding_key]..." % (sys.argv[0],)    sys.exit(1)def callback(ch, method, properties, body):    """        @ch: channel 通道,是由外部调用时上送        out body        读取队列内容做不同的操作    """    print " [x] method.routing_key {0}".format(method.routing_key)    print " [x] Done %r" % (body, )try:    # get connection    conn = pika.BlockingConnection(pika.ConnectionParameters(        'localhost')    )    # get channel    channel = conn.channel()    # 声明直连交换机    channel.exchange_declare(exchange='topic_logs',                              type='topic')    # 声明临时队列 , param exclusive 互斥    tmp_queue = channel.queue_declare(exclusive=True)    queue_name = tmp_queue.method.queue    # 根据输入routing_key绑定交换机与队列    for binding_key in binding_keys:        channel.queue_bind( exchange='topic_logs',                             queue=queue_name,                            routing_key=binding_key )    # 在队列中读取信息    channel.basic_consume(callback, queue=queue_name, no_ack=True)    print ' [*] Waiting for messages. To exit press CTRL+C'    channel.start_consuming()except Exception, e:    raise efinally:    if conn:        conn.close()

疑问

  • 绑定键为 * 的队列会取到一个路由键为空的消息吗?
  • 绑定键为 #.* 的队列会获取到一个名为..的路由键的消息吗?它会取到一个路由键为单个单词的消息吗?
  • a.*.# 和 a.#的区别在哪儿?
  • 之前提到了队列、消息、交换机可以持久化,有兴趣的不妨试一下

3. 参考资料

官网资料:
中文资料:
你可能感兴趣的文章
springboot+mybatis实现分页
查看>>
leetcode332. 重新安排行程
查看>>
为什么局域网网段不同不能通信?
查看>>
itchat微信助手,kaggle 电影数据集分析,基于内容的电影推荐
查看>>
认识和使用JWT
查看>>
通过springboot框架,自己动手实现oauth2.0授权码模式认证
查看>>
条件表达式于运算符的点点滴滴的积累
查看>>
最短路径最基本的三种算法【此后无良辰】
查看>>
class的点点滴滴的总结
查看>>
vector 的点点滴滴的总结
查看>>
测试用例
查看>>
自动化测试学习步骤
查看>>
自动化测试需要掌握的知识
查看>>
HTTP协议
查看>>
Python问题总结01
查看>>
Python小程序——冒泡排序
查看>>
cmd中输入net start mysql 提示:服务名无效或者MySQL正在启动 MySQL无法启动
查看>>
LeetCode 206反转链表 [javsScript]
查看>>
[LeetCode javaScript] 3. 无重复字符的最长子串
查看>>
[LeetCode javaScript] 6. Z字形变换
查看>>