日期:2014-05-17 浏览次数:21718 次
1.首先需要下载.net下的驱动类。rabbitmq.client.dll.安装后可以使用提供的文档。当然也可以直接从别的地方只下载rabbitmq.client.dll使用。
下载地址:http://www.rabbitmq.com/dotnet.html
?文档和安装程序都有了。
2.然后建立项目导入引用
???????? 一、首先建立一个消息的发送者类Sender
?
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Content;
using System.Collections;
namespace Sender
{
/// <summary>
/// 向队列中写入一个消息
/// </summary>
public class ProduceMQ
{
static void Main(string[] args)
{
//服务器所在的主机ip
?Uri uri = new Uri("amqp://192.168.1.99:8688/");
string exchange = "routing";//路由
string exchangeType = "direct";//交换模式
string routingKey = "rk";//路由关键字
//是否对消息队列持久化保存
bool persistMode = true;
ConnectionFactory cf = new ConnectionFactory();
cf.UserName = "gyg";//某个vhost下的用户
cf.Password = "123456";
cf.VirtualHost = "gyg001";//vhost
cf.RequestedHeartbeat = 0;
cf.Endpoint = new AmqpTcpEndpoint(uri);
? //创建一个连接到具体总结点的连接
using (IConnection conn = cf.CreateConnection())
{ //创建并返回一个新连接到具体节点的通道
using (IModel ch = conn.CreateModel())
{
if (exchangeType != null)
{//声明一个路由
ch.ExchangeDeclare(exchange, exchangeType);
//声明一个队列
ch.QueueDeclare("q", true, false, false, null);
//将一个队列和一个路由绑定起来。并制定路由关键字
ch.QueueBind("q1", exchange, routingKey);
}
///构造消息实体对象并发布到消息队列上
IMapMessageBuilder b = new MapMessageBuilder(ch);
IDictionary target = b.Headers;
target["header"] = "hello world";
IDictionary targerBody = b.Body;
targerBody["body"] = "hello world";//这个才是具体的发送内容
if (persistMode)
{
((IBasicProperties)b.GetContentHeader()).DeliveryMode = 2;
//设定传输模式
}
//写入
ch.BasicPublish(exchange, routingKey, (IBasicProperties)b.GetContentHeader(), b.GetContentBody());
Console.WriteLine("写入成功");
}
}
}
}
}
?二、创建一个接受者:receiver
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Util;
namespace receiver
{
public class receiver
{
static void Main(string[] args)
{
// Uri uri = new Uri("amqp://127.0.0.1:8688/");
string exchange = "routing";
string exchangeType = "direct";
string routingKey = "rk";
string serverAddress ="amqp://127.0.0.1:8688/";
ConnectionFactory cf = new ConnectionFactory();
cf.Uri = serverAddress;
cf.UserName = "gyg";
cf.Password = "123456";
cf.VirtualHost = "gyg001";
cf.RequestedHeartbeat = 0;
//cf.Endpoint = new AmqpTcpEndpoint(uri);
using (IConnection conn = cf.CreateConnection())
{
using (IModel ch = conn.CreateModel())
{
//普通使用方式BasicGet
//noAck = true,不需要回复,接收到消息后,queue上的消息就会清除
//noAck = false,需要回复,接收到消息后,queue上的消息不会被清除,
//直到调用channel.basicAck(deliveryTag, false);
//queue上的消息才会被清除 而且,在当前连接断开以前,其它客户端将不能收到此queue上的消息
BasicGetResult res = c