中文字幕日韩精品一区二区免费_精品一区二区三区国产精品无卡在_国精品无码专区一区二区三区_国产αv三级中文在线

RabbitMQ實(shí)踐體驗(yàn)-創(chuàng)新互聯(lián)

最近由于業(yè)務(wù)需要進(jìn)行性能升級(jí),將原來需要經(jīng)過http進(jìn)行數(shù)據(jù)交互的方式修改為消息隊(duì)列的形式。于是原來的同步處理的方式變成了異步處理,在一定程度上提升我們系統(tǒng)的性能,不過debug的時(shí)候,不免哭了出來。因?yàn)槊總€(gè)環(huán)節(jié)都需要進(jìn)行詳細(xì)檢查。
對(duì)于RabbitMQ,我們知道,其是AMQP的一種代理服服務(wù)器,具有一套嚴(yán)格的通信方式,即在核心產(chǎn)品進(jìn)行通信的各個(gè)方面幾乎都采用了RPC(Remote Procedure Call, 遠(yuǎn)程過程調(diào)用)模式。

目前成都創(chuàng)新互聯(lián)已為成百上千家的企業(yè)提供了網(wǎng)站建設(shè)、域名、雅安服務(wù)器托管、網(wǎng)站托管、企業(yè)網(wǎng)站設(shè)計(jì)、云陽網(wǎng)站維護(hù)等服務(wù),公司將堅(jiān)持客戶導(dǎo)向、應(yīng)用為本的策略,正道將秉承"和諧、參與、激情"的文化,與客戶和合作伙伴齊心協(xié)力一起成長,共同發(fā)展。

AMQ與RabbitMQ進(jìn)行交互

RabbitMQ通信時(shí)用到的類和方法與AMQP協(xié)議層面的類和方法一一對(duì)應(yīng)。因此AMQP本質(zhì)上是RPC的一種傳輸機(jī)制

高級(jí)消息隊(duì)列模型

AMQ(Advanced Message Queuing)模型,這個(gè)模型是針對(duì)代理服務(wù)器軟件例如(RabbitMQ)設(shè)計(jì)的,該模型在邏輯上定義了三種抽象組件用于指定消息的路由行為,分別是:

  • 交換器Exchange,消息代理服務(wù)器中用于把消息路由到隊(duì)列的組件
    接收/發(fā)送到RabbitMQ中的消息并決定把他們投遞到何處。
    定義消息的路由行為,通常這需要檢查消息所攜帶的數(shù)據(jù)特性或者包含在消息體內(nèi)的各種屬性
  • 隊(duì)列Queue,用來存儲(chǔ)消息的數(shù)據(jù)結(jié)構(gòu),位于硬盤或內(nèi)存中,以FIFO的順序進(jìn)行投遞
    負(fù)責(zé)存儲(chǔ)接收到的消息,同時(shí)也可能包含何如處理消息的配置信息。
  • 綁定Binding,一套規(guī)則,用于告訴交換器消息應(yīng)該被存儲(chǔ)到哪個(gè)隊(duì)列
    • 定義隊(duì)列和交換機(jī)之間的關(guān)系
    • 告知一個(gè)交換器應(yīng)該將消息投遞到哪些隊(duì)列中。對(duì)于某些交換器類型,綁定同時(shí)告知交換器如何對(duì)消息進(jìn)行過濾從而決定能夠投遞到隊(duì)列的消息
    • 當(dāng)發(fā)布一條消息到交換器時(shí),應(yīng)用程序使用路由鍵routing-key屬性。路由可以是隊(duì)列名稱,也可以是一串用于描述消息、具有特定語法的字符串。當(dāng)交換器對(duì)一條消息進(jìn)行評(píng)估以決定路由到哪些合適的隊(duì)列時(shí),消息的路由就會(huì)和綁定進(jìn)行比對(duì)。
    • 綁定是綁定隊(duì)列到交換器的粘合劑,而路由則是用于比對(duì)的標(biāo)準(zhǔn)。 RabbitMQ的靈活性來自于消息如何通過交換器路由到隊(duì)列的動(dòng)態(tài)特性,介于交換器和隊(duì)列之間的綁定,以及他們所創(chuàng)建的動(dòng)態(tài)消息路由,構(gòu)成了消息通信架構(gòu)的基本組件。為了把消息路由到合適的目標(biāo)地址,RabbitMQ所需的第一種信息就是用于控制路由的交換

python使用AMQP

在將消息發(fā)布到隊(duì)列之前,我們需要經(jīng)歷過以下若干個(gè)步驟。至少,必須要設(shè)置交換器和隊(duì)列,然后將他們綁定再一起。接下來我們將通過python來實(shí)現(xiàn)AMQP機(jī)制。
我用到了pika這個(gè)庫,需要的話,需要通過以下指令安裝。該庫實(shí)現(xiàn)了絕大部分rabbitmq的api以及提供了相關(guān)的調(diào)優(yōu)參數(shù),后續(xù)有機(jī)會(huì)不妨可以詳談。

pip install pika

1. 聲明交換器

交換器在AMQ模型中是非常重要的角色存在。因此,在AMQP規(guī)范中都有自己的類。聲明一個(gè)交換器,我們可以直接在控制臺(tái)界面進(jìn)行創(chuàng)建。
RabbitMQ實(shí)踐體驗(yàn)
不過這樣僅僅是在極少數(shù)的情況下才適合,動(dòng)手調(diào)戲鼠標(biāo)對(duì)開發(fā)工程師的來說實(shí)在是太蠢啦,能玩鍵盤就別玩鼠標(biāo)啊,我們不妨通過以下代碼來聲明(創(chuàng)建)一個(gè)交換器。pika內(nèi)置函數(shù)會(huì)事先通過get的方式來檢查我們待聲明的交換器是否存在,如果存在則不創(chuàng)建,否則創(chuàng)建一個(gè)新的交換器。

 self.channel.exchange_declare(
            exchange=exchange,
            exchange_type="direct",
            passive=False,
            durable=True,
            auto_delete=False)

2. 聲明隊(duì)列

一旦交換器創(chuàng)建成功,就可以通過發(fā)送類似queue.declare命令讓rabbitmq創(chuàng)建一個(gè)隊(duì)列。同樣的,我們?nèi)匀豢梢栽趫D形化界面里面創(chuàng)建隊(duì)列。
RabbitMQ實(shí)踐體驗(yàn)
還是那句話,動(dòng)手調(diào)戲鼠標(biāo)對(duì)開發(fā)工程師的來說實(shí)在是太蠢啦,能玩鍵盤就別玩鼠標(biāo)啊,我們不妨通過以下代碼來聲明(創(chuàng)建)若干個(gè)隊(duì)列。pika內(nèi)置函數(shù)會(huì)事先通過get的方式來檢查我們待聲明的隊(duì)列是否存在,如果存在則不創(chuàng)建,否則創(chuàng)建一個(gè)新的隊(duì)列。

self.channel.queue_declare(queue=queue, durable=True)

當(dāng)隊(duì)列同名時(shí),即如果我們多次發(fā)送同一個(gè)queue.declare命令并不會(huì)有任何副作用,因?yàn)镽abbitMQ并不會(huì)處理后續(xù)的隊(duì)列聲明,究其原因,每次創(chuàng)建都會(huì)先通過get的方式調(diào)用消息隊(duì)列引擎查詢隊(duì)列是否存在。如果需要返回隊(duì)列相關(guān)的有用信息,則將會(huì)返回隊(duì)列中待處理消息的數(shù)量以及該隊(duì)列的消費(fèi)者數(shù)量。當(dāng)然了如果隊(duì)列同名,而且新隊(duì)列的屬性與原有的隊(duì)列不一樣,那么RabbitMQ將關(guān)閉發(fā)出的RPC請(qǐng)求的信道,返回403錯(cuò)誤

3. 綁定隊(duì)列到交換器

一旦創(chuàng)建了交換器和隊(duì)列,之后就可以將它們綁定在一起了,如同queue.declare命令,將隊(duì)列綁定到交換器Queue.Bind每次只能指定一個(gè)隊(duì)列。我們既可以通過圖形化界面進(jìn)行綁定,也可以通過代碼實(shí)現(xiàn)這個(gè)效果
RabbitMQ實(shí)踐體驗(yàn)

 self.channel.queue_bind(
            queue=queue, exchange=exchange, routing_key=rk)

4. 發(fā)布消息

發(fā)布消息到RabbitMQ時(shí),多個(gè)幀封裝了發(fā)送到服務(wù)器的消息數(shù)據(jù)。在實(shí)際的消息內(nèi)容到達(dá)rabbitMQ之前,客戶端應(yīng)用程序會(huì)發(fā)送一個(gè)basic.publish方法幀、一個(gè)內(nèi)容頭幀和至少一個(gè)消息體幀。
默認(rèn)情況下,只要沒有消費(fèi)者正在監(jiān)聽隊(duì)列,消息就會(huì)被存儲(chǔ)在隊(duì)列中。當(dāng)添加更多消息時(shí),隊(duì)列大小也會(huì)隨之增加。RabbitMQ可以將這些消息保存在內(nèi)存或者寫入磁盤。

def produce(self, body):
        self.channel.basic_publish(exchange=self.exchange, routing_key=self.route_key, body=body,
                                   properties=pika.BasicProperties(content_type='text/plain', delivery_mode=1)
                                   )

5. 消費(fèi)消息

一旦發(fā)布消息被路由并且保存在一個(gè)或者多個(gè)隊(duì)列中,剩下的就是如何對(duì)其進(jìn)行消費(fèi)。注意到,發(fā)送和消費(fèi)是異步的。 消費(fèi)時(shí),可以讓RabbitMQ知道如何消費(fèi)他們
Basic.Consume命令中
no_ack為true時(shí),RabbitMQ將連續(xù)發(fā)送消息直到消費(fèi)者發(fā)送一個(gè)Basic.Cancel命令或者斷開連接為止
如果為false,則需要發(fā)送一個(gè)Basic.Ack來確認(rèn)收到每條消息的請(qǐng)求

def on_message(chan, method_frame, _header_frame, body, userdata=None):
            """Called when a message is received. Log message and ack it."""
            # LOGGER.info('Userdata: %s Message body: %s', userdata, body)
            # print(" [x] Received %r" % body.decode())
            data = body.decode()
            result = alarmFun(data)
            publish = Publish(exchange='spider', queue='alarm', rk='rk-alarm')
            publish.produce(result)
            # chan.basic_ack(delivery_tag=method_frame.delivery_tag)

on_message_callback = functools.partial(on_message)
self.channel.basic_consume(on_message_callback=on_message_callback,
                                   queue=self.queue,
                                   auto_ack=True
                                   )

基于python開發(fā)

經(jīng)過前面的描述,我們需要理論聯(lián)系實(shí)踐,讓我們通過python開發(fā)消費(fèi)者角色和發(fā)布者角色。

發(fā)布者

按照配置流程,我們需要初始化連接、配置交換器、隊(duì)列、綁定,然后才能通過連接件信息推送(publish)到隊(duì)列中。

import logging
from random import randint

import pika

BROKER_USER = os.environ.get('BROKER_USER', 'guest')
BROKER_PASSWD = os.environ.get('BROKER_PASSWD', 'guest')
BROKER_IP = os.environ.get('BROKER_IP', '127.0.0.1')
BROKER_PORT = os.environ.get('BROKER_PORT', '5672')
BROKER_VHOST = os.environ.get('BROKER_VHOST', 'my_vhost')
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
BROKER_URL = 'amqp://{}:{}@{}:{}/{}'.format(BROKER_USER, BROKER_PASSWD, BROKER_IP, BROKER_PORT, BROKER_VHOST)

# logging.basicConfig(level=logging.DEBUG)
# LOG_FORMAT = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) '
#               '-35s %(lineno) -5d: %(message)s')
# LOGGER = logging.getLogger(__name__)

class Publish(object):
    def __init__(self, exchange, queue, rk):
        # LOGGER.info('Connecting to %s', BROKER_URL)
        # logging.basicConfig(level=logging.DEBUG)
        self.credentials = pika.PlainCredentials(BROKER_USER, BROKER_PASSWD)
        # 通過這個(gè)方式設(shè)置備用鏈路,保證connection穩(wěn)定性
        self.parameters = (
            pika.ConnectionParameters(BROKER_IP, BROKER_PORT, BROKER_VHOST, self.credentials),
            pika.ConnectionParameters(BROKER_IP, BROKER_PORT, BROKER_VHOST, self.credentials, connection_attempts=5,
                                      retry_delay=1))
        self.connection = pika.BlockingConnection(self.parameters)
        self.channel = self.connection.channel()
        self.exchange = exchange
        self.channel.exchange_declare(
            exchange=exchange,
            exchange_type="direct",
            passive=False,
            durable=True,
            auto_delete=False)
        self.channel.queue_declare(queue=queue, durable=True)
        self.route_key = rk

    def produce(self, body):
        self.channel.basic_publish(exchange=self.exchange, routing_key=self.route_key, body=body,
                                   properties=pika.BasicProperties(content_type='text/plain', delivery_mode=1)
                                   )

    def close(self):
        self.connection.close()

def test():
    publish = Publish(exchange='test_yerik', queue='test_test', rk='rk-test_test')
    for i in range(1, 10000):
        publish.produce(randint(1, 100).__str__())
    publish.close()

if __name__ == '__main__':
    test()

消費(fèi)者

消費(fèi)者的設(shè)計(jì)和生產(chǎn)者在初始化的時(shí)候設(shè)計(jì)大致相同,都是通過建立連接、開啟channel、exange、queue、bind等過程,主要的區(qū)別在于commsum

import functools
import logging
import pika

BROKER_USER = os.environ.get('BROKER_USER', 'guest')
BROKER_PASSWD = os.environ.get('BROKER_PASSWD', 'guest')
BROKER_IP = os.environ.get('BROKER_IP', '127.0.0.1')
BROKER_PORT = os.environ.get('BROKER_PORT', '5672')
BROKER_VHOST = os.environ.get('BROKER_VHOST', 'my_vhost')
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
BROKER_URL = 'amqp://{}:{}@{}:{}/{}'.format(BROKER_USER, BROKER_PASSWD, BROKER_IP, BROKER_PORT, BROKER_VHOST)

# print('pika version: %s' % pika.__version__)

# logging.basicConfig(level=logging.DEBUG)
# LOG_FORMAT = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) '
#               '-35s %(lineno) -5d: %(message)s')
# LOGGER = logging.getLogger(__name__)
from apps.alarm.alarmfun import alarmFun
from apps.utils.rabbitmq.publish import Publish

class Consummer(object):
    def __init__(self, exchange, queue, rk):
        # LOGGER.info('Connecting to %s', BROKER_URL)
        self.credentials = pika.PlainCredentials(BROKER_USER, BROKER_PASSWD)
        self.parameters = (
            pika.ConnectionParameters(BROKER_IP, BROKER_PORT, BROKER_VHOST, self.credentials),
            pika.ConnectionParameters(BROKER_IP, BROKER_PORT, BROKER_VHOST, self.credentials, connection_attempts=5,
                                      retry_delay=1))
        self.connection = pika.BlockingConnection(self.parameters)

        self.channel = self.connection.channel()
        self.exchange = exchange
        self.channel.basic_qos(prefetch_count=1)
        self.exchange = exchange
        self.queue = queue
        self.channel.exchange_declare(
            exchange=exchange,
            exchange_type="direct",
            passive=False,
            durable=True,
            auto_delete=False)
        self.channel.queue_declare(queue=queue, durable=True)
        self.channel.queue_bind(
            queue=queue, exchange=exchange, routing_key=rk)
        self.channel.basic_qos(prefetch_count=1)

    def consum_message(self):
        # LOGGER.info('Comsummer by {}'.format(name))
        def on_message(chan, method_frame, _header_frame, body, userdata=None):
            """Called when a message is received. Log message and ack it."""
            # LOGGER.info('Userdata: %s Message body: %s', userdata, body)
            # print(" [x] Received %r" % body.decode())
            data = body.decode()
            result = alarmFun(data)
            publish = Publish(exchange='spider', queue='alarm', rk='rk-alarm')
            publish.produce(result)
            # chan.basic_ack(delivery_tag=method_frame.delivery_tag)

        on_message_callback = functools.partial(on_message)

        self.channel.basic_consume(on_message_callback=on_message_callback,
                                   queue=self.queue,
                                   auto_ack=True
                                   )
        try:
            self.channel.start_consuming()

        except KeyboardInterrupt:
            self.channel.stop_consuming()

    def cancel(self):
        self.connection.close()

def test():
    consummer = Consummer('test_yerik', 'test_test', 'rk-test_test')
    consummer.consum_message()
    print(consummer.receive)

if __name__ == '__main__':
    test()

參考文檔:

  1. 深入RabbitMQ, Gavin M.Roy 著 汪佳南 鄭天民 譯

另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)cdcxhl.cn,海內(nèi)外云服務(wù)器15元起步,三天無理由+7*72小時(shí)售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國服務(wù)器、虛擬主機(jī)、免備案服務(wù)器”等云主機(jī)租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡單易用、服務(wù)可用性高、性價(jià)比高”等特點(diǎn)與優(yōu)勢(shì),專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場景需求。

當(dāng)前文章:RabbitMQ實(shí)踐體驗(yàn)-創(chuàng)新互聯(lián)
URL地址:http://m.rwnh.cn/article24/ddscje.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供移動(dòng)網(wǎng)站建設(shè)企業(yè)建站、標(biāo)簽優(yōu)化、用戶體驗(yàn)、虛擬主機(jī)、網(wǎng)站營銷

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

成都seo排名網(wǎng)站優(yōu)化
扎赉特旗| 乌海市| 松潘县| 义乌市| 策勒县| 揭阳市| 云林县| 嘉义市| 宜川县| 乌兰察布市| 鄂托克前旗| 澎湖县| 大理市| 星子县| 城步| 武安市| 会同县| 富民县| 益阳市| 怀远县| 天峻县| 昌黎县| 东光县| 顺平县| 水城县| 诸城市| 刚察县| 浙江省| 怀远县| 齐河县| 红桥区| 得荣县| 镇坪县| 安多县| 松原市| 临海市| 巴马| 富顺县| 精河县| 新巴尔虎右旗| 旺苍县|