0%

复制代码

public class MQHelper
{ private const string CacheKey_MQConnectionSetting = “MQConnectionSetting”; private const string CacheKey_MQMaxConnectionCount = “MQMaxConnectionCount”; // 空闲连接对象队列
private readonly static ConcurrentQueue FreeConnectionQueue; //使用中(忙)连接对象字典
private readonly static ConcurrentDictionary<IConnection, bool> BusyConnectionDic; //连接池使用率
private readonly static ConcurrentDictionary<IConnection, int> MQConnectionPoolUsingDicNew; private readonly static Semaphore MQConnectionPoolSemaphore; //释放和添加连接时的锁对象
private readonly static object freeConnLock = new object(), addConnLock = new object(), getConnLock = new object(); // 连接总数
private static int connCount = 0; //默认最大保持可用连接数
public const int DefaultMaxConnectionCount = 50; //默认最大连接数可访问次数
public const int DefaultMaxConnectionUsingCount = 10000; public const int DefaultRetryConnectionCount = 1;//默认重试连接次数

    /// <summary>
    /// 初始化最大连接数 /// </summary>
    private static int MaxConnectionCount
    { get { //if (HttpRuntime.Cache\[CacheKey\_MQMaxConnectionCount\] != null) //{ // return Convert.ToInt32(HttpRuntime.Cache\[CacheKey\_MQMaxConnectionCount\]); //} //else //{ // int mqMaxConnectionCount = 0; // string mqMaxConnectionCountStr = ConfigurationManager.AppSettings\[CacheKey\_MQMaxConnectionCount\]; // if (!int.TryParse(mqMaxConnectionCountStr, out mqMaxConnectionCount) || mqMaxConnectionCount <= 0) // { // mqMaxConnectionCount = DefaultMaxConnectionCount; // } // string appConfigPath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "App.config"); // HttpRuntime.Cache.Insert(CacheKey\_MQMaxConnectionCount, mqMaxConnectionCount, new CacheDependency(appConfigPath));
                return 50; //}

}

    } /// <summary>
    /// 建立连接 /// </summary>
    /// <param name="hostName">服务器地址</param>
    /// <param name="userName">登录账号</param>
    /// <param name="passWord">登录密码</param>
    /// <returns></returns>
    private ConnectionFactory CrateFactory()
    { var mqConfigDom = MqConfigDomFactory.CreateConfigDomInstance(); //获取MQ的配置
        var connectionfactory = new ConnectionFactory();
        connectionfactory.HostName \= mqConfigDom.MqHost;
        connectionfactory.UserName \= mqConfigDom.MqUserName;
        connectionfactory.Password \= mqConfigDom.MqPassword;
        connectionfactory.Port \= mqConfigDom.MqPort;
        connectionfactory.VirtualHost \= mqConfigDom.MqVirtualHost; return connectionfactory;
    } /// <summary>
    /// 创建connection连接 /// </summary>
    /// <returns></returns>
    public IConnection CreateMQConnection()
    { var factory = CrateFactory();
        factory.AutomaticRecoveryEnabled \= true;//自动重连
        var connection = factory.CreateConnection();
        connection.AutoClose \= false; return connection;
    } /// <summary>
    /// 初始化 /// </summary>
    static MQHelper()
    {
        FreeConnectionQueue \= new ConcurrentQueue<IConnection>();
        BusyConnectionDic \= new ConcurrentDictionary<IConnection, bool\>();
        MQConnectionPoolUsingDicNew \= new ConcurrentDictionary<IConnection, int\>();//连接池使用率
        string semaphoreName = "MQConnectionPoolSemaphore"; try { if (null == MQConnectionPoolSemaphore)
            { bool semaphoreWasCreated;
                SemaphoreSecurity sems \= new SemaphoreSecurity();
                MQConnectionPoolSemaphore \= new Semaphore(MaxConnectionCount, MaxConnectionCount, "MQConnectionPoolSemaphore", out semaphoreWasCreated);//信号量,控制同时并发可用线程数

                if (semaphoreWasCreated)
                {
                    MQConnectionPoolSemaphore \= Semaphore.OpenExisting("MQConnectionPoolSemaphore", SemaphoreRights.FullControl);
                }

            }
        } catch (WaitHandleCannotBeOpenedException)
        { bool semaphoreWasCreated; string user = Environment.UserDomainName + "\\\\" + Environment.UserName;
            SemaphoreSecurity semSec \= new SemaphoreSecurity();

            SemaphoreAccessRule rule \= new SemaphoreAccessRule(user, SemaphoreRights.Synchronize | SemaphoreRights.Modify, AccessControlType.Deny);
            semSec.AddAccessRule(rule);

            rule \= new SemaphoreAccessRule(user, SemaphoreRights.ReadPermissions | SemaphoreRights.ChangePermissions, AccessControlType.Allow);
            semSec.AddAccessRule(rule); // Create a Semaphore object that represents the system // semaphore named by the constant 'semaphoreName', with // maximum count three, initial count three, and the // specified security access. The Boolean value that // indicates creation of the underlying system object is // placed in semaphoreWasCreated. //
            MQConnectionPoolSemaphore = new Semaphore(MaxConnectionCount, MaxConnectionCount, semaphoreName, out semaphoreWasCreated, semSec);
        } catch (UnauthorizedAccessException ex)
        {
            MQConnectionPoolSemaphore \= Semaphore.OpenExisting(semaphoreName, SemaphoreRights.ReadPermissions | SemaphoreRights.ChangePermissions); // Get the current ACL. This requires // SemaphoreRights.ReadPermissions.
            SemaphoreSecurity semSec = MQConnectionPoolSemaphore.GetAccessControl(); string user = Environment.UserDomainName + "\\\\" + Environment.UserName;
            SemaphoreAccessRule rule \= new SemaphoreAccessRule(user, SemaphoreRights.Synchronize | SemaphoreRights.Modify, AccessControlType.Deny);
            semSec.RemoveAccessRule(rule);//移除 // Now grant the user the correct rights.
            rule = new SemaphoreAccessRule(user, SemaphoreRights.Synchronize | SemaphoreRights.Modify, AccessControlType.Allow);
            semSec.AddAccessRule(rule); //重新授权

MQConnectionPoolSemaphore.SetAccessControl(semSec);
MQConnectionPoolSemaphore = Semaphore.OpenExisting(semaphoreName);
}
}

    //释放连接 public void CreateNewConnection2FreeQueue()
    {
        IConnection mqConnection \= null; if (FreeConnectionQueue.Count + BusyConnectionDic.Count < MaxConnectionCount)//如果已有连接数小于最大可用连接数,则直接创建新连接

{ lock (addConnLock)
{ if (FreeConnectionQueue.Count + BusyConnectionDic.Count < MaxConnectionCount)
{
mqConnection = CreateMQConnection();
FreeConnectionQueue.Enqueue(mqConnection);//加入到空闲队列连接集合中
MQConnectionPoolUsingDicNew[mqConnection] = 0;
}
}
}
} public string MqConnectionInfo()
{ int scount = 0; try {
scount=MQConnectionPoolSemaphore.Release();
MQConnectionPoolSemaphore.WaitOne(1);
scount -= 1;
} catch(SemaphoreFullException ex)
{
scount = MaxConnectionCount;
} return $”当前信号量计数={scount},当前空闲连接长度 ={ FreeConnectionQueue.Count},当前忙连接长度 ={ BusyConnectionDic.Count},连接使用频率信息如下:已达最大使用次数的有:{MQConnectionPoolUsingDicNew.Where(l=>l.Value>= DefaultMaxConnectionUsingCount).Count()},剩余{MQConnectionPoolUsingDicNew.Where(l => l.Value < DefaultMaxConnectionUsingCount).Count()}\r\n {JsonConvert.SerializeObject(MQConnectionPoolUsingDicNew)}”;
} ///


/// 在mq连接池中创建新连接 ///

///
public IConnection CreateMQConnectionInPoolNew(ref StringBuilder spanMsg)
{
Stopwatch sw = new Stopwatch(); long spanSum = 0;
sw.Start(); //
IConnection mqConnection = null; bool waitFree = false; int tryTimeCount = 0; try {
TryEnter:
waitFree = MQConnectionPoolSemaphore.WaitOne(10);//当<MaxConnectionCount时,会直接进入,否则会等待10ms继续监测直到空闲连接信号出现
if(!waitFree)
{
tryTimeCount++;
spanMsg.AppendLine($”阻塞10ms,空闲信号=[{waitFree}],进入第[{tryTimeCount}]次尝试,”); if (tryTimeCount <= 99)
{ goto TryEnter;
}
}
spanMsg.Append($”空闲信号=[{waitFree}],”); if (!FreeConnectionQueue.TryDequeue(out mqConnection)) //没有可用的
{
sw.Stop();
spanSum += sw.ElapsedMilliseconds;
spanMsg.Append($”尝试获取可用空闲连接,没有可用空闲连接,span:{sw.ElapsedMilliseconds}ms,”);
sw.Restart(); if (FreeConnectionQueue.Count + BusyConnectionDic.Count < MaxConnectionCount)//如果已有连接数小于最大可用连接数,则直接创建新连接
{ lock (addConnLock)
{ if (FreeConnectionQueue.Count + BusyConnectionDic.Count < MaxConnectionCount)
{
mqConnection = CreateMQConnection();
BusyConnectionDic[mqConnection] = true;//加入到忙连接集合中
MQConnectionPoolUsingDicNew[mqConnection] = 1;
sw.Stop();
spanSum += sw.ElapsedMilliseconds;
spanMsg.Append($”创建一个新连接,并加到使用中连接集合中,span:{sw.ElapsedMilliseconds}ms,”); return mqConnection;
}
}
}
sw.Stop();
spanSum += sw.ElapsedMilliseconds;
spanMsg.Append($”没有空闲连接,已到最大连接数{FreeConnectionQueue.Count + BusyConnectionDic.Count},等待连接释放,span:{sw.ElapsedMilliseconds}ms,”); if(waitFree)//重试需要释放之前占用的信号量
{ int scount=MQConnectionPoolSemaphore.Release();
waitFree = false;
spanMsg.Append($”释放空闲信号,当前信号计数={scount},”);
} return CreateMQConnectionInPoolNew();
} else if (MQConnectionPoolUsingDicNew[mqConnection] + 1 > DefaultMaxConnectionUsingCount || !mqConnection.IsOpen) //如果取到空闲连接,判断是否使用次数是否超过最大限制,超过则释放连接并重新创建
{ if (mqConnection.IsOpen)
{
mqConnection.Close();
}
mqConnection.Dispose();
mqConnection = CreateMQConnection();
MQConnectionPoolUsingDicNew[mqConnection] = 0;
sw.Stop();
spanSum += sw.ElapsedMilliseconds;
spanMsg.Append($”获取到的可用空闲连接可能因累计使用通道次数{MQConnectionPoolUsingDicNew[mqConnection] + 1 }>最大可用通道次数{DefaultMaxConnectionUsingCount},或连接状态处于不是开启状态={mqConnection.IsOpen},释放当前连接并重建一个连接,span:{sw.ElapsedMilliseconds}ms,”);
}
sw.Restart();
BusyConnectionDic[mqConnection] = true;//加入到忙连接集合中
MQConnectionPoolUsingDicNew[mqConnection] = MQConnectionPoolUsingDicNew[mqConnection] + 1;//使用次数加1
sw.Stop();
spanSum += sw.ElapsedMilliseconds;
spanMsg.AppendLine($”将获取到得空闲连接放入到忙集合中,并累加使用次数+1={MQConnectionPoolUsingDicNew[mqConnection] + 1},span:{sw.ElapsedMilliseconds}ms,”); return mqConnection;

        } catch(UnauthorizedAccessException ex)
        { throw ex;
        } catch (Exception ex)
        { if (null != mqConnection)
            {
                ResetMQConnectionToFree(mqConnection);
            } else  if(waitFree)//信号量没释放,则进行释放

{
MQConnectionPoolSemaphore.Release();
} return null;
} finally {
spanMsg.AppendLine( $”获取一个可用连接过程耗费{spanSum}ms,当前空闲连接长度={FreeConnectionQueue.Count},当前忙连接长度={BusyConnectionDic.Count}”); }
} ///


/// 在mq连接池中创建新连接 ///

///
public IConnection CreateMQConnectionInPoolNew()
{ //string spanMsg = string.Empty;
StringBuilder spanMsg = new StringBuilder(); return CreateMQConnectionInPoolNew(ref spanMsg);
} ///
/// 释放连接池中的连接 ///

///
private void ResetMQConnectionToFree(IConnection connection)
{ try { lock (freeConnLock)
{ bool result = false; if (BusyConnectionDic.TryRemove(connection, out result)) //从忙队列中取出
{
} else { //if(!BusyConnectionDic.TryRemove(connection,out result))
} if (FreeConnectionQueue.Count + BusyConnectionDic.Count > MaxConnectionCount)//如果因为高并发出现极少概率的>MaxConnectionCount,则直接释放该连接
{
connection.Close();
connection.Dispose();
} else if (connection.IsOpen) //如果OPEN状态才加入空闲链接队列
{
FreeConnectionQueue.Enqueue(connection);//加入到空闲队列,以便持续提供连接服务
}
}
} catch { throw;
} finally {
MQConnectionPoolSemaphore.Release();//释放一个空闲连接信号
}
} ///
/// 发送消息 ///

/// 消息队列连接对象
/// 消息类型
/// 队列名称
/// 是否持久化
/// 消息
///
public string SendMsg(IConnection connection, string queueName, string msg, bool durable = true, string exchange = “”, string type = “fanout”)
{ bool reTry = false; int reTryCount = 0; string sendErrMsg = string.Empty; do {
reTry = false; try { using (var channel = connection.CreateModel())//建立通讯信道
{ // 参数从前面开始分别意思为:队列名称,是否持久化,独占的队列,不使用时是否自动删除,其他参数
channel.QueueDeclare(queueName, durable, false, false, null); if (!exchange.IsNullOrEmpty())
{
channel.ExchangeDeclare(exchange: exchange, type: type, durable: durable);
}
channel.QueueBind(queueName, exchange, “”); //ExchangeDeclare(model, exchange, RabbitMqProxyConfig.ExchangeType.Fanout, isProperties); //QueueDeclare(model, queue, isProperties); //model.QueueBind(queue, exchange, routingKey);

                    var properties = channel.CreateBasicProperties();
                    properties.DeliveryMode \= 2;//1表示不持久,2.表示持久化
                                                ////properties.Type = ""; ////properties.CorrelationId

                    if (!durable)
                        properties \= null; var body = Encoding.UTF8.GetBytes(msg);
                    channel.BasicPublish(exchange, queueName, properties, body);
                }
                sendErrMsg \= string.Empty;
            } catch (Exception ex)
            { if ((++reTryCount) <= DefaultRetryConnectionCount)
                {
                    ResetMQConnectionToFree(connection);
                    connection \= CreateMQConnectionInPoolNew();
                    reTry \= true;
                } return ex.ToString();
            } finally { if (!reTry)
                {
                    ResetMQConnectionToFree(connection);
                }
            }
        } while (reTry); return sendErrMsg;
    } /// <summary>
    /// 消费消息 /// </summary>
    /// <param name="connection">消息队列连接对象</param>
    /// <param name="queueName">队列名称</param>
    /// <param name="durable">是否持久化</param>
    /// <param name="dealMessage">消息处理函数</param>
    /// <param name="saveLog">保存日志方法,可选</param>
    public void ConsumeMsg(IConnection connection, string queueName, bool durable, Func<string, ConsumeAction> dealMessage, Action<string, Exception> saveLog = null)
    { try { using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queueName, durable, false, false, null); //获取队列 
                channel.BasicQos(0, 1, false); //分发机制为触发式

                var consumer = new QueueingBasicConsumer(channel); //建立消费者 // 从左到右参数意思分别是:队列名称、是否读取消息后直接删除消息,消费者
                channel.BasicConsume(queueName, false, consumer); while (true)  //如果队列中有消息

{
ConsumeAction consumeResult = ConsumeAction.RETRY; var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); //获取消息
string message = null; try { var body = ea.Body;
message = Encoding.UTF8.GetString(body);
consumeResult = dealMessage(message);
} catch (Exception ex)
{ if (saveLog != null)
{
saveLog(message, ex);
}
} if (consumeResult == ConsumeAction.ACCEPT)
{
channel.BasicAck(ea.DeliveryTag, false); //消息从队列中删除
} else if (consumeResult == ConsumeAction.RETRY)
{
channel.BasicNack(ea.DeliveryTag, false, true); //消息重回队列
} else {
channel.BasicNack(ea.DeliveryTag, false, false); //消息直接丢弃
}
}
}

        } catch (Exception ex)
        { if (saveLog != null)
            {
                saveLog("QueueName:" + queueName, ex);
            } throw ex;
        } finally {
            ResetMQConnectionToFree(connection);
        }
    } /// <summary>
    /// 依次获取单个消息 /// </summary>
    /// <param name="connection">消息队列连接对象</param>
    /// <param name="QueueName">队列名称</param>
    /// <param name="durable">持久化</param>
    /// <param name="dealMessage">处理消息委托</param>
    public void ConsumeMsgSingle(IConnection connection, string QueueName, bool durable, Func<string, ConsumeAction> dealMessage)
    { try { using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(QueueName, durable, false, false, null); //获取队列 
                channel.BasicQos(0, 1, false); //分发机制为触发式

                uint msgCount = channel.MessageCount(QueueName); if (msgCount > 0)
                { var consumer = new QueueingBasicConsumer(channel); //建立消费者 // 从左到右参数意思分别是:队列名称、是否读取消息后直接删除消息,消费者
                    channel.BasicConsume(QueueName, false, consumer);

                    ConsumeAction consumeResult \= ConsumeAction.RETRY; var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); //获取消息
                    try { var body = ea.Body; var message = Encoding.UTF8.GetString(body);
                        consumeResult \= dealMessage(message);
                    } catch (Exception ex)
                    { throw ex;
                    } finally { if (consumeResult == ConsumeAction.ACCEPT)
                        {
                            channel.BasicAck(ea.DeliveryTag, false);  //消息从队列中删除

} else if (consumeResult == ConsumeAction.RETRY)
{
channel.BasicNack(ea.DeliveryTag, false, true); //消息重回队列
} else {
channel.BasicNack(ea.DeliveryTag, false, false); //消息直接丢弃
}
}
} else {
dealMessage(string.Empty);
}
}

        } catch (Exception ex)
        { throw ex;
        } finally {
            ResetMQConnectionToFree(connection);
        }
    } /// <summary>
    /// 获取队列消息数 /// </summary>
    /// <param name="connection"></param>
    /// <param name="QueueName"></param>
    /// <returns></returns>
    public int GetMessageCount(IConnection connection, string QueueName)
    { int msgCount = 0; bool reTry = false; int reTryCount = 0; do {
            reTry \= false; try { using (var channel = connection.CreateModel())
                {
                    channel.QueueDeclare(QueueName, true, false, false, null); //获取队列
                    msgCount = (int)channel.MessageCount(QueueName);
                }
            } catch (Exception ex)
            { //if (BaseUtil.IsIncludeException<SocketException>(ex))

{ if ((++reTryCount) <= DefaultRetryConnectionCount)//可重试1次
{
ResetMQConnectionToFree(connection);
connection = CreateMQConnectionInPoolNew();
reTry = true;
}
} throw ex;
} finally { if (!reTry)
{
ResetMQConnectionToFree(connection);
}
}

        } while (reTry); return msgCount;
    }
} public enum ConsumeAction
{
    ACCEPT, // 消费成功
    RETRY,   // 消费失败,可以放回队列重新消费
    REJECT,  // 消费失败,直接丢弃
}

复制代码

在站点开启多个工作进程的情况下,信号量的控制显得很重要,它可以有效的控制并发。信号量是针对单机而言的。通过计数可以有效控制使用中的连接个数。从而有效控制连接池的总体数量,防止过度的创建带来的毁灭性打击。