EasyDarwin RTSP會話流程解析

在main.cpp的main函數里:

...//This function starts, runs, and shuts down the server if (::StartServer(&theXMLParser, &theMessagesSource, thePort, statsUpdateInterval, theInitialState, dontFork, debugLevel, debugOptions) != qtssFatalErrorState) {  ::RunServer(); CleanPid(false); exit (EXIT_SUCCESS); } else exit(-1); //Cant start server don't try again
QTSS_ServerState StartServer(XMLPrefsParser* inPrefsSource, PrefsSource* inMessagesSource, UInt16 inPortOverride, int statsUpdateInterval, QTSS_ServerState inInitialState, Bool16 inDontFork, UInt32 debugLevel, UInt32 debugOptions){ QTSS_ServerState theServerState = qtssStartingUpState; sStatusUpdateInterval = statsUpdateInterval; ... //start the server QTSSDictionaryMap::Initialize(); QTSServerInterface::Initialize();// this must be called before constructing the server object sServer = NEW QTSServer(); sServer->SetDebugLevel(debugLevel); sServer->SetDebugOptions(debugOptions);  // re-parse config file inPrefsSource->Parse(); ... sServer->Initialize(inPrefsSource, inMessagesSource, inPortOverride,createListeners); ... if (sServer->GetServerState() != qtssFatalErrorState) { IdleTask::Initialize(); Socket::StartThread(); OSThread::Sleep(1000); sServer->InitModules(inInitialState); //開始監聽 sServer->StartTasks(); sServer->SetupUDPSockets(); // udp sockets are set up after the rtcp task is instantiated theServerState = sServer->GetServerState(); } return theServerState;}

在StartServer裡首先創建QTSServer,然後調用了sServer->Initialize函數,在Initialize調用了CreateListeners函數依據xml的配置文件創建了監聽的套接字來監聽RTSP的端口,這兩個函數的實現如下:

Bool16 QTSServer::Initialize(XMLPrefsParser* inPrefsSource, PrefsSource* inMessagesSource, UInt16 inPortOverride, Bool16 createListeners){ ... // CREATE GLOBAL OBJECTS fSocketPool = new RTPSocketPool(); fRTPMap = new OSRefTable(kRTPSessionMapSize); fHLSMap = new OSRefTable(kHLSSessionMapSize); fReflectorSessionMap = new OSRefTable(kReflectorSessionMapSize); ... // BEGIN LISTENING if (createListeners) { if ( !this->CreateListeners(false, fSrvrPrefs, inPortOverride) ) QTSSModuleUtils::LogError(qtssWarningVerbosity, qtssMsgSomePortsFailed, 0); } ... fServerState = qtssStartingUpState; return true;}
Bool16 QTSServer::CreateListeners(Bool16 startListeningNow, QTSServerPrefs* inPrefs, UInt16 inPortOverride){ // Create any new listeners we need for (UInt32 count3 = 0; count3 < theTotalRTSPPortTrackers; count3++) { if (theRTSPPortTrackers[count3].fNeedsCreating) { newListenerArray[curPortIndex] = NEW RTSPListenerSocket(); //在Initialize創建了TCPSocket QTSS_Error err = newListenerArray[curPortIndex]->Initialize(theRTSPPortTrackers[count3].fIPAddr, theRTSPPortTrackers[count3].fPort); ... else { // // This listener was successfully created. if (startListeningNow) newListenerArray[curPortIndex]->RequestEvent(EV_RE); curPortIndex++; } } }}

然後StartServer裡調用sServer->StartTasks()開始監聽。StartTasks是如何開始監聽的呢?

void QTSServer::StartTasks(){ fRTCPTask = new RTCPTask(); fStatsTask = new RTPStatsUpdaterTask(); // // Start listening for (UInt32 x = 0; x < fNumListeners; x++) fListeners[x]->RequestEvent(EV_RE);}

這裡的fListeners就是RTSPListenerSocket,其繼承關係如下圖,RequestEvent就是把創建的fFileDesc和EV_RE(讀事件)添加到epoll中.

EasyDarwin RTSP會話流程解析

123.jpg

在RequestEvent裡有這麼兩行代碼

fRef.Set(fUniqueIDStr, this);fEventThread->fRefTable.Register(&fRef);

這個fEventThread是在StartServer裡被創建且開始運行的,在該線程裡epoll會監聽網絡事件,網絡事件來的時候,會調用fRefTable存儲的EventContext指針對象的ProcessEvent處理事件;fListeners向fEventThread的fRefTable註冊了fRef,也就是等網絡事件來的時候調用到fListeners的ProcessEvent處理。

該線程函數的代碼如下:

void EventThread::Entry(){ struct eventreq theCurrentEvent; ::memset( &theCurrentEvent, '\0', sizeof(theCurrentEvent) );  while (true) { int theErrno = EINTR; while (theErrno == EINTR) {#if MACOSXEVENTQUEUE int theReturnValue = waitevent(&theCurrentEvent, NULL);#else #if defined(__linux__) int theReturnValue = epoll_waitevent(&theCurrentEvent, NULL); #else int theReturnValue = select_waitevent(&theCurrentEvent, NULL);  #endif#endif  ... } AssertV(theErrno == 0, theErrno);  //ok, there's data waiting on this socket. Send a wakeup. if (theCurrentEvent.er_data != NULL) { //The cookie in this event is an ObjectID. Resolve that objectID into //a pointer. StrPtrLen idStr((char*)&theCurrentEvent.er_data, sizeof(theCurrentEvent.er_data)); OSRef* ref = fRefTable.Resolve(&idStr); if (ref != NULL) { EventContext* theContext = (EventContext*)ref->GetObject();#if DEBUG theContext->fModwatched = false;#endif theContext->ProcessEvent(theCurrentEvent.er_eventbits); fRefTable.Release(ref); } } ...}

那在ProcessEvent裡幹了些什麼事情呢?

void TCPListenerSocket::ProcessEvent(int /*eventBits*/){ ... //fSocket data member of TCPSocket. int osSocket = accept(fFileDesc, (struct sockaddr*)&addr, &size); ... theTask = this->GetSessionTask(&theSocket); if (theTask == NULL) { //this should be a disconnect. do an ioctl call? close(osSocket); if (theSocket) theSocket->fState &= ~kConnected; // turn off connected state } else {  Assert(osSocket != EventContext::kInvalidFileDesc);  //set options on the socket //we are a server, always disable nagle algorithm int one = 1; int err = ::setsockopt(osSocket, IPPROTO_TCP, TCP_NODELAY, (char*)&one, sizeof(int)); AssertV(err == 0, OSThread::GetErrno()); err = ::setsockopt(osSocket, SOL_SOCKET, SO_KEEPALIVE, (char*)&one, sizeof(int)); AssertV(err == 0, OSThread::GetErrno());  int sndBufSize = 96L * 1024L; err = ::setsockopt(osSocket, SOL_SOCKET, SO_SNDBUF, (char*)&sndBufSize, sizeof(int)); AssertV(err == 0, OSThread::GetErrno());  //setup the socket. When there is data on the socket, //theTask will get an kReadEvent event theSocket->Set(osSocket, &addr); theSocket->InitNonBlocking(osSocket); theSocket->SetTask(theTask); theSocket->RequestEvent(EV_RE); theTask->SetThreadPicker(Task::GetBlockingTaskThreadPicker()); //The RTSP Task processing threads } ...}

從TCPListenerSocket::ProcessEvent可知,在該函數里accept RTSP的連接請求,然後調用GetSessionTask創建RTSPSession了,theSocket attach上了osSocket,然後設置osSocket為非阻塞,設置處理osSocket的網絡事件為RTSPSession,把osSocket的讀事件加入到epoll中。至此我們可以知道該連接的所有網絡事件由RTSPSession處理。theSocket是一個TCPSocket指針對象,由RTSPSession創建的,而不再是一個RTSPListenerSocket了。

RTSPSession是如何獲取到網絡事件的?從TCPListenerSocket::ProcessEvent中知道,theSocket對象調用了RequestEvent,請求了讀事件,也就是在epoll中等待網絡讀事件,當讀到RTSP會話的對端發送了RTSP的信息時(在EventThread線程函數Entry裡,如下所示),會調用到EventContext的ProcessEvent函數。

void EventThread::Entry(){ struct eventreq theCurrentEvent; ::memset( &theCurrentEvent, '\0', sizeof(theCurrentEvent) );  while (true) { int theErrno = EINTR; while (theErrno == EINTR) {#if MACOSXEVENTQUEUE int theReturnValue = waitevent(&theCurrentEvent, NULL);#else #if defined(__linux__) int theReturnValue = epoll_waitevent(&theCurrentEvent, NULL); #else int theReturnValue = select_waitevent(&theCurrentEvent, NULL);  #endif#endif  //Sort of a hack. In the POSIX version of the server, waitevent can return //an actual POSIX errorcode. if (theReturnValue >= 0) theErrno = theReturnValue; else theErrno = OSThread::GetErrno(); } AssertV(theErrno == 0, theErrno);  //ok, there's data waiting on this socket. Send a wakeup. if (theCurrentEvent.er_data != NULL) { //The cookie in this event is an ObjectID. Resolve that objectID into //a pointer. StrPtrLen idStr((char*)&theCurrentEvent.er_data, sizeof(theCurrentEvent.er_data)); OSRef* ref = fRefTable.Resolve(&idStr); if (ref != NULL) { EventContext* theContext = (EventContext*)ref->GetObject();#if DEBUG theContext->fModwatched = false;#endif theContext->ProcessEvent(theCurrentEvent.er_eventbits); fRefTable.Release(ref); } }#if EVENT_CONTEXT_DEBUG SInt64 yieldStart = OS::Milliseconds();#endif #if 0//defined(__linux__) #else this->ThreadYield(); #endif #if EVENT_CONTEXT_DEBUG SInt64 yieldDur = OS::Milliseconds() - yieldStart; static SInt64 numZeroYields;  if ( yieldDur > 1 ) { qtss_printf( "EventThread time in OSTHread::Yield %i, numZeroYields %i\n", (SInt32)yieldDur, (SInt32)numZeroYields ); numZeroYields = 0; } else numZeroYields++;#endif }} 

ProcessEvent會給RTSPSession這個Task發送一個kReadEvent的信號,此處的fTask就是RTSPSession。於是RTSPSession這個task會被加入到線程隊列中,等待RTSPSession的run方法被調用去處理網絡的讀事件,也就是處理RTSP會話。

RTSPSession::run函數採用狀態機處理RTSP會話,其狀態機設計如下:

EasyDarwin RTSP會話流程解析

1234.jpg

• kReadingFirstRequest,RTSPSession剛被創建時,都是首先進入這個狀態,然後讀取客戶端發送過來的RTSP request。

• kHTTPFilteringRequest,然後檢查該請求是不是以RTSP-OVER-HTTP的方式進行RTSP交互。

• kHaveNonTunnelMessage,開始加鎖,防止在該請求還沒有處理完的時候,禁止其他的任何RTSP請求被處理。

• kFilteringRequest,會給每個訂閱了QTSS_RTSPFilter_Role角色的模塊調用QTSS_RTSPFilter_Role的回調函數。如果不是OPTION命令或者SET_PARAMETER命令,則會創建RTPSession。如果是RTP的數據包,則會調用QTSS_RTSPIncomingData_Role角色的回調,每個訂閱了該角色的模塊都可以獲取到RTP包的內容。

• kPostProcessingRequest,如果創建了RTPSession,則會給所有訂閱了QTSS_RTSPPostProcessor_Role角色的模塊調用QTSS_RTSPPostProcessor_Role回調函數。

• kCleaningUp,在該狀態機裡釋放fSessionMutex和fReadMutex鎖。

• kReadingRequest,除了RTSP剛被創建時,讀取request消息是在kReadingFirstRequest狀態機裡,其餘所有的request消息都從kReadingRequest消息開始處理。

• kRoutingRequest,調用QTSS_RTSPRoute_Role角色的回調。

• kPreprocessingRequest,調用QTSS_RTSPPreProcessor_Role角色的回調,如果是TEARDOWN的request,則被調用的模塊可能會調用RTPSession的Teardown方法,這樣就會給RTPSession這個TASK發送一個kKillEvent的事件,RTPSession收到該事件後,會調用QTSS_ClientSessionClosing_Role角色的回調,關閉該會話。

每天定期的更新論文和視屏,每天晚上8.30和下午3.30會在騰訊課堂講解其他的幹活:https://ke.qq.com/course/131973#tuin=b52b9a80


分享到:


相關文章: