0%

live555的客服端流程:建立任务计划对象--建立环境对象--处理用户输入的参数(RTSP地址)--创建RTSPClient实例--发出DESCRIBE–发出SETUP–发出PLAY--进入Loop循环接收数据–发出TEARDOWN结束连接。

可以抽成3个函数接口:rtspOpen rtspRead rtspClose。

首先我们来分析rtspOpen的过程

复制代码

int rtspOpen(rtsp_object_t *p_obj, int tcpConnect)
{
     … … TRACE1_DEC(“BasicTaskScheduler::createNew !!!\n” ); if( ( p_sys->scheduler = BasicTaskScheduler::createNew() ) == NULL )
{
TRACE1_DEC(“BasicTaskScheduler::createNew failed\n” ); goto error;
} if( !( p_sys->env = BasicUsageEnvironment::createNew(*p_sys->scheduler) ) )
{
TRACE1_DEC(“BasicUsageEnvironment::createNew failed\n”); goto error;
} if( ( i_return = Connect( p_obj ) ) != RTSP_SUCCESS )
{
TRACE1_DEC( “Failed to connect with %s\n”, p_obj->rtspURL ); goto error;
} if( p_sys->p_sdp == NULL )
{
TRACE1_DEC( “Failed to retrieve the RTSP Session Description\n” ); goto error;
} if( ( i_return = SessionsSetup( p_obj ) ) != RTSP_SUCCESS )
{
TRACE1_DEC( “Nothing to play for rtsp://%s\n”, p_obj->rtspURL ); goto error;
} if( ( i_return = Play( p_obj ) ) != RTSP_SUCCESS ) goto error; … … }

复制代码

1> BasicTaskScheduler::createNew()

2> BasicUsageEnvironment::createNew()

3> connect

复制代码

static int Connect( rtsp_object_t *p_demux )
{
     … …
sprintf(appName, “LibRTSP%d”, p_demux->id); if( ( p_sys->rtsp = RTSPClient::createNew( *p_sys->env, 1, appName, i_http_port ) ) == NULL )
{
TRACE1_DEC( “RTSPClient::createNew failed (%s)\n”,
p_sys->env->getResultMsg() );

            i\_ret \= RTSP\_ERROR; goto connect\_error;                 
    }    

    psz\_options \= p\_sys->rtsp->sendOptionsCmd( p\_demux->rtspURL, psz\_user, psz\_pwd ); if(psz\_options == NULL)
            TRACE1\_DEC("RTSP Option commend error!!\\n");

    delete \[\] psz\_options;

    p\_sdp \= p\_sys->rtsp->describeURL( p\_demux->rtspURL );

     … … }

复制代码

connect中做了三件事:RTSPClient类的实例,发送“OPTIONS”请求,发送“describeURL”请求。

sendOptionsCmd()函数首先调用openConnectionFromURL()函数进程tcp连接,然后组包发送:

OPTIONS rtsp://120.90.0.50:8552/h264_ch2 RTSP/1.0
CSeq: 493 User-Agent: LibRTSP4 (LIVE555 Streaming Media v2008.04.02)

  收到服务器的应答:

RTSP/1.0 200 OK
CSeq: 493 Date: Mon, May 26 2014 13:27:07 GMT
Public: OPTIONS, DESCRIBE, SETUP, TEARDOWN, PLAY, PAUSE

  describeURL()函数首先也会调用openConnectionFromURL()函数进行TCP连接(这里可以看出先发OPTIONS请求,也可以先发describeURL请求),然后组包发送:

DESCRIBE rtsp://120.90.0.50:8552/h264_ch2 RTSP/1.0
CSeq: 494 Accept: application/sdp
User-Agent: LibRTSP4 (LIVE555 Streaming Media v2008.04.02)

  收到服务器应答:

复制代码

DESCRIBE rtsp://120.90.0.50:8552/h264_ch2 RTSP/1.0
CSeq: 494 Accept: application/sdp
User-Agent: LibRTSP4 (LIVE555 Streaming Media v2008.04.02)

Received DESCRIBE response:
RTSP/1.0 200 OK
CSeq: 494 Date: Mon, May 26 2014 13:27:07 GMT
Content-Base: rtsp://192.168.103.51:8552/h264_ch2/
Content-Type: application/sdp
Content-Length: 509 Need to read 509 extra bytes
Read 509 extra bytes: v=0 o=- 1401092685794152 1 IN IP4 192.168.103.51 s=RTSP/RTP stream from NETRA
i=h264_ch2
t=0 0 a=tool:LIVE555 Streaming Media v2008.04.02 a=type:broadcast
a=control:* a=range:npt=0- a=x-qt-text-nam:RTSP/RTP stream from NETRA
a=x-qt-text-inf:h264_ch2
m=video 0 RTP/AVP 96 c=IN IP4 0.0.0.0 a=rtpmap:96 H264/90000 a=fmtp:96 packetization-mode=1;profile-level-id=000042;sprop-parameter-sets=h264
a=control:track1
m=audio 0 RTP/AVP 96 c=IN IP4 0.0.0.0 a=rtpmap:96 PCMU/48000/2 a=control:track2

复制代码

4> SessionsSetup

复制代码

static int SessionsSetup( rtsp_object_t *p_demux )
{
     … …
// unsigned const thresh = 1000000;
if( !( p_sys->ms = MediaSession::createNew( *p_sys->env, p_sys->p_sdp ) ) )
{
TRACE1_DEC( “Could not create the RTSP Session: %s\n”, p_sys->env->getResultMsg() ); return RTSP_ERROR;
} /* Initialise each media subsession */ iter = new MediaSubsessionIterator( *p_sys->ms ); while( ( sub = iter->next() ) != NULL )
{
 … … bInit = sub->initiate(); if( !bInit )
{
TRACE1_DEC( “RTP subsession ‘%s/%s’ failed (%s)\n”,
sub->mediumName(), sub->codecName(), p_sys->env->getResultMsg() );
} else { … … /* Issue the SETUP */
if( p_sys->rtsp )
{ if( !p_sys->rtsp->setupMediaSubsession( *sub, False, b_rtsp_tcp, False ) )
{ /* if we get an unsupported transport error, toggle TCP
* use and try again */
if( !strstr(p_sys->env->getResultMsg(),”461 Unsupported Transport”) || !p_sys->rtsp->setupMediaSubsession( *sub, False, b_rtsp_tcp, False ) )
{
TRACE1_DEC( “SETUP of’%s/%s’ failed %s\n”, sub->mediumName(), sub->codecName(), p_sys->env->getResultMsg() ); continue;
}
}
}

            … …/* Value taken from mplayer */
if( !strcmp( sub->mediumName(), “audio” ) )
{ if( !strcmp( sub->codecName(), “MP4A-LATM” ) )
{
… … } else if( !strcmp( sub->codecName(), “PCMA” ) || !strcmp( sub->codecName(), “PCMU” ))
{
tk->fmt.i_extra = 0;
tk->fmt.i_codec = RTSP_CODEC_PCMU;
}
} else if( !strcmp( sub->mediumName(), “video” ) )
{ if( !strcmp( sub->codecName(), “H264” ) )
{
… … } else if( !strcmp( sub->codecName(), “MP4V-ES” ) )
{
… … } else if( !strcmp( sub->codecName(), “JPEG” ) )
{
tk->fmt.i_codec = RTSP_CODEC_MJPG;
}
}
               … … }
} … … }

复制代码

  这个函数做了四件事:创建MediaSession类的实例,创建MediaSubsessionIterator类的实例,MediaSubsession的初始化,发送”SETUP”请求。

  创建MediaSession实例的同时,会调用initializeWithSDP()函数去解析SDP,解析出”s=”相对应的fSessionName,解析出”s=”相对应的fSessionName,解析出”i=”相对应的fSessionDescription,解析出”c=”相对应的connectionEndpointName,解析出”a=type:”相对应的fMediaSessionType等等。创建MediaSubsession类的实例,并且加入到fSubsessionsHead链表中,从上面的SDP描述来看,有两个MediaSubsession,一个video,一个audio。

  创建MediaSubsessionIterator类的实例,并且调用reset函数,将fOurSession.fSubsessionsHead赋值给fNextPtr,也就是将链表的头结点赋值给fNextPtr。当执行while循环的时候,执行了两次,一次video,一次audio。

  initiate函数,根据fSourceFilterAddr来判断是否是SSM,还是ASM,然后调用Groupsock的不同构造函数来创建实例fRTPSocket、fRTCPSocket;然后根据协议类型fProtocolName(这个值在sdp中的“m=”)来判断是基于udp还是rtp,我们只分析RTP,如果是RTP,则根据相应的编码类型fCodecName(这个值在sdp中的“a=rtpmap:”)来判断相应的fRTPSource,这里我们创建了H264和PCMU的RTPSource实例fRTPSource;创建RTCPInstance类的实例fRTCPInstance。

  setupMediaSubsession()函数,主要是发送“SETUP”请求,通过SDP的描述,知道我们采用的是RTP协议,根据rtspOpen传入的参数streamUsingTCP来请求rtp是基于udp传输,还是tcp传输,如果是TCP传输,只能是单播,如果udp传输,则根据connectionEndpointName和传入的参数forceMulticastOnUnspecified来判断是否多播还是单播,我们的服务端值支持单播,而且传入的参数false,所以这里采用单播;组包发送“SETUP”请求:

SETUP rtsp://192.168.103.51:8552/h264_ch2/track1 RTSP/1.0
CSeq: 495 Transport: RTP/AVP;unicast;client_port=33482-33483 User-Agent: LibRTSP4 (LIVE555 Streaming Media v2008.04.02)

   服务器应答:

RTSP/1.0 200 OK
CSeq: 495 Date: Mon, May 26 2014 13:27:07 GMT
Transport: RTP/AVP;unicast;destination=14.214.248.17;source=192.168.103.51;client_port=33482-33483;server_port=6970-6971 Session: 151

  最后,如果采用TCP传输,则调用setStreamSocket()->RTPInterface::setStreamSocket()->addStreamSocket()函数将RTSP的socket值fInputSocketNum加入到fTCPStreams链表中;如果是UDP传输的话,组播地址为空,则用服务端地址保存到fDests中,如果组播地址不为空,则加入组播组。

复制代码

    ... ...  

     if (streamUsingTCP) { // Tell the subsession to receive RTP (and send/receive RTCP) // over the RTSP stream:
if (subsession.rtpSource() != NULL)
subsession.rtpSource()->setStreamSocket(fInputSocketNum, subsession.rtpChannelId); if (subsession.rtcpInstance() != NULL)
subsession.rtcpInstance()->setStreamSocket(fInputSocketNum, subsession.rtcpChannelId);
} else { // Normal case. // Set the RTP and RTCP sockets’ destination address and port // from the information in the SETUP response:
subsession.setDestinations(fServerAddress);
}
… …

复制代码

5> play

复制代码

static int Play( rtsp_object_t *p_demux )
{ … … if( p_sys->rtsp )
{ /* The PLAY */
if( !p_sys->rtsp->playMediaSession( *p_sys->ms, p_sys->i_npt_start, -1, 1 ) )
{
TRACE1_DEC( “RTSP PLAY failed %s\n”, p_sys->env->getResultMsg() ); return RTSP_ERROR;;
}
}
… …return RTSP_SUCCESS;
}

复制代码

  playMediaSession()函数,就是发送“PLAY”请求:

PLAY rtsp://120.90.0.50:8552/h264_ch2/ RTSP/1.0
CSeq: 497 Session: 151 Range: npt=0.000- User-Agent: LibRTSP4 (LIVE555 Streaming Media v2008.04.02)

 服务器应答:

复制代码

RTSP/1.0 200 OK
CSeq: 497 Date: Mon, May 26 2014 13:27:07 GMT
Range: npt=0.000- Session: 151 RTP-Info: url=rtsp://192.168.103.51:8552/h264_ch2/track1;seq=63842;rtptime=1242931431,url=rtsp://192.168.103.51:8552/h264_ch2/track2;seq=432;rtptime=3179210581

复制代码

接着我们分析rtspRead过程:

复制代码

int rtspRead(rtsp_object_t *p_obj)
{ … … if(p_sys != NULL)
{ /* First warn we want to read data */ p_sys->event = 0; for( i = 0; i < p_sys->i_track; i++ )
{
live_track_t *tk = p_sys->track[i];if( tk->waiting == 0 )
{
tk->waiting = 1;
tk->sub->readSource()->getNextFrame( tk->p_buffer, tk->i_buffer,
StreamRead, tk, StreamClose, tk );
}
}

            /\* Create a task that will be called if we wait more than 300ms \*/ task \= p\_sys->scheduler->scheduleDelayedTask( 300000, TaskInterrupt, p\_obj ); /\* Do the read \*/ p\_sys\->scheduler->doEventLoop( &p\_sys->event ); /\* remove the task \*/ p\_sys\->scheduler->unscheduleDelayedTask( task );    

            p\_sys\->b\_error ? ret = RTSP\_ERROR : ret = RTSP\_SUCCESS;
    } return ret;

}

复制代码

  这个函数首先要知道readSource()函数的fReadSource的值在哪里复制,在前面的initiate()函数里面有:

复制代码

    

       … …
       } else if (strcmp(fCodecName, “H264”) == 0) {
fReadSource = fRTPSource = H264VideoRTPSource::createNew(env(), fRTPSocket,
fRTPPayloadFormat,
fRTPTimestampFrequency);
} else if (strcmp(fCodecName, “JPEG”) == 0) { // motion JPEG
          … … } else if ( strcmp(fCodecName, “PCMU”) == 0 // PCM u-law audio
|| strcmp(fCodecName, “GSM”) == 0 // GSM audio
|| strcmp(fCodecName, “PCMA”) == 0 // PCM a-law audio
|| strcmp(fCodecName, “L16”) == 0 // 16-bit linear audio
|| strcmp(fCodecName, “MP1S”) == 0 // MPEG-1 System Stream
|| strcmp(fCodecName, “MP2P”) == 0 // MPEG-2 Program Stream
|| strcmp(fCodecName, “L8”) == 0 // 8-bit linear audio
|| strcmp(fCodecName, “G726-16”) == 0 // G.726, 16 kbps
|| strcmp(fCodecName, “G726-24”) == 0 // G.726, 24 kbps
|| strcmp(fCodecName, “G726-32”) == 0 // G.726, 32 kbps
|| strcmp(fCodecName, “G726-40”) == 0 // G.726, 40 kbps
|| strcmp(fCodecName, “SPEEX”) == 0 // SPEEX audio
) {
createSimpleRTPSource = True;
useSpecialRTPoffset = 0;
} else if (useSpecialRTPoffset >= 0) {
  … … } if (createSimpleRTPSource) { char* mimeType = new char[strlen(mediumName()) + strlen(codecName()) + 2] ;
sprintf(mimeType, “%s/%s”, mediumName(), codecName());
fReadSource = fRTPSource = SimpleRTPSource::createNew(env(), fRTPSocket, fRTPPayloadFormat,
fRTPTimestampFrequency, mimeType,
(unsigned)useSpecialRTPoffset,
doNormalMBitRule);
delete[] mimeType;
}
}

复制代码

    如果是h264编码方式,则getNextFrame函数定义在FramedSource::getNextFrame:

复制代码

void FramedSource::getNextFrame(unsigned char* to, unsigned maxSize,
afterGettingFunc* afterGettingFunc, void* afterGettingClientData,
onCloseFunc* onCloseFunc, void* onCloseClientData)
{ // Make sure we’re not already being read:
if (fIsCurrentlyAwaitingData) {
envir() << “FramedSource[“ << this << “]::getNextFrame(): attempting to read more than once at the same time!\n”;
exit(1);
}

fTo \= to;
fMaxSize \= maxSize;
fNumTruncatedBytes \= 0; // by default; could be changed by doGetNextFrame()
fDurationInMicroseconds = 0; // by default; could be changed by doGetNextFrame()
fAfterGettingFunc = afterGettingFunc;
fAfterGettingClientData \= afterGettingClientData;
fOnCloseFunc \= onCloseFunc;
fOnCloseClientData \= onCloseClientData;
fIsCurrentlyAwaitingData \= True;

doGetNextFrame();

}

复制代码

  doGetNextFrame()函数定义在MultiFramedRTPSource::doGetNextFrame():

复制代码

void MultiFramedRTPSource::doGetNextFrame()
{ if (!fAreDoingNetworkReads) { // Turn on background read handling of incoming packets:
fAreDoingNetworkReads = True;
TaskScheduler::BackgroundHandlerProc* handler = (TaskScheduler::BackgroundHandlerProc*)&networkReadHandler;
fRTPInterface.startNetworkReading(handler);
}

fSavedTo \= fTo;
fSavedMaxSize \= fMaxSize;
fFrameSize \= 0; // for now
fNeedDelivery = True;

doGetNextFrame1();

}

复制代码

  doGetNextFrame1()函数定义在MultiFramedRTPSource::doGetNextFrame1():

复制代码

void MultiFramedRTPSource::doGetNextFrame1()
{ while (fNeedDelivery) { // If we already have packet data available, then deliver it now.
Boolean packetLossPrecededThis;
BufferedPacket* nextPacket = fReorderingBuffer->getNextCompletedPacket(packetLossPrecededThis); if (nextPacket == NULL) break;

    fNeedDelivery \= False; if (nextPacket->useCount() == 0) { // Before using the packet, check whether it has a special header // that needs to be processed:

unsigned specialHeaderSize; if (!processSpecialHeader(nextPacket, specialHeaderSize)) { // Something’s wrong with the header; reject the packet:
fReorderingBuffer->releaseUsedPacket(nextPacket);
fNeedDelivery = True; break;
}
nextPacket->skip(specialHeaderSize);
} // Check whether we’re part of a multi-packet frame, and whether // there was packet loss that would render this packet unusable:
if (fCurrentPacketBeginsFrame) { if (packetLossPrecededThis || fPacketLossInFragmentedFrame) { // We didn’t get all of the previous frame. // Forget any data that we used from it:
fTo = fSavedTo; fMaxSize = fSavedMaxSize;
fFrameSize = 0;
}
fPacketLossInFragmentedFrame = False;
} else if (packetLossPrecededThis) { // We’re in a multi-packet frame, with preceding packet loss
fPacketLossInFragmentedFrame = True;
} if (fPacketLossInFragmentedFrame) { // This packet is unusable; reject it:
fReorderingBuffer->releaseUsedPacket(nextPacket);
fNeedDelivery = True; break;
} // The packet is usable. Deliver all or part of it to our caller:
unsigned frameSize;
nextPacket->use(fTo, fMaxSize, frameSize, fNumTruncatedBytes,
fCurPacketRTPSeqNum, fCurPacketRTPTimestamp,
fPresentationTime, fCurPacketHasBeenSynchronizedUsingRTCP,
fCurPacketMarkerBit);
fFrameSize += frameSize; if (!nextPacket->hasUsableData()) { // We’re completely done with this packet now
fReorderingBuffer->releaseUsedPacket(nextPacket);
} if (fCurrentPacketCompletesFrame || fNumTruncatedBytes > 0) { // We have all the data that the client wants.
if (fNumTruncatedBytes > 0) {
envir() << “MultiFramedRTPSource::doGetNextFrame1(): The total received frame size exceeds the client’s buffer size (“
<< fSavedMaxSize << “). “<< fNumTruncatedBytes << “ bytes of trailing data will be dropped!\n”;
} // Call our own ‘after getting’ function, so that the downstream object can consume the data:
if (fReorderingBuffer->isEmpty()) { // Common case optimization: There are no more queued incoming packets, so this code will not get // executed again without having first returned to the event loop. Call our ‘after getting’ function // directly, because there’s no risk of a long chain of recursion (and thus stack overflow):
afterGetting(this);
} else { // Special case: Call our ‘after getting’ function via the event loop.
nextTask() = envir().taskScheduler().scheduleDelayedTask(0, (TaskFunc*)FramedSource::afterGetting, this);
}
} else { // This packet contained fragmented data, and does not complete // the data that the client wants. Keep getting data:
fTo += frameSize; fMaxSize -= frameSize;
fNeedDelivery = True;
}
}
}

复制代码

   FramedSource::afterGetting(FramedSource* source) :

复制代码

void FramedSource::afterGetting(FramedSource* source)
{
source->fIsCurrentlyAwaitingData = False; // indicates that we can be read again // Note that this needs to be done here, in case the “fAfterFunc” // called below tries to read another frame (which it usually will)

if (source->fAfterGettingFunc != NULL) {
    (\*(source->fAfterGettingFunc))(source->fAfterGettingClientData,
                               source\->fFrameSize, 
                               source\->fNumTruncatedBytes,
                               source\->fPresentationTime,
                               source\->fDurationInMicroseconds);
}

}

复制代码

  fAfterGettingFunc函数指针在FramedSource::getNextFrame()中被赋值afterGettingFunc,afterGettingFunc的值则是rtspRead()函数调用getNextFrame()函数时,传入的StreamRead()。这样就获取了一帧数据。

     在MultiFramedRTPSource::doGetNextFrame()函数中,我们发现了fRTPInterface.startNetworkReading(handler),这个函数主要做了什么作用?

复制代码

void RTPInterface::startNetworkReading(TaskScheduler::BackgroundHandlerProc* handlerProc)
{ // Normal case: Arrange to read UDP packets:
envir().taskScheduler().turnOnBackgroundReadHandling(fGS->socketNum(), handlerProc, fOwner); // Also, receive RTP over TCP, on each of our TCP connections:
fReadHandlerProc = handlerProc; for (tcpStreamRecord* streams = fTCPStreams; streams != NULL; streams = streams->fNext) { // Get a socket descriptor for “streams->fStreamSocketNum”:
SocketDescriptor* socketDescriptor = lookupSocketDescriptor(envir(), streams->fStreamSocketNum); if (socketDescriptor == NULL) {
socketDescriptor = new SocketDescriptor(envir(), streams->fStreamSocketNum);
socketHashTable(envir())->Add((char const*)(long)(streams->fStreamSocketNum), socketDescriptor);
} // Tell it about our subChannel:
socketDescriptor->registerRTPInterface(streams->fStreamChannelId, this);
}
}

复制代码

  这个函数主要做了两个作用,一个是注册UDP socket的读取任务函数MultiFramedRTPSource::networkReadHandler()到任务队列,一个是注册TCP socket的读取任务函数SocketDescriptor::tcpReadHandler()到任务队列,最终还是会调用MultiFramedRTPSource::networkReadHandler()函数获取一帧数据。

live555支持单播和组播,我们先分析单播的流媒体服务端,后面分析组播的流媒体服务端。

一、单播的流媒体服务端:

复制代码

// Create the RTSP server:
RTSPServer* rtspServer = NULL; // Normal case: Streaming from a built-in RTSP server:
rtspServer = RTSPServer::createNew(*env, rtspServerPortNum, NULL); if (rtspServer == NULL) { *env << “Failed to create RTSP server: “ << env->getResultMsg() << “\n”;
exit(1);
} *env << “…done initializing \n”; if( streamingMode == STREAMING_UNICAST )
{
ServerMediaSession* sms = ServerMediaSession::createNew(*env,
H264StreamName[video_type],
H264StreamName[video_type],
streamDescription,
streamingMode == STREAMING_MULTICAST_SSM);
sms->addSubsession(WISH264VideoServerMediaSubsession::createNew(sms->envir(), *H264InputDevice[video_type], H264VideoBitrate));
sms->addSubsession(WISPCMAudioServerMediaSubsession::createNew(sms->envir(), *H264InputDevice[video_type]));

            rtspServer\->addServerMediaSession(sms); char \*url = rtspServer->rtspURL(sms); \*env << "Play this stream using the URL:\\t" << url << "\\n";
            delete\[\] url;  
    }

      // Begin the LIVE555 event loop:
      env->taskScheduler().doEventLoop(&watchVariable); // does not return

复制代码

我们一步一步分析:

1>  rtspServer = RTSPServer::createNew(*env, rtspServerPortNum, NULL);

复制代码

RTSPServer* RTSPServer::createNew(UsageEnvironment& env, Port ourPort,
UserAuthenticationDatabase* authDatabase,
unsigned reclamationTestSeconds)
{ int ourSocket = -1; do { int ourSocket = setUpOurSocket(env, ourPort); if (ourSocket == -1) break; return new RTSPServer(env, ourSocket, ourPort, authDatabase, reclamationTestSeconds);
} while (0); if (ourSocket != -1) ::closeSocket(ourSocket); return NULL;
}

复制代码

  此函数首先创建一个rtsp协议的socket,并且监听rtspServerPortNum端口,创建RTSPServer类的实例。下面我们看下RTSPServer的构造函数:

复制代码

RTSPServer::RTSPServer(UsageEnvironment& env, int ourSocket, Port ourPort,
UserAuthenticationDatabase* authDatabase,
unsigned reclamationTestSeconds)
: Medium(env),
fServerSocket(ourSocket), fServerPort(ourPort),
fAuthDB(authDatabase), fReclamationTestSeconds(reclamationTestSeconds),
fServerMediaSessions(HashTable::create(STRING_HASH_KEYS)),
fSessionIdCounter(0)
{
#ifdef USE_SIGNALS // Ignore the SIGPIPE signal, so that clients on the same host that are killed // don’t also kill us:
signal(SIGPIPE, SIG_IGN); #endif

    // Arrange to handle connections from others:
    env.taskScheduler().turnOnBackgroundReadHandling(fServerSocket, (TaskScheduler::BackgroundHandlerProc\*)&incomingConnectionHandler, this);

}

复制代码

  RTSPServer构造函数,初始化fServerMediaSessions为创建的HashTable,初始化fServerSocket为我们前面创建的tcp socket,fServerPort为我们监听的端口rtspServerPortNum,并且向taskScheduler注册fServerSocket的任务函数incomingConnectionHandler,这个任务函数主要监听是否有新的客服端连接accept,如果有新的客服端接入,创建RTSPClientSession的实例。

  RTSPClientSession要提供什么功能呢?可以想象:需要监听客户端的rtsp请求并回应它,需要在DESCRIBE请求中返回所请求的流的信息,需要在SETUP请求中建立起RTP会话,需要在TEARDOWN请求中关闭RTP会话,等等…

复制代码

RTSPServer::RTSPClientSession::RTSPClientSession(RTSPServer& ourServer, unsigned sessionId, int clientSocket, struct sockaddr_in clientAddr)
: fOurServer(ourServer), fOurSessionId(sessionId),
fOurServerMediaSession(NULL),
fClientSocket(clientSocket), fClientAddr(clientAddr),
fLivenessCheckTask(NULL),
fIsMulticast(False), fSessionIsActive(True), fStreamAfterSETUP(False),
fTCPStreamIdCount(0), fNumStreamStates(0), fStreamStates(NULL)
{ // Arrange to handle incoming requests:
resetRequestBuffer();
envir().taskScheduler().turnOnBackgroundReadHandling(fClientSocket,(TaskScheduler::BackgroundHandlerProc*)&incomingRequestHandler, this);
noteLiveness();
}

复制代码

  上面这个函数是RTSPClientSession的构造函数,初始化sessionId为++fSessionIdCounter,初始化fClientSocket为accept创建的socket(clientSocket),初始化fClientAddr为accept接收的客服端地址,也向taskScheduler注册了fClientSocket的认为函数incomingRequestHandler。

  incomingRequestHandler会调用incomingRequestHandler1,incomingRequestHandler1函数定义如下:

复制代码

void RTSPServer::RTSPClientSession::incomingRequestHandler1()
{
noteLiveness(); struct sockaddr_in dummy; // ‘from’ address, meaningless in this case
Boolean endOfMsg = False;
unsigned char* ptr = &fRequestBuffer[fRequestBytesAlreadySeen]; int bytesRead = readSocket(envir(), fClientSocket, ptr, fRequestBufferBytesLeft, dummy); if (bytesRead <= 0 || (unsigned)bytesRead >= fRequestBufferBytesLeft) { // Either the client socket has died, or the request was too big for us. // Terminate this connection:
#ifdef DEBUG
fprintf(stderr, “RTSPClientSession[%p]::incomingRequestHandler1() read %d bytes (of %d); terminating connection!\n”, this, bytesRead, fRequestBufferBytesLeft); #endif delete this; return;
}
#ifdef DEBUG
ptr[bytesRead] = ‘\0’;
fprintf(stderr, “RTSPClientSession[%p]::incomingRequestHandler1() read %d bytes:%s\n”, this, bytesRead, ptr); #endif

    // Look for the end of the message: <CR><LF><CR><LF>
    unsigned char \*tmpPtr = ptr; if (fRequestBytesAlreadySeen > 0) --tmpPtr; // in case the last read ended with a <CR>
    while (tmpPtr < &ptr\[bytesRead-1\]) { if (\*tmpPtr == '\\r' && \*(tmpPtr+1) == '\\n') { if (tmpPtr - fLastCRLF == 2) { // This is it:
                            endOfMsg = 1; break;
                    }
                    fLastCRLF \= tmpPtr;
            } ++tmpPtr;
    }

    fRequestBufferBytesLeft \-= bytesRead;
    fRequestBytesAlreadySeen += bytesRead; if (!endOfMsg) return; // subsequent reads will be needed to complete the request // Parse the request string into command name and 'CSeq', // then handle the command:
    fRequestBuffer\[fRequestBytesAlreadySeen\] = '\\0'; char cmdName\[RTSP\_PARAM\_STRING\_MAX\]; char urlPreSuffix\[RTSP\_PARAM\_STRING\_MAX\]; char urlSuffix\[RTSP\_PARAM\_STRING\_MAX\]; char cseq\[RTSP\_PARAM\_STRING\_MAX\]; if (!parseRTSPRequestString((char\*)fRequestBuffer, fRequestBytesAlreadySeen,
                                                    cmdName, sizeof cmdName,
                                                    urlPreSuffix, sizeof urlPreSuffix,
                                                    urlSuffix, sizeof urlSuffix,
                                                    cseq, sizeof cseq)) 
    {

#ifdef DEBUG
fprintf(stderr, “parseRTSPRequestString() failed!\n”); #endif handleCmd_bad(cseq);
} else {
#ifdef DEBUG
fprintf(stderr, “parseRTSPRequestString() returned cmdName \“%s\“, urlPreSuffix \“%s\“, urlSuffix \“%s\“\n”, cmdName, urlPreSuffix, urlSuffix); #endif
if (strcmp(cmdName, “OPTIONS”) == 0) {
handleCmd_OPTIONS(cseq);
} else if (strcmp(cmdName, “DESCRIBE”) == 0) {
printf(“incomingRequestHandler1 ~~~~~~~~~~~~~~\n”);
handleCmd_DESCRIBE(cseq, urlSuffix, (char const*)fRequestBuffer);
} else if (strcmp(cmdName, “SETUP”) == 0) {
handleCmd_SETUP(cseq, urlPreSuffix, urlSuffix, (char const*)fRequestBuffer);
} else if (strcmp(cmdName, “TEARDOWN”) == 0
|| strcmp(cmdName, “PLAY”) == 0
|| strcmp(cmdName, “PAUSE”) == 0
|| strcmp(cmdName, “GET_PARAMETER”) == 0) {
handleCmd_withinSession(cmdName, urlPreSuffix, urlSuffix, cseq, (char const*)fRequestBuffer);
} else {
handleCmd_notSupported(cseq);
}
}

#ifdef DEBUG
fprintf(stderr, “sending response: %s”, fResponseBuffer); #endif send(fClientSocket, (char const*)fResponseBuffer, strlen((char*)fResponseBuffer), 0); if (strcmp(cmdName, “SETUP”) == 0 && fStreamAfterSETUP) { // The client has asked for streaming to commence now, rather than after a // subsequent “PLAY” command. So, simulate the effect of a “PLAY” command:
handleCmd_withinSession(“PLAY”, urlPreSuffix, urlSuffix, cseq, (char const*)fRequestBuffer);
}

    resetRequestBuffer(); // to prepare for any subsequent request
    if (!fSessionIsActive) delete this;

}

复制代码

  此函数,我们可以看到rtsp的协议的各个命令的接收处理和应答。

2> ServerMediaSession* sms = ServerMediaSession::createNew(… …)

   创建ServerMediaSession类的实例,初始化fStreamName为”h264_ch1”,fInfoSDPString为”h264_ch1”,fDescriptionSDPString为”RTSP/RTP stream from NETRA”,fMiscSDPLines为null,fCreationTime获取的时间,fIsSSM为false。

3> sms->addSubsession(WISH264VideoServerMediaSubsession::createNew(… …);

  WISH264VideoServerMediaSubsession::createNew():这个函数的主要目的是创建OnDemandServerMediaSubsession类的实例,这个类在前面已经分析,是单播时候必须创建的,初始化fWISInput为*H264InputDevice[video_type]。

  sms->addSubsession() 是将WISH264VideoServerMediaSubsession类的实例加入到fSubsessionsTail链表首节点中。

4> sms->addSubsession(WISPCMAudioServerMediaSubsession::createNew(… …);

  WISPCMAudioServerMediaSubsession::createNew():这个函数的主要目的是创建OnDemandServerMediaSubsession类的实例,这个类在前面已经分析,是单播时候必须创建的,初始化fWISInput为*H264InputDevice[video_type]。

  sms->addSubsession() 是将WISPCMAudioServerMediaSubsession类的实例加入到fSubsessionsTail->fNext中。

5> rtspServer->addServerMediaSession(sms)

  将rtspServer加入到fServerMediaSessions的哈希表中。

6> env->taskScheduler().doEventLoop(&watchVariable);

这个doEventLoop在前面已经分析过,主要处理socket任务和延迟任务。   

二、组播的流媒体服务器:

复制代码

    // Create the RTSP server:
    RTSPServer\* rtspServer = NULL; // Normal case: Streaming from a built-in RTSP server:
    rtspServer = RTSPServer::createNew(\*env, rtspServerPortNum, NULL); if (rtspServer == NULL) { \*env << "Failed to create RTSP server: " << env->getResultMsg() << "\\n";
            exit(1);
    } \*env << "...done initializing \\n"; if( streamingMode == STREAMING\_UNICAST )
    {

        … … } else { if (streamingMode == STREAMING_MULTICAST_SSM)
{ if (multicastAddress == 0)
multicastAddress = chooseRandomIPv4SSMAddress(*env);
} else if (multicastAddress != 0) {
streamingMode = STREAMING_MULTICAST_ASM;
} struct in_addr dest;
     dest.s_addr = multicastAddress; const unsigned char ttl = 255; // For RTCP:
const unsigned maxCNAMElen = 100;
unsigned char CNAME[maxCNAMElen + 1];
gethostname((char *) CNAME, maxCNAMElen);
CNAME[maxCNAMElen] = ‘\0’; // just in case
ServerMediaSession* sms;
sms = ServerMediaSession::createNew(*env, H264StreamName[video_type], H264StreamName[video_type], streamDescription,streamingMode == STREAMING_MULTICAST_SSM); /* VIDEO Channel initial */
if(1)
{ // Create ‘groupsocks’ for RTP and RTCP:
const Port rtpPortVideo(videoRTPPortNum); const Port rtcpPortVideo(videoRTPPortNum+1);

                rtpGroupsockVideo \= new Groupsock(\*env, dest, rtpPortVideo, ttl);
                rtcpGroupsockVideo \= new Groupsock(\*env, dest, rtcpPortVideo, ttl); if (streamingMode == STREAMING\_MULTICAST\_SSM) {
                        rtpGroupsockVideo\->multicastSendOnly();
                        rtcpGroupsockVideo\->multicastSendOnly();
                }
                
                setVideoRTPSinkBufferSize();
                sinkVideo \= H264VideoRTPSink::createNew(\*env, rtpGroupsockVideo,96, 0x42, "h264"); // Create (and start) a 'RTCP instance' for this RTP sink:
                unsigned totalSessionBandwidthVideo = (Mpeg4VideoBitrate+500)/1000; // in kbps; for RTCP b/w share
                rtcpVideo = RTCPInstance::createNew(\*env, rtcpGroupsockVideo,
                                                     totalSessionBandwidthVideo, CNAME,
                                                     sinkVideo, NULL /\* we're a server \*/ ,
                                                     streamingMode \== STREAMING\_MULTICAST\_SSM); // Note: This starts RTCP running automatically
                sms->addSubsession(PassiveServerMediaSubsession::createNew(\*sinkVideo, rtcpVideo));

                sourceVideo \= H264VideoStreamFramer::createNew(\*env, H264InputDevice\[video\_type\]->videoSource()); // Start streaming:
                sinkVideo->startPlaying(\*sourceVideo, NULL, NULL);
            } /\* AUDIO Channel initial \*/
            if(1)
            { // there's a separate RTP stream for audio // Create 'groupsocks' for RTP and RTCP:
                    const Port rtpPortAudio(audioRTPPortNum); const Port rtcpPortAudio(audioRTPPortNum+1);

                    rtpGroupsockAudio \= new Groupsock(\*env, dest, rtpPortAudio, ttl);
                    rtcpGroupsockAudio \= new Groupsock(\*env, dest, rtcpPortAudio, ttl); if (streamingMode == STREAMING\_MULTICAST\_SSM) 
                    {
                            rtpGroupsockAudio\->multicastSendOnly();
                            rtcpGroupsockAudio\->multicastSendOnly();
                    } if( audioSamplingFrequency == 16000 )
                            sinkAudio \= SimpleRTPSink::createNew(\*env, rtpGroupsockAudio, 96, audioSamplingFrequency, "audio", "PCMU", 1); else sinkAudio \= SimpleRTPSink::createNew(\*env, rtpGroupsockAudio, 0, audioSamplingFrequency, "audio", "PCMU", 1); // Create (and start) a 'RTCP instance' for this RTP sink:
                    unsigned totalSessionBandwidthAudio = (audioOutputBitrate+500)/1000; // in kbps; for RTCP b/w share
                    rtcpAudio = RTCPInstance::createNew(\*env, rtcpGroupsockAudio,
                                                          totalSessionBandwidthAudio, CNAME,
                                                          sinkAudio, NULL /\* we're a server \*/,
                                                          streamingMode \== STREAMING\_MULTICAST\_SSM); // Note: This starts RTCP running automatically
                sms->addSubsession(PassiveServerMediaSubsession::createNew(\*sinkAudio, rtcpAudio));

                   sourceAudio \= H264InputDevice\[video\_type\]->audioSource(); // Start streaming:
                    sinkAudio->startPlaying(\*sourceAudio, NULL, NULL);
            }

            rtspServer\->addServerMediaSession(sms);
            
            { struct in\_addr dest; dest.s\_addr = multicastAddress; char \*url = rtspServer->rtspURL(sms); //char \*url2 = inet\_ntoa(dest);
                    \*env << "Mulicast Play this stream using the URL:\\n\\t" << url << "\\n"; //\*env << "2 Mulicast addr:\\n\\t" << url2 << "\\n";

delete[] url;
}
} // Begin the LIVE555 event loop:
env->taskScheduler().doEventLoop(&watchVariable); // does not return

复制代码

1> rtspServer = RTSPServer::createNew(*env, rtspServerPortNum, NULL);

同前面单播的分析一样。

2> sms = ServerMediaSession::createNew(… …)

  同前面单播的分析一样。

3> 视频

1. 创建视频rtp、rtcp的Groupsock类的实例,实现rtp和rtcp的udp通信socket。这里应该了解下ASM和SSM。

  2. 创建RTPSink类的实例,实现视频数据的RTP打包传输。

  3. 创建RTCPInstance类的实例,实现RTCP打包传输。

  4. 创建PassiveServerMediaSubsession类的实例,并加入到fSubsessionsTail链表中的首节点。

  5. 创建FramedSource类的实例,实现一帧视频数据的获取。

  5. 开始发送RTP和RTCP数据到组播地址。

4> 音频

1. 创建音频rtp、rtcp的Groupsock类的实例,实现rtp和rtcp的udp通信socket。这里应该了解下ASM和SSM。

  2. 创建RTPSink类的实例,实现音频数据的RTP打包传输。

  3. 创建RTCPInstance类的实例,实现RTCP打包传输。

  4. 创建PassiveServerMediaSubsession类的实例,并加入到fSubsessionsTail链表中的下一个节点。

  5. 创建FramedSource类的实例,实现一帧音频数据的获取。

  5. 开始发送RTP和RTCP数据到组播地址。

5> rtspServer->addServerMediaSession(sms)

同前面单播的分析一样。

6> env->taskScheduler().doEventLoop(&watchVariable)

同前面单播的分析一样。

三、单播和组播的区别

1> 创建socket的时候,组播一开始就创建了,而单播的则是根据收到的“SETUP”命令创建相应的socket。

2> startPlaying的时候,组播一开始就发送数据到组播地址,而单播则是根据收到的“PLAY”命令开始startPlaying。

四、startPlaying分析

首先分析组播:

sinkVideo->startPlaying()实现不在H264VideoRTPSink类中,也不在RTPSink类中,而是在MediaSink类中实现:

复制代码

Boolean MediaSink::startPlaying(MediaSource& source,
afterPlayingFunc* afterFunc, void* afterClientData)
{ // Make sure we’re not already being played:
if (fSource != NULL) {
envir().setResultMsg(“This sink is already being played”); return False;
} // Make sure our source is compatible:
if (!sourceIsCompatibleWithUs(source)) {
envir().setResultMsg(“MediaSink::startPlaying(): source is not compatible!”); return False;
}
fSource = (FramedSource*)&source;

fAfterFunc \= afterFunc;
fAfterClientData \= afterClientData; return continuePlaying();

}

复制代码

  这里发现调用了continuePlaying()函数,那这个函数在哪里实现的呢?因为sinkVideo是通过 H264VideoRTPSink::createNew()实现,返回的H264VideoRTPSink类的实例,因此我们可以判定这个continuePlaying()在H264VideoRTPSink类实现。

复制代码

Boolean H264VideoRTPSink::continuePlaying()
{ // First, check whether we have a ‘fragmenter’ class set up yet. // If not, create it now:
if (fOurFragmenter == NULL) {
fOurFragmenter = new H264FUAFragmenter(envir(), fSource, OutPacketBuffer::maxSize, ourMaxPacketSize() - 12/*RTP hdr size*/);
fSource = fOurFragmenter;
} //printf(“function=%s line=%d\n”,__func__,__LINE__); // Then call the parent class’s implementation:
return MultiFramedRTPSink::continuePlaying();
}

复制代码

  看到这里我们发现调用的是MultiFramedRTPSink类的成员函数continuePlaying,看下这个函数的实现:

复制代码

Boolean MultiFramedRTPSink::continuePlaying()
{ // Send the first packet. // (This will also schedule any future sends.)
buildAndSendPacket(True); return True;
}

复制代码

  这里我们发现了buildAndSendPacket(),这个函数实现:

复制代码

void MultiFramedRTPSink::buildAndSendPacket(Boolean isFirstPacket)
{ //此函数中主要是准备rtp包的头,为一些需要跟据实际数据改变的字段留出位置。
fIsFirstPacket = isFirstPacket; // Set up the RTP header:
unsigned rtpHdr = 0x80000000; // RTP version 2; marker (‘M’) bit not set (by default; it can be set later)
rtpHdr |= (fRTPPayloadType << 16);
rtpHdr |= fSeqNo; // sequence number
fOutBuf->enqueueWord(rtpHdr);//向包中加入一个字 // Note where the RTP timestamp will go. // (We can’t fill this in until we start packing payload frames.)
fTimestampPosition = fOutBuf->curPacketSize();
fOutBuf->skipBytes(4); // leave a hole for the timestamp 在缓冲中空出时间戳的位置
fOutBuf->enqueueWord(SSRC()); // Allow for a special, payload-format-specific header following the // RTP header:
fSpecialHeaderPosition = fOutBuf->curPacketSize();
fSpecialHeaderSize = specialHeaderSize();
fOutBuf->skipBytes(fSpecialHeaderSize); // Begin packing as many (complete) frames into the packet as we can:
fTotalFrameSpecificHeaderSizes = 0;
fNoFramesLeft = False;
fNumFramesUsedSoFar = 0; // 一个包中已打入的帧数。 //头准备好了,再打包帧数据
packFrame();
}

复制代码

  继续看packFrame():

复制代码

void MultiFramedRTPSink::packFrame()
{ // First, see if we have an overflow frame that was too big for the last pkt
if (fOutBuf->haveOverflowData()) { //如果有帧数据,则使用之。OverflowData是指上次打包时剩下的帧数据,因为一个包可能容纳不了一个帧。 // Use this frame before reading a new one from the source
unsigned frameSize = fOutBuf->overflowDataSize(); struct timeval presentationTime = fOutBuf->overflowPresentationTime();
unsigned durationInMicroseconds =fOutBuf->overflowDurationInMicroseconds();
fOutBuf->useOverflowData();

    afterGettingFrame1(frameSize, 0, presentationTime,durationInMicroseconds);
} else { //一点帧数据都没有,跟source要吧。 // Normal case: we need to read a new frame from the source
    if (fSource == NULL) return; //更新缓冲中的一些位置
    fCurFrameSpecificHeaderPosition = fOutBuf->curPacketSize();
    fCurFrameSpecificHeaderSize \= frameSpecificHeaderSize();
    fOutBuf\->skipBytes(fCurFrameSpecificHeaderSize);
    fTotalFrameSpecificHeaderSizes += fCurFrameSpecificHeaderSize; //从source获取下一帧
    fSource->getNextFrame(fOutBuf->curPtr(),//新数据存放开始的位置
            fOutBuf->totalBytesAvailable(),//缓冲中空余的空间大小
            afterGettingFrame,    //因为可能source中的读数据函数会被放在任务调度中,所以把获取帧后应调用的函数传给source
            this,
            ourHandleClosure, //这个是source结束时(比如文件读完了)要调用的函数。
            this);
}

}

复制代码

  fSource定义在MediaSink类中,在这个类中startPlaying()函数中,给fSource赋值为传入的参数sourceVideo,sourceVideo实现getNextFrame()函数在FramedSource中,这是一个虚函数:

复制代码

void FramedSource::getNextFrame(unsigned char* to, unsigned maxSize,
afterGettingFunc* afterGettingFunc, void* afterGettingClientData,
onCloseFunc* onCloseFunc, void* onCloseClientData)
{ // Make sure we’re not already being read:
if (fIsCurrentlyAwaitingData) {
envir() << “FramedSource[“ << this << “]::getNextFrame(): attempting to read more than once at the same time!\n”;
exit(1);
}

fTo \= to;
fMaxSize \= maxSize;
fNumTruncatedBytes \= 0; // by default; could be changed by doGetNextFrame()
fDurationInMicroseconds = 0; // by default; could be changed by doGetNextFrame()
fAfterGettingFunc = afterGettingFunc;
fAfterGettingClientData \= afterGettingClientData;
fOnCloseFunc \= onCloseFunc;
fOnCloseClientData \= onCloseClientData;
fIsCurrentlyAwaitingData \= True;

doGetNextFrame();

}

复制代码

  sourceVideo通过实现H264VideoStreamFramer::createNew()实例化,发现doGetNextFrame()函数实现在H264VideoStreamFramer类中:

复制代码

void H264VideoStreamFramer::doGetNextFrame()
{ //fParser->registerReadInterest(fTo, fMaxSize); //continueReadProcessing();
fInputSource->getNextFrame(fTo, fMaxSize,
afterGettingFrame, this,
FramedSource::handleClosure, this);
}

复制代码

  这fInputSource在H264VideoStreamFramer的基类StreamParser中被初始化为传入的参数H264InputDevice[video_type]->videoSource(),VideoOpenFileSource类继承OpenFileSource类,因此这个doGetNextFrame再一次FramedSource类中的getNextFrame()函数,这次getNextFrame函数中调用的doGetNextFrame()函数则是在OpenFileSource类实现的:

复制代码

void OpenFileSource::incomingDataHandler1() { int ret; if (!isCurrentlyAwaitingData()) return; // we’re not ready for the data yet
ret = readFromFile(); if (ret < 0) {
handleClosure(this);
fprintf(stderr,”In Grab Image, the source stops being readable!!!!\n”);
} else if (ret == 0)
{ if( uSecsToDelay >= uSecsToDelayMax )
{
uSecsToDelay = uSecsToDelayMax;
}else{
uSecsToDelay *= 2;
}
nextTask() = envir().taskScheduler().scheduleDelayedTask(uSecsToDelay, (TaskFunc*)incomingDataHandler, this);
} else {
nextTask() = envir().taskScheduler().scheduleDelayedTask(0, (TaskFunc*)afterGetting, this);
}
}

复制代码

  获取一帧数据后,执行延迟队列中的afterGetting()函数,此函数实现父类FramedSource中:

复制代码

void FramedSource::afterGetting(FramedSource* source)
{
source->fIsCurrentlyAwaitingData = False; // indicates that we can be read again // Note that this needs to be done here, in case the “fAfterFunc” // called below tries to read another frame (which it usually will)

if (source->fAfterGettingFunc != NULL) {
    (\*(source->fAfterGettingFunc))(source->fAfterGettingClientData,
                               source\->fFrameSize, 
                               source\->fNumTruncatedBytes,
                               source\->fPresentationTime,
                               source\->fDurationInMicroseconds);
}

}

复制代码

  fAfterGettingFunc函数指针在getNextFrame()函数被赋值,在MultiFramedRTPSink::packFrame() 函数中,被赋值MultiFramedRTPSink::afterGettingFrame():

复制代码

void MultiFramedRTPSink::afterGettingFrame(void* clientData, unsigned numBytesRead,
unsigned numTruncatedBytes, struct timeval presentationTime,
unsigned durationInMicroseconds)
{
MultiFramedRTPSink* sink = (MultiFramedRTPSink*)clientData;
sink->afterGettingFrame1(numBytesRead, numTruncatedBytes,
presentationTime, durationInMicroseconds);
}

复制代码

  继续看afterGettingFrame1实现:

复制代码

void MultiFramedRTPSink::afterGettingFrame1(
unsigned frameSize,
unsigned numTruncatedBytes, struct timeval presentationTime,
unsigned durationInMicroseconds)
{ if (fIsFirstPacket) { // Record the fact that we’re starting to play now:
gettimeofday(&fNextSendTime, NULL);
} //如果给予一帧的缓冲不够大,就会发生截断一帧数据的现象。但也只能提示一下用户
if (numTruncatedBytes > 0) {

    unsigned const bufferSize = fOutBuf->totalBytesAvailable();
    envir() << "MultiFramedRTPSink::afterGettingFrame1(): The input frame data was too large for our buffer size ("
            << bufferSize << "). "
            << numTruncatedBytes << " bytes of trailing data was dropped!  Correct this by increasing \\"OutPacketBuffer::maxSize\\" to at least "
            << OutPacketBuffer::maxSize + numTruncatedBytes << ", \*before\* creating this 'RTPSink'.  (Current value is "
            << OutPacketBuffer::maxSize << ".)\\n";
}
unsigned curFragmentationOffset \= fCurFragmentationOffset;
unsigned numFrameBytesToUse \= frameSize;
unsigned overflowBytes \= 0; //如果包只已经打入帧数据了,并且不能再向这个包中加数据了,则把新获得的帧数据保存下来。 // If we have already packed one or more frames into this packet, // check whether this new frame is eligible to be packed after them. // (This is independent of whether the packet has enough room for this // new frame; that check comes later.)
if (fNumFramesUsedSoFar > 0) { //如果包中已有了一个帧,并且不允许再打入新的帧了,则只记录下新的帧。
    if ((fPreviousFrameEndedFragmentation && !allowOtherFramesAfterLastFragment()) || !frameCanAppearAfterPacketStart(fOutBuf->curPtr(), frameSize))
    { // Save away this frame for next time:
        numFrameBytesToUse = 0;
        fOutBuf\->setOverflowData(fOutBuf->curPacketSize(), frameSize,
                presentationTime, durationInMicroseconds);
    }
} //表示当前打入的是否是上一个帧的最后一块数据。
fPreviousFrameEndedFragmentation = False; //下面是计算获取的帧中有多少数据可以打到当前包中,剩下的数据就作为overflow数据保存下来。
if (numFrameBytesToUse > 0) { // Check whether this frame overflows the packet
    if (fOutBuf->wouldOverflow(frameSize)) { // Don't use this frame now; instead, save it as overflow data, and // send it in the next packet instead.  However, if the frame is too // big to fit in a packet by itself, then we need to fragment it (and // use some of it in this packet, if the payload format permits this.)
        if (isTooBigForAPacket(frameSize) && (fNumFramesUsedSoFar == 0 || allowFragmentationAfterStart())) { // We need to fragment this frame, and use some of it now:
            overflowBytes = computeOverflowForNewFrame(frameSize);
            numFrameBytesToUse \-= overflowBytes;
            fCurFragmentationOffset += numFrameBytesToUse;
        } else { // We don't use any of this frame now:
            overflowBytes = frameSize;
            numFrameBytesToUse \= 0;
        }
        fOutBuf\->setOverflowData(fOutBuf->curPacketSize() + numFrameBytesToUse,
                overflowBytes, presentationTime, durationInMicroseconds);
    } else if (fCurFragmentationOffset > 0) { // This is the last fragment of a frame that was fragmented over // more than one packet.  Do any special handling for this case:
        fCurFragmentationOffset = 0;
        fPreviousFrameEndedFragmentation \= True;
    }
} if (numFrameBytesToUse == 0 && frameSize > 0) { //如果包中有数据并且没有新数据了,则发送之。(这种情况好像很难发生啊!) // Send our packet now, because we have filled it up:

sendPacketIfNecessary();
} else { //需要向包中打入数据。 // Use this frame in our outgoing packet:
unsigned char* frameStart = fOutBuf->curPtr();
fOutBuf->increment(numFrameBytesToUse); // do this now, in case “doSpecialFrameHandling()” calls “setFramePadding()” to append padding bytes // Here’s where any payload format specific processing gets done:
doSpecialFrameHandling(curFragmentationOffset, frameStart,
numFrameBytesToUse, presentationTime, overflowBytes); ++fNumFramesUsedSoFar; // Update the time at which the next packet should be sent, based // on the duration of the frame that we just packed into it. // However, if this frame has overflow data remaining, then don’t // count its duration yet.
if (overflowBytes == 0) {
fNextSendTime.tv_usec += durationInMicroseconds;
fNextSendTime.tv_sec += fNextSendTime.tv_usec / 1000000;
fNextSendTime.tv_usec %= 1000000;
} //如果需要,就发出包,否则继续打入数据。 // Send our packet now if (i) it’s already at our preferred size, or // (ii) (heuristic) another frame of the same size as the one we just // read would overflow the packet, or // (iii) it contains the last fragment of a fragmented frame, and we // don’t allow anything else to follow this or // (iv) one frame per packet is allowed:
if (fOutBuf->isPreferredSize() || fOutBuf->wouldOverflow(numFrameBytesToUse) || (fPreviousFrameEndedFragmentation && !allowOtherFramesAfterLastFragment()) || !frameCanAppearAfterPacketStart(
fOutBuf->curPtr() - frameSize, frameSize)) { // The packet is ready to be sent now
sendPacketIfNecessary();
} else { // There’s room for more frames; try getting another:
packFrame();
}
}
}

复制代码

看一下发送数据的函数:

复制代码

void MultiFramedRTPSink::sendPacketIfNecessary()
{ //发送包
if (fNumFramesUsedSoFar > 0) { // Send the packet:
#ifdef TEST_LOSS if ((our_random()%10) != 0) // simulate 10% packet loss #####
#endif
if (!fRTPInterface.sendPacket(fOutBuf->packet(),fOutBuf->curPacketSize())) { // if failure handler has been specified, call it
if (fOnSendErrorFunc != NULL)
(*fOnSendErrorFunc)(fOnSendErrorData);
} ++fPacketCount;
fTotalOctetCount += fOutBuf->curPacketSize();
fOctetCount += fOutBuf->curPacketSize() - rtpHeaderSize - fSpecialHeaderSize - fTotalFrameSpecificHeaderSizes; ++fSeqNo; // for next time
} //如果还有剩余数据,则调整缓冲区
if (fOutBuf->haveOverflowData() && fOutBuf->totalBytesAvailable() > fOutBuf->totalBufferSize() / 2) { // Efficiency hack: Reset the packet start pointer to just in front of // the overflow data (allowing for the RTP header and special headers), // so that we probably don’t have to “memmove()” the overflow data // into place when building the next packet:
unsigned newPacketStart = fOutBuf->curPacketSize()- (rtpHeaderSize + fSpecialHeaderSize + frameSpecificHeaderSize());
fOutBuf->adjustPacketStart(newPacketStart);
} else { // Normal case: Reset the packet start pointer back to the start:
fOutBuf->resetPacketStart();
}
fOutBuf->resetOffset();
fNumFramesUsedSoFar = 0; if (fNoFramesLeft) { //如果再没有数据了,则结束之 // We’re done:
onSourceClosure(this);
} else { //如果还有数据,则在下一次需要发送的时间再次打包发送。 // We have more frames left to send. Figure out when the next frame // is due to start playing, then make sure that we wait this long before // sending the next packet.
struct timeval timeNow;
gettimeofday(&timeNow, NULL); int secsDiff = fNextSendTime.tv_sec - timeNow.tv_sec;
int64_t uSecondsToGo = secsDiff * 1000000
+ (fNextSendTime.tv_usec - timeNow.tv_usec); if (uSecondsToGo < 0 || secsDiff < 0) { // sanity check: Make sure that the time-to-delay is non-negative:
uSecondsToGo = 0;
} // Delay this amount of time:
nextTask() = envir().taskScheduler().scheduleDelayedTask(uSecondsToGo,
(TaskFunc*) sendNext, this);
}
}

复制代码

  当一帧数据发送完,在doEventLoop()函数执行任务函数sendNext(),继续发送一包,进行下一个循环。音频数据的发送也是如此。

总结一下调用过程(参考牛搞大神):

单播数据发送:
  单播的时候,只有收到客服端的“PLAY”的命令时,才开始发送数据,在RTSPClientSession类中handleCmd_PLAY()函数中调用

复制代码

void RTSPServer::RTSPClientSession
::handleCmd_PLAY(ServerMediaSubsession* subsession, char const* cseq, char const* fullRequestStr)
{

  ... ...

    fStreamStates[i].subsession->startStream(fOurSessionId,
fStreamStates[i].streamToken,
(TaskFunc*)noteClientLiveness, this,
rtpSeqNum,
rtpTimestamp);
   … …
}

复制代码

  startStream()函数定义在OnDemandServerMediaSubsession类中:

复制代码

void OnDemandServerMediaSubsession::startStream(unsigned clientSessionId, void* streamToken,
TaskFunc* rtcpRRHandler, void* rtcpRRHandlerClientData,
unsigned short& rtpSeqNum,
unsigned& rtpTimestamp)
{
  StreamState* streamState = (StreamState*)streamToken;
  Destinations* destinations = (Destinations*)(fDestinationsHashTable->Lookup((char const*)clientSessionId)); if (streamState != NULL) {
    streamState->startPlaying(destinations, rtcpRRHandler, rtcpRRHandlerClientData); if (streamState->rtpSink() != NULL) {
      rtpSeqNum = streamState->rtpSink()->currentSeqNo();
      rtpTimestamp = streamState->rtpSink()->presetNextTimestamp();
}
}
}

复制代码

  startPlaying函数实现在StreamState类中:

复制代码

void StreamState::startPlaying(Destinations* dests,
TaskFunc* rtcpRRHandler, void* rtcpRRHandlerClientData)
{ if (dests == NULL) return; if (!fAreCurrentlyPlaying && fMediaSource != NULL) { if (fRTPSink != NULL) {
fRTPSink->startPlaying(*fMediaSource, afterPlayingStreamState, this);
fAreCurrentlyPlaying = True;
} else if (fUDPSink != NULL) {
fUDPSink->startPlaying(*fMediaSource, afterPlayingStreamState, this);
fAreCurrentlyPlaying = True;
}
} if (fRTCPInstance == NULL && fRTPSink != NULL) { // Create (and start) a ‘RTCP instance’ for this RTP sink:
fRTCPInstance = RTCPInstance::createNew(fRTPSink->envir(), fRTCPgs,
fTotalBW, (unsigned char*)fMaster.fCNAME,
fRTPSink, NULL /* we’re a server */); // Note: This starts RTCP running automatically
} if (dests->isTCP) { // Change RTP and RTCP to use the TCP socket instead of UDP:
if (fRTPSink != NULL) {
fRTPSink->addStreamSocket(dests->tcpSocketNum, dests->rtpChannelId);
} if (fRTCPInstance != NULL) {
fRTCPInstance->addStreamSocket(dests->tcpSocketNum, dests->rtcpChannelId);
fRTCPInstance->setSpecificRRHandler(dests->tcpSocketNum, dests->rtcpChannelId,
rtcpRRHandler, rtcpRRHandlerClientData);
}
} else { // Tell the RTP and RTCP ‘groupsocks’ about this destination // (in case they don’t already have it):
if (fRTPgs != NULL) fRTPgs->addDestination(dests->addr, dests->rtpPort); if (fRTCPgs != NULL) fRTCPgs->addDestination(dests->addr, dests->rtcpPort); if (fRTCPInstance != NULL) {
fRTCPInstance->setSpecificRRHandler(dests->addr.s_addr, dests->rtcpPort,
rtcpRRHandler, rtcpRRHandlerClientData);
}
}
}

复制代码

  这个函数就会去调用RTPSink类中的startPlaying()函数,但是RTPSink没有实现,直接调用父类MediaSink中的startPlaying函数。后面就跟组播一样的采集,组包,发送数据了。

九、Locust运行与配置_运行locust-CSDN博客

Excerpt

文章浏览阅读1.8k次。本文介绍了Locust的配置方式,包括通过环境变量、配置文件和命令行参数设置测试选项。Locust支持无网络界面运行、设置测试时间和退出策略。文章还提到了分布式运行时如何设置主节点和工作节点,以及自定义参数和统计设置的定制。此外,强调了在没有WebUI的情况下控制用户数和任务完成。


1. 配置

1.1 环境变量

也可以通过环境变量设置选项。它们通常与命令行参数相同,但大写并带有前缀LOCUST_

在 Linux/macOS 上:

1
$ LOCUST_LOCUSTFILE=custom_locustfile.py locust

在 Windows 上:

1
> set LOCUST_LOCUSTFILE=custom_locustfile.py > locust

1.2 配置文件

也可以在配置文件 格式的配置文件中设置选项。

Locust默认会查找~/.locust.conf和./locust.conf,你可以使用–config标志指定一个额外的文件。

例子:

1
# master.conf in current directory locustfile = locust_files/my_locust_file.py headless = true master = true expect-workers = 5 host = http://target-system users = 100 spawn-rate = 10 run-time = 10m
1
> locust --config=master.conf

注意: 配置值按以下顺序读取(重写)。

1
~/locust.conf -> ./locust.conf -> (file specified using --conf) -> env vars -> cmd args

1.3 所有可用的配置选项

这里有一个所有可用的配置选项的表格,以及它们相应的环境和配置文件的键。

命令行 环境变量 配置文件 描述
-f, --locustfile LOCUST_LOCUSTFILE locustfile 可以是一个.py文件,也可以是多个以逗号分隔的.py文件,或者是一个软件包目录。默认为’locustfile’。
-H, –host LOCUST_HOST host 要加载测试的主机,格式如下:http://10.21.32.33
-u, –users LOCUST_USERS users Locust并发用户的峰值数量。主要是与-headless或-autostart一起使用。在测试过程中可以通过键盘输入w, W (产生1, 10个用户)和s, S (停止1, 10个用户)来改变。
-r, –spawn-rate LOCUST_SPAWN_RATE spawn-rate 催生用户的速度(每秒用户数)。主要是与-headless或-autostart一起使用。
-t, –run-time LOCUST_RUN_TIME run-time 在指定的时间后停止,例如(300s、20m、3h、1h30m,等等)。只与-headless或-autostart一起使用。默认为永远运行。
--web-host LOCUST_WEB_HOST web-host 绑定网络接口的主机。默认为’*‘(所有接口)。
--web-port, -P LOCUST_WEB_PORT web-port 运行网络主机的端口
--headless LOCUST_HEADLESS headless 禁用网络界面,并立即开始测试。使用-u和-t来控制用户数和运行时间
--autostart LOCUST_AUTOSTART autostart 立即启动测试(就像-headless,但不会禁用网络用户界面)。
--autoquit LOCUST_AUTOQUIT autoquit 完全退出Locust,在运行结束后X秒。只和-autostart一起使用。默认情况是保持Locust运行,直到你用CTRL+C关闭它。
--web-auth LOCUST_WEB_AUTH web-auth 开启网络界面的基本认证。应以下列格式提供:用户名:密码
--tls-cert LOCUST_TLS_CERT tls-cert 可选的TLS证书路径,用于通过HTTPS提供服务
--tls-key LOCUST_TLS_KEY tls-key 可选的TLS私钥路径,用于通过HTTPS提供服务
--class-picker LOCUST_USERCLASS_PICKER class-picker 在网页界面上启用选择框,从所有可用的用户类和形状类中进行选择。
--master LOCUST_MODE_MASTER master 设置locust以分布式模式运行,并以该进程为主。
--master-bind-host LOCUST_MASTER_BIND_HOST master-bind-host locust master应该绑定的接口(hostname, ip)。只在与-master一起运行时使用。默认为*(所有可用的接口)。
--master-bind-port LOCUST_MASTER_BIND_PORT master-bind-port locust master应该绑定的端口。只在运行-master时使用。默认为5557。
--expect-workers LOCUST_EXPECT_WORKERS expect-workers 在开始测试之前,Master应该期望连接多少个worker(仅当使用-headless/autostart时)。
--expect-workers-max-wait LOCUST_EXPECT_WORKERS_MAX_WAIT expect-workers-max-wait 在放弃之前,主站应该等待工人连接多长时间。默认为永远等待
--worker LOCUST_MODE_WORKER worker 设置locust在分布式模式下运行,并将此进程作为工作者。
--master-host LOCUST_MASTER_NODE_HOST master-host 用于分布式负载测试的蝗虫主站的主机或 IP 地址。只在与-worker一起运行时使用。默认为127.0.0.1。
--master-port LOCUST_MASTER_NODE_PORT master-port 连接的端口,该端口被locust master用于分布式负载测试。只在与-worker一起运行时使用。默认为5557。
-T, --tags LOCUST_TAGS tags 要包括在测试中的标签列表,因此只有具有任何匹配标签的任务才会被执行。
-E, --exclude-tags LOCUST_EXCLUDE_TAGS exclude-tags 要从测试中排除的标签列表,因此只有没有匹配标签的任务才会被执行。
--csv LOCUST_CSV csv 将当前的请求统计信息以CSV格式存储到文件中。设置这个选项将生成三个文件。[CSV_PREFIX]_stats.csv, [CSV_PREFIX]_stats_history.csv and [CSV_PREFIX]_failures.csv
--csv-full-history LOCUST_CSV_FULL_HISTORY csv-full-history 将每个统计条目以CSV格式存储到_stats_history.csv文件。你还必须指定’-csv’参数来启用它。
--print-stats LOCUST_PRINT_STATS print-stats 启用在用户界面运行中定期打印请求统计信息的功能
--only-summary LOCUST_ONLY_SUMMARY only-summary 在-headless运行期间禁止定期打印请求统计信息
--reset-stats LOCUST_RESET_STATS reset-stats 一旦产卵完成,就重置统计数据。当以分布式模式运行时,应该在主站和工作站都设置。
--html LOCUST_HTML html 将HTML报告存储到指定的文件路径
--skip-log-setup LOCUST_SKIP_LOG_SETUP skip-log-setup 禁用Locust的日志设置。相反,配置是由Locust测试或Python默认提供的。
--loglevel, -L LOCUST_LOGLEVEL loglevel 在DEBUG/INFO/WARNING/ERROR/CRITICAL之间选择。默认是INFO。
--logfile LOCUST_LOGFILE logfile 日志文件的路径。如果不设置,日志将转到stderr。
--exit-code-on-error LOCUST_EXIT_CODE_ON_ERROR exit-code-on-error 当测试结果包含任何失败或错误时,设置进程退出代码。
-s, --stop-timeout LOCUST_STOP_TIMEOUT stop-timeout 在退出之前,等待模拟用户完成任何执行任务的秒数。默认是立即终止。这个参数只需要在运行Locust分布式时为主进程指定。

1.4 在没有网络用户界面的情况下运行

你可以通过使用 –headless 标志以及 -u/–users 和 -r/–spawn-rate,在没有网页界面的情况下运行蝗虫。

1
> locust -f locust_files/my_locust_file.py --headless -u 100 -r 5 [2021-07-24 10:41:10,947] .../INFO/locust.main: No run time limit set, use CTRL+C to interrupt. [2021-07-24 10:41:10,947] .../INFO/locust.main: Starting Locust 2.14.2 [2021-07-24 10:41:10,949] .../INFO/locust.runners: Ramping to 100 users using a 5.00 spawn rate Name # reqs # fails | Avg Min Max Median | req/s failures/s ---------------------------------------------------------------------------------------------- GET /hello 1 0(0.00%) | 115 115 115 115 | 0.00 0.00 GET /world 1 0(0.00%) | 119 119 119 119 | 0.00 0.00 ---------------------------------------------------------------------------------------------- Aggregated 2 0(0.00%) | 117 115 119 117 | 0.00 0.00 [2021-07-24 10:44:42,484] .../INFO/locust.runners: All users spawned: {"HelloWorldUser": 100} (100 total users)

即使在无头模式下,你也可以在测试运行时改变用户数。按w增加1个用户或按W增加10个。按s删除1个或S删除10个。

1.4.1 为测试设定一个时间限制

要指定一个测试的运行时间,使用-t/–运行时间。

1
locust -f --headless -u 100 --run-time 1h30m locust -f --headless -u 100 --run-time 60 # 默认单位为秒

一旦时间到了,Locust就会关闭。时间是从测试开始计算的(而不是从升温结束后计算)。

1.4.2 允许任务在关机时完成其迭代

默认情况下,Locust会立即停止你的任务(甚至不需要等待请求完成)。要给正在运行的任务一些时间来完成它们的迭代,可以使用-s/-stop-timeout。

1
$ locust --headless --run-time 1h30m --stop-timeout 10s
1.4.3 在没有网络界面下运行分布式

如果你想在没有Web UI的情况下运行Locust分布式,你应该在启动主节点时指定–expect-workers选项,以指定预期连接的工作节点的数量。然后,它将等待,直到有这么多的工作节点连接,才开始测试。

1.4.4 控制蝗虫进程的退出代码

默认情况下,如果有任何失败的样本,蝗虫进程将给出一个退出代码1(使用``-exit-code-on-error`来改变这一行为)。

你也可以通过设置环境实例的process_exit_code来手动控制你测试脚本中的退出代码。当把Locust作为一个自动/计划的测试运行时,例如作为CI管道的一部分,这一点特别有用。

下面是一个例子,如果满足以下任何条件,将退出代码设置为非零。

  • 超过1%的请求失败
  • 平均响应时间超过200毫秒
  • 响应时间的第95百分位数大于800毫秒
1
import logging from locust import events @events.quitting.add_listener def _(environment, **kw): if environment.stats.total.fail_ratio > 0.01: logging.error("Test failed due to failure ratio > 1%") environment.process_exit_code = 1 elif environment.stats.total.avg_response_time > 200: logging.error("Test failed due to average response time ratio > 200 ms") environment.process_exit_code = 1 elif environment.stats.total.get_response_time_percentile(0.95) > 800: logging.error("Test failed due to 95th percentile response time > 800 ms") environment.process_exit_code = 1 else: environment.process_exit_code = 0

请注意,这段代码可以进入 locustfile.py 或任何其他被导入 locustfile 的文件中

1.5 同时使用多个Locustfiles

-f/–locustfile选项接受一个单一的locustfiles目录作为选项。
蝗虫将递归搜索该目录中的*.py文件,忽略名为locust.py或以”_“开头的文件

例子:

具有以下文件结构。

1
├── locustfiles/ │ ├── locustfile1.py │ ├── locustfile2.py │ └── more_files/ │ ├── locustfile3.py │ ├── locust.py │ ├── _ignoreme.py
1
> locust -f locustfiles

蝗虫将使用 locustfile1.py, locustfile2.py & more_files/locustfile3.py

此外,-f/–locustfile接受多个以逗号分隔的locustfile。

Example:

1
$ locust -f locustfiles/locustfile1.py,locustfiles/locustfile2.py,locustfiles/more_files/locustfile3.py

蝗虫将使用 locustfile1.py, locustfile2.py & more_files/locustfile3.py

1.6 使用用户类UI选取器运行Locust

--class-picker你可以选择运行带有标志的 locust 时在 WebUI 中运行哪个 Shape 类和哪个 User 类。没有选择使用所有可用的用户类。

例子:

具有以下文件结构:

1
├── src/ │ ├── some_file.py ├── locustfiles/ │ ├── locustfile1.py │ ├── locustfile2.py │ └── more_files/ │ ├── locustfile3.py │ ├── locust.py │ ├── _ignoreme.py │ └── shape_classes/ │ ├── DoubleWaveShape.py │ ├── StagesShape.py
1
> locust -f locustfiles --class-picker

Web 用户界面将显示:

1.7 自定义参数

你可以使用init_command_line_parser事件向Locust添加你自己的命令行参数。

自定义参数也可以在Web UI中显示和编辑。

1
from locust import HttpUser, task, events @events.init_command_line_parser.add_listener def _(parser): parser.add_argument("--my-argument", type=str, env_var="LOCUST_MY_ARGUMENT", default="", help="It's working") # 如果要隐藏web ui,请将“include_in_web_ui”设置为False parser.add_argument("--my-ui-invisible-argument", include_in_web_ui=False, default="I am invisible") @events.test_start.add_listener def _(environment, **kw): print(f"Custom argument supplied: {environment.parsed_options.my_argument}") class WebsiteUser(HttpUser): @task def my_task(self): print(f"my_argument={self.environment.parsed_options.my_argument}") print(f"my_ui_invisible_argument={self.environment.parsed_options.my_ui_invisible_argument}")

当运行Locust分布式时,自定义参数会在运行开始时自动转发给工作者(但在此之前不会,所以你不能在测试实际开始之前依赖转发的参数)。

1.8 统计设置的定制

蝗虫统计的默认配置被设置在stats.py文件的常量中。它可以通过覆盖这些值来适应特定的要求。要做到这一点,请导入locust.stats模块并覆盖需要的设置。

1
import locust.stats locust.stats.CONSOLE_STATS_INTERVAL_SEC = 15

它可以直接在Locust文件中完成,也可以提取到单独的文件中,供所有Locust文件共同使用。

可以修改的统计参数列表是:

参数名字 宗旨
STATS_NAME_WIDTH 控制台输出中的请求名称列的宽度
STATS_TYPE_WIDTH 控制台输出中请求类型的列的宽度
CSV_STATS_INTERVAL_SEC 如果配置了这个选项,CSV文件的写入频率的间隔。
CONSOLE_STATS_INTERVAL_SEC 将结果写入控制台的频率的时间间隔
CURRENT_RESPONSE_TIME_PERCENTILE_WINDOW 计算当前响应时间百分位数时的窗口大小/分辨率–以秒为单位
PERCENTILES_TO_REPORT 要计算和报告的响应时间百分位数列表

更多教程,更多实战案例,请阅读:https://edu.csdn.net/course/detail/38449
在这里插入图片描述

更多教程,更多实战案例,请阅读:https://edu.csdn.net/course/detail/38449

背景叙述

在传统的基于 .Net Framework 框架下进行的 MEF 开发,大多是使用 MEF 1,对应的命名空间是 _System.ComponentModel.Composition_。在 DotNet Core 中,微软为了伟大的跨平台策略,引入了 MEF 2,其对应的命名空间是 _System.Composition_,这个需要开发者自己在 Nuget 上进行下载安装 Microsoft.Composition。2 与 1 相比,无论是在支持平台上还是性能上都有改进,值得我们探讨一下。

动手实验

实验1:在 DotNetCore 控制台程序中尝试使用 MEF2

首先,我们创建一个 DotNet Core 控制台应用程序,然后为其添加 MEF2 对应的 Package:Microsoft.Composition;

然后,我们创建一个示例接口:

1
2
3
4
public interface IMessageSender
{
void Send(string message);
}

接着,我们再创建一个示例类来实现该接口,并尝试将其导出:

1
2
3
4
5
6
7
8
[Export(typeof(IMessageSender))]
public class EmailSender : IMessageSender
{
public void Send(string message)
{
Console.WriteLine(message);
}
}

最后,我们在主程序中进行调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class Program
{
static void Main(string[] args)
{

var assembiles = new[] { typeof(Program).GetTypeInfo().Assembly };


var configuration = new ContainerConfiguration()
.WithAssembly(typeof(Program).GetTypeInfo().Assembly);
using (var container = configuration.CreateContainer())
{

IMessageSender messageSender = container.GetExport<IMessageSender>();

messageSender.Send("Hello MEF2");
}
Console.ReadKey();
}
}

此时,如果一切正常的话,程序会输入如下结果:

实验2:在 DotNetCore 控制台程序中尝试使用 MEF2 加载外部组件

由于微软在 DotNetCore 中为开发者提供了新的程序集加载方式 AssemblyLoadContext。它允许多次加载相同的程序集,并创建相互独立的副本,并且它比 AppDomain 重量轻得多。因此我在本次实验中,笔者尝试使用这种新的加载方式进行实验。

首先,我们创建一个如下图所示的解决方案:

  • DotNetCoreMEF:控制台程序,安装 Microsoft.Composition,并引用 DotNetCoreMEF.Core
  • DotNetCoreMEF.Core:核心类库,用于定义相关接口;
  • DotNetCoreMEF.Plugin1:插件类库,安装 Microsoft.Composition,并引用 DotNetCoreMEF.Core
  • DotNetCoreMEF.Plugin2:插件类库,安装 Microsoft.Composition,并引用 DotNetCoreMEF.Core

注意:请确保上述项目的生成目录保持一致。

相关示例代码如下所示:

IMessageSender.cs

1
2
3
4
public interface IMessageSender
{
void Send(string message);
}

EmailSender.cs

1
2
3
4
5
6
7
8
[Export(typeof(IMessageSender))]
public class EmailSender : IMessageSender
{
public void Send(string message)
{
Console.WriteLine($"Email:{message}");
}
}

SMSSender.cs

1
2
3
4
5
6
7
8
[Export(typeof(IMessageSender))]
public class SMSSender : IMessageSender
{
public void Send(string message)
{
Console.WriteLine($"SMS:{message}");
}
}

Program.cs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class Program
{
static void Main(string[] args)
{
var assembiles = Directory.GetFiles(AppContext.BaseDirectory, "*.dll", SearchOption.TopDirectoryOnly)
.Select(AssemblyLoadContext.Default.LoadFromAssemblyPath);

var conventions = new ConventionBuilder();
conventions.ForTypesDerivedFrom<IMessageSender>()
.Export<IMessageSender>()
.Shared();

var configuration = new ContainerConfiguration()
.WithAssemblies(assembiles, conventions);

using (var container = configuration.CreateContainer())
{
IEnumerable<IMessageSender> senders = container.GetExports<IMessageSender>();
foreach (var sender in senders)
{
sender.Send("Hello World");
}
}

Console.ReadKey();
}
}

此时,我们将项目全部重新编译一下,可通过 VS 调试运行,看到相应的输出结果。当然,我们也可以通过命令行的方式运行程序,前提是我们需要将我们的程序发布一下。发布好后我们可以执行 dotnet DotNetCoreMEF.dll 看到输出结果:

总结

上述展示的只是 MEF 在 DotNet Core 中的简单应用,其中需要注意的是 AssemblyLoadContext ,此外,关于模块的 延迟记载元数据的获取 ,感兴趣的朋友可参考我之前的一篇博客进行参考:MEF 插件式开发 - WPF 初体验

其实,如果对 DotNet Core 有一定了解的朋友是知道的,上述这种方式虽然实现了插件式的开发模式,但是并没有完全发挥 DotNet Core 本身所具有优势:内置 DI。所以,我们完全可以使用更高效的方式来实现。在下篇博客中,我们将感受一下 DotNet Core 中强大的 DI 。

相关参考

一、   什么是MEF

  MEF(Managed Extensibility Framework)是一个用于创建可扩展的轻型应用程序的库。 应用程序开发人员可利用该库发现并使用扩展,而无需进行配置。 扩展开发人员还可以利用该库轻松地封装代码,避免生成脆弱的硬依赖项。 通过 MEF,不仅可以在应用程序内重用扩展,还可以在应用程序之间重用扩展。(摘自MSDN)

  我的理解:应用_/_插件均使用约定好的协议(接口)进行开发。系统将自动扫描指定文件夹,并按协议自动导入。

二、   MEF简单例子

1、例子一

a、定义接口

public interface DemoOneInterface
{ void Send(string msg);
}

b、使用接口

复制代码

public class DemoOne
{
   \[Import\]
    DemoOneInterface DO; public void Run()
    {
        DO.Send("DemoOne.Run");
    }
}

复制代码

使用[Import]标记需要导入属性(DemoOneInterface DO;),如果不标记,则MEF不会进行导入。

c、创建插件类

\[Export(typeof(DemoOneInterface))\] public class DemoOneInherit1 : DemoOneInterface
{ #region DemoOneInterface Members

    public void Send(string msg)
    {
        Console.WriteLine("DemoOneInherit1 send {0}", msg);
    } #endregion }

插件

插件类需要使用Export标记,并且声称导出类型。

d、查看效果

static void Main(string[] args)
{ new DemoOne().Run();

        Console.ReadLine();
    }

原来我们使用MEF,但并没有通知MEF去寻找插件。

我们对Main函数进行修改:

复制代码

var demo = new DemoOne(); var catalog = new AggregateCatalog();

        catalog.Catalogs.Add(new AssemblyCatalog(typeof(Program).Assembly)); //catalog.Catalogs.Add(new DirectoryCatalog("Addin")); //遍历运行目录下的Addin文件夹,查找所需的插件。

        var \_container = new CompositionContainer(catalog);

        \_container.ComposeParts(demo);
        
        demo.Run();

复制代码

修改后再次运行看看效果。

OK,运行起来了,和预期一样。

2、例子二

运行例子一,没有问题,但2个插件使用同一个的时候,会报错。

因此我们可以为Export加入别名(contractName),并且Import的时候也指定别名,MEF就会根据别名自动进行加载。

修改后代码如下:

public class DemoOne
{
[Import(“2”)]
DemoOneInterface DO; public void Run()
{
DO.Send(“DemoOne.Run”);
}
} public interface DemoOneInterface
{ void Send(string msg);
}

\[Export("1",typeof(DemoOneInterface))\] public class DemoOneInherit1 : DemoOneInterface
{ #region DemoOneInterface Members

    public void Send(string msg)
    {
        Console.WriteLine("DemoOneInherit1 send {0}", msg);
    } #endregion }



\[Export("2", typeof(DemoOneInterface))\] public class DemoOneInherit12 : DemoOneInterface
{ #region DemoOneInterface Members

    public void Send(string msg)
    {
        Console.WriteLine("DemoOneInherit2 send {0}", msg);
    } #endregion }

View Code

运行效果:

3、例子三

有时我们希望一个同时使用多个插件,比如:输出log。

这时我们可以将Import改为ImportMany,并且修改Do的类型为IEnumerable来导入多个插件。

修改后代码:

public class DemoOne
{
[ImportMany]
IEnumerable DoList; public void Run()
{ foreach (var _do in DoList)
{
_do.Send(“DemoOne.Run”);
}
}
} public interface DemoOneInterface
{ void Send(string msg);
}

\[Export(typeof(DemoOneInterface))\] public class DemoOneInherit1 : DemoOneInterface
{ #region DemoOneInterface Members

    public void Send(string msg)
    {
        Console.WriteLine("DemoOneInherit1 send {0}", msg);
    } #endregion }



\[Export(typeof(DemoOneInterface))\] public class DemoOneInherit12 : DemoOneInterface
{ #region DemoOneInterface Members

    public void Send(string msg)
    {
        Console.WriteLine("DemoOneInherit2 send {0}", msg);
    } #endregion }

View Code

运行效果:

4、例子四

现在有很多插件使用同一个约定,但我想根据配置在同一个方法中调用某个插件。

这时我们需要使用ExportMetadata来为插件的特殊属性进行标记。

使用到Lazy,来进行延迟加载,并且获取插件标记的信息。(关于Lazy具体信息请自行查找)

a、新增插件描述类

public interface DemoOneInterfaceDepict
{ string Depict{get;}
}

b、为插件定义描述

[Export(typeof(DemoOneInterface))]
[ExportMetadata(“Depict”, “1”)] public class DemoOneInherit1 : DemoOneInterface
{ #region DemoOneInterface Members

    public void Send(string msg)
    {
        Console.WriteLine("DemoOneInherit1 send {0}", msg);
    } #endregion }



\[Export(typeof(DemoOneInterface))\]
\[ExportMetadata("Depict", "2")\] public class DemoOneInherit12 : DemoOneInterface
{ #region DemoOneInterface Members

    public void Send(string msg)
    {
        Console.WriteLine("DemoOneInherit2 send {0}", msg);
    } #endregion }

View Code

c、修改DoList

IEnumerable<Lazy<DemoOneInterface,DemoOneInterfaceDepict>> DoList;

d、根据配置调用

复制代码

public class DemoOne
{
[ImportMany]
IEnumerable<Lazy<DemoOneInterface,DemoOneInterfaceDepict>> DoList; public void Run()
{ foreach (var _do in DoList.Where(item=>item.Metadata.Depict == ReadXml()))
{
_do.Value.Send(“DemoOne.Run”);
}
} string ReadXml()
{ return “2”;
}
}

复制代码

运行结果:

三、简化调用

上述4个例子运行正常,但我们一直没去在意Main函数里面的内容。

复制代码

var demo = new DemoOne(); var catalog = new AggregateCatalog();

        catalog.Catalogs.Add(new AssemblyCatalog(typeof(Program).Assembly)); //catalog.Catalogs.Add(new DirectoryCatalog("Addin")); //遍历运行目录下的Addin文件夹,查找所需的插件。

        var \_container = new CompositionContainer(catalog);

        \_container.ComposeParts(demo);
        
        demo.Run();

复制代码

看着头就晕了,难道每次构造一个函数,都这么写吗?那不是非常痛苦?!!!

重新设计一下:

1、使用基类

复制代码

public abstract class BaseClass
{ public BaseClass()
    { var catalog = new AggregateCatalog();

        catalog.Catalogs.Add(new AssemblyCatalog(typeof(Program).Assembly)); var \_container = new CompositionContainer(catalog);

        \_container.ComposeParts(this);
    }
}

复制代码

修改DemoOne类继承BaseClass

public class DemoOne : BaseClass

简化调用

var demo = new DemoOne();
demo.Run();

运行 ok。

2、使用扩展方法

每个类都要继承这个基类,由于C#只有单继承,已经继承了一个基类后,就比较麻烦。

因此衍生出第二种方法,新增扩展方法。

扩展方法

复制代码

public static class ObjectExt
{ public static T ComposePartsSelf(this T obj) where T : class { var catalog = new AggregateCatalog();

        catalog.Catalogs.Add(new AssemblyCatalog(typeof(Program).Assembly));
        catalog.Catalogs.Add(new DirectoryCatalog(".")); //catalog.Catalogs.Add(new DirectoryCatalog("addin"));

        var \_container = new CompositionContainer(catalog);

        \_container.ComposeParts(obj); return obj;
    }
}

复制代码

修改DemoOne类,新增构造函数,并且调用扩展方法

复制代码

public class DemoOne
{ public DemoOne()
{ this.ComposePartsSelf();
}

    \[ImportMany\]
    IEnumerable<Lazy<DemoOneInterface,DemoOneInterfaceDepict>> DoList; public void Run()
    { foreach (var \_do in DoList.Where(item=>item.Metadata.Depict == ReadXml()))
        {
            \_do.Value.Send("DemoOne.Run");
        }
    } string ReadXml()
    { return "2";
    }
}

复制代码

简化调用

var demo = new DemoOne();
demo.Run();

运行 ok。

MSBuild 教程(2)

2014-06-27 11:40  yanqinqiang  阅读(2145)  评论()  编辑  收藏

如果你没有看过第一部分教程,请先看完后再回到这里来继续我们的第二部分。

我们下一步的工作是发布我们的网站,也就是创建Publish target. 为了能使我们的target工作,我们需要给它传递两个属性,WebProjectOutputDir 和 OutDir, 这两个变量将确保我们能把我们的网站文件发布到正确的目录下面。

MSbuild 允许我们声明一个PropertyGroup, 在这个PropertyGroup中,我们可以创建一些中间变量用来存储一些设置数据,当我们进行读写操作的时候,我们会需要这些设置数据。

好了,看例子:

复制代码

1
2 <Project xmlns=“http://schemas.microsoft.com/developer/msbuild/2003“ DefaultTargets=“Run”>
3 <PropertyGroup>
4 <OutputFolder>Output</OutputFolder>
5 </PropertyGroup>
6
7 <Target Name=“Run”>
8 <CallTarget Targets=“Clean” />
9 <CallTarget Targets=“Publish” />
10 </Target>
11
12 <Target Name=“Clean”>
13 <ItemGroup>
14 <BinFiles Include=“bin\*.*“ />
15 </ItemGroup>
16 <Delete Files=“@(BinFiles)” />
17 </Target>
18
19 <Target Name=“Publish”>
20 <RemoveDir Directories=“$(OutputFolder)” ContinueOnError=“true” />
21 <MSBuild Projects=“BuildDemoSite.csproj”
22 Targets=“ResolveReferences;_CopyWebApplication”
23 Properties=“WebProjectOutputDir=$(OutputFolder);OutDir=$(WebProjectOutputDir)\“ />
24 </Target>
25 </Project>

复制代码

正如你看到的,ResolveReferences target也被调用了,这是为了确保第三方的依赖也能够复制到Output目录下。

执行命令:msbuild build.xml, 结果如下

image

但是你也许注意到,在Output目录下,一些我不想复制的文件也一并复制了进来,比如Build.xml脚本本身,还有一些环境相关的配置文件:Live.Config & Test.Config. 那是因为它们的build action设置成了content, 你只要把build action设置成None,然后在重新执行下脚本,就不会出现上面的问题了。

image

现在这个网站基本上能够部署了,但是我们还需要把环境相关的配置复制到 web.config中来,那我们现在就开始这个教程

正如我们在 第一部分教程中提到的,除了web.config 文件,我们还有live.config 和 test.config文件。 同时web.config包含关于网站的所有配置,live.config和 test.config包含一些与部署环境相关的一些配置,接下来的工作就是要把部署环境相关的值正确的复制到web.config中来。

为了实现这个目的,我们通常的思路是用XPath从一个文件中读取配置信息,然后再通过XPath把这个信息写到另一个文件中去。但是快速浏览下内置的Task,没有找到有这个功能的,庆幸的是,MSBuild Community Tasks Project拯救了我们,它提供了XmlRead 和 XmllUpdate task。那先让我们下载,安装它。

在用他们之前,我们要确保下面的XML已经插入到我们的build.xml的根节点上了

<Import Project=“C:\Program Files (x86)\MSBuild\MSBuildCommunityTasks\MSBuild.Community.Tasks.Targets”/>

然后我们增加一个Environment的中间变量在PropertyGroup中,当前它是被硬编码的写成了Test,我们会在以后的教程成修改这个,让它可以配置。

<PropertyGroup>
<Environment>Test</Environment>

<OutputFolder>Output</OutputFolder>
</PropertyGroup>

好,我们第一步是用XmlRead task去获取环境配置的数据,当前我只列举了XmlRead的一个例子, 你可以添加更多

复制代码

1 <Target Name=“GetConfig”>
2 <XmlRead XPath=“configuration/system.web/compilation/@debug”
3 XmlFileName=“$(Environment).config”>
4 <Output TaskParameter=“Value” PropertyName=“CompilationDebug” />
5 </XmlRead>
6 </Target>

复制代码

  1. GetConfig target包含所有的Xmlread task
  2. XmlFileName属性有个变量 $(Environment).config,这个变量指向的是我们的前面硬编码的Test.
  3. Output是用来把XmlRead的返回值存储到CompilationDebug属性中。

几乎同样的,我们的XmlUpdate采用同样的语法, 其中注意的是 DependsOnTargets=”GetConfig”是确保复制数据的时候,要先读再写。

1 <Target Name=“SetConfig” DependsOnTargets=“GetConfig”>
2 <XmlUpdate XPath=“configuration/system.web/compilation/@debug”
3 XmlFileName=“Output\web.config”
4 Value=“$(CompilationDebug)” />
5 </Target>

好了,让我们开始见证奇迹的时候,执行下面的命令

msbuild Build.xml /t:SetConfig

检查一下是不是所有的命令都工作,打开output文件下的web.config文件,看看数据是不是跟Test.config中的一致。

你可以查看整个build.xml的代码

测试系统: windows 10

首先需要msys2的安装包,可以去官网下载安装包

官网地址: http://www.msys2.org/
本次下载的是 msys2-x86_64-20180531.exe

注意:1.msys2不可以安装在FAT*分区
2.msys2不能安装在win XP系统上

官方下载地址

http://www.msys2.org/,这官网下载超级慢,

推荐使用:https://mirrors-wan.geekpie.club/msys2/distrib/i686/(上海科技大学的镜像)

指定好安装路径(一般D根目录即可)本人使用的是默认路径,win10的c盘,一路下一步就好。

安装好后,不要直接运行,先配置,软件源,否则下载超级慢

安装完成后点击完 成退出安装程序

注意: 此处不要勾选”立即运行 MSYS2 64bit”, 因为接下来要配置国内更新源
这里写图片描述

二、配置国内镜像

使用[清华大学开源软件镜像站]中的地址,修改\etc\pacman.d目录下的三个文件。

配置教程 https://mirrors.tuna.tsinghua.edu.cn/help/msys2/

1、mirrorlist.msys 文件

复制代码

复制代码

MSYS2 repository mirrorlist

Primary

msys2.org

Server = http://repo.msys2.org/msys/$arch

Server = http://downloads.sourceforge.net/project/msys2/REPOS/MSYS2/$arch

Server = http://www2.futureware.at/~nickoe/msys2-mirror/msys/$arch/

Server = https://mirrors.tuna.tsinghua.edu.cn/msys2/msys/$arch

复制代码

复制代码

2、mirrorlist.mingw32 文件

复制代码

复制代码

32-bit Mingw-w64 repository mirrorlist

Primary

msys2.org

Server = http://repo.msys2.org/mingw/i686

Server = http://downloads.sourceforge.net/project/msys2/REPOS/MINGW/i686

Server = http://www2.futureware.at/~nickoe/msys2-mirror/i686/

Server = https:

复制代码

复制代码

3、mirrorlist.mingw64 文件

复制代码

复制代码

64-bit Mingw-w64 repository mirrorlist

Primary

msys2.org

Server = http://repo.msys2.org/mingw/x86\_64

Server = http://downloads.sourceforge.net/project/msys2/REPOS/MINGW/x86\_64

Server = http://www2.futureware.at/~nickoe/msys2-mirror/x86\_64/

Server = https://mirrors.tuna.tsinghua.edu.cn/msys2/mingw/x86_64

复制代码

复制代码

设置窗体修改颜色

目录下D:\msys64\etc 文件 pacman.conf

将其中的 #Color 注释去掉。

    

打开菜单中的“MSYS2 MinGW 64-bit”

习惯了打开64位的图标,所以此处打开“MSYS2 MinGW 64-bit”,以后的使用也一直使用“MSYS2 MinGW 64-bit”就好了

这里写图片描述

更新Msys2系统环境

pacman -Sy
#更新源
pacman -Syu
pacman -Su

安装示例如下:

这里写图片描述
输入“Y”键开始更新
注意: 更新时会遇到下图的警告,这时一定要点击右上角的×来关闭窗口
这里写图片描述

关闭窗口时会弹出一个对话框(如下图),点击OK就可以了

这里写图片描述
上面的更新没有结束,需要继续更新,
继续更新之前要重新配置更新源,因为目录“安装位置\etc\pacman.d”中的三个配置文件mirrorlist.msys,mirrorlist.mingw32,mirrorlist.mingw64会由于上面的更新而被更新(这么说别扭吗),所以需要按第3步重新配置更新源.

重新配置好更新源后,再次打开“MSYS2 MinGW 64-bit”,
安装MinGW-w64 和 MinGW-x86版本,如果在64位系统,只安装x86_64的即可。

安装编译环境

安装这个msys2就是要写程序编译程序用的,因此编译环境的安装还是必要的

5.1 安装gcc, 执行下面的命令安装与MinGW-w64匹配的gcc
命令: pacman -S mingw-w64-x86_64-gcc

这里写图片描述

注意:如果想要编译的程序在windows环境下运行,记得要静态编译

5.2 执行pacman -S make安装make

这里写图片描述

pacman -S mingw-w64-i686-toolchain    可以不安装
pacman -S mingw-w64-x86_64-toolchain

三、安装常用工具

pacman -S base-devel git wget perl ruby python2

安装GTK

pacman -S mingw-w64-i686-gtk3  可以不安装

pacman -S mingw-w64-x86_64-gtk3

安装Glade

pacman -S mingw-w64-i686-glade
pacman -S mingw-w64-x86_64-glade

四、设置系统环境变量

在Path值中加入参数

32位

64位

五、设置mingw的环境变量

变量MINGW_HOME

变量C_INCLUDE_PATH

变量LIBRARY_PATH

变量Path中添加如下值

MSbuild 教程

2014-06-23 21:39  yanqinqiang  阅读(7581)  评论()  编辑  收藏

在.Net framework中,一个不太被大家熟知的工具就是MSBuild。这个命令行工具通过执行XML脚本可以自动生成软件工程文件。

但问题来了,“我为什么想要我的生成过程自动化?”。 其实,大部分普通的的项目都有可能包含一系列的生成过程,例如:

  • · 增加版本号
  • · 编译源代码
  • · 针对不同的部署环境选择正确的设置(例如,测试或者现场)
  • · 在安装包中引入第三方的依赖

像这些工作,如果手动生成我们想要的一定是一个会非常耗时,容易出错的过程。并且我们一开始在创建脚本时花的时间完全可以被后来的自动化过程给抵消掉,还有如果我们采用手工的方式的话,我们会经常出错,并且做很做重复的工作。

好了,现在希望在你的心里已经有个理念就是我们可以从自动化生成中获得很多好处,那么我们现在开始我们的第一个脚本,让我们看一下我们的第一个例子,下面是一个ASP .NET MVC web site.

注意看,除了web.config文件, 还存在live.config 和 test.config 文件。现在的想法是:web.config包含所有的关于网站的配置, live.config和test.config仅仅包含那些跟部署环境相关的配置。这样能避免不同文件中的重复配置导致的头痛问题。 但是这样也给我们增加了挑战,那就是当我进行生成操作时,我们需要把部署环境相关的配置合并到web.confg中。

在开始之前,下面的列出了 构成项目生成的几个步骤

  • ·        清理删除bin目录下的文件
  • ·        编译工程
  • ·        复制所有的部署需要的文件到一个新的输出目录
  • ·        合并配置文件
  • ·        部署生成的输出文件到正确的环境

每一个工程都拥有它自己的生成过程, 但是为了更好的展示,我们的这个相当简单。

然后,让我们开始今天的教程,第一步就是添加一个新的XML文件到该网站的根目录,添加完后, 我们下一步就是清理删除bin目录, 目的是服务下一个编译过程。我们的脚本会是下面这样

复制代码

1 <Project xmlns=“http://schemas.microsoft.com/developer/msbuild/2003"\>
2 <Target Name=“Clean”>
3 <ItemGroup>
4 <BinFiles Include=“bin\*.*“ />
5 </ItemGroup>
6 <Delete Files=“@(BinFiles)” />
7 </Target>
8 </Project>

复制代码

在这有一些点需要明确的是:

  1. 引入命名空间是让Visual Studio 能为MSBuild的功能提供智能提示功能。
  2. MSBuild 脚本主要是由targets组成,每一个targets都包含一个或tasks。 在这个例子中,我们的target是Clean, 任务是Delete.
  3. ItemGroups用来定义一组items. 在我们的例子中它定义的是存放在bin文件夹下的所有文件。
  4. 所有的路径名,像 bin\ *.*,都是相对于生成脚本自己的。

好的。既然我们已经创建完了第一个target. 让我们开始执行它。 打开命令行窗口并且把路径切换到包含你的生成脚本的文件下,执行下面的语句

msbuild Build.xml /t:Clean

如果你遇到MSBuild could not be found的错误, 你需要给它指定个全路径名(我的是C:\Program Files (x86)\MSBuild\12.0\Bin), 或者是添加它的路径到Path环境变量中, 我个人比较推荐后面一种方式。

运行完这个命令,bin文件夹下应该是空的。为了让自己看到效果,最好在执行该命令之前确保bin文件夹下是有东西的。

好,让我们继续下一步:编译该工程,为了实现该功能,我们创建了一个Compile的target. 然后用MSBuild task去执行这个编译过程。

<Target Name=“Compile” DependsOnTargets=“Clean”>
<MSBuild Projects=“BuildDemoSite.csproj” />
</Target>

同样的,我们执行下面的命令,你就可以从命令提示框中看到执行的过程,然后检查一下自己的项目已经被编译完成。

msbuild Build.xml /t:Compile

好了,直到现在我们已经完成了:

  • 了解生成过程包含什么?

  • 决定去实现生成自动化。

  • 创建了删除bin文件夹和编译工程的targets.

  • 标签 MSbuild , Visual studio , .net

作者:[美]Adam Freeman 来源:《精通ASP.NET MVC 4》

3.Visual Studio 的单元测试

有很多.NET单元测试包,其中很多是开源和免费的。本文打算使用 Visual Studio 附带的内建单元测试支持,但其他一些.NET单元测试包也是可用的。

为了演示Visual Studio的单元测试支持,本例打算对示例项目添加一个 IDiscountHelper 接口的新实现。 在 Models 文件夹下新建类文件 MinimumDiscountHelper.cs :

复制代码

namespace EssentiaTools.Models
{ public class MinimumDiscountHelper:IDiscountHelper
{ public decimal ApplyDiscount(decimal totalParam)
{ throw new NotImplementedException();
}
}
}

复制代码

此例的的目标是让 MinimumDiscountHelper 演示以下行为:

· 总额大于 $100时,折扣为10%

· 总额介于(并包括)$10~$100之间时,折扣为$5

· 总额小于$10时,无折扣

· 总额为负值时,抛出 ArgumentOutOfRangeException

3.1 创建单元测试项目

承接  【MVC 4】3.MVC 基本工具(创建示例项目、使用 Ninject) 的项目“EssentiaTools”,右击解决方案资源管理器中的顶级条目,从弹出的菜单中选择“Add New Project(新建项目)”

在弹出的对话框中,添加“Unit Test Project(单元测试项目)”,将项目名设置为EssentiaTools.Tests

 

然后对这一测试项目添加一个引用,以便能够对MVC 项目中的类执行测试。

3.2 创建单元测试

在 Essential.Tests 项目的 UnitTest1.cs 文件中添加单元测试:

复制代码

using System; using Microsoft.VisualStudio.TestTools.UnitTesting; using EssentiaTools.Models; namespace EssentiaTools.Tests
{
[TestClass] public class UnitTest1
{ **private IDiscountHelper getTestObject()
{ return new MinimumDiscountHelper();
}

    \[TestMethod\] public void Discount\_Above\_100()
    { //准备
        IDiscountHelper target = getTestObject(); decimal total = 200; //动作
        var discountedTotal = target.ApplyDiscount(total); //断言
        Assert.AreEqual(total \* 0.9M****, discountedTotal);
    }**
}

}

复制代码

只添加了一个单元测试。含有测试的类是用 TestClass 注解属性进行注释的,其中的各个测试都是用 TestMethod 注解属性进行注释方法。并不是单元测试类中的所有方法都是单元测试。例如 getTestObject 方法因为该方法没有 TestMethod 注解属性,故 Visual Studio 不会把它当作一个单元测试。

可以看出,单元测试方法遵循了“准备/动作/断言(A/A/A)”模式。

上述测试方法是通过调用 getTestObject 方法建立起来的,getTestObject 方法创建了一个待测试的实例 —— 本例为 MinimumDiscountHelper 类。另外还定义了要进行检查的 total 值,这是单元测试的“准备(Arrange)” 部分。

对于测试的“动作(Act)”部分,调用 MinimumDiscountHelper.AppleDiscount 方法,并将结果赋给 discountedTotal 变量。最后,对于测试的“断言(Assert)”部分使用了 Assert.AreEqual 方法,以检查从 AppleDiscount 方法得到的值是最初总额的90% 。

Assert 类有一系列可以在测试中使用的静态方法。这个类位于 Microsoft.VisualStudio.TestTools.UnitTesting 命名空间,该命名空间还包含了一些对建立和执行测试有用的其他类。有关该命名空间的类,可以参阅:https://msdn.microsoft.com/en-us/library/ms182530.aspx

Assert 类是用的最多的一个,其中重要的一些方法如下:

Assert 类中的每一个静态方法都可以检查单元测试的某个方面。如果断言失败,将抛出一个异常,这意味着整个单元测试失败。由于每一个单元测试都是独立进行处理的,因此其他单元测试将被继续执行。

上述的每一个方法都有一个string 为参数的重载,该字符串作为断言失败时的消息元素。 AreEqual 和 AreNotEqual 方法有几个重载,以满足特定类型的比较。例如,有一个版本可以比较字符串, 而不需要考虑其他情况。

提示:Microsoft.VisualStudio.TestTools.UnitTesting 命名空间中一个值得注意的成员是 ExpectedException 属性。这是一个断言,只有当单元测试抛出 ExceptionType 参数指定类型的异常时,该断言才是成功的。这是一种确保单元测试抛出异常的整洁方式,而不需要在单元测试中构造 try..catch 块

为了验证前述 MinimumDiscountHelper 的其他行为,修改文件 UnitTest1.cs 如下:

复制代码

using System; using Microsoft.VisualStudio.TestTools.UnitTesting; using EssentiaTools.Models; namespace EssentiaTools.Tests
{
[TestClass] public class UnitTest1
{ private IDiscountHelper getTestObject()
{ return new MinimumDiscountHelper();
}

    \[TestMethod\] public void Discount\_Above\_100()
    { //准备
        IDiscountHelper target = getTestObject(); decimal total = 200; //动作
        var discountedTotal = target.ApplyDiscount(total); //断言
        Assert.AreEqual(total \* 0.9M, discountedTotal);
    }

  **\[TestMethod\]** **public void Discount\_Between\_10\_And\_100()
    { //准备
        IDiscountHelper target = getTestObject(); //动作
        decimal TenDollarDiscount = target.ApplyDiscount(10); decimal HundredDollarDiscount = target.ApplyDiscount(100); decimal FiftyDollarDiscount = target.ApplyDiscount(50); //断言
        Assert.AreEqual(5, TenDollarDiscount, "$10 discount is wrong");
        Assert.AreEqual(95, HundredDollarDiscount, "$100 discoutn is wrong");
        Assert.AreEqual(45, **FiftyDollarDiscount**, "$50 discount is wrong");
    }

    \[TestMethod\] public void Discount\_Less\_Than\_10()
    {
        IDiscountHelper target \= getTestObject(); decimal discount5 = target.ApplyDiscount(5); decimal discount0 = target.ApplyDiscount(0);

        Assert.AreEqual(5, discount5);
        Assert.AreEqual(0, discount0);
    }

    \[TestMethod\]
    \[ExpectedException(typeof(ArgumentOutOfRangeException))\] public void Discount\_Negative\_Total()
    {
        IDiscountHelper target \= getTestObject();

        target.ApplyDiscount(\-1****);
    }**
}

}

复制代码

3.3 运行单元测试(并失败)

Visual Studio 2012 为管理和运行测试引入了一个更为有用的“Test Explorer(测试资源管理器)”窗口。从 Visual Studio 的“Test(测试)”菜单中选择“Window(窗口)”—>”Test Explorer(测试资源管理器)”,便可以看到这一新窗口,点击左上角附近的“RunAll(全部运行)”按钮,会看到下图效果:

可以在该窗口的左侧面板中看到所定义的测试列表。所有的测试都失败了,这是当然的,因为所测试的这些方法还未实现。可以点其中任意测试,测试失败的原因和细节会显示在窗口的右侧面板中。

3.4 实现特性

现在,到了实现特性的时候了。当编码工作完成时,基本上可以确信代码是能够按预期工作的。有了之前的准备,MinimumDiscountHelper 类的实现相当简单:

复制代码

using System; using System.Collections.Generic; using System.Linq; using System.Web; namespace EssentiaTools.Models
{ public class MinimumDiscountHelper : IDiscountHelper
{ public decimal ApplyDiscount(decimal totalParam)
{ if (totalParam < 0)
{ throw new ArgumentOutOfRangeException();
} else if (totalParam > 100)
{ return totalParam * 0.9M;
} else if (totalParam > 10 && totalParam <= 100)
{ return totalParam - 5;
} else { return
totalParam;
}
}
}

}

复制代码

3.5 测试并修正代码

为了演示如何利用 Visual Studio 进行单元测试迭代,上述代码故意留下了一个错误。如果点击“测试资源管理器”窗口中的“全部运行”按钮,则可以看到该错误的效果。测试结果如下:

可以看到,三个单元测试得到了通过,但 Discount_Between_10_And_100 测试方法检测到了一个问题。当点击这一失败的测试时,可以看到测试期望得到的是5,但实际得到的是10。

此刻,重新审视代码便会发现,并未得到适当的实现——特别是总额是10或100的折扣,未做适当处理。问题出在 MinimumDiscountHelper 类的这句语句上:

… else if (totalParam > 10 && totalParam <= 100)

虽然目标是建立介于(包括)$10~$100 直接的行为,但实际却排除了等于$10 的情况,修改成:

… else if (totalParam >= 10 && totalParam <= 100)

重新运行测试,所有测试代码都已通过:

4. 使用 Moq

前面的单元测试如此简单的原因之一是因为测试的是一个不依赖于其他类而起作用的单一的类。当然,实际项目中有这样的类,但往往还需要测试一些不能孤立运行的对象。在这些情况下,需要将注意力于感兴趣的类或方法上,才能不必对依赖类也进行隐式测试。

一个有用的办法是使用模仿对象,它能够以一种特殊而受控的的方式,来模拟项目中实际对象的功能。模仿对象能够缩小测试的侧重点,以使用户只检查感兴趣的功能。

4.1 理解问题

在开始使用 Moq 之前,本例想演示一个试图要修正的问题。下面打算对 LinqValueCalculator 类进行单元测试,LinqValueCalculator 在前面出现过,具体代码为:

复制代码

using System; using System.Collections.Generic; using System.Linq; using System.Web; namespace EssentiaTools.Models
{ public class LinqValueCalculator : IValueCalculator
{ private IDiscountHelper discounter; public LinqValueCalculator(IDiscountHelper discountParam)
{
discounter = discountParam;
} public decimal ValueProducts(IEnumerable products)
{ return discounter.ApplyDiscount(products.Sum(p => p.Price));
}
}
}

复制代码

为了,测试这个类,在单元测试项目中新增单元测试文件 UnitTest2.cs :

复制代码

using System; using Microsoft.VisualStudio.TestTools.UnitTesting; using EssentiaTools.Models; using System.Linq; namespace EssentiaTools.Tests
{
[TestClass] public class UnitTest2
{ private Product[] products = { new Product{Name=”Kayak”,Catogory=”Watersports”,Price=275M}, new Product{Name=”Lifejacket”,Catogory=”Watersports”,Price=48.95M}, new Product{Name=”Soccer ball”,Catogory=”Soccer”,Price=19.50M}, new Product{Name=”Corner flag”,Catogory=”Soccer”,Price=34.95M}
};

    \[TestMethod\] public void Sum\_Products\_Correctly()
    { //准备
        var discounter = new MinimumDiscountHelper(); var target = new LinqValueCalculator(discounter); var goalTotal = products.Sum(e => e.Price); //动作
        var result = target.ValueProducts(products); //断言

Assert.AreEqual(goalTotal, result);
}
}
}

复制代码

现在面临的问题是,LinqValueCalculator 类依赖于 IDiscountHelper 接口的实现才能进行操作。此例使用了 MinimumDiscountHelper 类(这是 IDiscountHelper 接口的实现类),它表现了两个不同的问题。

第一个问题是单元测试变得复杂和脆弱。为了创建一个能够进行工作的单元测试,需要考虑 IDiscountHelper 实现中的折扣逻辑,以便判断出 ValueProducts 方法的预期值。脆弱来自这样一个事实:一旦该实现中的折扣逻辑发生变化,测试便会失败。

第二个也是最令人担忧的问题是已经延展了这一单元测试的范围,使它的隐式的包含了 MinimumDiscountHelper 类。当单元测试失败时,用户不知道问题是出在 LinqValueCalculator 类中,还是在 MinimumDiscountHelper 类中。

当单元测试简单且焦点集中时,会工作的很好,而当前的设置会让这两个特征都不能得到满足。而在MVC项目中添加并运用 Moq ,能够避免这些问题。

4.2 将 Moq 添加到VisualStudio 项目

和前面的 Ninject 一样,在测试项目中 搜索并添加 NuGet 程序包 Moq 。

4.3 对单元测试添加模仿对象

对单元测试添加模仿对象,其目的是告诉 Moq,用户想使用哪一种对象。对它的行为进行配置,然后将该对象运用于测试目的。

在单元测试中使用 Mock 对象,为 LinqValueCalculator 的单元测试添加模仿对象,修改 UnitTest2.cs 文件:

复制代码

using System; using Microsoft.VisualStudio.TestTools.UnitTesting; using EssentiaTools.Models; using System.Linq; using Moq; namespace EssentiaTools.Tests
{
[TestClass] public class UnitTest2
{ private Product[] products = { new Product{Name=”Kayak”,Catogory=”Watersports”,Price=275M}, new Product{Name=”Lifejacket”,Catogory=”Watersports”,Price=48.95M}, new Product{Name=”Soccer ball”,Catogory=”Soccer”,Price=19.50M}, new Product{Name=”Corner flag”,Catogory=”Soccer”,Price=34.95M}
};

    \[TestMethod\] public void Sum\_Products\_Correctly()
    { //准备
      **Mock<IDiscountHelper> mock = new Mock<IDiscountHelper>();
        mock.Setup(m \=> m.ApplyDiscount(It.IsAny<decimal\>())).Returns<decimal\>(total => total); var target = new LinqValueCalculator(mock.Object);** //动作
        var result = target.ValueProducts(products); //断言
        Assert.AreEqual(products.Sum(e => e.Price), result);
    }
}

}

复制代码

第一次使用 Moq 时,可能会觉得其语法有点奇怪,下面将演示该过程的每个步骤。

(1) 创建模仿对象

第一步是要告诉 Moq,用户想使用的是哪种模仿对象。 Moq 十分依赖于泛型的类型参数,从以下语句可以看到这种参数的使用方式,这是告诉 Moq,要模仿的对象时 IDiscountHelper 实现。


Mock mock = new Mock();

创建一个强类型的的 Mock 对象,目的是告诉 Moq 库,它要处理的是哪种类型——当然,这便是用于该单元测试的 IDiscountHelper 接口。单为了改善单元测试的侧重点,这可以是想要隔离出来的任何类型。

(2) 选择方法

除了创建强类型的Mock对象外,还需要指定它的行为方式——这是模仿过程的核心,它可以建立模仿所需要的基准行为,用户可以将这种行为用于对单元测试中目标对象的功能进行测试。以下是单元测试中的语句,它为模仿对象建立了用户所希望的行为。


mock.Setup(m => m.ApplyDiscount(It.IsAny<decimal>())).Returns<decimal>(total => total);

用 Setup 方法给模仿对象添加一个方法。 Moq 使用 LINQ 和 lambda 表达式进行工作。在调用 Setup 方法时,Moq 会传递要求它的接口。它巧妙地封装了一些本书不打算细说的LINQ 魔力,这种魔力让用户可以选择想要通过 lambda 表达式进行配置或检查的方法。对于该单元测试,希望定义 AppleDiscount 方法的行为,它是 IDiscountHelper 接口的唯一方法,也是对 LinqValueCalculator 类进行测试所需要的方法。

必须告诉 Moq 用户感兴趣的参数值是什么,这是要用 It 类要做的事情,如以下加粗部分所示。


mock.Setup(m => m.ApplyDiscount(It.IsAny<decimal>())).Returns<decimal>(total => total);

这个It 类定义了许多以泛型类型参数进行使用的方法。此例用 decimal 作为泛型类型调用了 IsAny 方式。这是告诉 Moq ,当以任何十进制为参数来调用 ApplyDiscount 方法时,它应该运用我们定义的这一行为。
下面给出了 It 类所提供的方法,所有的这些方法都是静态的。

 

(3) 定义结果

Returns 方法让用户指定在调用模仿方法时要返回的结果。其类型参数用以指定结果的类型,而用 lambda 表达式来指定结果。如下:


mock.Setup(m => m.ApplyDiscount(It.IsAny<decimal>())).Returns<decimal>(total => total);

通过调用带有 decimal 类型参数的 Returns 方法(即 Returns),这是告诉 Moq 要返回一个十进制的值。对于 lambda 表达式,Moq 传递了一个在ApplyDiscount 方法中接收的类型值 —— 此例创建了一个穿透方法,该方法返回了传递给模仿的 ApplyDiscount 方法的值,并未对这个值执行任何操作。

上述过程的思想是:

为了对 LinqValueCalculator 进行单元测试,如果创建一个 IDiscountHelper 模仿对象,便可以在单元测试中排除 IDiscountHelper 接口的实现类 MinimumDiscountHelper ,从而使单元测试更为简单容易。用 Moq 创建模仿对象的整个过程包括了以下几个步骤:a. 用 Mock 创建模仿对象; b. 用Setup 方法建立模仿对象的行为; c. 用 It 类设置行为的参数; d. 用Return 方法指定行为的返回类型; e. 用 lambda 表达式在Return 方法中建立具体行为。

(4) 使用模仿对象

最后一个步骤是在单元测试中使用这个模仿对象,通过读取 Mock 对象的Object 属性值来实现

… var target = new LinqValueCalculator(mock.Object);

总结下,在上述示例中,Object 属性返回 IDiscountHelper 接口的实现,该实现中的 ApplyDiscount 方法返回它传递的十进制参数的值。

这使单元测试很容易执行,因为用户可以自行求取 Product 对象的价格总和,并检查 LinqValueCalculator 对象得到了相同的值。


Assert.AreEqual(products.Sum(e => e.Price), result);

以这种方式使用 Moq 的好处是,单元测试只检查 LinqValueCalculator 对象的行为,并不依赖任何 Models 文件夹中 IDiscountHelper 接口的真实实现。这意味着当测试失败时,用户便知道问题出在 LinqValueCalculator 实现中,或建立模仿对象的方式中。而解决源自这些方面的问题,比处理实际对象链及其相互交互,要更叫简单而容易。

4.4 创建更复杂的模仿对象

前面展示了一个十分简单的模仿对象,但 Moq 最漂亮的部分是快速建立复杂行为以便对不同情况进行测试的能力。在 UnitTest2.cs  中新建一个单元测试,模仿更加复杂的 IDiscountHelper 接口实现。

复制代码

using System; using Microsoft.VisualStudio.TestTools.UnitTesting; using EssentiaTools.Models; using System.Linq; using Moq; namespace EssentiaTools.Tests
{
[TestClass] public class UnitTest2
{ private Product[] products = { new Product{Name=”Kayak”,Catogory=”Watersports”,Price=275M}, new Product{Name=”Lifejacket”,Catogory=”Watersports”,Price=48.95M}, new Product{Name=”Soccer ball”,Catogory=”Soccer”,Price=19.50M}, new Product{Name=”Corner flag”,Catogory=”Soccer”,Price=34.95M}
};

    \[TestMethod\] public void Sum\_Products\_Correctly()
    { //准备
        Mock<IDiscountHelper> mock = new Mock<IDiscountHelper>();
        mock.Setup(m \=> m.ApplyDiscount(It.IsAny<decimal\>())).Returns<decimal\>(total => total); var target = new LinqValueCalculator(mock.Object); //动作
        var result = target.ValueProducts(products); //断言
        Assert.AreEqual(products.Sum(e => e.Price), result);
    } **private Product\[\] createProduct(decimal value)
    { return new\[\] { new Product { Price = value } };
    }

    \[TestMethod\]
    \[ExpectedException(typeof(System.ArgumentOutOfRangeException))\] public void Pass\_Through\_Variable\_Discounts()
    {

        Mock<IDiscountHelper> mock = new Mock<IDiscountHelper>();
        mock.Setup(m \=> m.ApplyDiscount(It.IsAny<decimal\>())).Returns<decimal\>(total => total);
        mock.Setup(m \=> m.ApplyDiscount(It.Is<decimal\>(v => v == 0))).Throws<System.ArgumentOutOfRangeException>();
        mock.Setup(m \=> m.ApplyDiscount(It.Is<decimal\>(v => v > 100))).Returns<decimal\>(total => (total \* 0.9M));
        mock.Setup(m \=> m.ApplyDiscount(It.IsInRange<decimal\>(10, 100, Range.Inclusive))).Returns<decimal\>(total => total - 5); var target = new LinqValueCalculator(mock.Object); decimal FiveDollarDiscount = target.ValueProducts(createProduct(5)); decimal TenDollarDiscount = target.ValueProducts(createProduct(10)); decimal FiftyDollarDiscount = target.ValueProducts(createProduct(50)); decimal HundredDollarDiscount = target.ValueProducts(createProduct(100)); decimal FiveHundredDollarDiscount = target.ValueProducts(createProduct(500));

        Assert.AreEqual(5, FiveDollarDiscount, "$5 Fail");
        Assert.AreEqual(5, TenDollarDiscount, "$10 Fail");
        Assert.AreEqual(45, FiftyDollarDiscount, "$50 Fail");
        Assert.AreEqual(95, HundredDollarDiscount, "$100 Fail");
        Assert.AreEqual(450, FiveHundredDollarDiscount, "$500 Fail");
        target.ValueProducts(createProduct(0****));
    }**
}

}

复制代码

在单元测试期间,复制另一个模型类期望的行为似乎是在做一个奇怪的事情,但这能够完美演示 Moq 的一些不同用法。

可以看出,根据所接收到的参数值,定义了 ApplyDiscount 方法的四个不同的行为。最简单的行为是“全匹配”,它直接返回任意的decimal 值,如下:

mock.Setup(m => m.ApplyDiscount(It.IsAny<decimal>())).Returns<decimal>(total => total);

这是用于上一示例的同一行为,把它放在这是因为调用 Setup 方法的顺序会影响模仿对象的行为。Moq 会以相反的顺序评估所给定的行为,因此会考虑调用最后一个 Setup 方法。这意味着,用户必须按从最一般到最特殊的顺序,小心地创建模仿行为。 It.IsAny 是此例所定义的最一般的条件,因而首先运用它。如果颠倒调用 Setup 的顺序,该行为将能匹配对 ApplyDiscount 方法的所有调用,并生成错误的模仿结果。

(1) 模仿特定值(并抛出异常)

对于 Setup 方法第二个调用,使用了 It.Is 方法

mock.Setup(m => m.ApplyDiscount(It.Is<decimal>(v => v == 0))).Throws<System.ArgumentOutOfRangeException>();

若传递给 ApplyDiscount 方法的值是0,则 Is方法的谓词便返回 true。这里并未返回一个结果,而是使用了 Throws 方法,这会让 Moq 抛出一个用类型参数指定的异常实例。

示例还用 Is 方法捕捉了大于100的值:

mock.Setup(m => m.ApplyDiscount(It.Is<decimal>(v => v > 100))).Returns<decimal>(total => (total * 0.9M));

Is.It 方法是为不同参数值建立指定行为最灵活的方式,因为用户可以使用任意谓词来返回 true 或 false 。在创建复杂模仿对象的,这是最常用的方法。

(2) 模仿值的范围

It 对象最后是和 IsInRange 方法一起使用的,它让用户能够捕捉参数值的范围。

mock.Setup(m => m.ApplyDiscount(It.IsInRange<decimal>(10, 100, Range.Inclusive))).Returns<decimal>(total => total - 5);

这里介绍这一方法是出于完整性,如果是在用户自己的项目,可以使用 It 方法和一个谓词来做同样的事情,如下所示:

mock.Setup(m => m.ApplyDiscount(It.Is<decimal>(v=>v>=10&&v<=100))).Returns<decimal>(total => total - 5);

效果是相同的,但谓词方法更为灵活。Moq 有一系列非常有用的特性,阅读https://github.com/Moq/moq4/wiki/Quickstart上提供的入门指南,可以看到许多用法。

源码地址:https://github.com/YeXiaoChao/EssentiaTools

工作原理此处不作讲解,自己去官方网站学习(http://www.linuxvirtualserver.org),这里重点讲如何配置!
注:最好从官方网站对其进行了解,不至于会对某些问题产生误解,尽管是英文的!

环境:
192.168.1.1  GateWay
192.168.1.10  LVS_VIP(VIP:Virtual IP)
192.168.1.14  LVS**_**Master      
192.168.1.15  LVS_Backup
192.168.1.16  WEB1_RealServer
192.168.1.17  WEB2_RealServer

LINUX(CentOS 5.6)配置

1. 安装CentOS(此处我使用的版本为:CentOS-5.6-x86_64)
2. 安装IPVSADM
    知识点:IPVSADM理解为IPVS管理工具;LVS(Linux Virtual Server)的核心为IPVS(IP Virtual Server),从Linux内核版本2.6起,IPVS模块已经编译进了Linux内核  
    > 使用yum命令进行安装,系统会选择最适合内核版本的ipvsadm
       yum -y install ipvsadm
3. 配置(使用KeepAlived时,此步可省略,因为KeepAlived提供了更简单的配置方式来实现负载均衡。不过仍然建议使用此种方式配置一遍。)
    > ifconfig eth0:0 192.168.1.10 broadcast 192.168.1.10 netmask 255.255.255.255 up
    > route add -host 192.168.1.10 dev eth0:0
    > ipvsadm -A -t 192.168.1.10:80 -s wrr
    > ipvsadm -a -t 192.168.1.10:80 -r 192.168.1.16:80 -g
    > ipvsadm -a -t 192.168.1.10:80 -r 192.168.1.17:80 -g
4. 防火墙设置
    > service iptables stop
    或 在防火墙规则表中加入一条记录
    > vi /etc/sysconfig/iptables
    > -A RH-Firewall-1-INPUT -m state –state NEW -m -tcp -p tcp –dport 80 -j ACCEPT
    > service iptables restart

WINDOWS 2008 SERVER R2 配置

1. 创建windows环回网卡(如何创建,请自己Google)
2. 设置环回网卡IP地址
    > IP地址:    192.168.1.10
    > 子网掩码: 255.255.255.255
    其它不用设置了
3. 修改客户端网卡接口、环回接口连接模式(至关重要)
    > netsh interface ipv4 set interface “网卡名称” weakhostreceive=enabled
    > netsh interface ipv4 set interface “网卡名称” weakhostsend=enabled
    > netsh interface ipv4 set interface “环回网卡名称” weakhostreceive=enabled
    > netsh interface ipv4 set interface “环回网卡名称” weakhostsend=enabled

接下来,我们在浏览器地址栏中输入 http://192.168.1.10,你会发现你的访问请求被转移到了192.168.1.16或192.168.1.17,这时我们尝试停掉其中任何一台服务器,你再来访问 http://192.168.1.10,你会发现有时正常,有时不正常。原因很简单,因为其中一台机器被你停掉了,但是IPVS无法发现这种错误,所以还是会把请求均衡负载到当初配置的真实服务器列表中;针对这种问题,我们该如何来解决呢?此时该是KEEPALIVED登场的时候了!

KEEPALIVED 配置

知识点:KeepAlived是一个路由软件,它主要的目的是让我们通过简单的配置,实现高可用负载均衡,当然负载均衡依赖于Linux虚拟服务器(IPVS)的内核模块,其高可用性
           使用VRRP协议来实现,KeepAlived不仅会检测负载均衡服务器池中每台机器的健康状况并通知IPVS将非健康的机器从池中移除掉;同时它还能对负载均衡调度器本身
           实现健康状态检查,当主负载均衡调度器出现问题时,备用负载均衡调度器顶替主进行工作。
1. 安装KeepAlived(KeepAlived依赖openssl,所以在安装KeepAlived之前需要先安装openssl)
    > yum -y install openssl-devel
    > wget http://www.keepalived.org/software/keepalived-1.2.7.tar.gz(KeepAlived版本大家自己去 http://www.keepalived.org 上查看,下载最新版本即可)
    > tar zxvf keepalived-1.2.7.tar.gz
    > cd keepalived-1.2.7
    > ./configure -prefix=/usr/local/keepalived
    > make && make install
2. 配置KeepAlived
    The First:打开IP Forward 功能(LVS现有三种负载均衡规则都需要打开此功能,如果不打开此功能,下面的配置配得再好都无济于事。)
    > vi /etc/sysctl.conf
    > net.ipv4.ip_forward = 1
    > sysctl -p(使设置立即生效)
    KeepAlived配置分三部分
    > 全局定义块–global_defs
       . 不建议使用email通知,改用nagios来进行监控
       . router_id:负载均衡器标识。在一个网络内,它应该是唯一的。
    > VRRP定义块–vrrp_instance
       . state
                 均衡器状态。只有MASTER和BACKUP两种状态,并且需要大写这些单词;其中MASTER为工作状态,BACKUP为备用状态;当MASTER所在服务器
                 失效时,BACKUP所在系统会自动把它的状态由BACKUP变为MASTER;当失效的MASTER所在系统恢复时,BACKUP从MASTER恢复到BACKUP状态。
       . interface
                 对外提供服务的网络接口,如eth0,eth1;当前的主流服务器一般都有2个或2个以上的接口,在选择服务接口时,一定要核实清楚。
       . virtual_router_id
                 这个标识是个数字,并且同一个VRRP实例使用唯一的标识。即同一个vrrp_instance,MASTER和BACKUP的virtual_router_id是一致的。
       . priority
                 优先级。这个标识也是个数字,数值越大,优先级越高;在同一个vrrp_instance里,MASTER的优先级高于BACKUP。
       . advert_int
                 同步通知间隔。MASTER与BACKUP负载均衡器之间同步检查的时间间隔,单位为秒。
       . authentication
                 验证。包含验证类型和验证密码。类型主要有PASS、AH两种,通常使用PASS类型,据说AH使用时有问题;验证密码为明文,同一个vrrp_instance
                 实例MASTER与BACKUP使用相同的密码才能正常通信。
       . virtual_ipaddress
                 虚拟IP地址。可以有多个地址,每个地址占一行。
    > 虚拟服务器定义块–virtual_server
       . delay_loop
                  健康检查时间间隔,单位为秒。
       . lb_algo
                  负载均衡调度算法(rr、wrr、dh、sh、lc、wlc、sed、nq、lblc、lblcr)。
                  算法分两大类:静态算法、动态算法
                  静态算法:只是根据算法进行调度并不考虑后端REALSERVER的实际连接情况
                  —- rr(轮叫调度 - Round-Robin Scheduling)
                        调度器通过”轮叫”调度算法将外部请求按顺序轮流分配到集群中的真实服务器上,它均等地对待每一台服务器,而不管服务器上实际的连接数
                        和系统负载。
                  —- wrr(加权轮叫调度 - Weighted Round-Robin Scheduling)
                        调度器通过”加权轮叫”调度算法根据真实服务器的不同处理能力来调度访问请求。这样可以保证处理能力强的服务器处理更多的访问流量。调
                        度器可以自动问询真实服务器的负载情况,并动态地调整其权值
                  —- dh(目标地址散列调度 - Destination Hashing Scheduling)
                        “目标地址散列”调度算法根据请求的目标IP地址,作为散列键(Hash Key)从静态分配的散列表找出对应的服务器,若该服务器是可用的且
                        未超载,将请求发送到该服务器,否则返回空。
                  —- sh(源地址散列调度 - Source Hashing Scheduling)
                        “源地址散列”调度算法根据请求的源IP地址,作为散列键(Hash Key)从静态分配的散列表找出对应的服务器,若该服务器是可用的且未超
                        载,将请求发送到该服务器,否则返回空。
                  动态算法:前端的调度器会根据后端REALSERVER的实际连接情况来分配请求
                  —- lc(最小连接调度 - Least-Connection Scheduling)
                        当一个用户请求过来的时候,就计算下哪台RS的链接谁最小,那么这台RS就获得了下次响应客户端请求的机会,计算的方法
                        Overhead=active*256+inactive,如果两者的结果是相同的则从LVS中的规则依次往下选择RS。这种算法也是不考虑服务器的性能的。
                  —- wlc(加权最小连接调度 - Weighted Least-Connection Scheduling)
                        这个就是加了权重的LC,考虑了RS的性能,即是性能好的就给的权重值大一些,不好的给的权重值小一些。缺点就是如果Overhead相同,
                        则会按规则表中的顺序,由上而下选择RS,Overhead=(active*256+inactive)/weight
                  —- sed(最短预期延时调度 - Shortest Expected Delay Scheduling)
                        就是对WLC的情况的补充,Overhead=(active+1)*256/weight,加一,就是为了让其能够比较出大小。
                  —- nq(不排队调度 - Never Queue Scheduling)
                        never queue 基本和SED相同,避免了SED当中的性能差的服务器长时间被空闲的弊端,它是第一个请求给性能好的服务器,第二个请求
                        一定是给的空闲服务器不论它的性能的好与坏。以后还是会把请求给性能好的服务器
                  —- lblc(基于局部性的最少链接 - Locality-Based Least Connections Scheduling)
                        它就是动态DH和LC的组合,适用于cache群,对于从来没有来过的那些新的请求会分给当前连接数较少的那台服务器。
                  —- lblcr(带复制的基于局部性最少链接 - Locality-Based Least Connections with Replication Scheduling)
                        带有复制功能的LBLC,它的适用场景这里举例说明一下,比如说现在有RS1和RS2,第一次访问RS1的5个请求第二次又来了,理所应到
                        Director将会将其交给RS1,而此时在RS2是非常闲的,所以此时最好的处理方法就是可以将后来的这5个请求分别交给RS1和RS2,所以
                        此时就需要把客户端第一次请求的资源复制下来。(特殊情况)
                  小解:活动链接active和非活动链接inactive
                  这里以http为例,http本身是一种无状态的链接,当客户端请求访问的时候,有个等待响应过程,这个时段可以称其为活动链接active状态。
                  当服务器端给与响应后,请求因为keepalive而并未断开,则此段时间的状态就是非活动链接状态。
       . lb_kind
                  负载均衡转发规则(DR、NET、TUN),常用DR。
       . persistence_timeout
                  会话保持时间,单位是秒。这个选项对动态网站很有用处,当用户用账号登录网站时,有了这个会话保持功能,就能把用户的请求转发给同一个应用服务
                  器。在这里,我们来做一个假设,假定现在有一个LVS环境,使用DR转发模式,真实服务器有3台,如果负载均衡器不启用会话保持功能,当用户第一次
                  访问的时候,他的访问请求分发给某个真实服务器,这样他看到一个登录页面,第一次访问完毕;接着他在登录框填写用户名和密码,然后提交,这个时
                  候,问题就可能出现了:登录不成功,因为没有会话保持,负载均衡器可能把第2次的请求转发到其它服务器。
       . protocol
                  转发协议。一般有TCP和UDP两种,UDP没尝试过。
       . real_server
                  真实服务器,也即服务器词。real_server的值包括IP和端口。
                  * weight
                             权重值,它是一个数字,数值越大,权重越高。使用不同的权重值的目的在于为不同性能的机器分配不同的负载。
                  * tcp_check
                             TCP方式检查机器健康状态
    我这里把我配好的截图给大家看看(包含负载均衡器的高可用)
    MASTER:
   
   
   
    BACKUP:
   
   
   
    MASTER
BACKUP配置仅三处不同:global_defs中的router_id、vrrp_instance中的state、priority
3. 启动KeepAlived
    /usr/local/keepalived/sbin/keepalived -D
    此处需要注意:KeepAlived默认会去/etc/keepalived下面找它的配置文件,要么你将keepalived.conf文件copy到该目录下,要么在启动时带上 -f 参数来指
    定keepalived.conf文件位置。
4. 到这里,KeepAlived就安装成功了!接下来我们可以使用一些命令来检查一下。
    > 查看进程:ps aux | grep keepalived
       Keepalived正常运行时,共启动3个进程,其中一个进程是父进程,负责监控其子进程;一个是vrrp子进程;另外一个是checkers子进程。
       我们也可以通过命令(pstree | grep keepalived)查看进程相关性来验证上面的说法
    > 查看日志:tail -f /var/log/messages
    > 查看请求转发情况:ipvsadm -lcn | grep 虚拟IP
5. 优化。
    最后,我们需要做一下优化,那就是将KeepAlived做成服务,随机启动,这样我们就免去了每次手动去启动的麻烦
    将KeepAlived加入系统服务
    > ln -s /usr/local/keepalived/etc/rc.d/init.d/keepalived /etc/rc.d/init.d/
    > ln -s /usr/local/keepalived/etc/sysconfig/keepalived /etc/sysconfig/
    > mkdir /etc/keepalived
    > ln -s /usr/local/keepalived/etc/keepalived/keepalived.conf /etc/keepalived/
    > ln -s /usr/local/keepalived/sbin/keepalived /usr/sbin/  
    设置KeepAlived系统服务随机启动
    > chkconfig –add keepalived
    > chkconfig keepalived on
    > 查看:chkconfig –list keepalived

到这里,整个配置就完成了,最后我们尝试关闭Master机器或Master上的keepalived服务,看看Backup机器是否会代替Master来工作。