RabbitMq-C客户端的使用
RabbitMq C++客户端的使用
1.RabbitMq介绍
RabbitMQ 是一款开源的消息队列中间件,基于 AMQP(高级消息队列协议)实现,支持多种编程语言和平台。以下是其核心特点和介绍:
核心特点
- 多语言支持 提供 Java、Python、C#、Go、JavaScript 等语言的客户端库,适配不同开发场景。
- 高可靠性
- 支持消息持久化,确保数据不丢失。
- 提供消息确认机制(ACK),保证消息被正确处理。
- 通过集群模式实现高可用性和负载均衡。
- 灵活的路由机制
- 通过 Exchange(交换器) 管理消息路由规则,支持 Direct、Fanout、Topic、Headers 等类型。
- 消息最终被路由到 Queue(队列) 中供消费者处理。
- 支持多种协议 除 AMQP 外,还支持 MQTT、STOMP、HTTP 等协议,适配物联网、实时通信等场景。
- 轻量级与低延迟 适合中小企业和对延迟敏感的应用,如订单处理、支付系统等。
架构组成
- Producer(生产者) :发送消息到 RabbitMQ。
- Consumer(消费者) :从队列中获取消息并处理。
- Broker :RabbitMQ 服务器实例,负责接收、存储和转发消息。
- Exchange :根据路由规则将消息分发到队列。
- Queue :存储消息的缓冲区,消费者从队列中拉取消息。
典型应用场景
- 异步处理 解耦服务间依赖,例如用户下单后异步发送短信或邮件。
- 削峰填谷 应对突发流量,如秒杀活动中暂存订单请求,避免系统过载。
- 日志处理 集中收集和分发日志,支持多服务订阅。
- 微服务通信 作为服务间异步通信的桥梁,降低耦合度。
优缺点
- 优点 :
- 高可靠性和灵活的路由策略。
- 社区活跃,文档完善。
- 支持多种协议和编程语言。
- 缺点 :
- 吞吐量相对 Kafka 较低,不适合超大规模数据处理。
- 部署和配置相对复杂(尤其在集群模式下)。
总结
RabbitMQ 是一款功能强大、成熟稳定的消息队列,适合对可靠性和灵活性要求较高的中小型项目。如果需要处理海量数据或追求极致性能,可考虑 Kafka 或 RocketMQ 等其他方案。
2.安装****RabbitMQ
apt install rabbitmq-server
3.RabbitMQ****的简单使用
启动服务
sudo systemctl start rabbitmq-server.service
查看服务状态
sudo systemctl status rabbitmq-server.service
安装完成的时候默认有个用户 guest ,但是权限不够,要创建一个
administrator 用户,才可以做为远程登录和发表订阅消息: #添加用户 sudo rabbitmqctl add_user root 123456 #设置用户 tag sudo rabbitmqctl set_user_tags root administrator #设置用户权限 sudo rabbitmqctl set_permissions -p / root “.” “.” “.*”
RabbitMQ 自带了 web 管理界面,执行下面命令开启
sudo rabbitmq-plugins enable rabbitmq_management
**4.安装RabbitMQ的****C++**客户端库
sudo apt install libev-dev #libev 网络库组件 git clone cd AMQP-CPP/ make make install
5.AMQP-CPP****库的简单使用
5.1介绍
• AMQP-CPP 是用于与 RabbitMq 消息中间件通信的 c++ 库。它能解析从 RabbitMq 服务发送来的数据,也可以生成发向 RabbitMq 的数据包。 AMQP-CPP 库不会向 RabbitMq 建立网络连接,所有的网络 io 由用户完成。 • 当然,AMQP-CPP 提供了可选的网络层接口,它预定义了 TCP 模块,用户就不 用自己实现网络 io ,我们也可以选择 libevent 、 libev 、 libuv 、 asio 等异步通信组件, 需要手动安装对应的组件。 • AMQP-CPP 完全异步,没有阻塞式的系统调用,不使用线程就能够应用在高性能 应用中。 • 注意:它需要 c++17 的支持。
5.2使用
AMQP-CPP 的使用有两种模式: • 使用默认的 TCP 模块进行网络通信 • 使用扩展的 libevent 、 libev 、 libuv 、 asio 异步通信组件进行通信
5.3TCP****模式
• 实现一个类继承自 AMQP::TcpHandler 类, 它负责网络层的 TCP 连接 • 重写相关函数, 其中必须重写 monitor 函数 • 在 monitor 函数中需要实现的是将 fd 放入 eventloop(select 、 epoll) 中监控, 当 fd 可写可读就绪之后, 调用 AMQP-CPP 的 connection->process(fd, flags) 方法
5.4扩展模式
以 libev 为例, 我们不必要自己实现 monitor 函数, 可以直接使用 AMQP::LibEvHandler
6.常用类与接口介绍
6.1Channel
channel 是一个虚拟连接,一个连接上可以建立多个通道。并且所有的 RabbitMq 指令 都是通过 channel 传输,所以连接建立后的第一步,就是建立 channel 。因为所有操作 是异步的,所以在 channel 上执行指令的返回值并不能作为操作执行结果,实际上它 返回的是 Deferred 类,可以使用它安装处理函数。 namespace AMQP { /**
- Generic callbacks that are used by many deferred objects / using SuccessCallback = std::function; using ErrorCallback = std::function; using FinalizeCallback = std::function; /*
- Declaring and deleting a queue / using QueueCallback = std::function; using DeleteCallback = std::function; using MessageCallback = std::function; //当使用发布者确认时,当服务器确认消息已被接收和处理时,将调用 AckCallback using AckCallback = std::function; //使用确认包裹通道时,当消息被 ack/nacked 时,会调用这些回调 using PublishAckCallback = std::function; using PublishNackCallback = std::function; using PublishLostCallback = std::function; class Channel { Channel(Connection connection); bool connected() / *声明交换机 *如果提供了一个空名称,则服务器将分配一个名称。 *以下 flags 可用于交换机:
*-durable 持久化,重启后交换机依然有效 *-autodelete 删除所有连接的队列后,自动删除交换 *-passive 仅被动检查交换机是否存在 *-internal 创建内部交换 * *@param name 交换机的名称 *@param-type 交换类型 enum ExchangeType { fanout, 广播交换,绑定的队列都能拿到消息 direct, 直接交换,只将消息交给 routingkey 一致的队列 topic, 主题交换,将消息交给符合 bindingkey 规则的队 列 headers, consistent_hash, message_deduplication }; *@param flags 交换机标志 *@param arguments 其他参数 * 此函数返回一个延迟处理程序。可以安装回调 using onSuccess(), onError() and onFinalize() methods. / Deferred &declareExchange( const std::string_view &name, ExchangeType type, int flags, const Table &arguments) / *声明队列 *如果不提供名称,服务器将分配一个名称。 *flags 可以是以下值的组合: * *-durable 持久队列在代理重新启动后仍然有效 *-autodelete 当所有连接的使用者都离开时,自动删除队列 *-passive 仅被动检查队列是否存在 *-exclusive 队列仅存在于此连接,并且在连接断开时自动删除 * *@param name 队列的名称 *@param flags 标志组合 *@param arguments 可选参数 * *此函数返回一个延迟处理程序。可以安装回调 *使用 onSuccess()、onError()和 onFinalize()方法。 * Deferred &onError(const char *message) * *可以安装的 onSuccess()回调应该具有以下签名: void myCallback(const std::string &name, uint32_t messageCount, uint32_t consumerCount); 例如: channel.declareQueue(“myqueue”).onSuccess( [](const std::string &name, uint32_t messageCount, uint32_t consumerCount) { std::cout « “Queue ‘” « name « “’ “; std::cout « “has been declared with “; std::cout « messageCount; std::cout « " messages and “; std::cout « consumerCount; std::cout « " consumers” « std::endl;
- }); / DeferredQueue &declareQueue( const std::string_view &name, int flags, const Table &arguments) /* *将队列绑定到交换机
*@param exchange 源交换机 *@param queue 目标队列 *@param routingkey 路由密钥 *@param arguments 其他绑定参数 * *此函数返回一个延迟处理程序。可以安装回调 使用 onSuccess()、onError()和 onFinalize()方法。 /Deferred &bindQueue( const std::string_view &exchange, const std::string_view &queue, const std::string_view &routingkey, const Table &arguments) / *将消息发布到 exchange *您必须提供交换机的名称和路由密钥。 然后,RabbitMQ 将尝试将消息发送到一个或多个队列。 使用可选的 flags 参数,可以指定如果消息无法路由到队列时应该发生 的情况。 默认情况下,不可更改的消息将被静默地丢弃。 * *如果设置了’mandatory’或’immediate’标志, 则无法处理的消息将返回到应用程序。 在开始发布之前,请确保您已经调用了 recall()-方法, 并设置了所有适当的处理程序来处理这些返回的消息。 * *可以提供以下 flags: * *-mandatory 如果设置,服务器将返回未发送到队列的消息 *-immediate 如果设置,服务器将返回无法立即转发给使用者的消息。 *@param exchange 要发布到的交易所 *@param routingkey 路由密钥 *@param envelope 要发送的完整信封 *@param message 要发送的消息 *@param size 消息的大小 @param flags 可选标志 / bool publish( const std::string_view &exchange, const std::string_view &routingKey, const std::string &message, int flags = 0) / *告诉 RabbitMQ 服务器我们已准备好使用消息-也就是订阅队列消息 * *调用此方法后,RabbitMQ 开始向客户端应用程序传递消息。 consumer tag 是一个字符串标识符, 如果您以后想通过 channel::cancel()调用停止它, 可以使用它来标识使用者。 *如果您没有指定使用者 tag,服务器将为您分配一个。 * *支持以下 flags: * *-nolocal 如果设置了,则不会同时消耗在此通道上发布的消息 *-noack 如果设置了,则不必对已消费的消息进行确认 *-exclusive 请求独占访问,只有此使用者可以访问队列 * *@param queue 您要使用的队列 *@param tag 将与此消费操作关联的消费者标记 *@param flags 其他标记 *@param arguments 其他参数 * 此函数返回一个延迟处理程序。 可以使用 onSuccess()、onError()和 onFinalize()方法安装回 调。 可以安装的 onSuccess()回调应该具有以下格式: void myCallback(const std::string_view&tag); 样例: channel.consume(“myqueue”).onSuccess( [](const std::string_view& tag) { std::cout « “Started consuming under tag “; std::cout « tag « std::endl; }); / DeferredConsumer &consume( const std::string_view &queue, const std::string_view &tag, int flags, const Table &arguments) / *确认接收到的消息 * *当在 DeferredConsumer::onReceived()方法中接收到消息时, 必须确认该消息, 以便 RabbitMQ 将其从队列中删除(除非使用 noack 选项消费)。 * *支持以下标志: * *-多条确认多条消息:之前传递的所有未确认消息也会得到确认 * *@param deliveryTag 消息的唯一 delivery 标签 *@param flags 可选标志 *@return bool / bool ack(uint64_t deliveryTag, int flags=0) } class DeferredConsumer { / 注册一个回调函数,该函数在消费者启动时被调用。 void onSuccess(const std::string &consumertag) / DeferredConsumer &onSuccess(const ConsumeCallback& callback) / 注册回调函数,用于接收到一个完整消息的时候被调用 void MessageCallback(const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) / DeferredConsumer &onReceived(const MessageCallback& callback) / Alias for onReceived() / DeferredConsumer &onMessage(const MessageCallback& callback) / 注册要在服务器取消消费者时调用的函数 void CancelCallback(const std::string &tag) */ DeferredConsumer &onCancelled(const CancelCallback& callback) } class Message : public Envelope{ const std::string &exchange() const std::string &routingkey():q } class Envelope : public MetaData{ const char *body() uint64_t bodySize() } }
6.2ev
typedef struct ev_async { EV_WATCHER (ev_async) EV_ATOMIC_T sent; /* private / } ev_async; //break type enum { EVBREAK_CANCEL = 0, / undo unloop / EVBREAK_ONE = 1, / unloop once / EVBREAK_ALL = 2 / unloop all loops */ }; struct ev_loop *ev_default_loop (unsigned int flags EV_CPP (= 0))
define EV_DEFAULT ev_default_loop (0)
int ev_run (struct ev_loop loop); / break out of the loop */ void ev_break (struct ev_loop *loop, int32_t break_type) ; void (*callback)(struct ev_loop *loop, ev_async *watcher, int32_t revents) void ev_async_init(ev_async *w, callback cb); void ev_async_start(struct ev_loop *loop, ev_async *w) ; void ev_async_send(struct ev_loop *loop, ev_async *w) ;
7.简单使用
publish.cpp
#include
#include
#include
#include
#include
int main()
{
// 创建一个事件循环对象
auto *loop = EV_DEFAULT;
// 创建一个LibEvHandler对象
AMQP::LibEvHandler handler(loop);
// 设置RabbitMQ服务器的地址
AMQP::Address address(“amqp://root:123456@127.0.0.1:5672/”);
// 创建一个TcpConnection对象,连接到RabbitMQ服务器
AMQP::TcpConnection connection(&handler, address);
// 创建一个Channel对象,用于与RabbitMQ服务器进行通信
AMQP::TcpChannel channel(&connection);
// 声明一个交换机
// 设置声明交换机失败时的回调函数
channel.declareExchange(“test_exchange”, AMQP::direct)
.onError([](const std::string& message){
std::cout « “Error1: " « message « std::endl;
return -1;
})
.onSuccess(
{
std::cout « “Exchange declared” « std::endl;
});
channel.declareQueue(“test_queue”)
.onError([](const std::string& message){
std::cout « “Error2: " « message « std::endl;
return -1;
})
.onSuccess(
{
std::cout « “Queue declared” « std::endl;
});
// 绑定队列到交换机
channel.bindQueue(“test_exchange”, “test_queue”, “test_queue”)
.onError([](const std::string& message){
std::cout « “Error2: " « message « std::endl;
return -1;
})
.onSuccess(
{
std::cout « “Queue bind” « std::endl;
});
// 发送10条消息
for(int i = 1; i <= 10; i++)
{
std::string message = “Hello RabbitMQ " + std::to_string(i);
// 发布消息到RabbitMQ服务器
bool ret = channel.publish(“test_exchange”, “test_queue”, message);
if (!ret)
{
std::cout « “Publish failed” « std::endl;
return -1;
}
}
// 运行事件循环
ev_run(loop, 0);
return 0;
}
consume.cpp
#include
#include
#include
#include
#include
void MessageCb(AMQP::TcpChannel *channel, const AMQP::Message &message,uint64_t deliveryTag, bool redelivered)
{
std::string body = message.body();
std::cout « “Received a message with content: " « body « std::endl;
channel->ack(deliveryTag);
}
int main()
{
// 创建一个事件循环对象
auto *loop = EV_DEFAULT;
// 创建一个LibEvHandler对象
AMQP::LibEvHandler handler(loop);
// 设置RabbitMQ服务器的地址
AMQP::Address address(“amqp://root:123456@127.0.0.1:5672/”);
// 创建一个TcpConnection对象,连接到RabbitMQ服务器
AMQP::TcpConnection connection(&handler, address);
// 创建一个Channel对象,用于与RabbitMQ服务器进行通信
AMQP::TcpChannel channel(&connection);
// 声明一个交换机
// 设置声明交换机失败时的回调函数
channel.declareExchange(“test_exchange”, AMQP::direct)
.onError([](const std::string& message){
std::cout « “Error: " « message « std::endl;
return -1;
})
.onSuccess(
{
std::cout « “Exchange declared” « std::endl;
});
// 绑定队列到交换机
channel.bindQueue(“test_exchange”, “test_queue”, “test_queue”)
.onError([](const std::string& message){
std::cout « “Error: " « message « std::endl;
return -1;
})
.onSuccess(
{
std::cout « “Queue bound” « std::endl;
});
auto call_back=std::bind(MessageCb,&channel,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3);
channel.consume(“test_queue”)
.onReceived(call_back)
.onError([](const std::string& message){
std::cout « “Error: " « message « std::endl;
return -1;
});
// 运行事件循环
ev_run(loop, 0);
return 0;
}
makefile
all : publish consume publish:publish.cpp g++ publish.cpp -o publish -std=c++17 -lamqpcpp -lev -lpthread -ldl -lssl -lcrypto consume:consume.cpp g++ consume.cpp -o consume -std=c++17 -lamqpcpp -lev -lpthread -ldl -lssl -lcrypto @PHONY:clean clean: rm -f publish consume
验证截图
至此大家就可以简单安全和使用RabbitMq!