0%

live555支持单播和组播,我们先分析单播的流媒体服务端,后面分析组播的流媒体服务端。

一、单播的流媒体服务端:

复制代码

// Create the RTSP server:
RTSPServer* rtspServer = NULL; // Normal case: Streaming from a built-in RTSP server:
rtspServer = RTSPServer::createNew(*env, rtspServerPortNum, NULL); if (rtspServer == NULL) { *env << “Failed to create RTSP server: “ << env->getResultMsg() << “\n”;
exit(1);
} *env << “…done initializing \n”; if( streamingMode == STREAMING_UNICAST )
{
ServerMediaSession* sms = ServerMediaSession::createNew(*env,
H264StreamName[video_type],
H264StreamName[video_type],
streamDescription,
streamingMode == STREAMING_MULTICAST_SSM);
sms->addSubsession(WISH264VideoServerMediaSubsession::createNew(sms->envir(), *H264InputDevice[video_type], H264VideoBitrate));
sms->addSubsession(WISPCMAudioServerMediaSubsession::createNew(sms->envir(), *H264InputDevice[video_type]));

            rtspServer\->addServerMediaSession(sms); char \*url = rtspServer->rtspURL(sms); \*env << "Play this stream using the URL:\\t" << url << "\\n";
            delete\[\] url;  
    }

      // Begin the LIVE555 event loop:
      env->taskScheduler().doEventLoop(&watchVariable); // does not return

复制代码

我们一步一步分析:

1>  rtspServer = RTSPServer::createNew(*env, rtspServerPortNum, NULL);

复制代码

RTSPServer* RTSPServer::createNew(UsageEnvironment& env, Port ourPort,
UserAuthenticationDatabase* authDatabase,
unsigned reclamationTestSeconds)
{ int ourSocket = -1; do { int ourSocket = setUpOurSocket(env, ourPort); if (ourSocket == -1) break; return new RTSPServer(env, ourSocket, ourPort, authDatabase, reclamationTestSeconds);
} while (0); if (ourSocket != -1) ::closeSocket(ourSocket); return NULL;
}

复制代码

  此函数首先创建一个rtsp协议的socket,并且监听rtspServerPortNum端口,创建RTSPServer类的实例。下面我们看下RTSPServer的构造函数:

复制代码

RTSPServer::RTSPServer(UsageEnvironment& env, int ourSocket, Port ourPort,
UserAuthenticationDatabase* authDatabase,
unsigned reclamationTestSeconds)
: Medium(env),
fServerSocket(ourSocket), fServerPort(ourPort),
fAuthDB(authDatabase), fReclamationTestSeconds(reclamationTestSeconds),
fServerMediaSessions(HashTable::create(STRING_HASH_KEYS)),
fSessionIdCounter(0)
{
#ifdef USE_SIGNALS // Ignore the SIGPIPE signal, so that clients on the same host that are killed // don’t also kill us:
signal(SIGPIPE, SIG_IGN); #endif

    // Arrange to handle connections from others:
    env.taskScheduler().turnOnBackgroundReadHandling(fServerSocket, (TaskScheduler::BackgroundHandlerProc\*)&incomingConnectionHandler, this);

}

复制代码

  RTSPServer构造函数,初始化fServerMediaSessions为创建的HashTable,初始化fServerSocket为我们前面创建的tcp socket,fServerPort为我们监听的端口rtspServerPortNum,并且向taskScheduler注册fServerSocket的任务函数incomingConnectionHandler,这个任务函数主要监听是否有新的客服端连接accept,如果有新的客服端接入,创建RTSPClientSession的实例。

  RTSPClientSession要提供什么功能呢?可以想象:需要监听客户端的rtsp请求并回应它,需要在DESCRIBE请求中返回所请求的流的信息,需要在SETUP请求中建立起RTP会话,需要在TEARDOWN请求中关闭RTP会话,等等…

复制代码

RTSPServer::RTSPClientSession::RTSPClientSession(RTSPServer& ourServer, unsigned sessionId, int clientSocket, struct sockaddr_in clientAddr)
: fOurServer(ourServer), fOurSessionId(sessionId),
fOurServerMediaSession(NULL),
fClientSocket(clientSocket), fClientAddr(clientAddr),
fLivenessCheckTask(NULL),
fIsMulticast(False), fSessionIsActive(True), fStreamAfterSETUP(False),
fTCPStreamIdCount(0), fNumStreamStates(0), fStreamStates(NULL)
{ // Arrange to handle incoming requests:
resetRequestBuffer();
envir().taskScheduler().turnOnBackgroundReadHandling(fClientSocket,(TaskScheduler::BackgroundHandlerProc*)&incomingRequestHandler, this);
noteLiveness();
}

复制代码

  上面这个函数是RTSPClientSession的构造函数,初始化sessionId为++fSessionIdCounter,初始化fClientSocket为accept创建的socket(clientSocket),初始化fClientAddr为accept接收的客服端地址,也向taskScheduler注册了fClientSocket的认为函数incomingRequestHandler。

  incomingRequestHandler会调用incomingRequestHandler1,incomingRequestHandler1函数定义如下:

复制代码

void RTSPServer::RTSPClientSession::incomingRequestHandler1()
{
noteLiveness(); struct sockaddr_in dummy; // ‘from’ address, meaningless in this case
Boolean endOfMsg = False;
unsigned char* ptr = &fRequestBuffer[fRequestBytesAlreadySeen]; int bytesRead = readSocket(envir(), fClientSocket, ptr, fRequestBufferBytesLeft, dummy); if (bytesRead <= 0 || (unsigned)bytesRead >= fRequestBufferBytesLeft) { // Either the client socket has died, or the request was too big for us. // Terminate this connection:
#ifdef DEBUG
fprintf(stderr, “RTSPClientSession[%p]::incomingRequestHandler1() read %d bytes (of %d); terminating connection!\n”, this, bytesRead, fRequestBufferBytesLeft); #endif delete this; return;
}
#ifdef DEBUG
ptr[bytesRead] = ‘\0’;
fprintf(stderr, “RTSPClientSession[%p]::incomingRequestHandler1() read %d bytes:%s\n”, this, bytesRead, ptr); #endif

    // Look for the end of the message: <CR><LF><CR><LF>
    unsigned char \*tmpPtr = ptr; if (fRequestBytesAlreadySeen > 0) --tmpPtr; // in case the last read ended with a <CR>
    while (tmpPtr < &ptr\[bytesRead-1\]) { if (\*tmpPtr == '\\r' && \*(tmpPtr+1) == '\\n') { if (tmpPtr - fLastCRLF == 2) { // This is it:
                            endOfMsg = 1; break;
                    }
                    fLastCRLF \= tmpPtr;
            } ++tmpPtr;
    }

    fRequestBufferBytesLeft \-= bytesRead;
    fRequestBytesAlreadySeen += bytesRead; if (!endOfMsg) return; // subsequent reads will be needed to complete the request // Parse the request string into command name and 'CSeq', // then handle the command:
    fRequestBuffer\[fRequestBytesAlreadySeen\] = '\\0'; char cmdName\[RTSP\_PARAM\_STRING\_MAX\]; char urlPreSuffix\[RTSP\_PARAM\_STRING\_MAX\]; char urlSuffix\[RTSP\_PARAM\_STRING\_MAX\]; char cseq\[RTSP\_PARAM\_STRING\_MAX\]; if (!parseRTSPRequestString((char\*)fRequestBuffer, fRequestBytesAlreadySeen,
                                                    cmdName, sizeof cmdName,
                                                    urlPreSuffix, sizeof urlPreSuffix,
                                                    urlSuffix, sizeof urlSuffix,
                                                    cseq, sizeof cseq)) 
    {

#ifdef DEBUG
fprintf(stderr, “parseRTSPRequestString() failed!\n”); #endif handleCmd_bad(cseq);
} else {
#ifdef DEBUG
fprintf(stderr, “parseRTSPRequestString() returned cmdName \“%s\“, urlPreSuffix \“%s\“, urlSuffix \“%s\“\n”, cmdName, urlPreSuffix, urlSuffix); #endif
if (strcmp(cmdName, “OPTIONS”) == 0) {
handleCmd_OPTIONS(cseq);
} else if (strcmp(cmdName, “DESCRIBE”) == 0) {
printf(“incomingRequestHandler1 ~~~~~~~~~~~~~~\n”);
handleCmd_DESCRIBE(cseq, urlSuffix, (char const*)fRequestBuffer);
} else if (strcmp(cmdName, “SETUP”) == 0) {
handleCmd_SETUP(cseq, urlPreSuffix, urlSuffix, (char const*)fRequestBuffer);
} else if (strcmp(cmdName, “TEARDOWN”) == 0
|| strcmp(cmdName, “PLAY”) == 0
|| strcmp(cmdName, “PAUSE”) == 0
|| strcmp(cmdName, “GET_PARAMETER”) == 0) {
handleCmd_withinSession(cmdName, urlPreSuffix, urlSuffix, cseq, (char const*)fRequestBuffer);
} else {
handleCmd_notSupported(cseq);
}
}

#ifdef DEBUG
fprintf(stderr, “sending response: %s”, fResponseBuffer); #endif send(fClientSocket, (char const*)fResponseBuffer, strlen((char*)fResponseBuffer), 0); if (strcmp(cmdName, “SETUP”) == 0 && fStreamAfterSETUP) { // The client has asked for streaming to commence now, rather than after a // subsequent “PLAY” command. So, simulate the effect of a “PLAY” command:
handleCmd_withinSession(“PLAY”, urlPreSuffix, urlSuffix, cseq, (char const*)fRequestBuffer);
}

    resetRequestBuffer(); // to prepare for any subsequent request
    if (!fSessionIsActive) delete this;

}

复制代码

  此函数,我们可以看到rtsp的协议的各个命令的接收处理和应答。

2> ServerMediaSession* sms = ServerMediaSession::createNew(… …)

   创建ServerMediaSession类的实例,初始化fStreamName为”h264_ch1”,fInfoSDPString为”h264_ch1”,fDescriptionSDPString为”RTSP/RTP stream from NETRA”,fMiscSDPLines为null,fCreationTime获取的时间,fIsSSM为false。

3> sms->addSubsession(WISH264VideoServerMediaSubsession::createNew(… …);

  WISH264VideoServerMediaSubsession::createNew():这个函数的主要目的是创建OnDemandServerMediaSubsession类的实例,这个类在前面已经分析,是单播时候必须创建的,初始化fWISInput为*H264InputDevice[video_type]。

  sms->addSubsession() 是将WISH264VideoServerMediaSubsession类的实例加入到fSubsessionsTail链表首节点中。

4> sms->addSubsession(WISPCMAudioServerMediaSubsession::createNew(… …);

  WISPCMAudioServerMediaSubsession::createNew():这个函数的主要目的是创建OnDemandServerMediaSubsession类的实例,这个类在前面已经分析,是单播时候必须创建的,初始化fWISInput为*H264InputDevice[video_type]。

  sms->addSubsession() 是将WISPCMAudioServerMediaSubsession类的实例加入到fSubsessionsTail->fNext中。

5> rtspServer->addServerMediaSession(sms)

  将rtspServer加入到fServerMediaSessions的哈希表中。

6> env->taskScheduler().doEventLoop(&watchVariable);

这个doEventLoop在前面已经分析过,主要处理socket任务和延迟任务。   

二、组播的流媒体服务器:

复制代码

    // Create the RTSP server:
    RTSPServer\* rtspServer = NULL; // Normal case: Streaming from a built-in RTSP server:
    rtspServer = RTSPServer::createNew(\*env, rtspServerPortNum, NULL); if (rtspServer == NULL) { \*env << "Failed to create RTSP server: " << env->getResultMsg() << "\\n";
            exit(1);
    } \*env << "...done initializing \\n"; if( streamingMode == STREAMING\_UNICAST )
    {

        … … } else { if (streamingMode == STREAMING_MULTICAST_SSM)
{ if (multicastAddress == 0)
multicastAddress = chooseRandomIPv4SSMAddress(*env);
} else if (multicastAddress != 0) {
streamingMode = STREAMING_MULTICAST_ASM;
} struct in_addr dest;
     dest.s_addr = multicastAddress; const unsigned char ttl = 255; // For RTCP:
const unsigned maxCNAMElen = 100;
unsigned char CNAME[maxCNAMElen + 1];
gethostname((char *) CNAME, maxCNAMElen);
CNAME[maxCNAMElen] = ‘\0’; // just in case
ServerMediaSession* sms;
sms = ServerMediaSession::createNew(*env, H264StreamName[video_type], H264StreamName[video_type], streamDescription,streamingMode == STREAMING_MULTICAST_SSM); /* VIDEO Channel initial */
if(1)
{ // Create ‘groupsocks’ for RTP and RTCP:
const Port rtpPortVideo(videoRTPPortNum); const Port rtcpPortVideo(videoRTPPortNum+1);

                rtpGroupsockVideo \= new Groupsock(\*env, dest, rtpPortVideo, ttl);
                rtcpGroupsockVideo \= new Groupsock(\*env, dest, rtcpPortVideo, ttl); if (streamingMode == STREAMING\_MULTICAST\_SSM) {
                        rtpGroupsockVideo\->multicastSendOnly();
                        rtcpGroupsockVideo\->multicastSendOnly();
                }
                
                setVideoRTPSinkBufferSize();
                sinkVideo \= H264VideoRTPSink::createNew(\*env, rtpGroupsockVideo,96, 0x42, "h264"); // Create (and start) a 'RTCP instance' for this RTP sink:
                unsigned totalSessionBandwidthVideo = (Mpeg4VideoBitrate+500)/1000; // in kbps; for RTCP b/w share
                rtcpVideo = RTCPInstance::createNew(\*env, rtcpGroupsockVideo,
                                                     totalSessionBandwidthVideo, CNAME,
                                                     sinkVideo, NULL /\* we're a server \*/ ,
                                                     streamingMode \== STREAMING\_MULTICAST\_SSM); // Note: This starts RTCP running automatically
                sms->addSubsession(PassiveServerMediaSubsession::createNew(\*sinkVideo, rtcpVideo));

                sourceVideo \= H264VideoStreamFramer::createNew(\*env, H264InputDevice\[video\_type\]->videoSource()); // Start streaming:
                sinkVideo->startPlaying(\*sourceVideo, NULL, NULL);
            } /\* AUDIO Channel initial \*/
            if(1)
            { // there's a separate RTP stream for audio // Create 'groupsocks' for RTP and RTCP:
                    const Port rtpPortAudio(audioRTPPortNum); const Port rtcpPortAudio(audioRTPPortNum+1);

                    rtpGroupsockAudio \= new Groupsock(\*env, dest, rtpPortAudio, ttl);
                    rtcpGroupsockAudio \= new Groupsock(\*env, dest, rtcpPortAudio, ttl); if (streamingMode == STREAMING\_MULTICAST\_SSM) 
                    {
                            rtpGroupsockAudio\->multicastSendOnly();
                            rtcpGroupsockAudio\->multicastSendOnly();
                    } if( audioSamplingFrequency == 16000 )
                            sinkAudio \= SimpleRTPSink::createNew(\*env, rtpGroupsockAudio, 96, audioSamplingFrequency, "audio", "PCMU", 1); else sinkAudio \= SimpleRTPSink::createNew(\*env, rtpGroupsockAudio, 0, audioSamplingFrequency, "audio", "PCMU", 1); // Create (and start) a 'RTCP instance' for this RTP sink:
                    unsigned totalSessionBandwidthAudio = (audioOutputBitrate+500)/1000; // in kbps; for RTCP b/w share
                    rtcpAudio = RTCPInstance::createNew(\*env, rtcpGroupsockAudio,
                                                          totalSessionBandwidthAudio, CNAME,
                                                          sinkAudio, NULL /\* we're a server \*/,
                                                          streamingMode \== STREAMING\_MULTICAST\_SSM); // Note: This starts RTCP running automatically
                sms->addSubsession(PassiveServerMediaSubsession::createNew(\*sinkAudio, rtcpAudio));

                   sourceAudio \= H264InputDevice\[video\_type\]->audioSource(); // Start streaming:
                    sinkAudio->startPlaying(\*sourceAudio, NULL, NULL);
            }

            rtspServer\->addServerMediaSession(sms);
            
            { struct in\_addr dest; dest.s\_addr = multicastAddress; char \*url = rtspServer->rtspURL(sms); //char \*url2 = inet\_ntoa(dest);
                    \*env << "Mulicast Play this stream using the URL:\\n\\t" << url << "\\n"; //\*env << "2 Mulicast addr:\\n\\t" << url2 << "\\n";

delete[] url;
}
} // Begin the LIVE555 event loop:
env->taskScheduler().doEventLoop(&watchVariable); // does not return

复制代码

1> rtspServer = RTSPServer::createNew(*env, rtspServerPortNum, NULL);

同前面单播的分析一样。

2> sms = ServerMediaSession::createNew(… …)

  同前面单播的分析一样。

3> 视频

1. 创建视频rtp、rtcp的Groupsock类的实例,实现rtp和rtcp的udp通信socket。这里应该了解下ASM和SSM。

  2. 创建RTPSink类的实例,实现视频数据的RTP打包传输。

  3. 创建RTCPInstance类的实例,实现RTCP打包传输。

  4. 创建PassiveServerMediaSubsession类的实例,并加入到fSubsessionsTail链表中的首节点。

  5. 创建FramedSource类的实例,实现一帧视频数据的获取。

  5. 开始发送RTP和RTCP数据到组播地址。

4> 音频

1. 创建音频rtp、rtcp的Groupsock类的实例,实现rtp和rtcp的udp通信socket。这里应该了解下ASM和SSM。

  2. 创建RTPSink类的实例,实现音频数据的RTP打包传输。

  3. 创建RTCPInstance类的实例,实现RTCP打包传输。

  4. 创建PassiveServerMediaSubsession类的实例,并加入到fSubsessionsTail链表中的下一个节点。

  5. 创建FramedSource类的实例,实现一帧音频数据的获取。

  5. 开始发送RTP和RTCP数据到组播地址。

5> rtspServer->addServerMediaSession(sms)

同前面单播的分析一样。

6> env->taskScheduler().doEventLoop(&watchVariable)

同前面单播的分析一样。

三、单播和组播的区别

1> 创建socket的时候,组播一开始就创建了,而单播的则是根据收到的“SETUP”命令创建相应的socket。

2> startPlaying的时候,组播一开始就发送数据到组播地址,而单播则是根据收到的“PLAY”命令开始startPlaying。

四、startPlaying分析

首先分析组播:

sinkVideo->startPlaying()实现不在H264VideoRTPSink类中,也不在RTPSink类中,而是在MediaSink类中实现:

复制代码

Boolean MediaSink::startPlaying(MediaSource& source,
afterPlayingFunc* afterFunc, void* afterClientData)
{ // Make sure we’re not already being played:
if (fSource != NULL) {
envir().setResultMsg(“This sink is already being played”); return False;
} // Make sure our source is compatible:
if (!sourceIsCompatibleWithUs(source)) {
envir().setResultMsg(“MediaSink::startPlaying(): source is not compatible!”); return False;
}
fSource = (FramedSource*)&source;

fAfterFunc \= afterFunc;
fAfterClientData \= afterClientData; return continuePlaying();

}

复制代码

  这里发现调用了continuePlaying()函数,那这个函数在哪里实现的呢?因为sinkVideo是通过 H264VideoRTPSink::createNew()实现,返回的H264VideoRTPSink类的实例,因此我们可以判定这个continuePlaying()在H264VideoRTPSink类实现。

复制代码

Boolean H264VideoRTPSink::continuePlaying()
{ // First, check whether we have a ‘fragmenter’ class set up yet. // If not, create it now:
if (fOurFragmenter == NULL) {
fOurFragmenter = new H264FUAFragmenter(envir(), fSource, OutPacketBuffer::maxSize, ourMaxPacketSize() - 12/*RTP hdr size*/);
fSource = fOurFragmenter;
} //printf(“function=%s line=%d\n”,__func__,__LINE__); // Then call the parent class’s implementation:
return MultiFramedRTPSink::continuePlaying();
}

复制代码

  看到这里我们发现调用的是MultiFramedRTPSink类的成员函数continuePlaying,看下这个函数的实现:

复制代码

Boolean MultiFramedRTPSink::continuePlaying()
{ // Send the first packet. // (This will also schedule any future sends.)
buildAndSendPacket(True); return True;
}

复制代码

  这里我们发现了buildAndSendPacket(),这个函数实现:

复制代码

void MultiFramedRTPSink::buildAndSendPacket(Boolean isFirstPacket)
{ //此函数中主要是准备rtp包的头,为一些需要跟据实际数据改变的字段留出位置。
fIsFirstPacket = isFirstPacket; // Set up the RTP header:
unsigned rtpHdr = 0x80000000; // RTP version 2; marker (‘M’) bit not set (by default; it can be set later)
rtpHdr |= (fRTPPayloadType << 16);
rtpHdr |= fSeqNo; // sequence number
fOutBuf->enqueueWord(rtpHdr);//向包中加入一个字 // Note where the RTP timestamp will go. // (We can’t fill this in until we start packing payload frames.)
fTimestampPosition = fOutBuf->curPacketSize();
fOutBuf->skipBytes(4); // leave a hole for the timestamp 在缓冲中空出时间戳的位置
fOutBuf->enqueueWord(SSRC()); // Allow for a special, payload-format-specific header following the // RTP header:
fSpecialHeaderPosition = fOutBuf->curPacketSize();
fSpecialHeaderSize = specialHeaderSize();
fOutBuf->skipBytes(fSpecialHeaderSize); // Begin packing as many (complete) frames into the packet as we can:
fTotalFrameSpecificHeaderSizes = 0;
fNoFramesLeft = False;
fNumFramesUsedSoFar = 0; // 一个包中已打入的帧数。 //头准备好了,再打包帧数据
packFrame();
}

复制代码

  继续看packFrame():

复制代码

void MultiFramedRTPSink::packFrame()
{ // First, see if we have an overflow frame that was too big for the last pkt
if (fOutBuf->haveOverflowData()) { //如果有帧数据,则使用之。OverflowData是指上次打包时剩下的帧数据,因为一个包可能容纳不了一个帧。 // Use this frame before reading a new one from the source
unsigned frameSize = fOutBuf->overflowDataSize(); struct timeval presentationTime = fOutBuf->overflowPresentationTime();
unsigned durationInMicroseconds =fOutBuf->overflowDurationInMicroseconds();
fOutBuf->useOverflowData();

    afterGettingFrame1(frameSize, 0, presentationTime,durationInMicroseconds);
} else { //一点帧数据都没有,跟source要吧。 // Normal case: we need to read a new frame from the source
    if (fSource == NULL) return; //更新缓冲中的一些位置
    fCurFrameSpecificHeaderPosition = fOutBuf->curPacketSize();
    fCurFrameSpecificHeaderSize \= frameSpecificHeaderSize();
    fOutBuf\->skipBytes(fCurFrameSpecificHeaderSize);
    fTotalFrameSpecificHeaderSizes += fCurFrameSpecificHeaderSize; //从source获取下一帧
    fSource->getNextFrame(fOutBuf->curPtr(),//新数据存放开始的位置
            fOutBuf->totalBytesAvailable(),//缓冲中空余的空间大小
            afterGettingFrame,    //因为可能source中的读数据函数会被放在任务调度中,所以把获取帧后应调用的函数传给source
            this,
            ourHandleClosure, //这个是source结束时(比如文件读完了)要调用的函数。
            this);
}

}

复制代码

  fSource定义在MediaSink类中,在这个类中startPlaying()函数中,给fSource赋值为传入的参数sourceVideo,sourceVideo实现getNextFrame()函数在FramedSource中,这是一个虚函数:

复制代码

void FramedSource::getNextFrame(unsigned char* to, unsigned maxSize,
afterGettingFunc* afterGettingFunc, void* afterGettingClientData,
onCloseFunc* onCloseFunc, void* onCloseClientData)
{ // Make sure we’re not already being read:
if (fIsCurrentlyAwaitingData) {
envir() << “FramedSource[“ << this << “]::getNextFrame(): attempting to read more than once at the same time!\n”;
exit(1);
}

fTo \= to;
fMaxSize \= maxSize;
fNumTruncatedBytes \= 0; // by default; could be changed by doGetNextFrame()
fDurationInMicroseconds = 0; // by default; could be changed by doGetNextFrame()
fAfterGettingFunc = afterGettingFunc;
fAfterGettingClientData \= afterGettingClientData;
fOnCloseFunc \= onCloseFunc;
fOnCloseClientData \= onCloseClientData;
fIsCurrentlyAwaitingData \= True;

doGetNextFrame();

}

复制代码

  sourceVideo通过实现H264VideoStreamFramer::createNew()实例化,发现doGetNextFrame()函数实现在H264VideoStreamFramer类中:

复制代码

void H264VideoStreamFramer::doGetNextFrame()
{ //fParser->registerReadInterest(fTo, fMaxSize); //continueReadProcessing();
fInputSource->getNextFrame(fTo, fMaxSize,
afterGettingFrame, this,
FramedSource::handleClosure, this);
}

复制代码

  这fInputSource在H264VideoStreamFramer的基类StreamParser中被初始化为传入的参数H264InputDevice[video_type]->videoSource(),VideoOpenFileSource类继承OpenFileSource类,因此这个doGetNextFrame再一次FramedSource类中的getNextFrame()函数,这次getNextFrame函数中调用的doGetNextFrame()函数则是在OpenFileSource类实现的:

复制代码

void OpenFileSource::incomingDataHandler1() { int ret; if (!isCurrentlyAwaitingData()) return; // we’re not ready for the data yet
ret = readFromFile(); if (ret < 0) {
handleClosure(this);
fprintf(stderr,”In Grab Image, the source stops being readable!!!!\n”);
} else if (ret == 0)
{ if( uSecsToDelay >= uSecsToDelayMax )
{
uSecsToDelay = uSecsToDelayMax;
}else{
uSecsToDelay *= 2;
}
nextTask() = envir().taskScheduler().scheduleDelayedTask(uSecsToDelay, (TaskFunc*)incomingDataHandler, this);
} else {
nextTask() = envir().taskScheduler().scheduleDelayedTask(0, (TaskFunc*)afterGetting, this);
}
}

复制代码

  获取一帧数据后,执行延迟队列中的afterGetting()函数,此函数实现父类FramedSource中:

复制代码

void FramedSource::afterGetting(FramedSource* source)
{
source->fIsCurrentlyAwaitingData = False; // indicates that we can be read again // Note that this needs to be done here, in case the “fAfterFunc” // called below tries to read another frame (which it usually will)

if (source->fAfterGettingFunc != NULL) {
    (\*(source->fAfterGettingFunc))(source->fAfterGettingClientData,
                               source\->fFrameSize, 
                               source\->fNumTruncatedBytes,
                               source\->fPresentationTime,
                               source\->fDurationInMicroseconds);
}

}

复制代码

  fAfterGettingFunc函数指针在getNextFrame()函数被赋值,在MultiFramedRTPSink::packFrame() 函数中,被赋值MultiFramedRTPSink::afterGettingFrame():

复制代码

void MultiFramedRTPSink::afterGettingFrame(void* clientData, unsigned numBytesRead,
unsigned numTruncatedBytes, struct timeval presentationTime,
unsigned durationInMicroseconds)
{
MultiFramedRTPSink* sink = (MultiFramedRTPSink*)clientData;
sink->afterGettingFrame1(numBytesRead, numTruncatedBytes,
presentationTime, durationInMicroseconds);
}

复制代码

  继续看afterGettingFrame1实现:

复制代码

void MultiFramedRTPSink::afterGettingFrame1(
unsigned frameSize,
unsigned numTruncatedBytes, struct timeval presentationTime,
unsigned durationInMicroseconds)
{ if (fIsFirstPacket) { // Record the fact that we’re starting to play now:
gettimeofday(&fNextSendTime, NULL);
} //如果给予一帧的缓冲不够大,就会发生截断一帧数据的现象。但也只能提示一下用户
if (numTruncatedBytes > 0) {

    unsigned const bufferSize = fOutBuf->totalBytesAvailable();
    envir() << "MultiFramedRTPSink::afterGettingFrame1(): The input frame data was too large for our buffer size ("
            << bufferSize << "). "
            << numTruncatedBytes << " bytes of trailing data was dropped!  Correct this by increasing \\"OutPacketBuffer::maxSize\\" to at least "
            << OutPacketBuffer::maxSize + numTruncatedBytes << ", \*before\* creating this 'RTPSink'.  (Current value is "
            << OutPacketBuffer::maxSize << ".)\\n";
}
unsigned curFragmentationOffset \= fCurFragmentationOffset;
unsigned numFrameBytesToUse \= frameSize;
unsigned overflowBytes \= 0; //如果包只已经打入帧数据了,并且不能再向这个包中加数据了,则把新获得的帧数据保存下来。 // If we have already packed one or more frames into this packet, // check whether this new frame is eligible to be packed after them. // (This is independent of whether the packet has enough room for this // new frame; that check comes later.)
if (fNumFramesUsedSoFar > 0) { //如果包中已有了一个帧,并且不允许再打入新的帧了,则只记录下新的帧。
    if ((fPreviousFrameEndedFragmentation && !allowOtherFramesAfterLastFragment()) || !frameCanAppearAfterPacketStart(fOutBuf->curPtr(), frameSize))
    { // Save away this frame for next time:
        numFrameBytesToUse = 0;
        fOutBuf\->setOverflowData(fOutBuf->curPacketSize(), frameSize,
                presentationTime, durationInMicroseconds);
    }
} //表示当前打入的是否是上一个帧的最后一块数据。
fPreviousFrameEndedFragmentation = False; //下面是计算获取的帧中有多少数据可以打到当前包中,剩下的数据就作为overflow数据保存下来。
if (numFrameBytesToUse > 0) { // Check whether this frame overflows the packet
    if (fOutBuf->wouldOverflow(frameSize)) { // Don't use this frame now; instead, save it as overflow data, and // send it in the next packet instead.  However, if the frame is too // big to fit in a packet by itself, then we need to fragment it (and // use some of it in this packet, if the payload format permits this.)
        if (isTooBigForAPacket(frameSize) && (fNumFramesUsedSoFar == 0 || allowFragmentationAfterStart())) { // We need to fragment this frame, and use some of it now:
            overflowBytes = computeOverflowForNewFrame(frameSize);
            numFrameBytesToUse \-= overflowBytes;
            fCurFragmentationOffset += numFrameBytesToUse;
        } else { // We don't use any of this frame now:
            overflowBytes = frameSize;
            numFrameBytesToUse \= 0;
        }
        fOutBuf\->setOverflowData(fOutBuf->curPacketSize() + numFrameBytesToUse,
                overflowBytes, presentationTime, durationInMicroseconds);
    } else if (fCurFragmentationOffset > 0) { // This is the last fragment of a frame that was fragmented over // more than one packet.  Do any special handling for this case:
        fCurFragmentationOffset = 0;
        fPreviousFrameEndedFragmentation \= True;
    }
} if (numFrameBytesToUse == 0 && frameSize > 0) { //如果包中有数据并且没有新数据了,则发送之。(这种情况好像很难发生啊!) // Send our packet now, because we have filled it up:

sendPacketIfNecessary();
} else { //需要向包中打入数据。 // Use this frame in our outgoing packet:
unsigned char* frameStart = fOutBuf->curPtr();
fOutBuf->increment(numFrameBytesToUse); // do this now, in case “doSpecialFrameHandling()” calls “setFramePadding()” to append padding bytes // Here’s where any payload format specific processing gets done:
doSpecialFrameHandling(curFragmentationOffset, frameStart,
numFrameBytesToUse, presentationTime, overflowBytes); ++fNumFramesUsedSoFar; // Update the time at which the next packet should be sent, based // on the duration of the frame that we just packed into it. // However, if this frame has overflow data remaining, then don’t // count its duration yet.
if (overflowBytes == 0) {
fNextSendTime.tv_usec += durationInMicroseconds;
fNextSendTime.tv_sec += fNextSendTime.tv_usec / 1000000;
fNextSendTime.tv_usec %= 1000000;
} //如果需要,就发出包,否则继续打入数据。 // Send our packet now if (i) it’s already at our preferred size, or // (ii) (heuristic) another frame of the same size as the one we just // read would overflow the packet, or // (iii) it contains the last fragment of a fragmented frame, and we // don’t allow anything else to follow this or // (iv) one frame per packet is allowed:
if (fOutBuf->isPreferredSize() || fOutBuf->wouldOverflow(numFrameBytesToUse) || (fPreviousFrameEndedFragmentation && !allowOtherFramesAfterLastFragment()) || !frameCanAppearAfterPacketStart(
fOutBuf->curPtr() - frameSize, frameSize)) { // The packet is ready to be sent now
sendPacketIfNecessary();
} else { // There’s room for more frames; try getting another:
packFrame();
}
}
}

复制代码

看一下发送数据的函数:

复制代码

void MultiFramedRTPSink::sendPacketIfNecessary()
{ //发送包
if (fNumFramesUsedSoFar > 0) { // Send the packet:
#ifdef TEST_LOSS if ((our_random()%10) != 0) // simulate 10% packet loss #####
#endif
if (!fRTPInterface.sendPacket(fOutBuf->packet(),fOutBuf->curPacketSize())) { // if failure handler has been specified, call it
if (fOnSendErrorFunc != NULL)
(*fOnSendErrorFunc)(fOnSendErrorData);
} ++fPacketCount;
fTotalOctetCount += fOutBuf->curPacketSize();
fOctetCount += fOutBuf->curPacketSize() - rtpHeaderSize - fSpecialHeaderSize - fTotalFrameSpecificHeaderSizes; ++fSeqNo; // for next time
} //如果还有剩余数据,则调整缓冲区
if (fOutBuf->haveOverflowData() && fOutBuf->totalBytesAvailable() > fOutBuf->totalBufferSize() / 2) { // Efficiency hack: Reset the packet start pointer to just in front of // the overflow data (allowing for the RTP header and special headers), // so that we probably don’t have to “memmove()” the overflow data // into place when building the next packet:
unsigned newPacketStart = fOutBuf->curPacketSize()- (rtpHeaderSize + fSpecialHeaderSize + frameSpecificHeaderSize());
fOutBuf->adjustPacketStart(newPacketStart);
} else { // Normal case: Reset the packet start pointer back to the start:
fOutBuf->resetPacketStart();
}
fOutBuf->resetOffset();
fNumFramesUsedSoFar = 0; if (fNoFramesLeft) { //如果再没有数据了,则结束之 // We’re done:
onSourceClosure(this);
} else { //如果还有数据,则在下一次需要发送的时间再次打包发送。 // We have more frames left to send. Figure out when the next frame // is due to start playing, then make sure that we wait this long before // sending the next packet.
struct timeval timeNow;
gettimeofday(&timeNow, NULL); int secsDiff = fNextSendTime.tv_sec - timeNow.tv_sec;
int64_t uSecondsToGo = secsDiff * 1000000
+ (fNextSendTime.tv_usec - timeNow.tv_usec); if (uSecondsToGo < 0 || secsDiff < 0) { // sanity check: Make sure that the time-to-delay is non-negative:
uSecondsToGo = 0;
} // Delay this amount of time:
nextTask() = envir().taskScheduler().scheduleDelayedTask(uSecondsToGo,
(TaskFunc*) sendNext, this);
}
}

复制代码

  当一帧数据发送完,在doEventLoop()函数执行任务函数sendNext(),继续发送一包,进行下一个循环。音频数据的发送也是如此。

总结一下调用过程(参考牛搞大神):

单播数据发送:
  单播的时候,只有收到客服端的“PLAY”的命令时,才开始发送数据,在RTSPClientSession类中handleCmd_PLAY()函数中调用

复制代码

void RTSPServer::RTSPClientSession
::handleCmd_PLAY(ServerMediaSubsession* subsession, char const* cseq, char const* fullRequestStr)
{

  ... ...

    fStreamStates[i].subsession->startStream(fOurSessionId,
fStreamStates[i].streamToken,
(TaskFunc*)noteClientLiveness, this,
rtpSeqNum,
rtpTimestamp);
   … …
}

复制代码

  startStream()函数定义在OnDemandServerMediaSubsession类中:

复制代码

void OnDemandServerMediaSubsession::startStream(unsigned clientSessionId, void* streamToken,
TaskFunc* rtcpRRHandler, void* rtcpRRHandlerClientData,
unsigned short& rtpSeqNum,
unsigned& rtpTimestamp)
{
  StreamState* streamState = (StreamState*)streamToken;
  Destinations* destinations = (Destinations*)(fDestinationsHashTable->Lookup((char const*)clientSessionId)); if (streamState != NULL) {
    streamState->startPlaying(destinations, rtcpRRHandler, rtcpRRHandlerClientData); if (streamState->rtpSink() != NULL) {
      rtpSeqNum = streamState->rtpSink()->currentSeqNo();
      rtpTimestamp = streamState->rtpSink()->presetNextTimestamp();
}
}
}

复制代码

  startPlaying函数实现在StreamState类中:

复制代码

void StreamState::startPlaying(Destinations* dests,
TaskFunc* rtcpRRHandler, void* rtcpRRHandlerClientData)
{ if (dests == NULL) return; if (!fAreCurrentlyPlaying && fMediaSource != NULL) { if (fRTPSink != NULL) {
fRTPSink->startPlaying(*fMediaSource, afterPlayingStreamState, this);
fAreCurrentlyPlaying = True;
} else if (fUDPSink != NULL) {
fUDPSink->startPlaying(*fMediaSource, afterPlayingStreamState, this);
fAreCurrentlyPlaying = True;
}
} if (fRTCPInstance == NULL && fRTPSink != NULL) { // Create (and start) a ‘RTCP instance’ for this RTP sink:
fRTCPInstance = RTCPInstance::createNew(fRTPSink->envir(), fRTCPgs,
fTotalBW, (unsigned char*)fMaster.fCNAME,
fRTPSink, NULL /* we’re a server */); // Note: This starts RTCP running automatically
} if (dests->isTCP) { // Change RTP and RTCP to use the TCP socket instead of UDP:
if (fRTPSink != NULL) {
fRTPSink->addStreamSocket(dests->tcpSocketNum, dests->rtpChannelId);
} if (fRTCPInstance != NULL) {
fRTCPInstance->addStreamSocket(dests->tcpSocketNum, dests->rtcpChannelId);
fRTCPInstance->setSpecificRRHandler(dests->tcpSocketNum, dests->rtcpChannelId,
rtcpRRHandler, rtcpRRHandlerClientData);
}
} else { // Tell the RTP and RTCP ‘groupsocks’ about this destination // (in case they don’t already have it):
if (fRTPgs != NULL) fRTPgs->addDestination(dests->addr, dests->rtpPort); if (fRTCPgs != NULL) fRTCPgs->addDestination(dests->addr, dests->rtcpPort); if (fRTCPInstance != NULL) {
fRTCPInstance->setSpecificRRHandler(dests->addr.s_addr, dests->rtcpPort,
rtcpRRHandler, rtcpRRHandlerClientData);
}
}
}

复制代码

  这个函数就会去调用RTPSink类中的startPlaying()函数,但是RTPSink没有实现,直接调用父类MediaSink中的startPlaying函数。后面就跟组播一样的采集,组包,发送数据了。