0%

在CMD命令行中切换到管理员权限模式

方式1:

搜索CMD Ctrl+Shift+Enter

方式2:

打开CMD,输入

1
runas /noprofile /user:Administrator cmd

输入Administrator账户的密码

runas 允许用户用其他权限运行指定的工具和程序

/noprofile 指定不加载用户的配置文件

/user:UserAccountName 指定在其下运行程序的账户

常见问题

运行runas 指令输入密码报错“无法启动服务,原因可能是已被禁用或与其关联的设备没有启动。”

这是因为“Secondary Logo”服务没有启动,这个服务是”在不同凭据下启用启动过程“。直接在cmd中输入services.msc,将服务从禁用改为手动就好了,之后再次输入runas命令就可以使用administrator账户运行。

识别接口名称

1
2
# 需要 net-tools
ifconfig

如果使用标准的ifconfig命令没有显示出接口,尝试使用带有-a选项的相同的命令。这个选项强制这个工具去显示系统检测到的所有的网络接口,不管他们是up或down状态。如果ifconfig -a没有提供结果,则硬件有错误或者接口驱动没有加载到内核中。

1
2
# 新版本系统大部分支持
ip addr

dhcp

DHCP(动态主机配置协议)使自动接受网络信息(IP地址、掩码、广播地址、网关、名称服务器等)变得容易。这只在网络中有DHCP服务器(或者如果ISP提供商提供一个DHCP服务)时有用.

1
dhcpcd eth0 # eth0 为网口名称,根据上一步识别出的接口名称修改

ifconfig命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 启用/禁用网卡
ifconfig eth0 up/down

# 设置IP地址及掩码
ifconfig eth0 {IP地址} netmask {掩码} up

# 设置默认网关
route add default gw {网关}

# 配置DNS
nano -w /etc.resolv.conf

#使用下边模板填充
nameserver {名称服务器}

花括号中内容使用具体的地址填充

ip命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 启用/禁用网卡
ip link set dev eth0 up/down

# 设置Ip地址及掩码,掩码一般用 24 相当于255.255.255.0
ip addr add {IP地址}/{掩码} dev eth0

# 删除
ip addr del dev eth0 {IP}/{掩码}

# 刷新接口IP(删除所有)
ip addr flush eth0

# 设置默认网关
ip route add default via {网关}

网关的配置参考

ip route命令

Linux上添加路由,删除路由,修改路由配置(route add, route del, 路由表项基本知识)

| 子网掩码用来划分网络区域
| 子网掩码非0的位对应的ip上的数字表示这个ip的网络位
| 子网掩码0位对应的数字是ip的主机位
| 网络位表示网络区域
| 主机位表示网络区域里某台主机
|
| 11111111.11111111.11111111.00000000 = 255.255.255.0 = 24
| —————————————— —————
| 网络位 主机位

| 网络位一致,主机位不一致的2个IP可以直接通讯
|
| 172.25.254.10/24 #24=255.255.255.0
|
| 172.25.254.20/24
|
| 172.25.0.1/16 #16=255.255.0.0
| 前两个可以直接通讯,最后一个与其他俩个不能直接通讯

无线网连接

当使用一块无线(802.11)网卡,在继续之前需要先配置无线设置。要查看当前无线网卡的设置,你可以使用iw

1
2
3
4
5
6
7
8
# 查看连接信息
iw dev wlan0 info

# 检查连接状态
iw dev wlan0 link

# 连接网络 (确保接口处于活动状态)
iw dev wlan0 connect -w {网络名称} key 0:d:{密码}

如果无线网络配置为WPA或WPA2,则需要使用wpa_supplicant

1
2
3
4
5
6
7
8
9
10
11
12
# 查找附近热点
wpa_cli -i wlan0 scan

# 生成连接配置文件
wpa_passphrase {网络名称} {密码} > /etc/wpa_supplicant.conf

# 连接网络
# -D 驱动程序名称(可以是多个驱动程序:nl80211,wext)
# -i 接口名称
# -c 配置文件
# -B 在后台运行守护进程
wpa_supplicant -D nl80211 -i wlan0 -c /etc/wpa_supplicant.conf -B

SSH配置

1
2
3
4
5
6
7
8
9
nano -w /etc/ssh/sshd_config

# 放开注释
PasswordAuthentication yes
PermitRootLogin yes

# 启用SSH密钥对登录,取消如下行的注释符
PubkeyAuthentication yes
AuthorizeKeysFile .ssh/authorized_keys

启动SSHD

1
2
# 启动SSH服务(需要有可登录的账户)
/etc/init.d/sshd start

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

先罗列一下主流开源流媒体服务器

  1. 流媒体解决方案 Live555
  2. 流媒体平台框架 EasyDarwin
  3. 实时流媒体播放服务器程序DarwinStreamingSrvr
  4. 流媒体实时传输开发包 jrtplib
  5. 多媒体处理工具 ffmpeg
  6. 多媒体编码工具包Libav
  7. Flash流媒体服务器 Red5
  8. 流媒体服务器 Open Streaming Server
  9. FMS流媒体服务器
  10. Wowza流媒体服务器
  11. 开源流媒体平台FreeCast
  12. 最后补充一个 Ngix+RTMP插件

这里我选择 Darwin Streaming Server (达尔文),原因在于:

  • 因为它是很老牌产品,稳定
  • C++写的,性能好。
  • 以前用过,配置方便

一、概要

Darwin Streaming Server简称DSS。DSS是Apple公司提供的开源实时流媒体播放服务器程序。整个程序使用C++编写,在设计上遵循高性能,简单,模块化等程序设计原则,务求做到程序高效,可扩充性好。并且DSS是一个开放源代码的,基于标准的流媒体服务器,可以运行在Windows NT和Windows 2000,以及几个UNIX实现上,包括Mac OS X,Linux,FreeBSD,和Solaris操作系统上的。

二、Darwin streaming server的特性

  • 支持MP4、3GPP等文件格式;
  • 支持MPEG-4、H.264等视频编解码格式;
  • 支持RTSP流控协议,支持HTTP协议;
  • 支持RTP流媒体传输协议;
  • 支持单播和组播;
  • 支持基于Web的管理;
  • 具有完备的日志功能。

三、DDS安装配置

第一步:安装Darwin

  • 从:http://dss.macosforge.org/downloads/DarwinStreamingSrvr5.5.5-Windows.exe (只有5.5的) 这里下载 DSS for Windows
  • 下载后解压,会看到一个 Install.bat 的文件,Win10下最好从CMD管理员运行,直接运行可能存在路径拷贝问题。
  • 执行批处理后会安装到 C:\Program Files\Darwin Streaming Server 并还会在 系统服务里面加一个号Darwin Streaming Server 的服务程序,这个就是 DSS 的 RTSP 服务器。

第二步:安装Perl解释器

注意:如果安装后perl路径没有自动添加到Path,就自己添加一下。

第三步:配置管理的用户密码

# 根据提示创建 WebAdmin 的账号和密码

C:\Program Files\Darwin Streaming Server> perl WinPasswdAssistant.pl

比如 用户 admin 密码 123456

# 运行 WebAdmin 管理器

C:\Program Files\Darwin Streaming Server> perl streamingadminserver.pl

第四步:进入管理界面对dss服务器进行管理

1)在浏览器中,输入打http://127.0.0.1:1220/,打开管理界面

2)选择流媒体存放路径,默认存放在流媒体服务器下的:c:\Program Files\Darwin Streaming Server\目录下

3)更改服务器服务端口,可以在streaingloadtool.cfg文件中指定其他端口;

第五步:播放测试

安装vcl播放器,检测dss能不能正常播放

rtsp://localhost/sample_300kbit.mp4

四、流化处理

DSS提供的视频发现都能用,自己考个视频进去咋就播放不了呢?这里涉及到一个概念叫“流化 ”。DSS本身不提供素材的流化操作,但是我们可以借助第三方工具进行处理。

然后执行命令:

C:\Program Files\Darwin Streaming Server\Movies> mp4box mymovie.mp4 -hint

流媒体视频就转换好了,现在文件大小就会有变动,变大了一些。

然后再用VLC打开就可以播放了:

rtsp://localhost/mymovie.mp4

先决条件
本教程假定 RabbitMQ 已经安装,并运行在localhost 标准端口(5672)。如果你使用不同的主机、端口或证书,则需要调整连接设置。

从哪里获得帮助
如果您在阅读本教程时遇到困难,可以通过邮件列表 联系我们

在第 教程[2] 中,我们学习了如何使用工作队列在多个工作单元之间分配耗时任务。

但是如果我们想要运行一个在远程计算机上的函数并等待其结果呢?这将是另外一回事了。这种模式通常被称为 远程过程调用RPC

在本篇教程中,我们将使用 RabbitMQ 构建一个 RPC 系统:一个客户端和一个可扩展的 RPC 服务器。由于我们没有什么耗时任务值得分发,那干脆就创建一个返回斐波那契数列的虚拟 RPC 服务吧。

客户端接口#

为了说明如何使用 RPC 服务,我们将创建一个简单的客户端类。该类将暴露一个名为Call的方法,用来发送 RPC 请求并且保持阻塞状态,直到接收到应答为止。

var rpcClient = new RPCClient();

Console.WriteLine(" [x] Requesting fib(30)");
var response = rpcClient.Call("30");
Console.WriteLine(" [.] Got '{0}'", response);

rpcClient.Close();

关于 RPC 的说明

尽管 RPC 在计算机中是一种很常见的模式,但它经常受到批评。问题出现在当程序员不知道一个函数是本地调用还是一个耗时的 RPC 请求。这样的混淆,会导致系统不可预测,以及给调试增加不必要的复杂性。误用 RPC 可能会导致不可维护的混乱代码,而不是简化软件。

牢记这些限制,请考虑如下建议:

  • 确保可以明显区分哪些函数是本地调用,哪些是远程调用。
  • 为您的系统编写文档,明确组件之间的依赖关系。
  • 捕获异常,当 RPC 服务长时间宕机时客户端该如何应对。

当有疑问的时候可以先避免使用 RPC。如果可以的话,考虑使用异步管道 - 而不是类似 RPC 的阻塞,其会将结果以异步的方式推送到下一个计算阶段。

回调队列#

一般来讲,基于 RabbitMQ 进行 RPC 通信是非常简单的,客户端发送一个请求消息,然后服务端用一个响应消息作为应答。为了能接收到响应,我们需要在发送请求过程中指定一个’callback’队列地址。

var props = channel.CreateBasicProperties();
props.ReplyTo = replyQueueName;

var messageBytes = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "",
                     routingKey: "rpc_queue",
                     basicProperties: props,
                     body: messageBytes);


消息属性

AMQP 0-9-1 协议在消息中预定义了一个包含 14 个属性的集合,大多数属性很少使用,但以下情况除外:
Persistent:将消息标记为持久的(值为2)或者瞬时的(其他值),可以参考 教程[2]
DeliveryMode:熟悉 AMQP 协议的人可以选择此属性而不是熟悉协议的人可以选择使用此属性而不是Persistent,它们控制的东西是一样的。
ContentType:用于描述编码的 mime 类型。例如,对于经常使用的 JSON 编码,将此属性设置为:application/json是一种很好的做法。
ReplyTo:通常用于命名回调队列。
CorrelationId:用于将 RPC 响应与请求相关联。

关联ID#

在上面介绍的方法中,我们建议为每个 RPC 请求创建一个回调队列,但是这种方式效率低。幸运的是我们有一种更好的方式,那就是为每个客户端创建一个独立的回调队列。

这种方式会引出一个新的问题,在收到响应的回调队列中,它无法区分响应属于哪一个请求,此时便是CorrelationId属性的所用之处。我们将为每个请求的CorrelationId设置一个唯一值。之后当我们在回调队列接收到响应的时候,再去检查下这个属性是否和请求中的值匹配,如此一来,我们就可以把响应和请求关联起来了。如果出现一个未知的CorrelationId值,我们可以安全的销毁这个消息,因为这个消息不属于我们的请求。

你可能会问,为什么我们应该忽略回调队列中的未知的消息,而不是用错误来标识失败呢?这是因为于服务器端可能存在竞争条件。虽然不太可能,但是 RPC 服务器可能在仅发送了响应消息而未发送消息确认的情况下挂掉,如果出现这种情况,RPC 服务器重启之后将会重新处理该请求。这就是为什么在客户端上我们必须优雅地处理重复的响应,并且理想情况下 RPC 应该是幂等的。

总结#

我们的 RPC 会是这样工作:

  • 客户端启动时,会创建一个匿名的独占回调队列。
  • 对于 RPC 请求,客户端发送带有两个属性的消息:ReplyTo(设置为回调队列)和CorrelationId(为每个请求设置唯一值)。
  • 请求被发送到rpc_queue队列。
  • RPC 工作线程(或者叫:服务器)正在等待该队列上的请求。当出现请求时,它会执行该作业,并使用ReplyTo属性设置的队列将带有结果的消息发送回客户端。
  • 客户端等待回调队列上的数据。出现消息时,它会检查CorrelationId属性。如果它与请求中的值匹配,则返回对应用程序的响应。

组合在一起#

斐波纳契 任务:

private static int fib(int n)
{
    if (n == 0 || n == 1) return n;
    return fib(n - 1) + fib(n - 2);
}

我们宣布我们的斐波那契函数。并假定只允许有效的正整数输入。 (不要期望这个适用于大数字,它可能是最慢的递归实现)。

我们的 RPC 服务端代码 RPCServer.cs 看起来如下所示:

using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

class RPCServer
{
    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using (var connection = factory.CreateConnection())
        using (var channel = connection.CreateModel())
        {
            channel.QueueDeclare(queue: "rpc_queue", durable: false,
              exclusive: false, autoDelete: false, arguments: null);
            channel.BasicQos(0, 1, false);
            var consumer = new EventingBasicConsumer(channel);
            channel.BasicConsume(queue: "rpc_queue",
              autoAck: false, consumer: consumer);
            Console.WriteLine(" [x] Awaiting RPC requests");

            consumer.Received += (model, ea) =>
            {
                string response = null;

                var body = ea.Body;
                var props = ea.BasicProperties;
                var replyProps = channel.CreateBasicProperties();
                replyProps.CorrelationId = props.CorrelationId;

                try
                {
                    var message = Encoding.UTF8.GetString(body);
                    int n = int.Parse(message);
                    Console.WriteLine(" [.] fib({0})", message);
                    response = fib(n).ToString();
                }
                catch (Exception e)
                {
                    Console.WriteLine(" [.] " + e.Message);
                    response = "";
                }
                finally
                {
                    var responseBytes = Encoding.UTF8.GetBytes(response);
                    channel.BasicPublish(exchange: "", routingKey: props.ReplyTo,
                      basicProperties: replyProps, body: responseBytes);
                    channel.BasicAck(deliveryTag: ea.DeliveryTag,
                      multiple: false);
                }
            };

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }







​ private static int fib(int n)
​ {
​ if (n == 0 || n == 1)
​ {
​ return n;
​ }

​ return fib(n - 1) + fib(n - 2);
​ }
​ }

服务端代码非常简单:

  • 像往常一样,首先建立连接,通道和声明队列。
  • 我们可能希望运行多个服务器进程。为了在多个服务器上平均分配负载,我们需要设置channel.BasicQos中的prefetchCount值。
  • 使用BasicConsume访问队列,然后注册一个交付处理程序,并在其中完成工作并发回响应。

我们的 RPC 客户端 RPCClient.cs 代码:

using System;
using System.Collections.Concurrent;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

public class RpcClient
{
    private readonly IConnection connection;
    private readonly IModel channel;
    private readonly string replyQueueName;
    private readonly EventingBasicConsumer consumer;
    private readonly BlockingCollection<string> respQueue = new BlockingCollection<string>();
    private readonly IBasicProperties props;

public RpcClient()
{
        var factory = new ConnectionFactory() { HostName = "localhost" };

        connection = factory.CreateConnection();
        channel = connection.CreateModel();
        replyQueueName = channel.QueueDeclare().QueueName;
        consumer = new EventingBasicConsumer(channel);

        props = channel.CreateBasicProperties();
        var correlationId = Guid.NewGuid().ToString();
        props.CorrelationId = correlationId;
        props.ReplyTo = replyQueueName;

        consumer.Received += (model, ea) =>
        {
            var body = ea.Body;
            var response = Encoding.UTF8.GetString(body);
            if (ea.BasicProperties.CorrelationId == correlationId)
            {
                respQueue.Add(response);
            }
        };
    }

    public string Call(string message)
    {
        var messageBytes = Encoding.UTF8.GetBytes(message);
        channel.BasicPublish(
            exchange: "",
            routingKey: "rpc_queue",
            basicProperties: props,
            body: messageBytes);

        channel.BasicConsume(
            consumer: consumer,
            queue: replyQueueName,
            autoAck: true);

        return respQueue.Take(); ;
    }

    public void Close()
    {
        connection.Close();
    }
}

public class Rpc
{
    public static void Main()
    {
        var rpcClient = new RpcClient();

        Console.WriteLine(" [x] Requesting fib(30)");
        var response = rpcClient.Call("30");

        Console.WriteLine(" [.] Got '{0}'", response);
        rpcClient.Close();
    }
}

客户端代码稍微复杂一些:

  • 建立连接和通道,并为响应声明一个独有的 ‘callback’ 队列。
  • 订阅这个 ‘callback’ 队列,以便可以接收到 RPC 响应。
  • Call方法用来生成实际的 RPC 请求。
  • 在这里,我们首先生成一个唯一的CorrelationId编号并保存它,while 循环会使用该值来捕获匹配的响应。
  • 接下来,我们发布请求消息,其中包含两个属性:ReplyToCorrelationId
  • 此时,我们可以坐下来稍微一等,直到指定的响应到来。
  • while 循环做的工作非常简单,对于每个响应消息,它都会检查CorrelationId是否是我们正在寻找的那一个。如果是这样,它就会保存该响应。
  • 最后,我们将响应返回给用户。

客户发出请求:

var rpcClient = new RPCClient();

Console.WriteLine(" [x] Requesting fib(30)");
var response = rpcClient.Call("30");
Console.WriteLine(" [.] Got '{0}'", response);

rpcClient.Close();

现在是查看 RPCClient.csRPCServer.cs 的完整示例源代码(包括基本异常处理)的好时机哦。

像往常一样设置(请参见 教程[1]):

我们的 RPC 服务现已准备就绪,现在可以启动服务端:

cd RPCServer
dotnet run

要请求斐波纳契数,请运行客户端:

cd RPCClient
dotnet run

这里介绍的设计并不是 RPC 服务的唯一可能实现,但它仍具有一些重要优势:

  • 如果 RPC 服务器太慢,您可以通过运行另一个服务器来扩展。尝试在新开一个控制台,运行第二个 RPCServer。
  • 在客户端,RPC 只需要发送和接收一条消息。不需要像QueueDeclare一样同步调用。因此,对于单个 RPC 请求,RPC 客户端只需要一次网络往返。

我们的代码很简单,也并没有尝试去解决更复杂(但很重要)的问题,比如就像:

  • 如果服务端没有运行,客户端应该如何反应?
  • 客户端是否应该为 RPC 设置某种超时机制?
  • 如果服务端出现故障并引发异常,是否应将其转发给客户端?
  • 在处理之前防止无效的传入消息(例如:检查边界、类型)。

如果您想进行实验,您可能会发现 管理 UI 对于查看队列非常有用。

写在最后#

本文翻译自 RabbitMQ 官方教程 C# 版本。如本文介绍内容与官方有所出入,请以官方最新内容为准。水平有限,翻译的不好请见谅,如有翻译错误还请指正。

先决条件
本教程假定 RabbitMQ 已经安装,并运行在localhost 标准端口(5672)。如果你使用不同的主机、端口或证书,则需要调整连接设置。

从哪里获得帮助
如果您在阅读本教程时遇到困难,可以通过邮件列表 联系我们

路由#

(使用.NET客户端)

教程[3] 中,我们构建了一个简单的日志系统,可以向多个接收者广播消息。

在本教程中,我们会为日志系统再添加一个特性,使其可以只订阅消息的一个子集。例如,将所有日志消息打印到
控制台,同时只会将严重错误消息写入日志文件(保存到磁盘空间)。

绑定#

在前面的例子中,我们创建过_绑定_。不知道您是否还记得下面的代码:

channel.QueueBind(queue: queueName,
                  exchange: "logs",
                  routingKey: "");

绑定是指交换器和队列之间的关联关系。可以简单地理解为:某个队列对来自此交换器的消息感兴趣。

绑定可以采用额外的routingKey参数,为了避免与BasicPublish方法中相同参数混淆,我们将其称为binding key(这里是指路由键从声明角度的一种别称,绑定键)。下面即是如何使用绑定键 建立一个绑定:

channel.QueueBind(queue: queueName,
                  exchange: "direct_logs",
                  routingKey: "black");

绑定键的含义取决于交换器类型。像我们前面使用的fanout 交换器,忽略了它的值(依据fanout交换器的特性,它会把消息广播到所有订阅的队列,所以就算指定routingKey也不会根据其过滤消息)。

Direct交换器#

在上篇教程中,我们的日志系统会把所有消息广播给所有消费者,现在我们想要扩展使其可以根据消息的严重性过滤消息。例如,我们希望将日志消息写入磁盘的脚本仅接收严重错误的消息,而不是在警告或者信息类型的消息上浪费磁盘空间。

之前我们使用的是fanout交换器,它没有给我们足够的灵活性 - 它只能进行无意识的广播。

现在我们要用direct交换器替换它,direct交换器背后的路由算法很简单 - 消息会进入其binding key恰好与routing key相匹配的队列。
为了说明这一点,请参考以下设置:

在上面的设置中,我们可以看到direct交换器X与两个队列绑定。第一个队列通过键orange绑定,第二个队列有两个绑定,一个通过键black绑定、另外一个通过键green绑定。

如此设置,发布使用路由键orange的消息到交换器最终会被路由到队列Q1,路由键为blackgreen的消息会去向队列Q2,而其他所有的消息会被丢弃。

多重绑定#

使用相同的绑定键绑定多个队列是完全合法的。在示例中,我们可以在XQ1之间添加一个键为black的绑定。这种情况下,direct交换器会像fanout交换器一样,把消息广播到所有匹配的队列,路由键为black的消息会被分别发送到队列Q1Q2

发送日志#

我们将在日志系统中使用上述消息模型,在发送消息时使用direct交换机来替换fanout交换器。同时我们会把日志的严重性作为路由键,这样的话,接收脚本就可以选择性地接收它期望严重性的消息。首先我们来关注如何发送日志。

同样地,我们需要先创建一个交换器:

channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct);

准备好发送消息:

var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "direct_logs",
                     routingKey: severity,
                     basicProperties: null,
                     body: body);

简单起见,我们先假定severity可以是infowarningerror任意一值。

订阅#

马上就可以像前面的教程接收消息了,但有一点不同, 我们需要为我们感兴趣的每种日志严重性级别的消息建立一个新的绑定。

var queueName = channel.QueueDeclare().QueueName;

foreach(var severity in args)
{
    channel.QueueBind(queue: queueName,
                      exchange: "direct_logs",
                      routingKey: severity);
}

组合在一起#

EmitLogDirect.cs类的代码:

using System;
using System.Linq;
using RabbitMQ.Client;
using System.Text;

class EmitLogDirect
{
    public static void Main(string[] args)
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare(exchange: "direct_logs",
                                    type: "direct");

            var severity = (args.Length > 0) ? args[0] : "info";
            var message = (args.Length > 1)
                          ? string.Join(" ", args.Skip(1).ToArray())
                          : "Hello World!";
            var body = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish(exchange: "direct_logs",
                                 routingKey: severity,
                                 basicProperties: null,
                                 body: body);
            Console.WriteLine(" [x] Sent '{0}':'{1}'", severity, message);
        }

        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();
    }
}

ReceiveLogsDirect.cs类的代码:

using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

class ReceiveLogsDirect
{
    public static void Main(string[] args)
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare(exchange: "direct_logs",
                                    type: "direct");
            var queueName = channel.QueueDeclare().QueueName;

            if(args.Length < 1)
            {
                Console.Error.WriteLine("Usage: {0} [info] [warning] [error]",
                                        Environment.GetCommandLineArgs()[0]);
                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
                Environment.ExitCode = 1;
                return;
            }

            foreach(var severity in args)
            {
                channel.QueueBind(queue: queueName,
                                  exchange: "direct_logs",
                                  routingKey: severity);
            }

            Console.WriteLine(" [*] Waiting for messages.");

            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);
                var routingKey = ea.RoutingKey;
                Console.WriteLine(" [x] Received '{0}':'{1}'",
                                  routingKey, message);
            };
            channel.BasicConsume(queue: queueName,
                                 autoAck: true,
                                 consumer: consumer);

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}

请像往常一样创建项目(请参阅 教程[1])。

如果您想将warningerror(不包括info)日志消息保存到文件,只需打开控制台并输入:

cd ReceiveLogsDirect
dotnet run warning error > logs_from_rabbit.log

如果您想在屏幕上看到所有日志消息,请打开一个新终端并执行以下操作:

cd ReceiveLogsDirect
dotnet run info warning error

例如,想要发出error日志消息,只需要输入:

cd EmitLogDirect
dotnet run error "Run. Run. Or it will explode."

EmitLogDirect.csReceiveLogsDirect.cs 的完整源代码。

跳转到 教程[5],了解如何基于模式监听消息。

写在最后#

本文翻译自 RabbitMQ 官方教程 C# 版本。如本文介绍内容与官方有所出入,请以官方最新内容为准。水平有限,翻译的不好请见谅,如有翻译错误还请指正。

  • 原文链接:RabbitMQ tutorial - Routing
  • 实验环境:RabbitMQ 3.7.4 、.NET Core 2.1.3、Visual Studio Code
  • 最后更新:2018-08-31

先决条件
本教程假定 RabbitMQ 已经安装,并运行在localhost 标准端口(5672)。如果你使用不同的主机、端口或证书,则需要调整连接设置。

从哪里获得帮助
如果您在阅读本教程时遇到困难,可以通过邮件列表 联系我们

工作队列#

(使用 .NET Client)

教程[1] 中,我们编写了两个程序,用于从一个指定的队列发送和接收消息。在本文中,我们将创建一个_工作队列_,用于在多个工作线程间分发耗时的任务。

工作队列(又名:任务队列)背后的主要想法是避免立即执行资源密集型、且必须等待其完成的任务。相反的,我们把这些任务安排在稍后完成。我们可以将任务封装为消息并把它发送到队列中,在后台运行的工作进程将从队列中取出任务并最终执行。当您运行多个工作线程,这些任务将在这些工作线程之间共享。

这个概念在Web应用程序中特别有用,因为在一个 HTTP 请求窗口中无法处理复杂的任务。

准备#

我们将略微修改上一个示例中的_Send_程序,以其可以在命令行发送任意消息。
这个程序将调度任务到我们的工作队列中,所以让我们把它命名为NewTask

教程[1]一样,我们需要生成两个项目:

dotnet new console --name NewTask
mv NewTask/Program.cs NewTask/NewTask.cs

dotnet new console --name Worker
mv Worker/Program.cs Worker/Worker.cs

cd NewTask
dotnet add package RabbitMQ.Client
dotnet restore

cd ../Worker
dotnet add package RabbitMQ.Client
dotnet restore


var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);

var properties = channel.CreateBasicProperties();
properties.Persistent = true;

channel.BasicPublish(exchange: "",
                     routingKey: "task_queue",
                     basicProperties: properties,
                     body: body);

从命令行参数获取消息的帮助方法:

private static string GetMessage(string[] args)
{
    return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
}

我们旧的Receive.cs脚本也需要进行一些更改:它需要为消息体中的每个点模拟一秒种的时间消耗。它将处理由 RabbitMQ 发布的消息,并执行任务,因此我们把它复制到Worker项目并修改:

var consumer = new EventingBasicConsumer(channel);


​ consumer.Received += (model, ea) =>
​ {
​ var body = ea.Body;
​ var message = Encoding.UTF8.GetString(body);
​ Console.WriteLine(“ [x] Received {0}”, message);


​ int dots = message.Split(‘.’).Length - 1;
​ Thread.Sleep(dots * 1000);

​ Console.WriteLine(“ [x] Done”);
​ };

channel.BasicConsume(queue: “task_queue”, autoAck: true, consumer: consumer);

模拟虚拟任务的执行时间:

int dots = message.Split('.').Length - 1;
Thread.Sleep(dots * 1000);

循环调度#

使用任务队列的优点之一是能够轻松地并行工作。如果我们正在积累积压的工作,我们仅要增加更多的工作者,并以此方式可以轻松扩展。

首先,我们尝试同时运行两个Worker实例。他们都会从队列中获取消息,但究竟如何?让我们来看看。

您需要打开三个控制台,两个运行Worker程序,这些控制台作为我们的两个消费者 - C1和C2。

cd Worker
dotnet run

cd Worker
dotnet run

在第三个控制台中,我们将发布一些新的任务。一旦你已经运行了消费者,你可以尝试发布几条消息:

cd NewTask
dotnet run "First message."
dotnet run "Second message.."
dotnet run "Third message..."
dotnet run "Fourth message...."
dotnet run "Fifth message....."

让我们看看有什么发送到了我们的Worker程序:

# shell 1
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'


# shell 2
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Second message..'
# => [x] Received 'Fourth message....'

默认情况下,RabbitMQ 会按顺序将每条消息发送给下一个消费者。消费者数量平均的情况下,每个消费者将会获得相同数量的消息。这种分配消息的方式称为循环(Round-Robin)。请尝试开启三个或更多的Worker程序来验证。

消息确认#

处理一项任务可能会需要几秒钟的时间。如果其中一个消费者开启了一项长期的任务并且只完成了部分就挂掉了,您可能想知道会发生什么?在我们当前的代码中,一旦 RabbitMQ 把消息分发给了消费者,它会立即将这条消息标记为删除。在这种情况下,如果您停掉某一个 Worker,我们将会丢失这条正在处理的消息,也将丢失所有分发到该 Worker 但尚未处理的消息。

但是我们不想丢失任何一个任务。如果一个 Worker 挂掉了,我们希望这个任务能被重新分发给其他 Worker。

为了确保消息永远不会丢失,RabbitMQ 支持 消息确认 机制。消费者回发一个确认信号 Ack(nowledgement) 给 RabbitMQ,告诉它某个消息已经被接收、处理并且可以自由删除它。

如果一个消费者在还没有回发确认信号之前就挂了(其通道关闭,连接关闭或者 TCP 连接丢失),RabbitMQ 会认为该消息未被完全处理,并将其重新排队。如果有其他消费者同时在线,该消息将会被会迅速重新分发给其他消费者。这样,即便 Worker 意外挂掉,也可以确保消息不会丢失。

没有任何消息会超时;当消费者死亡时,RabbitMQ 将会重新分发消息。即使处理消息需要非常非常长的时间也没关系。

默认情况下,手动消息确认 模式是开启的。在前面的例子中,我们通过将autoAck(“自动确认模式”)参数设置为true来明确地关闭手动消息确认模式。一旦完成任务,是时候删除这个标志并且从 Worker 手动发送一个恰当的确认信号给RabbitMQ。

var consumer = new EventingBasicConsumer(channel);


​ consumer.Received += (model, ea) =>
​ {
​ var body = ea.Body;
​ var message = Encoding.UTF8.GetString(body);
​ Console.WriteLine(“ [x] Received {0}”, message);


​ int dots = message.Split(‘.’).Length - 1;
​ Thread.Sleep(dots * 1000);

​ Console.WriteLine(“ [x] Done”);


​ channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
​ };



​ channel.BasicConsume(queue: “task_queue”, autoAck: false, consumer: consumer);

使用上面这段代码,我们可以确定的是,即使一个 Worker 在处理消息时,我们通过使用CTRL + C来终止它,也不会丢失任何消息。Worker 挂掉不久,所有未确认的消息将会被重新分发。

忘记确认
遗漏BasicAck是一个常见的错误。这是一个很简单的错误,但导致的后果却是严重的。当客户端退出时(看起来像是随机分发的),消息将会被重新分发,但是RabbitMQ会吃掉越来越多的内存,因为它不能释放未确认的消息。
为了调试这种错误,您可以使用rabbitmqctl来打印messages_unacknowledged字段:

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

在Windows上,删除sudo

rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

消息持久化#

我们已经学习了如何确保即使消费者挂掉,任务也不会丢失。但是如果 RabbitMQ 服务器停止,我们的任务还是会丢失。

当 RabbitMQ 退出或崩溃时,它会忘记已存在的队列和消息,除非告诉它不要这样做。为了确保消息不会丢失,有两件事是必须的:我们需要将队列和消息标记为持久

首先,我们需要确保 RabbitMQ 永远不会丢失我们的队列。为了做到这一点,我们需要把队列声明是_持久的(Durable)_:

// 声明队列,通过指定 durable 参数为 true,对消息进行持久化处理。 
channel.QueueDeclare(queue: "hello",
                     durable: true,
                     exclusive: false,
                     autoDelete: false,
                     arguments: null);

虽然这个命令本身是正确的,但是它在当前设置中不会起作用。那是因为我们已经定义过一个名为hello的队列,并且这个队列不是持久化的。RabbitMQ 不允许使用不同的参数重新定义已经存在的队列,并会向尝试执行该操作的程序返回一个错误。但有一个快速的解决办法 - 让我们用不同的名称声明一个队列,例如task_queue

channel.QueueDeclare(queue: "task_queue",
                     durable: true,
                     exclusive: false,
                     autoDelete: false,
                     arguments: null);

注意,该声明队列QueueDeclare方法的更改需要同时应用于生产者和消费者代码。

此时,我们可以确定的是,即使 RabbitMQ 重新启动,task_queue队列也不会丢失。现在我们需要将我们的消息标记为_持久的(Persistent)_ - 通过将IBasicProperties.Persistent设置为true

var properties = channel.CreateBasicProperties();
properties.Persistent = true;

关于消息持久性的说明
将消息标记为Persistent并不能完全保证消息不会丢失。尽管它告诉 RabbitMQ 将消息保存到磁盘,但当 RabbitMQ 接收到消息并且尚未保存消息时仍有一段时间间隔。此外,RabbitMQ 不会为每条消息执行fsync(2) - 它可能只是保存到缓存中,并没有真正写入磁盘。消息的持久化保证并不健壮,但对于简单的任务队列来说已经足够了。如果您需要一个更加健壮的保证,可以使用 发布者确认

公平调度#

您可能已经注意到调度仍然无法完全按照我们期望的方式工作。例如,在有两个 Worker 的情况下,假设所有奇数消息都很庞大、偶数消息都很轻量,那么一个 Worker 将会一直忙碌,而另一个 Worker 几乎不做任何工作。是的,RabbitMQ 并不知道存在这种情况,它仍然会平均地分发消息。

发生这种情况是因为 RabbitMQ 只是在消息进入队列后就将其分发。它不会去检查每个消费者所拥有的未确认消息的数量。它只是盲目地将第 n 条消息分发给第 n 位消费者。

为了改变上述这种行为,我们可以使用参数设置prefetchCount = 1basicQos方法。

这就告诉 RabbitMQ 同一时间不要给一个 Worker 发送多条消息。或者换句话说,不要向一个 Worker 发送新的消息,直到它处理并确认了前一个消息。
相反,它会这个消息调度给下一个不忙碌的 Worker。

channel.BasicQos(0, 1, false);

关于队列大小的说明
如果所有的 Worker 都很忙,您的队列可能会被填满。请留意这一点,可以尝试添加更多的 Worker,或者使用其他策略。

组合在一起#

我们NewTask.cs类的最终代码:

using System;
using RabbitMQ.Client;
using System.Text;

class NewTask
{
    public static void Main(string[] args)
    {
        
        var factory = new ConnectionFactory() { HostName = "localhost" };


​ using(var connection = factory.CreateConnection())
​ using(var channel = connection.CreateModel())
​ {

​ channel.QueueDeclare(queue: “task_queue”,
​ durable: true,
​ exclusive: false,
​ autoDelete: false,
​ arguments: null);


​ var message = GetMessage(args);
​ var body = Encoding.UTF8.GetBytes(message);


​ var properties = channel.CreateBasicProperties();
​ properties.Persistent = true;


​ channel.BasicPublish(exchange: “”,
​ routingKey: “task_queue”,
​ basicProperties: properties,
​ body: body);

​ Console.WriteLine(“ [x] Sent {0}”, message);
​ }

Console.WriteLine(“ Press [enter] to exit.”);
Console.ReadLine();
}

    private static string GetMessage(string[] args)
    {
        return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
    }
}

(NewTask.cs 源码)

还有我们的Worker.cs

using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Threading;

class Worker
{
    public static void Main()
    {
        
        var factory = new ConnectionFactory() { HostName = "localhost" };


​ using(var connection = factory.CreateConnection())
​ using(var channel = connection.CreateModel())
​ {

​ channel.QueueDeclare(queue: “task_queue”,
​ durable: true,
​ exclusive: false,
​ autoDelete: false,
​ arguments: null);


​ channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

​ Console.WriteLine(“ [*] Waiting for messages.”);


​ var consumer = new EventingBasicConsumer(channel);


​ consumer.Received += (model, ea) =>
​ {
​ var body = ea.Body;
​ var message = Encoding.UTF8.GetString(body);
​ Console.WriteLine(“ [x] Received {0}”, message);


​ int dots = message.Split(‘.’).Length - 1;
​ Thread.Sleep(dots * 1000);

​ Console.WriteLine(“ [x] Done”);


​ channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
​ };

​ channel.BasicConsume(queue: “task_queue”,
​ autoAck: false,
​ consumer: consumer);

Console.WriteLine(“ Press [enter] to exit.”);
Console.ReadLine();
}
}
}

(Worker.cs 源码)

使用消息确认机制和BasicQ您可以创建一个工作队列。即使 RabbitMQ 重新启动,通过持久性选项也可让任务继续存在。

有关IModel方法和IBasicProperties的更多信息,您可以在线浏览 RabbitMQ .NET客户端API参考

现在,我们可以继续阅读 教程[3],学习如何向多个消费者发送相同的消息。

写在最后#

本文翻译自 RabbitMQ 官方教程 C# 版本。如本文介绍内容与官方有所出入,请以官方最新内容为准。水平有限,翻译的不好请见谅,如有翻译错误还请指正。

先决条件
本教程假定 RabbitMQ 已经安装,并运行在localhost 标准端口(5672)。如果你使用不同的主机、端口或证书,则需要调整连接设置。

从哪里获得帮助
如果您在阅读本教程时遇到困难,可以通过邮件列表 联系我们

发布/订阅#

(使用 .NET Client)

教程[2] 中,我们创建了一个工作队列,假设在工作队列中的每一个任务都只被分发给一个 Worker。那么在这一章节,我们要做与之完全不同的事,那就是我们将要把一条消息分发给多个消费者。这种模式被称为“发布/订阅”。

为了说明、体现这种模式,我们将会建一个简单的日志系统。它将会包含两个程序 - 第一个用来发送日志消息,第二个用来接收并打印它们。

在我们建立的日志系统中,每个接收程序的运行副本都会收到消息。这样我们就可以运行一个接收程序接收消息并将日志写入磁盘;同时运行另外一个接收程序接收消息并将日志打印到屏幕上。

实质上,发布的日志消息将会被广播给所有的接收者。

交换器#

在教程的前几部分,我们是发送消息到队列并从队列中接收消息。现在是时候介绍 Rabbit 中完整的消息传递模型了。

让我们快速回顾一下前面教程中的内容:

  • _生产者_是发送消息的用户应用程序。
  • _队列_是存储消息的缓冲区。
  • _消费者_是接收消息的用户应用程序。

在 RabbitMQ 中,消息传递模型的核心理念是生产者从来不会把任何消息直接发送到队列,其实,通常生产者甚至不知道消息是否会被分发到任何队列中。

然而,生产者只能把消息发送给_交换器_。交换器非常简单,一方面它接收来自生产者的消息,另一方面又会把接收的消息推送到队列中。交换器必须明确知道该如何处理收到的消息,应该追加到一个特定队列中?还是应该追加到多个队列中?或者应该把它丢弃?这些规则都被定义在_交换器类型_中。

Exchanges

目前交换器类型有这几种:directtopicheadersfanout。我们先重点关注最后一个fanout,我们创建一个这种类型的交换器,将其命名为logs

channel.ExchangeDeclare("logs", "fanout");

fanout类型交换器非常简单,正如您可能从名字中猜出的那样,它会把收到的所有消息广播到它已知的所有队列中。这恰巧是我们的日志系统目前所需要的。

列举交换器
要列举出服务器上的交换器,您可以使用非常有用的rabbitmqctl命令行工具:

sudo rabbitmqctl list_exchanges

执行上述命令后,出现的列表中将会有一些amq.*交换器和默认(未命名)交换器。这些是默认创建的,不过目前您可能用不到它们。

默认交换器
在教程的前些部分,我们对交换器这一概念还一无所知,但仍然可以把消息发送到队列。之所以这样,是因为我们使用了一个用空字符串("")标识的默认交换器。

回顾一下我们之前如何发布消息:

var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "",
                     routingKey: "hello",
                     basicProperties: null,
                     body: body);

第一个参数就是交换器的名称,空字符串表示默认或匿名交换器:将消息路由到routingKey指定的队列(如果存在)中。

现在,我们可以把消息发布到我们指定的交换器:

var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "logs",
                     routingKey: "",
                     basicProperties: null,
                     body: body);

临时队列#

您是否还记得之前我们使用过的队列,它们都有一个特定的名称(记得应该是hellotask_queue吧)。给队列命名对我们来说是至关重要的 – 因为我们可能需要多个 Worker 指向同一个队列;当您想要在生产者和消费者之间共享队列时,给队列一个名称也是非常重要的。

但是,我们创建的日志系统并不希望如此。我们希望监听所有的日志消息,而不仅仅是其中一部分。我们也只对目前流动的消息感兴趣,而不是旧消息。为解决这个问题,我们需要做好两件事。

首先,我们无论何时连接 Rabbit,都需要一个新的、空的队列。要做到这一点,我们可以使用随机名称来创建队列,或许,甚至更好的方案是让服务器为我们选择一个随机队列名称。

其次,一旦我们与消费者断开连接,与之相关的队列应该被自动删除。

在 .NET 客户端中,如果不向QueueDeclare()方法提供任何参数,实际上就是创建了一个非持久化、独占、且自动删除的随机命名队列:

var queueName = channel.QueueDeclare().QueueName;

您可以在 队列指南 中了解更多关于exclusive参数和其他队列属性的信息。

此时,queueName包含一个随机队列名称。例如,它看起来可能像amq.gen-JzTY20BRgKO-HjmUJj0wLg

绑定#

Bindings

我们已经创建好了一个fanout 交换器和一个队列。现在我们需要告诉交换器把消息发送到我们的队列。而交换器和队列之间的关系就称之为_绑定_。

channel.QueueBind(queue: queueName,
                  exchange: "logs",
                  routingKey: "");

从现在起,logs交换器会把消息追加到我们的队列中。

列举绑定
您可以使用(您或许已经猜到了),列举出现有的绑定。

sudo rabbitmqctl list_bindings

组合在一起#

生产者程序负责分发消息,这与之前的教程看起来没有太大区别。

最重要的变化是我们现在想把消息发布到我们的logs交换器,而不是匿名交换器。在发送时我们需要提供一个路由键routingKey,但是对于fanout交换器,它的值可以被忽略。这里是EmitLog.cs文件的代码:

using System;
using RabbitMQ.Client;
using System.Text;

class EmitLog
{
    public static void Main(string[] args)
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare(exchange: "logs", type: "fanout");

            var message = GetMessage(args);
            var body = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish(exchange: "logs",
                                 routingKey: "",
                                 basicProperties: null,
                                 body: body);
            Console.WriteLine(" [x] Sent {0}", message);
        }

        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();
    }

    private static string GetMessage(string[] args)
    {
        return ((args.Length > 0)
               ? string.Join(" ", args)
               : "info: Hello World!");
    }
}

EmitLog.cs 源码)

如你所见,在建立连接后,我们声明了交换器。这一步非常有必要,因为发布消息到一个不存在的交换器,这种情况是被禁止的。

如果没有队列绑定到交换器上,消息将会丢失,但这对我们来说并没有什么没问题;如果没有消费者正在监听,我们是可以放心地把消息丢弃的。

ReceiveLogs.cs的代码:

using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

class ReceiveLogs
{
    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare(exchange: "logs", type: "fanout");

            var queueName = channel.QueueDeclare().QueueName;
            channel.QueueBind(queue: queueName,
                              exchange: "logs",
                              routingKey: "");

            Console.WriteLine(" [*] Waiting for logs.");

            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine(" [x] {0}", message);
            };
            channel.BasicConsume(queue: queueName,
                                 autoAck: true,
                                 consumer: consumer);

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}

ReceiveLogs.cs 源码)

按照 教程[1]中的设置说明生成EmitLogsReceiveLogs 项目。

如果您想把日志保存到文件中,只需打开一个控制台并输入:

cd ReceiveLogs
dotnet run > logs_from_rabbit.log

如果你想在屏幕上看到日志,我可以新开一个终端并运行:

cd ReceiveLogs
dotnet run

当然,分发日志需要输入:

cd EmitLog
dotnet run

使用rabbitmqctl list_bindings命令,您可以验证代码是否真正创建了我们想要的绑定和队列。当有两个ReceiveLogs.cs程序运行时,您应该看到如下所示的内容:

sudo rabbitmqctl list_bindings




对执行结果的解释简洁明了:来自logs交换器的数据转发到了两个由服务器随机分配名称的队列。这正是我们期待的结果。

想要了解如何监听消息的这一块内容,让我们继续阅读 教程[4]

写在最后#

本文翻译自 RabbitMQ 官方教程 C# 版本。如本文介绍内容与官方有所出入,请以官方最新内容为准。水平有限,翻译的不好请见谅,如有翻译错误还请指正。

先决条件
本教程假定 RabbitMQ 已经安装,并运行在localhost 标准端口(5672)。如果你使用不同的主机、端口或证书,则需要调整连接设置。

从哪里获得帮助
如果您在阅读本教程时遇到困难,可以通过邮件列表 联系我们

介绍#

RabbitMQ 是一个消息中间件:它接收并转发消息。您可以把它想象为一个邮局:当您把需要寄出的邮件投递到邮箱,邮差最终会把邮件送给您的收件人。在这个比喻中,RabbitMQ 就是一个邮箱,也可以理解成邮局和邮递员。

RabbitMQ 和邮局的主要区别在于它不处理纸张,而是接收、存储和转发二进制数据块 - _消息_。

RabbitMQ 和消息传递通常使用一些术语。

生产 的意思无非就是发送。发送消息的程序就是一个 _生产者_:

Producer

队列 就是 RabbitMQ 内部“邮箱”的名称。虽然消息流经 RabbitMQ 和您的应用程序,但它们只能存储在 队列 中。_队列_ 只受主机的内存和磁盘的限制,它本质上就是一个很大的消息缓冲区。多个 生产者 可以发送消息到一个队列,并且多个 消费者 可以尝试从一个 队列 接收数据。这就是我们代表队列的方式:

Queue

消费 与接收有相似的含义,等待接收消息的程序就是一个 _消费者_:

Consumer

注意:生产者、消费者和中间件不是必须部署在同一主机上,实际上在大多数应用程序中它们也不是这样的。

“Hello World”#

使用 .NET / C#Client

在教程的这一部分,我们将用 C# 编写两个程序:一个发送单条消息的生产者,以及接收消息并将其打印出来的消费者。我们将忽略 .NET 客户端 API 中的一些细节,专注于更简单的开始。这是一个消息传递的“Hello World”。

在下图中,P是我们的生产者,C是我们的消费者。中间的盒子是队列 - RabbitMQ 代表消费者保存的消息缓冲区。

.NET 客户端库

RabbitMQ 支持多种协议,本教程使用AMQP 0-9-1,它是一种开放的、通用的消息传递协议。RabbitMQ 提供了一些针对不同 语言环境 的客户端,我们将使用 RabbitMQ 提供的 .NET 客户端。

客户端支持 .NET Core 以及 .NET Framework 4.5.1+。本教程将使用 .NET Core,因此您需要确保客户端已 安装 并且路径添加到PATH系统变量。

您也可以使用 .NET Framework 来完成本教程,但设置步骤会有所不同。

RabbitMQ .NET 客户端 5.0 及更高版本通过 nuget 发布。

本教程假定您在 Windows 上使用 PowerShell。在 MacOS 和 Linux 上,几乎所有 shell 也都可以正常工作。

安装#

首先让我们验证您在PATH系统变量是否有 .NET Core 工具链:

dotnet --help

应该产生帮助信息。

现在,让我们生成两个项目,一个用于发布者,另一个用于消费者:

dotnet new console --name Send
mv Send/Program.cs Send/Send.cs
dotnet new console --name Receive
mv Receive/Program.cs Receive/Receive.cs

这将创建两个名为SendReceive的新目录。

然后,我们添加客户端依赖项。

cd Send
dotnet add package RabbitMQ.Client
dotnet restore
cd ../Receive
dotnet add package RabbitMQ.Client
dotnet restore

我们已经建立了 .NET 项目,现在我们可以编写一些代码。

发送#

我们将调用我们的消息发布者(发送者)Send.cs和我们的消息消费者(接收者)Receive.cs。发布者将连接到 RabbitMQ,发送一条消息,然后退出。

Send.cs 中,我们需要使用一些命名空间:

using System;
using RabbitMQ.Client;
using System.Text;

设置类:

class Send
{
    public static void Main()
    {
        ...
    }
}

然后,我们可以创建一个连接,连接到服务器:

class Send
{
    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using (var connection = factory.CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                ...
            }
        }
    }
}

该连接抽象了套接字连接,并为我们处理协议版本的协商和身份验证等。在这里,我们连接的是本地机器上的代理, 因此是localhost。如果我们想连接到其他机器上的代理,我们只需在此指定其名称或 IP 地址。

接下来,我们创建一个通道,该 API 的主要功能是把获得信息保存起来。

想要发送消息,我们必须为需要发送的消息声明一个队列,然后我们就可以把消息发布到队列中:

using System;
using RabbitMQ.Client;
using System.Text;

class Send
{
    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.QueueDeclare(queue: "hello",
                                 durable: false,
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: null);

            string message = "Hello World!";
            var body = Encoding.UTF8.GetBytes(message);

            channel.BasicPublish(exchange: "",
                                 routingKey: "hello",
                                 basicProperties: null,
                                 body: body);
            Console.WriteLine(" [x] Sent {0}", message);
        }

        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();
    }
}

声明队列是 幂等 的 - 只有当它不存在时才会被创建。消息内容是一个字节数组,所以您可以用喜欢的任意方式编码。

当上面的代码完成运行时,通道和连接将被释放。这就是我们的发布者。

Send.cs 源码)

发送不起作用!

如果这是您第一次使用 RabbitMQ,并且您没有看到“已发送”消息,那么您可能会挠着头想知道错误出在什么地方。也许是代理程序启动时没有足够的可用磁盘空间(默认情况下,它至少需要50 MB空闲空间),因此拒绝接收消息。
必要时检查代理程序日志文件来确认和减少限制。配置文件 文档 将告诉您如何设置disk_free_limit

接收#

至于消费者,它是把消息从 RabbitMQ 拉取过来。因此,与发布消息的发布者不同,我们会保持消费者持续不断地运行,监听消息并将其打印出来。

代码(在 Receive.cs 中)具有与Send差不多一样的using声明:

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

设置与发布者相同;我们开启一个连接和一个通道,并声明我们将要使用的队列。请注意,这需要与Send发布到的队列相匹配。

class Receive
{
    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using (var connection = factory.CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "hello",
                                     durable: false,
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);
                ...
            }
        }
    }
}

请注意,我们在这里也声明了队列。因为我们可能会在发布者之前启动消费者,所以我们希望在我们尝试从它中消费消息之前确保队列已存在。

我们即将告诉服务器将队列中的消息传递给我们。由于它会异步推送消息,因此我们提供了一个回调。这就是EventingBasicConsumer.Received事件处理程序所做的事情。

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

class Receive
{
    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.QueueDeclare(queue: "hello",
                                 durable: false,
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: null);

            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine(" [x] Received {0}", message);
            };
            channel.BasicConsume(queue: "hello",
                                 autoAck: true,
                                 consumer: consumer);

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}

Receive.cs 源码)

组合在一起#

打开两个终端。

运行消费者:

cd Receive
dotnet run

运行生产者:

cd Send
dotnet run

消费者将打印它通过 RabbitMQ 从发布者处获得的消息。消费者将继续运行、等待新消息(按Ctrl-C将其停止),可以尝试从开启另一个终端运行发布者。

接下来可以跳转到 教程[2],构建一个简单的工作队列。

写在最后#

本文翻译自 RabbitMQ 官方教程 C# 版本。如本文介绍内容与官方有所出入,请以官方最新内容为准。水平有限,翻译的不好请见谅,如有翻译错误还请指正。

先决条件
本教程假定 RabbitMQ 已经安装,并运行在localhost 标准端口(5672)。如果你使用不同的主机、端口或证书,则需要调整连接设置。

从哪里获得帮助
如果您在阅读本教程时遇到困难,可以通过邮件列表 联系我们

主题#

(使用 .NET 客户端)

教程[4] 中,我们改进了我们日志系统。我们用direct交换器替换了只能呆滞广播消息的fanout交换器,从而可以有选择性的接收日志。

虽然使用direct交换器改进了我们的系统,但它仍然有局限性 - 不能基于多个标准进行路由。

在我们的日志系统中,我们可能不仅要根据日志的严重性订阅日志,可能还要根据日志分发源来订阅日志。或许您可能从 unix syslog 工具中了解过这种概念,syslog 工具在路由日志的时候是可以既基于严重性(info/warn/crit…)又基于设备(auth/cron/kern…)的。

这种机制会给我们带来极大的灵活性 - 我们可以仅监听来自cron的关键错误日志,与此同时,监听来自kern的所有日志。

要在我们的日志系统中实现这一特性,我们需要学习更复杂的topic交换器。

Topic交换器#

发送到topic交换器的消息不能随意指定routing key,它必须是一个由点分割的单词列表,这些单词可以是任意内容,但通常会在其中指定一些与消息相关的特性。请看一些合法的路由键示例:stock.usd.nysenyse.vmwquick.orange.rabbit,路由键可以包含任意数量的单词,但不能超过255个字节的上限。

binding key也必须是相同的形式,topic交换器的背后逻辑与direct交换器类似 - 使用指定路由键发送的消息会被分发到与其绑定键匹配的所有队列中。不过对于绑定键来说,有两个重要的特殊情况需要注意:

  • *(星号)可以代替一个单词。
  • #(哈希)可以代替零个或多个单词。

下图示例是对上述内容最简单的解释:

在这个示例中,我们打算发送的消息全是用来描述动物的,这些消息会使用由三个单词(两个点)组成的路由键来发送。在路由键中,第一个单词用来描述行动速度、第二个是颜色、第三个是物种,即:<speed>.<colour>.<species>

我们创建了三个绑定:Q1绑定了键.orange.,Q2绑定了键*.*.rabbitlazy.#

这些绑定可以被概括为:

  • Q1对所有橙色的动物感兴趣。
  • Q2对兔子以及所有行动缓慢的动物感兴趣。

路由键为quick.orange.rabbit的消息会被发送到这两个队列,消息lazy.orange.elephant也会被发送到这两个队列。另外,quick.orange.fox只会进入第一个队列,lazy.brown.fox只会进入第二个队列。lazy.pink.rabbit只会被发送到第二个队列一次,尽管它匹配了两个绑定(避免了消息重复)。quick.brown.fox没有匹配的绑定,因此它将会被丢弃。

如果我们打破约定,发送使用一个或四个单词(例如:orangequick.orange.male.rabbit)作路由键的消息会发生什么?答案是,这些消息因为没有匹配到任何绑定,将被丢弃。

但是,另外,例如路由键为lazy.orange.male.rabbit的消息,尽管它有四个单词,也会匹配最后一个绑定,并将被发送到第二个队列。

Topics 交换器
topic交换器的功能是很强大的,它可以表现出一些其他交换器的行为。
当一个队列与键(哈希)绑定时, 它会忽略路由键,接收所有消息,这就像fanout交换器一样。
当特殊字符*(星号)和(哈希)未在绑定中使用时,topic交换器的行为就像direct交换器一样。

组合在一起#

我们将要在我们的日志系统中使用topic交换器,首先假设日志的路由键有两个单词组成:<facility>.<severity>

代码与上一篇 教程 中的代码几乎相同。

EmitLogTopic.cs的代码:

using System;
using System.Linq;
using RabbitMQ.Client;
using System.Text;

class EmitLogTopic
{
    public static void Main(string[] args)
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare(exchange: "topic_logs",
                                    type: "topic");

            var routingKey = (args.Length > 0) ? args[0] : "anonymous.info";
            
            var message = (args.Length > 1)
                          ? string.Join(" ", args.Skip(1).ToArray())
                          : "Hello World!";
            var body = Encoding.UTF8.GetBytes(message);
            
            channel.BasicPublish(exchange: "topic_logs",
                                 routingKey: routingKey,
                                 basicProperties: null,
                                 body: body);
                                 
            Console.WriteLine(" [x] Sent '{0}':'{1}'", routingKey, message);
        }
    }
}

ReceiveLogsTopic.cs的代码:

using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

class ReceiveLogsTopic
{
    public static void Main(string[] args)
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare(exchange: "topic_logs", type: "topic");
            var queueName = channel.QueueDeclare().QueueName;

            if(args.Length < 1)
            {
                Console.Error.WriteLine("Usage: {0} [binding_key...]",
                                        Environment.GetCommandLineArgs()[0]);
                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
                Environment.ExitCode = 1;
                return;
            }

            foreach(var bindingKey in args)
            {
                channel.QueueBind(queue: queueName,
                                  exchange: "topic_logs",
                                  routingKey: bindingKey);
            }

            Console.WriteLine(" [*] Waiting for messages. To exit press CTRL+C");

            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);
                var routingKey = ea.RoutingKey;
                Console.WriteLine(" [x] Received '{0}':'{1}'",
                                  routingKey,
                                  message);
            };
            channel.BasicConsume(queue: queueName,
                                 autoAck: true,
                                 consumer: consumer);

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}

请运行以下示例:

要接收所有日志:

cd ReceiveLogsTopic
dotnet run "#"

要接收来自设备kern的所有日志:

cd ReceiveLogsTopic
dotnet run "kern.*"

或者,如果您只想监听级别为critical的日志:

cd ReceiveLogsTopic
dotnet run "*.critical"

您可以创建多个绑定:

cd ReceiveLogsTopic
dotnet run "kern.*" "*.critical"

使用路由键kern.critical发出日志:

cd EmitLogTopic
dotnet run "kern.critical" "A critical kernel error"

希望运行这些程序能让您玩得开心。要注意的是,这些代码没有针对路由键和绑定键做任何预设,您可以尝试使用两个以上的路由键参数。

EmitLogTopic.csReceiveLogsTopic.cs 的完整源码)

接下来,在 教程[6] 中将了解如何将往返消息作为远程过程调用。

写在最后#

本文翻译自 RabbitMQ 官方教程 C# 版本。如本文介绍内容与官方有所出入,请以官方最新内容为准。水平有限,翻译的不好请见谅,如有翻译错误还请指正。

  • 原文链接:RabbitMQ tutorial - Topics
  • 实验环境:RabbitMQ 3.7.4 、.NET Core 2.1.3、Visual Studio Code
  • 最后更新:2018-09-06