diff --git a/CMakeLists.txt b/CMakeLists.txt index 9a2fc63d2..215559326 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -162,6 +162,7 @@ option(ENABLE_PKTINFO "Enable using IP_PKTINFO to allow the listener extracting option(ENABLE_RELATIVE_LIBPATH "Should application contain relative library paths, like ../lib" OFF) option(ENABLE_GETNAMEINFO "In-logs sockaddr-to-string should do rev-dns" OFF) option(ENABLE_UNITTESTS "Enable unit tests" OFF) +option(DISABLE_UNITTESTS_DISCOVERY "Do not discover unit tests when enabled" OFF) option(ENABLE_ENCRYPTION "Enable encryption in SRT" ON) option(ENABLE_AEAD_API_PREVIEW "Enable AEAD API preview in SRT" Off) option(ENABLE_MAXREXMITBW "Enable SRTO_MAXREXMITBW (v1.6.0 API preview)" Off) @@ -1527,7 +1528,9 @@ if (ENABLE_UNITTESTS AND ENABLE_CXX11) #set_tests_properties(test-srt PROPERTIES RUN_SERIAL TRUE) else() set_tests_properties(${tests_srt} PROPERTIES RUN_SERIAL TRUE) - gtest_discover_tests(test-srt) + if (NOT DISABLE_UNITTESTS_DISCOVERY) + gtest_discover_tests(test-srt) + endif() endif() enable_testing() diff --git a/srtcore/api.h b/srtcore/api.h index b5d6be915..7b7782f03 100644 --- a/srtcore/api.h +++ b/srtcore/api.h @@ -134,7 +134,8 @@ class CUDTSocket return m_iBusy; } - + // XXX Controversial as to whether it should be guarded by this lock. + // It is used in many places without the lock, and it is also atomic. SRT_ATTR_GUARDED_BY(m_ControlLock) sync::atomic m_Status; //< current socket state diff --git a/srtcore/buffer_rcv.cpp b/srtcore/buffer_rcv.cpp index 363bd7e9c..927529831 100644 --- a/srtcore/buffer_rcv.cpp +++ b/srtcore/buffer_rcv.cpp @@ -75,22 +75,6 @@ namespace { #define IF_RCVBUF_DEBUG(instr) (void)0 - // Check if iFirstNonreadPos is in range [iStartPos, (iStartPos + iMaxPosOff) % iSize]. - // The right edge is included because we expect iFirstNonreadPos to be - // right after the last valid packet position if all packets are available. - bool isInRange(int iStartPos, int iMaxPosOff, size_t iSize, int iFirstNonreadPos) - { - if (iFirstNonreadPos == iStartPos) - return true; - - const int iLastPos = (iStartPos + iMaxPosOff) % iSize; - const bool isOverrun = iLastPos < iStartPos; - - if (isOverrun) - return iFirstNonreadPos > iStartPos || iFirstNonreadPos <= iLastPos; - - return iFirstNonreadPos > iStartPos && iFirstNonreadPos <= iLastPos; - } } @@ -119,13 +103,15 @@ CRcvBuffer::CRcvBuffer(int initSeqNo, size_t size, CUnitQueue* unitqueue, bool b : m_entries(size) , m_szSize(size) // TODO: maybe just use m_entries.size() , m_pUnitQueue(unitqueue) - , m_iStartSeqNo(initSeqNo) + , m_iStartSeqNo(initSeqNo) // NOTE: SRT_SEQNO_NONE is allowed here. , m_iStartPos(0) + , m_iEndOff(0) + , m_iDropOff(0) , m_iFirstNonreadPos(0) , m_iMaxPosOff(0) , m_iNotch(0) - , m_numOutOfOrderPackets(0) - , m_iFirstReadableOutOfOrder(-1) + , m_numNonOrderPackets(0) + , m_iFirstNonOrderMsgPos(CPos_TRAP) , m_bPeerRexmitFlag(true) , m_bMessageAPI(bMessageAPI) , m_iBytesCount(0) @@ -142,85 +128,352 @@ CRcvBuffer::~CRcvBuffer() { if (!it->pUnit) continue; - + m_pUnitQueue->makeUnitFree(it->pUnit); it->pUnit = NULL; } } -int CRcvBuffer::insert(CUnit* unit) +void CRcvBuffer::debugShowState(const char* source SRT_ATR_UNUSED) +{ + HLOGC(brlog.Debug, log << "RCV-BUF-STATE(" << source + << ") start=" << m_iStartPos + << " end=+" << m_iEndOff + << " drop=+" << m_iDropOff + << " max-off=+" << m_iMaxPosOff + << " seq[start]=%" << m_iStartSeqNo.val()); +} + +CRcvBuffer::InsertInfo CRcvBuffer::insert(CUnit* unit) { SRT_ASSERT(unit != NULL); const int32_t seqno = unit->m_Packet.getSeqNo(); - const int offset = CSeqNo::seqoff(m_iStartSeqNo, seqno); + const COff offset = COff(CSeqNo(seqno) - m_iStartSeqNo); IF_RCVBUF_DEBUG(ScopedLog scoped_log); IF_RCVBUF_DEBUG(scoped_log.ss << "CRcvBuffer::insert: seqno " << seqno); IF_RCVBUF_DEBUG(scoped_log.ss << " msgno " << unit->m_Packet.getMsgSeq(m_bPeerRexmitFlag)); IF_RCVBUF_DEBUG(scoped_log.ss << " m_iStartSeqNo " << m_iStartSeqNo << " offset " << offset); - if (offset < 0) + if (offset < COff(0)) { IF_RCVBUF_DEBUG(scoped_log.ss << " returns -2"); - return -2; + return InsertInfo(InsertInfo::BELATED); } + IF_HEAVY_LOGGING(string debug_source = "insert %" + Sprint(seqno)); - if (offset >= (int)capacity()) + if (offset >= COff(capacity())) { IF_RCVBUF_DEBUG(scoped_log.ss << " returns -3"); - return -3; + + InsertInfo ireport (InsertInfo::DISCREPANCY); + getAvailInfo((ireport)); + + IF_HEAVY_LOGGING(debugShowState((debug_source + " overflow").c_str())); + + return ireport; } // TODO: Don't do assert here. Process this situation somehow. // If >= 2, then probably there is a long gap, and buffer needs to be reset. SRT_ASSERT((m_iStartPos + offset) / m_szSize < 2); - const int pos = (m_iStartPos + offset) % m_szSize; + const CPos newpktpos = incPos(m_iStartPos, offset); + const COff prev_max_off = m_iMaxPosOff; + bool extended_end = false; if (offset >= m_iMaxPosOff) - m_iMaxPosOff = offset + 1; + { + m_iMaxPosOff = offset + COff(1); + extended_end = true; + } // Packet already exists - SRT_ASSERT(pos >= 0 && pos < int(m_szSize)); - if (m_entries[pos].status != EntryState_Empty) + // (NOTE: the above extension of m_iMaxPosOff is + // possible even before checking that the packet + // exists because existence of a packet beyond + // the current max position is not possible). + SRT_ASSERT(newpktpos >= 0 && newpktpos < int(m_szSize)); + if (m_entries[newpktpos].status != EntryState_Empty) { IF_RCVBUF_DEBUG(scoped_log.ss << " returns -1"); - return -1; + IF_HEAVY_LOGGING(debugShowState((debug_source + " redundant").c_str())); + return InsertInfo(InsertInfo::REDUNDANT); } - SRT_ASSERT(m_entries[pos].pUnit == NULL); + SRT_ASSERT(m_entries[newpktpos].pUnit == NULL); m_pUnitQueue->makeUnitTaken(unit); - m_entries[pos].pUnit = unit; - m_entries[pos].status = EntryState_Avail; + m_entries[newpktpos].pUnit = unit; + m_entries[newpktpos].status = EntryState_Avail; countBytes(1, (int)unit->m_Packet.getLength()); + // Set to a value, if due to insertion there was added + // a packet that is earlier to be retrieved than the earliest + // currently available packet. + time_point earlier_time = updatePosInfo(unit, prev_max_off, offset, extended_end); + + InsertInfo ireport (InsertInfo::INSERTED); + ireport.first_time = earlier_time; + // If packet "in order" flag is zero, it can be read out of order. // With TSBPD enabled packets are always assumed in order (the flag is ignored). if (!m_tsbpd.isEnabled() && m_bMessageAPI && !unit->m_Packet.getMsgOrderFlag()) { - ++m_numOutOfOrderPackets; - onInsertNotInOrderPacket(pos); + ++m_numNonOrderPackets; + onInsertNonOrderPacket(newpktpos); } updateNonreadPos(); + + // This updates only the first_seq and avail_range fields. + getAvailInfo((ireport)); + IF_RCVBUF_DEBUG(scoped_log.ss << " returns 0 (OK)"); - return 0; + IF_HEAVY_LOGGING(debugShowState((debug_source + " ok").c_str())); + + return ireport; } +void CRcvBuffer::getAvailInfo(CRcvBuffer::InsertInfo& w_if) +{ + // This finds the first possible available packet, which is + // preferably at cell 0, but if not available, try also with + // given fallback position, if it's set + if (m_entries[m_iStartPos].status == EntryState_Avail) + { + const CPacket* pkt = &packetAt(m_iStartPos); + SRT_ASSERT(pkt); + w_if.avail_range = m_iEndOff; + w_if.first_seq = CSeqNo(pkt->getSeqNo()); + return; + } + + // If not the first position, probe the skipped positions: + // - for live mode, check the DROP position + // (for potential after-drop reading) + // - for message mode, check the non-order message position + // (for potential out-of-oder message delivery) + + const CPacket* pkt = NULL; + if (m_tsbpd.isEnabled()) + { + // With TSBPD you can rely on drop position, if set + // Drop position must point always to a valid packet. + // Drop position must start from +1; 0 means no drop. + if (m_iDropOff) + { + pkt = &packetAt(incPos(m_iStartPos, m_iDropOff)); + SRT_ASSERT(pkt); + } + } + else + { + // Message-mode: try non-order read position. + if (m_iFirstNonOrderMsgPos != CPos_TRAP) + { + pkt = &packetAt(m_iFirstNonOrderMsgPos); + SRT_ASSERT(pkt); + } + } + + if (!pkt) + { + // This is default, but set just in case + // The default seq is SRT_SEQNO_NONE. + w_if.avail_range = COff(0); + return; + } + + // TODO: we know that at least 1 packet is available, but only + // with m_iEndOff we know where the true range is. This could also + // be implemented for message mode, but still this would employ + // a separate begin-end range declared for a complete out-of-order + // message. + w_if.avail_range = COff(1); + w_if.first_seq = CSeqNo(pkt->getSeqNo()); +} + + +// This function is called exclusively after packet insertion. +// This will update also m_iEndOff and m_iDropOff fields (the latter +// regardless of the TSBPD mode). +CRcvBuffer::time_point CRcvBuffer::updatePosInfo(const CUnit* unit, const COff prev_max_off, + const COff offset, + const bool extended_end) +{ + time_point earlier_time; + + // Update flags + // Case [A]: insertion of the packet has extended the busy region. + if (extended_end) + { + // THIS means that the buffer WAS CONTIGUOUS BEFORE. + if (m_iEndOff == prev_max_off) + { + // THIS means that the new packet didn't CAUSE a gap + if (m_iMaxPosOff == prev_max_off + 1) + { + // This means that m_iEndOff now shifts by 1, + // and m_iDropOff is set to 0 as there's no gap. + m_iEndOff = m_iMaxPosOff; + m_iDropOff = 0; + } + else + { + // Otherwise we have a drop-after-gap candidate + // which is the currently inserted packet. + // Therefore m_iEndOff STAYS WHERE IT IS. + m_iDropOff = m_iMaxPosOff - 1; + } + } + } + // + // Since this place, every 'offset' is in the range + // between m_iEndOff (inclusive) and m_iMaxPosOff. + else if (offset == m_iEndOff) + { + // Case [D]: inserted a packet at the first gap following the + // contiguous region. This makes a potential to extend the + // contiguous region and we need to find its end. + + // If insertion happened at the very first packet, it is the + // new earliest packet now. In any other situation under this + // condition there's some contiguous packet range preceding + // this position. + if (m_iEndOff == 0) + { + earlier_time = getPktTsbPdTime(unit->m_Packet.getMsgTimeStamp()); + } + + updateGapInfo(); + } + else if (offset < m_iDropOff) + { + // Case [C]: the newly inserted packet precedes the + // previous earliest delivery position after drop, + // that is, there is now a "better" after-drop delivery + // candidate. + + // New position updated a valid packet on an earlier + // position than the drop position was before, although still + // following a gap. + // + // We know it because if the position has filled a gap following + // a valid packet, this preceding valid packet would be pointed + // by m_iDropOff, or it would point to some earlier packet in a + // contiguous series of valid packets following a gap, hence + // the above condition wouldn't be satisfied. + m_iDropOff = offset; + + // If there's an inserted packet BEFORE drop-pos (which makes it + // a new drop-pos), while the very first packet is absent (the + // below condition), it means we have a new earliest-available + // packet. Otherwise we would have only a newly updated drop + // position, but still following some earlier contiguous range + // of valid packets - so it's earlier than previous drop, but + // not earlier than the earliest packet. + if (m_iEndOff == 0) + { + earlier_time = getPktTsbPdTime(unit->m_Packet.getMsgTimeStamp()); + } + } + // OTHERWISE: case [B] in which nothing is to be updated. + + return earlier_time; +} + +// This function is called when the m_iEndOff has been set to a new +// position and the m_iDropOff should be calculated since that position again. +void CRcvBuffer::updateGapInfo() +{ + COff from = m_iEndOff; + SRT_ASSERT(m_entries[incPos(m_iStartPos, m_iMaxPosOff)].status == EntryState_Empty); + + CPos pos = incPos(m_iStartPos, from); + + if (m_entries[pos].status == EntryState_Avail) + { + CPos end_pos = incPos(m_iStartPos, m_iMaxPosOff); + + for (; pos != end_pos; pos = incPos(pos)) + { + if (m_entries[pos].status != EntryState_Avail) + break; + } + + m_iEndOff = offPos(m_iStartPos, pos); + } + + // XXX This should be this way, but there are still inconsistencies + // in the message code. + //USE: SRT_ASSERT(m_entries[incPos(m_iStartPos, m_iEndOff)].status == EntryState_Empty); + SRT_ASSERT(m_entries[incPos(m_iStartPos, m_iEndOff)].status != EntryState_Avail); + + // XXX Controversy: m_iDropOff is only used in case when SRTO_TLPKTDROP + // is set. This option is not handled in message mode, only in live mode. + // Dropping by packet makes sense only in case of packetwise reading, + // which isn't the case of neither stream nor message mode. + if (!m_tsbpd.isEnabled()) + { + m_iDropOff = 0; + return; + } + + // Do not touch m_iDropOff if it's still beside the contiguous + // region. DO NOT SEARCH for m_iDropOff if m_iEndOff is max + // because this means that the whole buffer is contiguous. + // That would simply find nothing and only uselessly burden the + // performance by searching for a not present empty cell. + + // Also check if the current drop position is a readable packet. + // If not, start over. + CPos drop_pos = incPos(m_iStartPos, m_iDropOff); + + if (m_iDropOff < m_iEndOff || m_entries[drop_pos].status != EntryState_Avail) + { + m_iDropOff = 0; + if (m_iEndOff < m_iMaxPosOff) + { + CPos start = incPos(m_iStartPos, m_iEndOff + 1), + end = incPos(m_iStartPos, m_iEndOff); + + for (CPos i = start; i != end; i = incPos(i)) + { + if (m_entries[i].status == EntryState_Avail) + { + m_iDropOff = offPos(m_iStartPos, i); + break; + } + } + + // Must be found somewhere, worst case at the position + // of m_iMaxPosOff-1. If no finding loop caught it somehow, + // it will remain at 0. The case when you have empty packets + // in the busy range is only with message mode after reading + // packets out-of-order, but this doesn't use tsbpd mode. + SRT_ASSERT(m_iDropOff != 0); + } + } +} + +/// Request to remove from the receiver buffer +/// all packets with earlier sequence than @a seqno. +/// (Meaning, the packet with given sequence shall +/// be the first packet in the buffer after the operation). std::pair CRcvBuffer::dropUpTo(int32_t seqno) { IF_RCVBUF_DEBUG(ScopedLog scoped_log); IF_RCVBUF_DEBUG(scoped_log.ss << "CRcvBuffer::dropUpTo: seqno " << seqno << " m_iStartSeqNo " << m_iStartSeqNo); - int len = CSeqNo::seqoff(m_iStartSeqNo, seqno); + COff len = COff(CSeqNo(seqno) - m_iStartSeqNo); if (len <= 0) { IF_RCVBUF_DEBUG(scoped_log.ss << ". Nothing to drop."); return std::make_pair(0, 0); } - m_iMaxPosOff -= len; - if (m_iMaxPosOff < 0) - m_iMaxPosOff = 0; + m_iMaxPosOff = decOff(m_iMaxPosOff, len); + m_iEndOff = decOff(m_iEndOff, len); + m_iDropOff = decOff(m_iDropOff, len); int iNumDropped = 0; // Number of dropped packets that were missing. int iNumDiscarded = 0; // The number of dropped packets that existed in the buffer. @@ -229,9 +482,9 @@ std::pair CRcvBuffer::dropUpTo(int32_t seqno) // Note! Dropping a EntryState_Read must not be counted as a drop because it was read. // Note! Dropping a EntryState_Drop must not be counted as a drop because it was already dropped and counted earlier. if (m_entries[m_iStartPos].status == EntryState_Avail) - ++iNumDiscarded; + ++iNumDiscarded; else if (m_entries[m_iStartPos].status == EntryState_Empty) - ++iNumDropped; + ++iNumDropped; dropUnitInPos(m_iStartPos); m_entries[m_iStartPos].status = EntryState_Empty; SRT_ASSERT(m_entries[m_iStartPos].pUnit == NULL && m_entries[m_iStartPos].status == EntryState_Empty); @@ -240,19 +493,23 @@ std::pair CRcvBuffer::dropUpTo(int32_t seqno) } // Update positions - m_iStartSeqNo = seqno; + m_iStartSeqNo = CSeqNo(seqno); // Move forward if there are "read/drop" entries. + // (This call MAY shift m_iStartSeqNo further.) releaseNextFillerEntries(); + updateGapInfo(); + // If the nonread position is now behind the starting position, set it to the starting position and update. // Preceding packets were likely missing, and the non read position can probably be moved further now. - if (!isInRange(m_iStartPos, m_iMaxPosOff, m_szSize, m_iFirstNonreadPos)) + if (!isInUsedRange(m_iFirstNonreadPos)) { m_iFirstNonreadPos = m_iStartPos; updateNonreadPos(); } if (!m_tsbpd.isEnabled() && m_bMessageAPI) - updateFirstReadableOutOfOrder(); + updateFirstReadableNonOrder(); + IF_HEAVY_LOGGING(debugShowState(("drop %" + Sprint(seqno)).c_str())); return std::make_pair(iNumDropped, iNumDiscarded); } @@ -261,7 +518,7 @@ int CRcvBuffer::dropAll() if (empty()) return 0; - const int end_seqno = CSeqNo::incseq(m_iStartSeqNo, m_iMaxPosOff); + const int32_t end_seqno = CSeqNo::incseq(m_iStartSeqNo.val(), m_iMaxPosOff); const std::pair numDropped = dropUpTo(end_seqno); return numDropped.first + numDropped.second; } @@ -274,24 +531,24 @@ int CRcvBuffer::dropMessage(int32_t seqnolo, int32_t seqnohi, int32_t msgno, Dro << m_iStartSeqNo); // Drop by packet seqno range to also wipe those packets that do not exist in the buffer. - const int offset_a = CSeqNo::seqoff(m_iStartSeqNo, seqnolo); - const int offset_b = CSeqNo::seqoff(m_iStartSeqNo, seqnohi); + const int offset_a = CSeqNo(seqnolo) - m_iStartSeqNo; + const int offset_b = CSeqNo(seqnohi) - m_iStartSeqNo; if (offset_b < 0) { LOGC(rbuflog.Debug, log << "CRcvBuffer.dropMessage(): nothing to drop. Requested [" << seqnolo << "; " - << seqnohi << "]. Buffer start " << m_iStartSeqNo << "."); + << seqnohi << "]. Buffer start " << m_iStartSeqNo.val() << "."); return 0; } const bool bKeepExisting = (actionOnExisting == KEEP_EXISTING); - int minDroppedOffset = -1; + COff minDroppedOffset (-1); int iDropCnt = 0; - const int start_off = max(0, offset_a); - const int start_pos = incPos(m_iStartPos, start_off); - const int end_off = min((int) m_szSize - 1, offset_b + 1); - const int end_pos = incPos(m_iStartPos, end_off); + const COff start_off = COff(max(0, offset_a)); + const CPos start_pos = incPos(m_iStartPos, start_off); + const COff end_off = COff(min((int) m_szSize - 1, offset_b + 1)); + const CPos end_pos = incPos(m_iStartPos, end_off); bool bDropByMsgNo = msgno > SRT_MSGNO_CONTROL; // Excluding both SRT_MSGNO_NONE (-1) and SRT_MSGNO_CONTROL (0). - for (int i = start_pos; i != end_pos; i = incPos(i)) + for (CPos i = start_pos; i != end_pos; i = incPos(i)) { // Check if the unit was already dropped earlier. if (m_entries[i].status == EntryState_Drop) @@ -306,7 +563,7 @@ int CRcvBuffer::dropMessage(int32_t seqnolo, int32_t seqnohi, int32_t msgno, Dro if (bKeepExisting && bnd == PB_SOLO) { bDropByMsgNo = false; // Solo packet, don't search for the rest of the message. - LOGC(rbuflog.Debug, + HLOGC(rbuflog.Debug, log << "CRcvBuffer::dropMessage(): Skipped dropping an existing SOLO packet %" << packetAt(i).getSeqNo() << "."); continue; @@ -332,6 +589,14 @@ int CRcvBuffer::dropMessage(int32_t seqnolo, int32_t seqnohi, int32_t msgno, Dro minDroppedOffset = offPos(m_iStartPos, i); } + if (end_off > m_iMaxPosOff) + { + HLOGC(rbuflog.Debug, log << "CRcvBuffer::dropMessage: requested to drop up to %" << seqnohi + << " with highest in the buffer %" << CSeqNo::incseq(m_iStartSeqNo.val(), end_off) + << " - updating the busy region"); + m_iMaxPosOff = end_off; + } + if (bDropByMsgNo) { // If msgno is specified, potentially not the whole message was dropped using seqno range. @@ -339,8 +604,8 @@ int CRcvBuffer::dropMessage(int32_t seqnolo, int32_t seqnohi, int32_t msgno, Dro // The sender should have the last packet of the message it is requesting to be dropped. // Therefore we don't search forward, but need to check earlier packets in the RCV buffer. // Try to drop by the message number in case the message starts earlier than @a seqnolo. - const int stop_pos = decPos(m_iStartPos); - for (int i = start_pos; i != stop_pos; i = decPos(i)) + const CPos stop_pos = decPos(m_iStartPos); + for (CPos i = start_pos; i != stop_pos; i = decPos(i)) { // Can't drop if message number is not known. if (!m_entries[i].pUnit) // also dropped earlier. @@ -349,7 +614,7 @@ int CRcvBuffer::dropMessage(int32_t seqnolo, int32_t seqnohi, int32_t msgno, Dro const PacketBoundary bnd = packetAt(i).getMsgBoundary(); const int32_t msgseq = packetAt(i).getMsgSeq(m_bPeerRexmitFlag); if (msgseq != msgno) - break; + break; if (bKeepExisting && bnd == PB_SOLO) { @@ -372,9 +637,26 @@ int CRcvBuffer::dropMessage(int32_t seqnolo, int32_t seqnohi, int32_t msgno, Dro IF_RCVBUF_DEBUG(scoped_log.ss << " iDropCnt " << iDropCnt); } + if (iDropCnt) + { + // We don't need the drop position, if we allow to drop messages by number + // and with that value we risk that drop was pointing to a dropped packet. + // Theoretically to make it consistent we need to shift the value to the + // next found packet, but we don't need this information if we use the message + // mode (because drop-by-packet is not supported in this mode) and this + // will burden the performance for nothing. + m_iDropOff = 0; + } + // Check if units before m_iFirstNonreadPos are dropped. const bool needUpdateNonreadPos = (minDroppedOffset != -1 && minDroppedOffset <= getRcvDataSize()); releaseNextFillerEntries(); + + updateGapInfo(); + + IF_HEAVY_LOGGING(debugShowState( + ("dropmsg off %" + Sprint(seqnolo) + " #" + Sprint(msgno)).c_str())); + if (needUpdateNonreadPos) { m_iFirstNonreadPos = m_iStartPos; @@ -382,24 +664,46 @@ int CRcvBuffer::dropMessage(int32_t seqnolo, int32_t seqnohi, int32_t msgno, Dro } if (!m_tsbpd.isEnabled() && m_bMessageAPI) { - if (!checkFirstReadableOutOfOrder()) - m_iFirstReadableOutOfOrder = -1; - updateFirstReadableOutOfOrder(); + if (!checkFirstReadableNonOrder()) + m_iFirstNonOrderMsgPos = CPos_TRAP; + updateFirstReadableNonOrder(); } + IF_HEAVY_LOGGING(debugShowState(("dropmsg off %" + Sprint(seqnolo)).c_str())); return iDropCnt; } -int CRcvBuffer::readMessage(char* data, size_t len, SRT_MSGCTRL* msgctrl) +bool CRcvBuffer::getContiguousEnd(int32_t& w_seq) const +{ + if (m_iEndOff == 0) + { + // Initial contiguous region empty (including empty buffer). + HLOGC(rbuflog.Debug, log << "CONTIG: empty, give up base=%" << m_iStartSeqNo.val()); + w_seq = m_iStartSeqNo.val(); + return m_iMaxPosOff > 0; + } + + w_seq = CSeqNo::incseq(m_iStartSeqNo.val(), m_iEndOff); + + HLOGC(rbuflog.Debug, log << "CONTIG: endD=" << m_iEndOff + << " maxD=" << m_iMaxPosOff + << " base=%" << m_iStartSeqNo.val() + << " end=%" << w_seq); + + return (m_iEndOff < m_iMaxPosOff); +} + +int CRcvBuffer::readMessage(char* data, size_t len, SRT_MSGCTRL* msgctrl, pair* pw_seqrange) { const bool canReadInOrder = hasReadableInorderPkts(); - if (!canReadInOrder && m_iFirstReadableOutOfOrder < 0) + if (!canReadInOrder && m_iFirstNonOrderMsgPos == CPos_TRAP) { LOGC(rbuflog.Warn, log << "CRcvBuffer.readMessage(): nothing to read. Ignored isRcvDataReady() result?"); return 0; } - const int readPos = canReadInOrder ? m_iStartPos : m_iFirstReadableOutOfOrder; + const CPos readPos = canReadInOrder ? m_iStartPos : m_iFirstNonOrderMsgPos; + const bool isReadingFromStart = (readPos == m_iStartPos); // Indicates if the m_iStartPos can be changed IF_RCVBUF_DEBUG(ScopedLog scoped_log); IF_RCVBUF_DEBUG(scoped_log.ss << "CRcvBuffer::readMessage. m_iStartSeqNo " << m_iStartSeqNo << " m_iStartPos " << m_iStartPos << " readPos " << readPos); @@ -408,8 +712,21 @@ int CRcvBuffer::readMessage(char* data, size_t len, SRT_MSGCTRL* msgctrl) char* dst = data; int pkts_read = 0; int bytes_extracted = 0; // The total number of bytes extracted from the buffer. - const bool updateStartPos = (readPos == m_iStartPos); // Indicates if the m_iStartPos can be changed - for (int i = readPos;; i = incPos(i)) + + int32_t out_seqlo = SRT_SEQNO_NONE; + int32_t out_seqhi = SRT_SEQNO_NONE; + + // As we have a green light for reading, it is already known that + // we're going to either remove or extract packets from the buffer, + // so drop position won't count anymore. + // + // The END position should be updated, that is: + // - remain just updated by the shifted start position if it's still ahead + // - recalculated from 0 again otherwise + m_iDropOff = 0; + int nskipped = 0; + + for (CPos i = readPos;; i = incPos(i)) { SRT_ASSERT(m_entries[i].pUnit); if (!m_entries[i].pUnit) @@ -422,6 +739,11 @@ int CRcvBuffer::readMessage(char* data, size_t len, SRT_MSGCTRL* msgctrl) const size_t pktsize = packet.getLength(); const int32_t pktseqno = packet.getSeqNo(); + if (out_seqlo == SRT_SEQNO_NONE) + out_seqlo = pktseqno; + + out_seqhi = pktseqno; + // unitsize can be zero const size_t unitsize = std::min(remain, pktsize); memcpy(dst, packet.m_pcData, unitsize); @@ -434,8 +756,8 @@ int CRcvBuffer::readMessage(char* data, size_t len, SRT_MSGCTRL* msgctrl) if (m_tsbpd.isEnabled()) updateTsbPdTimeBase(packet.getMsgTimeStamp()); - if (m_numOutOfOrderPackets && !packet.getMsgOrderFlag()) - --m_numOutOfOrderPackets; + if (m_numNonOrderPackets && !packet.getMsgOrderFlag()) + --m_numNonOrderPackets; const bool pbLast = packet.getMsgBoundary() & PB_LAST; if (msgctrl && (packet.getMsgBoundary() & PB_FIRST)) @@ -450,12 +772,11 @@ int CRcvBuffer::readMessage(char* data, size_t len, SRT_MSGCTRL* msgctrl) msgctrl->pktseq = pktseqno; releaseUnitInPos(i); - if (updateStartPos) + if (isReadingFromStart) { m_iStartPos = incPos(i); - --m_iMaxPosOff; - SRT_ASSERT(m_iMaxPosOff >= 0); - m_iStartSeqNo = CSeqNo::incseq(pktseqno); + m_iStartSeqNo = CSeqNo(pktseqno) + 1; + ++nskipped; } else { @@ -465,26 +786,44 @@ int CRcvBuffer::readMessage(char* data, size_t len, SRT_MSGCTRL* msgctrl) if (pbLast) { - if (readPos == m_iFirstReadableOutOfOrder) - m_iFirstReadableOutOfOrder = -1; + if (readPos == m_iFirstNonOrderMsgPos) + { + m_iFirstNonOrderMsgPos = CPos_TRAP; + m_iDropOff = 0; // always set to 0 in this mode. + } break; } } + if (nskipped) + { + // This means that m_iStartPos HAS BEEN shifted by that many packets. + // Update offset variables + m_iMaxPosOff -= nskipped; + + // This is checked as the PB_LAST flag marked packet should still + // be extracted in the existing period. + SRT_ASSERT(m_iMaxPosOff >= 0); + + m_iEndOff = decOff(m_iEndOff, len); + } countBytes(-pkts_read, -bytes_extracted); releaseNextFillerEntries(); - if (!isInRange(m_iStartPos, m_iMaxPosOff, m_szSize, m_iFirstNonreadPos)) + // This will update the end position + updateGapInfo(); + + if (!isInUsedRange( m_iFirstNonreadPos)) { m_iFirstNonreadPos = m_iStartPos; //updateNonreadPos(); } if (!m_tsbpd.isEnabled()) - // We need updateFirstReadableOutOfOrder() here even if we are reading inorder, + // We need updateFirstReadableNonOrder() here even if we are reading inorder, // incase readable inorder packets are all read out. - updateFirstReadableOutOfOrder(); + updateFirstReadableNonOrder(); const int bytes_read = int(dst - data); if (bytes_read < bytes_extracted) @@ -494,6 +833,10 @@ int CRcvBuffer::readMessage(char* data, size_t len, SRT_MSGCTRL* msgctrl) IF_RCVBUF_DEBUG(scoped_log.ss << " pldi64 " << *reinterpret_cast(data)); + if (pw_seqrange) + *pw_seqrange = make_pair(out_seqlo, out_seqhi); + + IF_HEAVY_LOGGING(debugShowState("readmsg")); return bytes_read; } @@ -527,8 +870,8 @@ namespace { int CRcvBuffer::readBufferTo(int len, copy_to_dst_f funcCopyToDst, void* arg) { - int p = m_iStartPos; - const int end_pos = m_iFirstNonreadPos; + CPos p = m_iStartPos; + const CPos end_pos = m_iFirstNonreadPos; const bool bTsbPdEnabled = m_tsbpd.isEnabled(); const steady_clock::time_point now = (bTsbPdEnabled ? steady_clock::now() : steady_clock::time_point()); @@ -538,7 +881,7 @@ int CRcvBuffer::readBufferTo(int len, copy_to_dst_f funcCopyToDst, void* arg) { if (!m_entries[p].pUnit) { - p = incPos(p); + // REDUNDANT? p = incPos(p); // Return abandons the loop anyway. LOGC(rbuflog.Error, log << "readBufferTo: IPE: NULL unit found in file transmission"); return -1; } @@ -554,7 +897,7 @@ int CRcvBuffer::readBufferTo(int len, copy_to_dst_f funcCopyToDst, void* arg) << " PKT TS=" << FormatTime(tsPlay)); if ((tsPlay > now)) - break; /* too early for this unit, return whatever was copied */ + break; // too early for this unit, return whatever was copied } const int pktlen = (int)pkt.getLength(); @@ -573,7 +916,10 @@ int CRcvBuffer::readBufferTo(int len, copy_to_dst_f funcCopyToDst, void* arg) m_iStartPos = p; --m_iMaxPosOff; SRT_ASSERT(m_iMaxPosOff >= 0); - m_iStartSeqNo = CSeqNo::incseq(m_iStartSeqNo); + m_iEndOff = decOff(m_iEndOff, 1); + m_iDropOff = decOff(m_iDropOff, 1); + + ++m_iStartSeqNo; } else m_iNotch += rs; @@ -588,16 +934,18 @@ int CRcvBuffer::readBufferTo(int len, copy_to_dst_f funcCopyToDst, void* arg) // Update positions // Set nonread position to the starting position before updating, // because start position was increased, and preceding packets are invalid. - if (!isInRange(m_iStartPos, m_iMaxPosOff, m_szSize, m_iFirstNonreadPos)) + if (!isInUsedRange( m_iFirstNonreadPos)) { m_iFirstNonreadPos = m_iStartPos; } if (iBytesRead == 0) { - LOGC(rbuflog.Error, log << "readBufferTo: 0 bytes read. m_iStartPos=" << m_iStartPos << ", m_iFirstNonreadPos=" << m_iFirstNonreadPos); + LOGC(rbuflog.Error, log << "readBufferTo: 0 bytes read. m_iStartPos=" << m_iStartPos + << ", m_iFirstNonreadPos=" << m_iFirstNonreadPos); } + IF_HEAVY_LOGGING(debugShowState("readbuf")); return iBytesRead; } @@ -613,15 +961,12 @@ int CRcvBuffer::readBufferToFile(fstream& ofs, int len) bool CRcvBuffer::hasAvailablePackets() const { - return hasReadableInorderPkts() || (m_numOutOfOrderPackets > 0 && m_iFirstReadableOutOfOrder != -1); + return hasReadableInorderPkts() || (m_numNonOrderPackets > 0 && m_iFirstNonOrderMsgPos != CPos_TRAP); } int CRcvBuffer::getRcvDataSize() const { - if (m_iFirstNonreadPos >= m_iStartPos) - return m_iFirstNonreadPos - m_iStartPos; - - return int(m_szSize + m_iFirstNonreadPos - m_iStartPos); + return offPos(m_iStartPos, m_iFirstNonreadPos); } int CRcvBuffer::getTimespan_ms() const @@ -632,7 +977,7 @@ int CRcvBuffer::getTimespan_ms() const if (m_iMaxPosOff == 0) return 0; - int lastpos = incPos(m_iStartPos, m_iMaxPosOff - 1); + CPos lastpos = incPos(m_iStartPos, m_iMaxPosOff - 1); // Normally the last position should always be non empty // if TSBPD is enabled (reading out of order is not allowed). // However if decryption of the last packet fails, it may be dropped @@ -642,11 +987,11 @@ int CRcvBuffer::getTimespan_ms() const { lastpos = decPos(lastpos); } - + if (m_entries[lastpos].pUnit == NULL) return 0; - int startpos = m_iStartPos; + CPos startpos = m_iStartPos; while (m_entries[startpos].pUnit == NULL && startpos != lastpos) { startpos = incPos(startpos); @@ -676,33 +1021,42 @@ int CRcvBuffer::getRcvDataSize(int& bytes, int& timespan) const CRcvBuffer::PacketInfo CRcvBuffer::getFirstValidPacketInfo() const { - const int end_pos = incPos(m_iStartPos, m_iMaxPosOff); - for (int i = m_iStartPos; i != end_pos; i = incPos(i)) - { - // TODO: Maybe check status? - if (!m_entries[i].pUnit) - continue; + // Default: no packet available. + PacketInfo pi = { SRT_SEQNO_NONE, false, time_point() }; - const CPacket& packet = packetAt(i); - const PacketInfo info = { packet.getSeqNo(), i != m_iStartPos, getPktTsbPdTime(packet.getMsgTimeStamp()) }; - return info; + const CPacket* pkt = NULL; + + // Very first packet available with no gap. + if (m_entries[m_iStartPos].status == EntryState_Avail) + { + SRT_ASSERT(m_entries[m_iStartPos].pUnit); + pkt = &packetAt(m_iStartPos); + } + // If not, get the information from the drop + else if (m_iDropOff) + { + CPos drop_pos = incPos(m_iStartPos, m_iDropOff); + SRT_ASSERT(m_entries[drop_pos].pUnit); + pkt = &packetAt(drop_pos); + pi.seq_gap = true; // Available, but after a drop. + } + else + { + // If none of them point to a valid packet, + // there is no packet available; + return pi; } - const PacketInfo info = { -1, false, time_point() }; - return info; + pi.seqno = pkt->getSeqNo(); + pi.tsbpd_time = getPktTsbPdTime(pkt->getMsgTimeStamp()); + return pi; } std::pair CRcvBuffer::getAvailablePacketsRange() const { - const int seqno_last = CSeqNo::incseq(m_iStartSeqNo, (int) countReadable()); - return std::pair(m_iStartSeqNo, seqno_last); -} - -size_t CRcvBuffer::countReadable() const -{ - if (m_iFirstNonreadPos >= m_iStartPos) - return m_iFirstNonreadPos - m_iStartPos; - return m_szSize + m_iFirstNonreadPos - m_iStartPos; + const COff nonread_off = offPos(m_iStartPos, m_iFirstNonreadPos); + const CSeqNo seqno_last = m_iStartSeqNo + nonread_off; + return std::pair(m_iStartSeqNo.val(), seqno_last.val()); } bool CRcvBuffer::isRcvDataReady(time_point time_now) const @@ -713,8 +1067,8 @@ bool CRcvBuffer::isRcvDataReady(time_point time_now) const if (haveInorderPackets) return true; - SRT_ASSERT((!m_bMessageAPI && m_numOutOfOrderPackets == 0) || m_bMessageAPI); - return (m_numOutOfOrderPackets > 0 && m_iFirstReadableOutOfOrder != -1); + SRT_ASSERT((!m_bMessageAPI && m_numNonOrderPackets == 0) || m_bMessageAPI); + return (m_numNonOrderPackets > 0 && m_iFirstNonOrderMsgPos != CPos_TRAP); } if (!haveInorderPackets) @@ -738,11 +1092,11 @@ CRcvBuffer::PacketInfo CRcvBuffer::getFirstReadablePacketInfo(time_point time_no const PacketInfo info = {packet.getSeqNo(), false, time_point()}; return info; } - SRT_ASSERT((!m_bMessageAPI && m_numOutOfOrderPackets == 0) || m_bMessageAPI); - if (m_iFirstReadableOutOfOrder >= 0) + SRT_ASSERT((!m_bMessageAPI && m_numNonOrderPackets == 0) || m_bMessageAPI); + if (m_iFirstNonOrderMsgPos != CPos_TRAP) { - SRT_ASSERT(m_numOutOfOrderPackets > 0); - const CPacket& packet = packetAt(m_iFirstReadableOutOfOrder); + SRT_ASSERT(m_numNonOrderPackets > 0); + const CPacket& packet = packetAt(m_iFirstNonOrderMsgPos); const PacketInfo info = {packet.getSeqNo(), true, time_point()}; return info; } @@ -763,7 +1117,7 @@ CRcvBuffer::PacketInfo CRcvBuffer::getFirstReadablePacketInfo(time_point time_no int32_t CRcvBuffer::getFirstNonreadSeqNo() const { const int offset = offPos(m_iStartPos, m_iFirstNonreadPos); - return CSeqNo::incseq(m_iStartSeqNo, offset); + return m_iStartSeqNo.inc(offset).val(); } void CRcvBuffer::countBytes(int pkts, int bytes) @@ -780,7 +1134,7 @@ void CRcvBuffer::countBytes(int pkts, int bytes) } } -void CRcvBuffer::releaseUnitInPos(int pos) +void CRcvBuffer::releaseUnitInPos(CPos pos) { CUnit* tmp = m_entries[pos].pUnit; m_entries[pos] = Entry(); // pUnit = NULL; status = Empty @@ -788,7 +1142,7 @@ void CRcvBuffer::releaseUnitInPos(int pos) m_pUnitQueue->makeUnitFree(tmp); } -bool CRcvBuffer::dropUnitInPos(int pos) +bool CRcvBuffer::dropUnitInPos(CPos pos) { if (!m_entries[pos].pUnit) return false; @@ -798,27 +1152,49 @@ bool CRcvBuffer::dropUnitInPos(int pos) } else if (m_bMessageAPI && !packetAt(pos).getMsgOrderFlag()) { - --m_numOutOfOrderPackets; - if (pos == m_iFirstReadableOutOfOrder) - m_iFirstReadableOutOfOrder = -1; + --m_numNonOrderPackets; + if (pos == m_iFirstNonOrderMsgPos) + m_iFirstNonOrderMsgPos = CPos_TRAP; } releaseUnitInPos(pos); return true; } -void CRcvBuffer::releaseNextFillerEntries() +int CRcvBuffer::releaseNextFillerEntries() { - int pos = m_iStartPos; + CPos pos = m_iStartPos; + int nskipped = 0; + while (m_entries[pos].status == EntryState_Read || m_entries[pos].status == EntryState_Drop) { - m_iStartSeqNo = CSeqNo::incseq(m_iStartSeqNo); + if (nskipped == m_iMaxPosOff) + { + // This should never happen. All the previously read- or drop-marked + // packets should be contained in the range up to m_iMaxPosOff. Do not + // let the buffer ride any further and report the problem. Still stay there. + LOGC(rbuflog.Error, log << "releaseNextFillerEntries: IPE: Read/Drop status outside the busy range!"); + break; + } + + ++m_iStartSeqNo; releaseUnitInPos(pos); pos = incPos(pos); m_iStartPos = pos; - --m_iMaxPosOff; - if (m_iMaxPosOff < 0) - m_iMaxPosOff = 0; + ++nskipped; + } + + if (!nskipped) + { + return nskipped; } + + m_iMaxPosOff -= nskipped; + m_iEndOff = decOff(m_iEndOff, nskipped); + + // Drop off will be updated after that call, if needed. + m_iDropOff = 0; + + return nskipped; } // TODO: Is this function complete? There are some comments left inside. @@ -827,21 +1203,25 @@ void CRcvBuffer::updateNonreadPos() if (m_iMaxPosOff == 0) return; - const int end_pos = incPos(m_iStartPos, m_iMaxPosOff); // The empty position right after the last valid entry. + const CPos end_pos = incPos(m_iStartPos, m_iMaxPosOff); // The empty position right after the last valid entry. - int pos = m_iFirstNonreadPos; + CPos pos = m_iFirstNonreadPos; while (m_entries[pos].pUnit && m_entries[pos].status == EntryState_Avail) { if (m_bMessageAPI && (packetAt(pos).getMsgBoundary() & PB_FIRST) == 0) break; - for (int i = pos; i != end_pos; i = incPos(i)) + for (CPos i = pos; i != end_pos; i = incPos(i)) { if (!m_entries[i].pUnit || m_entries[pos].status != EntryState_Avail) { break; } + // m_iFirstNonreadPos is moved to the first position BEHIND + // the PB_LAST packet of the message. There's no guaratnee that + // the cell at this position isn't empty. + // Check PB_LAST only in message mode. if (!m_bMessageAPI || packetAt(i).getMsgBoundary() & PB_LAST) { @@ -857,9 +1237,9 @@ void CRcvBuffer::updateNonreadPos() } } -int CRcvBuffer::findLastMessagePkt() +CPos CRcvBuffer::findLastMessagePkt() { - for (int i = m_iStartPos; i != m_iFirstNonreadPos; i = incPos(i)) + for (CPos i = m_iStartPos; i != m_iFirstNonreadPos; i = incPos(i)) { SRT_ASSERT(m_entries[i].pUnit); @@ -869,12 +1249,12 @@ int CRcvBuffer::findLastMessagePkt() } } - return -1; + return CPos_TRAP; } -void CRcvBuffer::onInsertNotInOrderPacket(int insertPos) +void CRcvBuffer::onInsertNonOrderPacket(CPos insertPos) { - if (m_numOutOfOrderPackets == 0) + if (m_numNonOrderPackets == 0) return; // If the following condition is true, there is already a packet, @@ -883,7 +1263,7 @@ void CRcvBuffer::onInsertNotInOrderPacket(int insertPos) // // There might happen that the packet being added precedes the previously found one. // However, it is allowed to re bead out of order, so no need to update the position. - if (m_iFirstReadableOutOfOrder >= 0) + if (m_iFirstNonOrderMsgPos != CPos_TRAP) return; // Just a sanity check. This function is called when a new packet is added. @@ -896,34 +1276,34 @@ void CRcvBuffer::onInsertNotInOrderPacket(int insertPos) //if ((boundary & PB_FIRST) && (boundary & PB_LAST)) //{ // // This packet can be read out of order - // m_iFirstReadableOutOfOrder = insertPos; + // m_iFirstNonOrderMsgPos = insertPos; // return; //} const int msgNo = pkt.getMsgSeq(m_bPeerRexmitFlag); // First check last packet, because it is expected to be received last. - const bool hasLast = (boundary & PB_LAST) || (-1 < scanNotInOrderMessageRight(insertPos, msgNo)); + const bool hasLast = (boundary & PB_LAST) || (scanNonOrderMessageRight(insertPos, msgNo) != CPos_TRAP); if (!hasLast) return; - const int firstPktPos = (boundary & PB_FIRST) + const CPos firstPktPos = (boundary & PB_FIRST) ? insertPos - : scanNotInOrderMessageLeft(insertPos, msgNo); - if (firstPktPos < 0) + : scanNonOrderMessageLeft(insertPos, msgNo); + if (firstPktPos == CPos_TRAP) return; - m_iFirstReadableOutOfOrder = firstPktPos; + m_iFirstNonOrderMsgPos = firstPktPos; return; } -bool CRcvBuffer::checkFirstReadableOutOfOrder() +bool CRcvBuffer::checkFirstReadableNonOrder() { - if (m_numOutOfOrderPackets <= 0 || m_iFirstReadableOutOfOrder < 0 || m_iMaxPosOff == 0) + if (m_numNonOrderPackets <= 0 || m_iFirstNonOrderMsgPos == CPos_TRAP || m_iMaxPosOff == COff(0)) return false; - const int endPos = incPos(m_iStartPos, m_iMaxPosOff); + const CPos endPos = incPos(m_iStartPos, m_iMaxPosOff); int msgno = -1; - for (int pos = m_iFirstReadableOutOfOrder; pos != endPos; pos = incPos(pos)) + for (CPos pos = m_iFirstNonOrderMsgPos; pos != endPos; pos = incPos(pos)) // ++pos) { if (!m_entries[pos].pUnit) return false; @@ -944,30 +1324,31 @@ bool CRcvBuffer::checkFirstReadableOutOfOrder() return false; } -void CRcvBuffer::updateFirstReadableOutOfOrder() +void CRcvBuffer::updateFirstReadableNonOrder() { - if (hasReadableInorderPkts() || m_numOutOfOrderPackets <= 0 || m_iFirstReadableOutOfOrder >= 0) + if (hasReadableInorderPkts() || m_numNonOrderPackets <= 0 || m_iFirstNonOrderMsgPos != CPos_TRAP) return; if (m_iMaxPosOff == 0) return; // TODO: unused variable outOfOrderPktsRemain? - int outOfOrderPktsRemain = (int) m_numOutOfOrderPackets; + int outOfOrderPktsRemain = (int) m_numNonOrderPackets; // Search further packets to the right. // First check if there are packets to the right. - const int lastPos = (m_iStartPos + m_iMaxPosOff - 1) % m_szSize; + const CPos lastPos = incPos(m_iStartPos, m_iMaxPosOff - 1); - int posFirst = -1; - int posLast = -1; + CPos posFirst = CPos_TRAP; + CPos posLast = CPos_TRAP; int msgNo = -1; - for (int pos = m_iStartPos; outOfOrderPktsRemain; pos = incPos(pos)) + for (CPos pos = m_iStartPos; outOfOrderPktsRemain; pos = incPos(pos)) { if (!m_entries[pos].pUnit) { - posFirst = posLast = msgNo = -1; + posFirst = posLast = CPos_TRAP; + msgNo = -1; continue; } @@ -975,7 +1356,8 @@ void CRcvBuffer::updateFirstReadableOutOfOrder() if (pkt.getMsgOrderFlag()) // Skip in order packet { - posFirst = posLast = msgNo = -1; + posFirst = posLast = CPos_TRAP; + msgNo = -1; continue; } @@ -990,13 +1372,14 @@ void CRcvBuffer::updateFirstReadableOutOfOrder() if (pkt.getMsgSeq(m_bPeerRexmitFlag) != msgNo) { - posFirst = posLast = msgNo = -1; + posFirst = posLast = CPos_TRAP; + msgNo = -1; continue; } if (boundary & PB_LAST) { - m_iFirstReadableOutOfOrder = posFirst; + m_iFirstNonOrderMsgPos = posFirst; return; } @@ -1007,15 +1390,15 @@ void CRcvBuffer::updateFirstReadableOutOfOrder() return; } -int CRcvBuffer::scanNotInOrderMessageRight(const int startPos, int msgNo) const +CPos CRcvBuffer::scanNonOrderMessageRight(const CPos startPos, int msgNo) const { // Search further packets to the right. // First check if there are packets to the right. - const int lastPos = (m_iStartPos + m_iMaxPosOff - 1) % m_szSize; + const CPos lastPos = incPos(m_iStartPos, m_iMaxPosOff - 1); if (startPos == lastPos) - return -1; + return CPos_TRAP; - int pos = startPos; + CPos pos = startPos; do { pos = incPos(pos); @@ -1027,7 +1410,7 @@ int CRcvBuffer::scanNotInOrderMessageRight(const int startPos, int msgNo) const if (pkt.getMsgSeq(m_bPeerRexmitFlag) != msgNo) { LOGC(rbuflog.Error, log << "Missing PB_LAST packet for msgNo " << msgNo); - return -1; + return CPos_TRAP; } const PacketBoundary boundary = pkt.getMsgBoundary(); @@ -1035,30 +1418,30 @@ int CRcvBuffer::scanNotInOrderMessageRight(const int startPos, int msgNo) const return pos; } while (pos != lastPos); - return -1; + return CPos_TRAP; } -int CRcvBuffer::scanNotInOrderMessageLeft(const int startPos, int msgNo) const +CPos CRcvBuffer::scanNonOrderMessageLeft(const CPos startPos, int msgNo) const { // Search preceding packets to the left. // First check if there are packets to the left. if (startPos == m_iStartPos) - return -1; + return CPos_TRAP; - int pos = startPos; + CPos pos = startPos; do { pos = decPos(pos); if (!m_entries[pos].pUnit) - return -1; + return CPos_TRAP; const CPacket& pkt = packetAt(pos); if (pkt.getMsgSeq(m_bPeerRexmitFlag) != msgNo) { LOGC(rbuflog.Error, log << "Missing PB_FIRST packet for msgNo " << msgNo); - return -1; + return CPos_TRAP; } const PacketBoundary boundary = pkt.getMsgBoundary(); @@ -1066,7 +1449,7 @@ int CRcvBuffer::scanNotInOrderMessageLeft(const int startPos, int msgNo) const return pos; } while (pos != m_iStartPos); - return -1; + return CPos_TRAP; } bool CRcvBuffer::addRcvTsbPdDriftSample(uint32_t usTimestamp, const time_point& tsPktArrival, int usRTTSample) @@ -1104,11 +1487,11 @@ void CRcvBuffer::updateTsbPdTimeBase(uint32_t usPktTimestamp) m_tsbpd.updateTsbPdTimeBase(usPktTimestamp); } -string CRcvBuffer::strFullnessState(int iFirstUnackSeqNo, const time_point& tsNow) const +string CRcvBuffer::strFullnessState(int32_t iFirstUnackSeqNo, const time_point& tsNow) const { stringstream ss; - ss << "iFirstUnackSeqNo=" << iFirstUnackSeqNo << " m_iStartSeqNo=" << m_iStartSeqNo + ss << "iFirstUnackSeqNo=" << iFirstUnackSeqNo << " m_iStartSeqNo=" << m_iStartSeqNo.val() << " m_iStartPos=" << m_iStartPos << " m_iMaxPosOff=" << m_iMaxPosOff << ". "; ss << "Space avail " << getAvailSize(iFirstUnackSeqNo) << "/" << m_szSize << " pkts. "; @@ -1120,7 +1503,7 @@ string CRcvBuffer::strFullnessState(int iFirstUnackSeqNo, const time_point& tsNo if (!is_zero(nextValidPkt.tsbpd_time)) { ss << count_milliseconds(nextValidPkt.tsbpd_time - tsNow) << "ms"; - const int iLastPos = incPos(m_iStartPos, m_iMaxPosOff - 1); + const CPos iLastPos = incPos(m_iStartPos, m_iMaxPosOff - 1); if (m_entries[iLastPos].pUnit) { ss << ", timespan "; @@ -1169,4 +1552,90 @@ void CRcvBuffer::updRcvAvgDataSize(const steady_clock::time_point& now) m_mavg.update(now, pkts, bytes, timespan_ms); } +int32_t CRcvBuffer::getFirstLossSeq(int32_t fromseq, int32_t* pw_end) +{ + // This means that there are no lost seqs at all, no matter + // from which position they would have to be checked. + if (m_iEndOff == m_iMaxPosOff) + return SRT_SEQNO_NONE; + + COff offset = COff(CSeqNo(fromseq) - m_iStartSeqNo); + + // Check if it's still inside the buffer. + // Skip the region from 0 to m_iEndOff because this + // region is by definition contiguous and contains no loss. + if (offset < m_iEndOff || offset >= m_iMaxPosOff) + { + HLOGC(rbuflog.Debug, log << "getFirstLossSeq: offset=" << offset << " for %" << fromseq + << " (with max=" << m_iMaxPosOff << ") - NO LOSS FOUND"); + return SRT_SEQNO_NONE; + } + + // Check if this offset is equal to m_iEndOff. If it is, + // then you have the loss sequence exactly the one that + // was passed. Skip now, pw_end was not requested. + if (offset == m_iEndOff) + { + if (pw_end) + { + // If the offset is exactly at m_iEndOff, then + // m_iDropOff will mark the end of gap. + if (m_iDropOff) + *pw_end = CSeqNo::incseq(m_iStartSeqNo.val(), m_iDropOff); + else + { + LOGC(rbuflog.Error, log << "getFirstLossSeq: IPE: drop-off=0 while seq-off == end-off != max-off"); + *pw_end = fromseq; + } + } + return fromseq; + } + + int ret_seq = SRT_SEQNO_NONE; + int loss_off = 0; + // Now find the first empty position since here, + // up to m_iMaxPosOff. Checking against m_iDropOff + // makes no sense because if it is not 0, you'll + // find it earlier by checking packet presence. + for (int off = offset; off < m_iMaxPosOff; ++off) + { + CPos ipos ((m_iStartPos + off) % m_szSize); + if (m_entries[ipos].status == EntryState_Empty) + { + ret_seq = CSeqNo::incseq(m_iStartSeqNo.val(), off); + loss_off = off; + break; + } + } + + if (ret_seq == SRT_SEQNO_NONE) + { + // This is theoretically possible if we search from behind m_iEndOff, + // after m_iDropOff. This simply means that we are trying to search + // behind the last gap in the buffer. + return ret_seq; + } + + // We get this position, so search for the end of gap + if (pw_end) + { + for (int off = loss_off+1; off < m_iMaxPosOff; ++off) + { + CPos ipos ((m_iStartPos + off) % m_szSize); + if (m_entries[ipos].status != EntryState_Empty) + { + *pw_end = CSeqNo::incseq(m_iStartSeqNo.val(), off); + return ret_seq; + } + } + + // Should not be possible to not find an existing packet + // following the gap, otherwise there would be no gap. + LOGC(rbuflog.Error, log << "getFirstLossSeq: IPE: gap since %" << ret_seq << " not covered by existing packet"); + *pw_end = ret_seq; + } + return ret_seq; +} + + } // namespace srt diff --git a/srtcore/buffer_rcv.h b/srtcore/buffer_rcv.h index c5fca428b..80fe1d1ec 100644 --- a/srtcore/buffer_rcv.h +++ b/srtcore/buffer_rcv.h @@ -15,32 +15,390 @@ #include "common.h" #include "queue.h" #include "tsbpd_time.h" +#include "utilities.h" + +#define USE_WRAPPERS 0 +#define USE_OPERATORS 0 namespace srt { -/* - * Circular receiver buffer. - * - * |<------------------- m_szSize ---------------------------->| - * | |<------------ m_iMaxPosOff ----------->| | - * | | | | - * +---+---+---+---+---+---+---+---+---+---+---+---+---+ +---+ - * | 0 | 0 | 1 | 1 | 1 | 0 | 1 | 1 | 1 | 1 | 0 | 1 | 0 |...| 0 | m_pUnit[] - * +---+---+---+---+---+---+---+---+---+---+---+---+---+ +---+ - * | | - * | \__last pkt received - * | - * \___ m_iStartPos: first message to read - * - * m_pUnit[i]->status_: 0: free, 1: good, 2: read, 3: dropped (can be combined with read?) - * - * thread safety: - * start_pos_: CUDT::m_RecvLock - * first_unack_pos_: CUDT::m_AckLock - * max_pos_inc_: none? (modified on add and ack - * first_nonread_pos_: - */ +// DEVELOPMENT TOOL - TO BE MOVED ELSEWHERE (like common.h) + +// NOTE: This below series of definitions for CPos and COff +// are here for development support only, but they are not in +// use in the release code - there CPos and COff are aliases to int. +#if USE_WRAPPERS +struct CPos +{ + int value; +#if USE_OPERATORS + const size_t* psize; + int isize() const {return *psize;} +#endif + +#if USE_OPERATORS + explicit CPos(const size_t* ps SRT_ATR_UNUSED, int val) + : value(val) + , psize(ps) + {} + +#else + explicit CPos(int val): value(val) {} +#endif + + int val() const { return value; } + explicit operator int() const {return value;} + + CPos(const CPos& src): value(src.value) +#if USE_OPERATORS + , psize(src.psize) +#endif + {} + CPos& operator=(const CPos& src) + { +#if USE_OPERATORS + psize = src.psize; +#endif + value = src.value; + return *this; + } + +#if USE_OPERATORS + int cmp(CPos other, CPos start) const + { + int pos2 = value; + int pos1 = other.value; + + const int off1 = pos1 >= start.value ? pos1 - start.value : pos1 + start.isize() - start.value; + const int off2 = pos2 >= start.value ? pos2 - start.value : pos2 + start.isize() - start.value; + + return off2 - off1; + } + + CPos& operator--() + { + if (value == 0) + value = isize() - 1; + else + --value; + return *this; + } + + CPos& operator++() + { + ++value; + if (value == isize()) + value = 0; + return *this; + } +#endif + + bool operator == (CPos other) const { return value == other.value; } + bool operator != (CPos other) const { return value != other.value; } +}; + +struct COff +{ + int value; + explicit COff(int v): value(v) {} + COff& operator=(int v) { value = v; return *this; } + + int val() const { return value; } + explicit operator int() const {return value;} + + COff& operator--() { --value; return *this; } + COff& operator++() { ++value; return *this; } + + COff operator--(int) { int v = value; --value; return COff(v); } + COff operator++(int) { int v = value; ++value; return COff(v); } + + COff operator+(COff other) const { return COff(value + other.value); } + COff operator-(COff other) const { return COff(value - other.value); } + COff& operator+=(COff other) { value += other.value; return *this;} + COff& operator-=(COff other) { value -= other.value; return *this;} + + bool operator == (COff other) const { return value == other.value; } + bool operator != (COff other) const { return value != other.value; } + bool operator < (COff other) const { return value < other.value; } + bool operator > (COff other) const { return value > other.value; } + bool operator <= (COff other) const { return value <= other.value; } + bool operator >= (COff other) const { return value >= other.value; } + + // Exceptionally allow modifications of COff by a bare integer + COff operator+(int other) const { return COff(value + other); } + COff operator-(int other) const { return COff(value - other); } + COff& operator+=(int other) { value += other; return *this;} + COff& operator-=(int other) { value -= other; return *this;} + + bool operator == (int other) const { return value == other; } + bool operator != (int other) const { return value != other; } + bool operator < (int other) const { return value < other; } + bool operator > (int other) const { return value > other; } + bool operator <= (int other) const { return value <= other; } + bool operator >= (int other) const { return value >= other; } + + friend bool operator == (int value, COff that) { return value == that.value; } + friend bool operator != (int value, COff that) { return value != that.value; } + friend bool operator < (int value, COff that) { return value < that.value; } + friend bool operator > (int value, COff that) { return value > that.value; } + friend bool operator <= (int value, COff that) { return value <= that.value; } + friend bool operator >= (int value, COff that) { return value >= that.value; } + + operator bool() const { return value != 0; } +}; + +#if USE_OPERATORS + +inline CPos operator+(const CPos& pos, COff off) +{ + int val = pos.value + off.value; + while (val >= pos.isize()) + val -= pos.isize(); + return CPos(pos.psize, val); +} + +inline CPos operator-(const CPos& pos, COff off) +{ + int val = pos.value - off.value; + while (val < 0) + val += pos.isize(); + return CPos(pos.psize, val); +} + +// Should verify that CPos use the same size! +inline COff operator-(CPos later, CPos earlier) +{ + if (later.value < earlier.value) + return COff(later.value + later.isize() - earlier.value); + + return COff(later.value - earlier.value); +} + +inline CSeqNo operator+(CSeqNo seq, COff off) +{ + int32_t val = CSeqNo::incseq(seq.val(), off.val()); + return CSeqNo(val); +} + +inline CSeqNo operator-(CSeqNo seq, COff off) +{ + int32_t val = CSeqNo::decseq(seq.val(), off.val()); + return CSeqNo(val); +} + + +#endif +const CPos CPos_TRAP (-1); + +#else +typedef int CPos; +typedef int COff; +const int CPos_TRAP = -1; +#endif + +// +// Circular receiver buffer. +// +// ICR = Initial Contiguous Region: all cells here contain valid packets +// SCRAP REGION: Region with possibly filled or empty cells +// NOTE: in scrap region, the first cell is empty and the last one filled. +// SPARE REGION: Region without packets +// +// | BUSY REGION | +// | | | | +// | ICR | SCRAP REGION | SPARE REGION...-> +// ......->| | | | +// | /FIRST-GAP | | +// |<------------------- m_szSize ---------------------------->| +// | |<------------ m_iMaxPosOff ----------->| | +// | | | | | | +// +---+---+---+---+---+---+---+---+---+---+---+---+---+ +---+ +// | 0 | 0 | 1 | 1 | 1 | 0 | 1 | 1 | 1 | 1 | 0 | 1 | 0 |...| 0 | m_pUnit[] +// +---+---+---+---+---+---+---+---+---+---+---+---+---+ +---+ +// | | | | +// | | | \__last pkt received +// |<------------->| m_iDropOff | +// | | | +// |<--------->| m_iEndOff | +// | +// \___ m_iStartPos: first packet position in the buffer +// +// m_pUnit[i]->status: +// EntryState_Empty: No packet was ever received here +// EntryState_Avail: The packet is ready for reading +// EntryState_Read: The packet is non-order-read +// EntryState_Drop: The packet was requested to drop +// +// thread safety: +// m_iStartPos: CUDT::m_RecvLock +// first_unack_pos_: CUDT::m_AckLock +// m_iMaxPosOff: none? (modified on add and ack +// m_iFirstNonreadPos: +// +// +// m_iStartPos: the first packet that should be read (might be empty) +// m_iEndOff: shift to the end of contiguous range. This points always to an empty cell. +// m_iDropPos: shift a packet available for retrieval after a drop. If 0, no such packet. +// +// Operational rules: +// +// Initially: +// m_iStartPos = 0 +// m_iEndOff = 0 +// m_iDropOff = 0 +// +// When a packet has arrived, then depending on where it landed: +// +// 1. Position: next to the last read one and newest +// +// m_iStartPos unchanged. +// m_iEndOff shifted by 1 +// m_iDropOff = 0 +// +// 2. Position: after a loss, newest. +// +// m_iStartPos unchanged. +// m_iEndOff unchanged. +// m_iDropOff: +// - set to this packet, if m_iDropOff == 0 or m_iDropOff is past this packet +// - otherwise unchanged +// +// 3. Position: after a loss, but belated (retransmitted) -- not equal to m_iEndPos +// +// m_iStartPos unchanged. +// m_iEndPos unchanged. +// m_iDropPos: +// - if m_iDropPos == m_iEndPos, set to this +// - if m_iDropPos %> this sequence, set to this +// - otherwise unchanged +// +// 4. Position: after a loss, sealing -- seq equal to position of m_iEndPos +// +// m_iStartPos unchanged. +// m_iEndPos: +// - since this position, search the first free cell +// - if reached the end of filled region (m_iMaxPosOff), stay there. +// m_iDropPos: +// - start from the value equal to m_iEndPos +// - walk at maximum to m_iMaxPosOff +// - find the first existing packet +// NOTE: +// If there are no "after gap" packets, then m_iMaxPosOff == m_iEndPos. +// If there is one existing packet, then one loss, then one packet, it +// should be that m_iEndPos = m_iStartPos %+ 1, m_iDropPos can reach +// to m_iStartPos %+ 2 position, and m_iMaxPosOff == m_iStartPos %+ 3. +// +// To wrap up: +// +// Let's say we have the following possibilities in a general scheme: +// +// +// [D] [C] [B] [A] (insertion cases) +// | (start) --- (end) ===[gap]=== (after-loss) ... (max-pos) | +// +// See the CRcvBuffer::updatePosInfo method for detailed implementation. +// +// WHEN INSERTING A NEW PACKET: +// +// If the incoming sequence maps to newpktpos that is: +// +// * newpktpos <% (start) : discard the packet and exit +// * newpktpos %> (size) : report discrepancy, discard and exit +// * newpktpos %> (start) and: +// * EXISTS: discard and exit (NOTE: could be also < (end)) +// [A]* seq == m_iMaxPosOff +// --> INC m_iMaxPosOff +// * m_iEndPos == previous m_iMaxPosOff +// * previous m_iMaxPosOff + 1 == m_iMaxPosOff +// --> m_iEndPos = m_iMaxPosOff +// --> m_iDropPos = m_iEndPos +// * otherwise (means the new packet caused a gap) +// --> m_iEndPos REMAINS UNCHANGED +// --> m_iDropPos = POSITION(m_iMaxPosOff) +// COMMENT: +// If this above condition isn't satisfied, then there are +// gaps, first at m_iEndPos, and m_iDropPos is at furthest +// equal to m_iMaxPosOff %- 1. The inserted packet is outside +// both the contiguous region and the following scratched region, +// so no updates on m_iEndPos and m_iDropPos are necessary. +// +// NOTE +// SINCE THIS PLACE seq cannot be a sequence of an existing packet, +// which means that earliest newpktpos == m_iEndPos, up to == m_iMaxPosOff -% 2. +// +// * otherwise (newpktpos <% max-pos): +// [D]* newpktpos == m_iEndPos: +// --> (search FIRST GAP and FIRST AFTER-GAP) +// --> m_iEndPos: increase until reaching m_iMaxPosOff +// * m_iEndPos <% m_iMaxPosOff: +// --> m_iDropPos = first VALID packet since m_iEndPos +% 1 +// * otherwise: +// --> m_iDropPos = m_iEndPos +// [B]* newpktpos %> m_iDropPos +// --> store, but do not update anything +// [C]* otherwise (newpktpos %> m_iEndPos && newpktpos <% m_iDropPos) +// --> store +// --> set m_iDropPos = newpktpos +// COMMENT: +// It is guaratneed that between m_iEndPos and m_iDropPos +// there is only a gap (series of empty cells). So wherever +// this packet lands, if it's next to m_iEndPos and before m_iDropPos +// it will be the only packet that violates the gap, hence this +// can be the only drop pos preceding the previous m_iDropPos. +// +// -- information returned to the caller should contain: +// 1. Whether adding to the buffer was successful. +// 2. Whether the "freshest" retrievable packet has been changed, that is: +// * in live mode, a newly added packet has earlier delivery time than one before +// * in stream mode, the newly added packet was at cell[0] +// * in message mode, if the newly added packet has: +// * completed the very first message +// * completed any message further than first that has out-of-order flag +// +// The information about a changed packet is important for the caller in +// live mode in order to notify the TSBPD thread. +// +// +// +// WHEN CHECKING A PACKET +// +// 1. Check the position at m_iStartPos. If there is a packet, +// return info at its position. +// +// 2. If position on m_iStartPos is empty, get the value of m_iDropPos. +// +// NOTE THAT: +// * if the buffer is empty, m_iDropPos == m_iStartPos and == m_iEndPos; +// note that m_iDropPos == m_iStartPos suffices to check that +// * if there is a packet in the buffer, but the first cell is empty, +// then m_iDropPos points to this packet, while m_iEndPos == m_iStartPos. +// Check then m_iStartPos == m_iEndPos to recognize it, and if then +// m_iDropPos isn't equal to them, you can read with dropping. +// * If cell[0] is valid, there could be only at worst cell[1] empty +// and cell[2] pointed by m_iDropPos. +// +// 3. In case of time-based checking for live mode, return empty packet info, +// if this packet's time is later than given time. +// +// WHEN EXTRACTING A PACKET +// +// 1. Extraction is only possible if there is a packet at cell[0]. +// 2. If there's no packet at cell[0], the application may request to +// drop up to the given packet, or drop the whole message up to +// the beginning of the next message. +// 3. In message mode, extraction can only extract a full message, so +// if there's no full message ready, nothing is extracted. +// 4. When the extraction region is defined, the m_iStartPos is shifted +// by the number of extracted packets. +// 5. If m_iEndPos <% m_iStartPos (after update), m_iEndPos should be +// set by searching from m_iStartPos up to m_iMaxPosOff for an empty cell. +// 6. m_iDropPos must be always updated. If m_iEndPos == m_iMaxPosOff, +// m_iDropPos is set to their value. Otherwise start from m_iEndPos +// and search a valid packet up to m_iMaxPosOff. +// 7. NOTE: m_iMaxPosOff is a delta, hence it must be set anew after update +// for m_iStartPos. +// class CRcvBuffer { @@ -53,16 +411,70 @@ class CRcvBuffer ~CRcvBuffer(); public: - /// Insert a unit into the buffer. - /// Similar to CRcvBuffer::addData(CUnit* unit, int offset) + + void debugShowState(const char* source); + + struct InsertInfo + { + enum Result { INSERTED = 0, REDUNDANT = -1, BELATED = -2, DISCREPANCY = -3 } result; + + // Below fields are valid only if result == INSERTED. Otherwise they have trap repro. + + CSeqNo first_seq; // sequence of the first available readable packet + time_point first_time; // Time of the new, earlier packet that appeared ready, or null-time if this didn't change. + COff avail_range; + + InsertInfo(Result r, int fp_seq = SRT_SEQNO_NONE, int range = 0, + time_point fp_time = time_point()) + : result(r), first_seq(fp_seq), first_time(fp_time), avail_range(range) + { + } + + InsertInfo() + : result(REDUNDANT), first_seq(SRT_SEQNO_NONE), avail_range(0) + { + } + + }; + + /// Inserts the unit with the data packet into the receiver buffer. + /// The result inform about the situation with the packet attempted + /// to be inserted and the readability of the buffer. /// - /// @param [in] unit pointer to a data unit containing new packet - /// @param [in] offset offset from last ACK point. + /// @param [PASS] unit The unit that should be placed in the buffer /// - /// @return 0 on success, -1 if packet is already in buffer, -2 if packet is before m_iStartSeqNo. - /// -3 if a packet is offset is ahead the buffer capacity. - // TODO: Previously '-2' also meant 'already acknowledged'. Check usage of this value. - int insert(CUnit* unit); + /// @return The InsertInfo structure where: + /// * result: the result of insertion, which is: + /// * INSERTED: successfully placed in the buffer + /// * REDUNDANT: not placed, the packet is already there + /// * BELATED: not placed, its sequence is in the past + /// * DISCREPANCY: not placed, the sequence is far future or OOTB + /// * first_seq: the earliest sequence number now avail for reading + /// * avail_range: how many packets are available for reading (1 if unknown) + /// * first_time: the play time of the earliest read-available packet + /// If there is no available packet for reading, first_seq == SRT_SEQNO_NONE. + /// + InsertInfo insert(CUnit* unit); + + time_point updatePosInfo(const CUnit* unit, const COff prev_max_off, const COff offset, const bool extended_end); + void getAvailInfo(InsertInfo& w_if); + + /// Update the values of `m_iEndPos` and `m_iDropPos` in + /// case when `m_iEndPos` was updated to a position of a + /// nonempty cell. + /// + /// This function should be called after having m_iEndPos + /// has somehow be set to position of a non-empty cell. + /// This can happen by two reasons: + /// + /// - the cell has been filled by incoming packet + /// - the value has been reset due to shifted m_iStartPos + /// + /// This means that you have to search for a new gap and + /// update the m_iEndPos and m_iDropPos fields, or set them + /// both to the end of range if there are no loss gaps. + /// + void updateGapInfo(); /// Drop packets in the receiver buffer from the current position up to the seqno (excluding seqno). /// @param [in] seqno drop units up to this sequence number @@ -98,16 +510,28 @@ class CRcvBuffer /// @return the number of packets actually dropped. int dropMessage(int32_t seqnolo, int32_t seqnohi, int32_t msgno, DropActionIfExists actionOnExisting); + /// Extract the "expected next" packet sequence. + /// Extract the past-the-end sequence for the first packet + /// that is expected to arrive next with preserving the packet order. + /// If the buffer is empty or the very first cell is lacking a packet, + /// it returns the sequence assigned to the first cell. Otherwise it + /// returns the sequence representing the first empty cell (the next + /// cell to the last received packet, if there are no loss-holes). + /// @param [out] w_seq: returns the sequence (always valid) + /// @return true if this sequence is followed by any valid packets + bool getContiguousEnd(int32_t& w_seq) const; + /// Read the whole message from one or several packets. /// - /// @param [in,out] data buffer to write the message into. + /// @param [out] data buffer to write the message into. /// @param [in] len size of the buffer. - /// @param [in,out] message control data + /// @param [out,opt] message control data to be filled + /// @param [out,opt] pw_seqrange range of sequence numbers for packets belonging to the message /// /// @return actual number of bytes extracted from the buffer. /// 0 if nothing to read. /// -1 on failure. - int readMessage(char* data, size_t len, SRT_MSGCTRL* msgctrl = NULL); + int readMessage(char* data, size_t len, SRT_MSGCTRL* msgctrl = NULL, std::pair* pw_seqrange = NULL); /// Read acknowledged data into a user buffer. /// @param [in, out] dst pointer to the target user buffer. @@ -123,24 +547,25 @@ class CRcvBuffer public: /// Get the starting position of the buffer as a packet sequence number. - int getStartSeqNo() const { return m_iStartSeqNo; } + int32_t getStartSeqNo() const { return m_iStartSeqNo.val(); } /// Sets the start seqno of the buffer. /// Must be used with caution and only when the buffer is empty. - void setStartSeqNo(int seqno) { m_iStartSeqNo = seqno; } + void setStartSeqNo(int32_t seqno) { m_iStartSeqNo = CSeqNo(seqno); } /// Given the sequence number of the first unacknowledged packet /// tells the size of the buffer available for packets. /// Effective returns capacity of the buffer minus acknowledged packet still kept in it. // TODO: Maybe does not need to return minus one slot now to distinguish full and empty buffer. - size_t getAvailSize(int iFirstUnackSeqNo) const + size_t getAvailSize(int32_t iFirstUnackSeqNo) const { // Receiver buffer allows reading unacknowledged packets. // Therefore if the first packet in the buffer is ahead of the iFirstUnackSeqNo // then it does not have acknowledged packets and its full capacity is available. // Otherwise subtract the number of acknowledged but not yet read packets from its capacity. - const int iRBufSeqNo = getStartSeqNo(); + const int32_t iRBufSeqNo = m_iStartSeqNo.val(); if (CSeqNo::seqcmp(iRBufSeqNo, iFirstUnackSeqNo) >= 0) // iRBufSeqNo >= iFirstUnackSeqNo + //if (iRBufSeqNo >= CSeqNo(iFirstUnackSeqNo)) { // Full capacity is available. return capacity(); @@ -198,11 +623,11 @@ class CRcvBuffer /// @note CSeqNo::seqoff(first, second) is 0 if nothing to read. std::pair getAvailablePacketsRange() const; - size_t countReadable() const; + int32_t getFirstLossSeq(int32_t fromseq, int32_t* opt_end = NULL); bool empty() const { - return (m_iMaxPosOff == 0); + return (m_iMaxPosOff == COff(0)); } /// Returns the currently used number of cells, including @@ -255,54 +680,104 @@ class CRcvBuffer const CUnit* peek(int32_t seqno); private: - inline int incPos(int pos, int inc = 1) const { return (pos + inc) % m_szSize; } - inline int decPos(int pos) const { return (pos - 1) >= 0 ? (pos - 1) : int(m_szSize - 1); } - inline int offPos(int pos1, int pos2) const { return (pos2 >= pos1) ? (pos2 - pos1) : int(m_szSize + pos2 - pos1); } + //* + CPos incPos(CPos pos, COff inc = COff(1)) const { return CPos((pos + inc) % m_szSize); } + CPos decPos(CPos pos) const { return (pos - 1) >= 0 ? CPos(pos - 1) : CPos(m_szSize - 1); } + COff offPos(CPos pos1, CPos pos2) const + { + int diff = pos2 - pos1; + if (diff >= 0) + { + return COff(diff); + } + return COff(m_szSize + diff); + } + + COff posToOff(CPos pos) const { return offPos(m_iStartPos, pos); } + + static COff decOff(COff val, int shift) + { + int ival = val - shift; + if (ival < 0) + return COff(0); + return COff(ival); + } /// @brief Compares the two positions in the receiver buffer relative to the starting position. /// @param pos2 a position in the receiver buffer. /// @param pos1 a position in the receiver buffer. /// @return a positive value if pos2 is ahead of pos1; a negative value, if pos2 is behind pos1; otherwise returns 0. - inline int cmpPos(int pos2, int pos1) const + inline COff cmpPos(CPos pos2, CPos pos1) const { // XXX maybe not the best implementation, but this keeps up to the rule. // Maybe use m_iMaxPosOff to ensure a position is not behind the m_iStartPos. - const int off1 = pos1 >= m_iStartPos ? pos1 - m_iStartPos : pos1 + (int)m_szSize - m_iStartPos; - const int off2 = pos2 >= m_iStartPos ? pos2 - m_iStartPos : pos2 + (int)m_szSize - m_iStartPos; - return off2 - off1; + return posToOff(pos2) - posToOff(pos1); + } + // */ + + // Check if iFirstNonreadPos is in range [iStartPos, (iStartPos + iMaxPosOff) % iSize]. + // The right edge is included because we expect iFirstNonreadPos to be + // right after the last valid packet position if all packets are available. + static bool isInRange(CPos iStartPos, COff iMaxPosOff, size_t iSize, CPos iFirstNonreadPos) + { + if (iFirstNonreadPos == iStartPos) + return true; + + const CPos iLastPos = CPos((iStartPos + iMaxPosOff) % int(iSize)); + const bool isOverrun = iLastPos < iStartPos; + + if (isOverrun) + return iFirstNonreadPos > iStartPos || iFirstNonreadPos <= iLastPos; + + return iFirstNonreadPos > iStartPos && iFirstNonreadPos <= iLastPos; + } + + bool isInUsedRange(CPos iFirstNonreadPos) + { + if (iFirstNonreadPos == m_iStartPos) + return true; + + // DECODE the iFirstNonreadPos + int diff = iFirstNonreadPos - m_iStartPos; + if (diff < 0) + diff += m_szSize; + + return diff <= m_iMaxPosOff; } // NOTE: Assumes that pUnit != NULL - CPacket& packetAt(int pos) { return m_entries[pos].pUnit->m_Packet; } - const CPacket& packetAt(int pos) const { return m_entries[pos].pUnit->m_Packet; } + CPacket& packetAt(CPos pos) { return m_entries[pos].pUnit->m_Packet; } + const CPacket& packetAt(CPos pos) const { return m_entries[pos].pUnit->m_Packet; } private: void countBytes(int pkts, int bytes); void updateNonreadPos(); - void releaseUnitInPos(int pos); + void releaseUnitInPos(CPos pos); /// @brief Drop a unit from the buffer. /// @param pos position in the m_entries of the unit to drop. /// @return false if nothing to drop, true if the unit was dropped successfully. - bool dropUnitInPos(int pos); + bool dropUnitInPos(CPos pos); /// Release entries following the current buffer position if they were already /// read out of order (EntryState_Read) or dropped (EntryState_Drop). - void releaseNextFillerEntries(); + /// + /// @return the range for which the start pos has been shifted + int releaseNextFillerEntries(); bool hasReadableInorderPkts() const { return (m_iFirstNonreadPos != m_iStartPos); } /// Find position of the last packet of the message. - int findLastMessagePkt(); + CPos findLastMessagePkt(); /// Scan for availability of out of order packets. - void onInsertNotInOrderPacket(int insertpos); - // Check if m_iFirstReadableOutOfOrder is still readable. - bool checkFirstReadableOutOfOrder(); - void updateFirstReadableOutOfOrder(); - int scanNotInOrderMessageRight(int startPos, int msgNo) const; - int scanNotInOrderMessageLeft(int startPos, int msgNo) const; + void onInsertNonOrderPacket(CPos insertpos); + // Check if m_iFirstNonOrderMsgPos is still readable. + bool checkFirstReadableNonOrder(); + void updateFirstReadableNonOrder(); + CPos scanNonOrderMessageRight(CPos startPos, int msgNo) const; + CPos scanNonOrderMessageLeft(CPos startPos, int msgNo) const; typedef bool copy_to_dst_f(char* data, int len, int dst_offset, void* arg); @@ -358,15 +833,20 @@ class CRcvBuffer const size_t m_szSize; // size of the array of units (buffer) CUnitQueue* m_pUnitQueue; // the shared unit queue - int m_iStartSeqNo; - int m_iStartPos; // the head position for I/O (inclusive) - int m_iFirstNonreadPos; // First position that can't be read (<= m_iLastAckPos) - int m_iMaxPosOff; // the furthest data position - int m_iNotch; // the starting read point of the first unit + CSeqNo m_iStartSeqNo; + CPos m_iStartPos; // the head position for I/O (inclusive) + COff m_iEndOff; // past-the-end of the contiguous region since m_iStartOff + COff m_iDropOff; // points past m_iEndOff to the first deliverable after a gap, or == m_iEndOff if no such packet + CPos m_iFirstNonreadPos; // First position that can't be read (<= m_iLastAckPos) + COff m_iMaxPosOff; // the furthest data position + int m_iNotch; // index of the first byte to read in the first ready-to-read packet (used in file/stream mode) + + size_t m_numNonOrderPackets; // The number of stored packets with "inorder" flag set to false - size_t m_numOutOfOrderPackets; // The number of stored packets with "inorder" flag set to false - int m_iFirstReadableOutOfOrder; // In case of out ouf order packet, points to a position of the first such packet to - // read + /// Points to the first packet of a message that has out-of-order flag + /// and is complete (all packets from first to last are in the buffer). + /// If there is no such message in the buffer, it contains -1. + CPos m_iFirstNonOrderMsgPos; bool m_bPeerRexmitFlag; // Needed to read message number correctly const bool m_bMessageAPI; // Operation mode flag: message or stream. @@ -396,7 +876,7 @@ class CRcvBuffer /// Form a string of the current buffer fullness state. /// number of packets acknowledged, TSBPD readiness, etc. - std::string strFullnessState(int iFirstUnackSeqNo, const time_point& tsNow) const; + std::string strFullnessState(int32_t iFirstUnackSeqNo, const time_point& tsNow) const; private: CTsbpdTime m_tsbpd; diff --git a/srtcore/common.h b/srtcore/common.h index 6a8912118..e0d7212cc 100644 --- a/srtcore/common.h +++ b/srtcore/common.h @@ -580,6 +580,8 @@ class CSeqNo explicit CSeqNo(int32_t v): value(v) {} + int32_t val() const { return value; } + // Comparison bool operator == (const CSeqNo& other) const { return other.value == value; } bool operator < (const CSeqNo& other) const @@ -683,14 +685,20 @@ class CSeqNo inline static int32_t incseq(int32_t seq) {return (seq == m_iMaxSeqNo) ? 0 : seq + 1;} + CSeqNo inc() const { return CSeqNo(incseq(value)); } + inline static int32_t decseq(int32_t seq) {return (seq == 0) ? m_iMaxSeqNo : seq - 1;} + CSeqNo dec() const { return CSeqNo(decseq(value)); } + inline static int32_t incseq(int32_t seq, int32_t inc) {return (m_iMaxSeqNo - seq >= inc) ? seq + inc : seq - m_iMaxSeqNo + inc - 1;} // m_iMaxSeqNo >= inc + sec --- inc + sec <= m_iMaxSeqNo // if inc + sec > m_iMaxSeqNo then return seq + inc - (m_iMaxSeqNo+1) + CSeqNo inc(int32_t i) const { return CSeqNo(incseq(value, i)); } + inline static int32_t decseq(int32_t seq, int32_t dec) { // Check if seq - dec < 0, but before it would have happened @@ -703,6 +711,8 @@ class CSeqNo return seq - dec; } + CSeqNo dec(int32_t i) const { return CSeqNo(decseq(value, i)); } + static int32_t maxseq(int32_t seq1, int32_t seq2) { if (seqcmp(seq1, seq2) < 0) diff --git a/srtcore/core.cpp b/srtcore/core.cpp index eca2b2069..b0ab9376c 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -8022,12 +8022,19 @@ void srt::CUDT::sendCtrl(UDTMessageType pkttype, const int32_t* lparam, void* rp m_tsLastSndTime.store(steady_clock::now()); } -// [[using locked(m_RcvBufferLock)]] bool srt::CUDT::getFirstNoncontSequence(int32_t& w_seq, string& w_log_reason) { + if (!m_pRcvBuffer) + { + LOGP(cnlog.Error, "IPE: ack can't be sent, buffer doesn't exist and no group membership"); + return false; + } if (m_config.bTSBPD || !m_config.bMessageAPI) { - // The getFirstNonreadSeqNo() function retuens the sequence number of the first packet + // NOTE: it's not only about protecting the buffer itself, it's also protecting + // the section where the m_iRcvCurrSeqNo is updated. + ScopedLock buflock (m_RcvBufferLock); + // The getFirstNonreadSeqNo() function returns the sequence number of the first packet // that cannot be read. In cases when a message can consist of several data packets, // an existing packet of partially available message also cannot be read. // If TSBPD mode is enabled, a message must consist of a single data packet only. @@ -8045,28 +8052,17 @@ bool srt::CUDT::getFirstNoncontSequence(int32_t& w_seq, string& w_log_reason) return true; } - - { - ScopedLock losslock (m_RcvLossLock); - const int32_t seq = m_pRcvLossList->getFirstLostSeq(); - if (seq != SRT_SEQNO_NONE) - { - HLOGC(xtlog.Debug, log << "NONCONT-SEQUENCE: first loss %" << seq << " (loss len=" << - m_pRcvLossList->getLossLength() << ")"); - w_seq = seq; - w_log_reason = "first lost"; - return true; - } - } - - w_seq = CSeqNo::incseq(m_iRcvCurrSeqNo); - HLOGC(xtlog.Debug, log << "NONCONT-SEQUENCE: past-recv %" << w_seq); - w_log_reason = "expected next"; + + ScopedLock buflock (m_RcvBufferLock); + bool has_followers = m_pRcvBuffer->getContiguousEnd((w_seq)); + if (has_followers) + w_log_reason = "first lost"; + else + w_log_reason = "expected next"; return true; } - int srt::CUDT::sendCtrlAck(CPacket& ctrlpkt, int size) { int nbsent = 0; @@ -8084,16 +8080,14 @@ int srt::CUDT::sendCtrlAck(CPacket& ctrlpkt, int size) local_prevack = m_iDebugPrevLastAck; #endif - string reason; // just for "a reason" of giving particular % for ACK - // The TSBPD thread may change the first lost sequence record (TLPKTDROP). - // To avoid it the m_RcvBufferLock has to be acquired. - UniqueLock bufflock(m_RcvBufferLock); + // NOTE: the below calls do locking on m_RcvBufferLock. + // Hence up to the handling of lite ACK, the scoped lock is not applied. // The full ACK should be sent to indicate there is now available space in the RCV buffer // since the last full ACK. It should unblock the sender to proceed further. - const bool bNeedFullAck = (m_bBufferWasFull && getAvailRcvBufferSizeNoLock() > 0); + const bool bNeedFullAck = (m_bBufferWasFull && !isRcvBufferFull()); int32_t ack; // First unacknowledged packet sequence number (acknowledge up to ack). - + string reason; // just for "a reason" of giving particular % for ACK if (!getFirstNoncontSequence((ack), (reason))) return nbsent; @@ -8107,7 +8101,6 @@ int srt::CUDT::sendCtrlAck(CPacket& ctrlpkt, int size) // to save time on buffer processing and bandwidth/AS measurement, a lite ACK only feeds back an ACK number if (size == SEND_LITE_ACK && !bNeedFullAck) { - bufflock.unlock(); ctrlpkt.pack(UMSG_ACK, NULL, &ack, size); ctrlpkt.set_id(m_PeerID); nbsent = m_pSndQueue->sendto(m_PeerAddr, ctrlpkt, m_SourceAddr); @@ -8115,6 +8108,16 @@ int srt::CUDT::sendCtrlAck(CPacket& ctrlpkt, int size) return nbsent; } + // Lock the group existence until this function ends. This will be useful + // also on other places. +#if ENABLE_BONDING + CUDTUnited::GroupKeeper gkeeper (uglobal(), m_parent); +#endif + + // There are new received packets to acknowledge, update related information. + /* tsbpd thread may also call ackData when skipping packet so protect code */ + UniqueLock bufflock(m_RcvBufferLock); + // IF ack %> m_iRcvLastAck // There are new received packets to acknowledge, update related information. if (CSeqNo::seqcmp(ack, m_iRcvLastAck) > 0) @@ -8181,23 +8184,22 @@ int srt::CUDT::sendCtrlAck(CPacket& ctrlpkt, int size) } } #endif - // If TSBPD is enabled, then INSTEAD OF signaling m_RecvDataCond, - // signal m_RcvTsbPdCond. This will kick in the tsbpd thread, which - // will signal m_RecvDataCond when there's time to play for particular - // data packet. + // Signalling m_RecvDataCond is not done when TSBPD is on. + // This signalling is done in file mode in order to keep the + // API reader thread sleeping until there is a "bigger portion" + // of data to read. In TSBPD mode this isn't done because every + // packet has its individual delivery time and its readiness is signed + // off by the TSBPD thread. HLOGC(xtlog.Debug, log << CONID() << "ACK: clip %" << m_iRcvLastAck << "-%" << ack << ", REVOKED " << CSeqNo::seqoff(ack, m_iRcvLastAck) << " from RCV buffer"); - if (m_bTsbPd) - { - /* Newly acknowledged data, signal TsbPD thread */ - CUniqueSync tslcc (m_RecvLock, m_RcvTsbPdCond); - // m_bTsbPdAckWakeup is protected by m_RecvLock in the tsbpd() thread - if (m_bTsbPdNeedsWakeup) - tslcc.notify_one(); - } - else + // There's no need to update TSBPD in the wake-on-recv state + // from ACK because it is being done already in the receiver thread + // when a newly inserted packet caused provision of a new candidate + // that could be delivered soon. Also, this flag is only used in TSBPD + // mode and can be only set to true in the TSBPD thread. + if (!m_bTsbPd) { { CUniqueSync rdcc (m_RecvLock, m_RecvDataCond); @@ -9087,14 +9089,27 @@ void srt::CUDT::processCtrlDropReq(const CPacket& ctrlpkt) m_stats.rcvr.dropped.count(stats::BytesPackets(iDropCnt * avgpayloadsz, (uint32_t) iDropCnt)); } } - // When the drop request was received, it means that there are - // packets for which there will never be ACK sent; if the TSBPD thread - // is currently in the ACK-waiting state, it may never exit. - if (m_bTsbPd) - { - HLOGP(inlog.Debug, "DROPREQ: signal TSBPD"); - rcvtscc.notify_one(); - } + + // NOTE: + // PREVIOUSLY done: notify on rcvtscc if m_bTsbPd + // OLD COMMENT: + // // When the drop request was received, it means that there are + // // packets for which there will never be ACK sent; if the TSBPD thread + // // is currently in the ACK-waiting state, it may never exit. + // + // Likely this is no longer necessary because: + // + // 1. If there's a play-ready packet, either in cell 0 or + // after a drop, TSBPD is sleeping timely, up to the play-time + // of the next ready packet (and the drop region concerned here + // is still a gap to be skipped for this). + // 2. TSBPD sleeps forever when the buffer is empty, in which case + // it will be always woken up on packet reception (see m_bTsbPdNeedsWakeup). + // And dropping won't happen in that case anyway. Note that the drop + // request will not drop packets that are already received. + // 3. TSBPD sleeps forever when the API call didn't extract the + // data that are ready to play. This isn't a problem if nothing + // except the API call would wake it up. } dropFromLossLists(dropdata[0], dropdata[1]); @@ -10064,7 +10079,7 @@ CUDT::time_point srt::CUDT::getPktTsbPdTime(void*, const CPacket& packet) SRT_ATR_UNUSED static const char *const s_rexmitstat_str[] = {"ORIGINAL", "REXMITTED", "RXS-UNKNOWN"}; // [[using locked(m_RcvBufferLock)]] -int srt::CUDT::handleSocketPacketReception(const vector& incoming, bool& w_new_inserted, bool& w_was_sent_in_order, CUDT::loss_seqs_t& w_srt_loss_seqs) +int srt::CUDT::handleSocketPacketReception(const vector& incoming, bool& w_new_inserted, time_point& w_next_tsbpd, bool& w_was_sent_in_order, CUDT::loss_seqs_t& w_srt_loss_seqs) { bool excessive SRT_ATR_UNUSED = true; // stays true unless it was successfully added @@ -10074,7 +10089,7 @@ int srt::CUDT::handleSocketPacketReception(const vector& incoming, bool& // Loop over all incoming packets that were filtered out. // In case when there is no filter, there's just one packet in 'incoming', // the one that came in the input of this function. - for (vector::const_iterator unitIt = incoming.begin(); unitIt != incoming.end(); ++unitIt) + for (vector::const_iterator unitIt = incoming.begin(); unitIt != incoming.end() && !m_bBroken; ++unitIt) { CUnit * u = *unitIt; CPacket &rpkt = u->m_Packet; @@ -10163,7 +10178,26 @@ int srt::CUDT::handleSocketPacketReception(const vector& incoming, bool& } } - const int buffer_add_result = m_pRcvBuffer->insert(u); + CRcvBuffer::InsertInfo info = m_pRcvBuffer->insert(u); + + // Remember this value in order to CHECK if there's a need + // to request triggering TSBPD in case when TSBPD is in the + // state of waiting forever and wants to know if there's any + // possible time to wake up known earlier than that. + + // Note that in case of the "builtin group reader" (its own + // buffer), there's no need to do it here because it has also + // its own TSBPD thread. + + if (info.result == CRcvBuffer::InsertInfo::INSERTED) + { + // This may happen multiple times in the loop, so update only if earlier. + if (w_next_tsbpd == time_point() || w_next_tsbpd > info.first_time) + w_next_tsbpd = info.first_time; + w_new_inserted = true; + } + const int buffer_add_result = int(info.result); + if (buffer_add_result < 0) { // The insert() result is -1 if at the position evaluated from this packet's @@ -10174,8 +10208,6 @@ int srt::CUDT::handleSocketPacketReception(const vector& incoming, bool& } else { - w_new_inserted = true; - IF_HEAVY_LOGGING(exc_type = "ACCEPTED"); excessive = false; if (u->m_Packet.getMsgCryptoFlags() != EK_NOENC) @@ -10484,6 +10516,8 @@ int srt::CUDT::processData(CUnit* in_unit) } #endif + // NULL time by default + time_point next_tsbpd_avail; bool new_inserted = false; if (m_PacketFilter) @@ -10512,6 +10546,7 @@ int srt::CUDT::processData(CUnit* in_unit) const int res = handleSocketPacketReception(incoming, (new_inserted), + (next_tsbpd_avail), (was_sent_in_order), (srt_loss_seqs)); @@ -10586,6 +10621,42 @@ int srt::CUDT::processData(CUnit* in_unit) return -1; } + // 1. This is set to true in case when TSBPD during the last check + // has seen no packet candidate to ever deliver, hence it needs + // an update on that. Note that this is also false if TSBPD thread + // isn't running. + // 2. If next_tsbpd_avail is set, it means that in the buffer there is + // a new packet that precedes the previously earliest available packet. + // This means that if TSBPD was sleeping up to the time of this earliest + // delivery (after drop), this time we have received a packet to be delivered + // earlier than that, so we need to notify TSBPD immediately so that it + // updates this itself, not sleep until the previously set time. + + // The meaning of m_bTsbPdNeedsWakeup: + // - m_bTsbPdNeedsWakeup is set by TSBPD thread and means that it wishes to be woken up + // on every received packet. Hence we signal always if a new packet was inserted. + // - even if TSBPD doesn't wish to be woken up on every reception (because it sleeps + // until the play time of the next deliverable packet), it will be woken up when + // next_tsbpd_avail is set because it means this time is earlier than the time until + // which TSBPD sleeps, so it must be woken up prematurely. It might be more performant + // to simply update the sleeping end time of TSBPD, but there's no way to do it, so + // we simply wake TSBPD up and count on that it will update its sleeping settings. + + // XXX Consider: as CUniqueSync locks m_RecvLock, it means that the next instruction + // gets run only when TSBPD falls asleep again. Might be a good idea to record the + // TSBPD end sleeping time - as an alternative to m_bTsbPdNeedsWakeup - and after locking + // a mutex check this time again and compare it against next_tsbpd_avail; might be + // that if this difference is smaller than "dirac" (could be hard to reliably compare + // this time, unless it's set from this very value), there's no need to wake the TSBPD + // thread because it will wake up on time requirement at the right time anyway. + if (m_bTsbPd && ((m_bTsbPdNeedsWakeup && new_inserted) || next_tsbpd_avail != time_point())) + { + HLOGC(qrlog.Debug, log << "processData: will SIGNAL TSBPD for socket. WakeOnRecv=" << m_bTsbPdNeedsWakeup + << " new_inserted=" << new_inserted << " next_tsbpd_avail=" << FormatTime(next_tsbpd_avail)); + CUniqueSync tsbpd_cc(m_RecvLock, m_RcvTsbPdCond); + tsbpd_cc.notify_all(); + } + if (incoming.empty()) { // Treat as excessive. This is when a filter cumulates packets @@ -10601,16 +10672,6 @@ int srt::CUDT::processData(CUnit* in_unit) HLOGC(qrlog.Debug, log << CONID() << "WILL REPORT LOSSES (SRT): " << Printable(srt_loss_seqs)); sendLossReport(srt_loss_seqs); } - - if (m_bTsbPd) - { - HLOGC(qrlog.Debug, log << CONID() << "loss: signaling TSBPD cond"); - CSync::lock_notify_one(m_RcvTsbPdCond, m_RecvLock); - } - else - { - HLOGC(qrlog.Debug, log << CONID() << "loss: socket is not TSBPD, not signaling"); - } } // Separately report loss records of those reported by a filter. @@ -10622,12 +10683,6 @@ int srt::CUDT::processData(CUnit* in_unit) { HLOGC(qrlog.Debug, log << CONID() << "WILL REPORT LOSSES (filter): " << Printable(filter_loss_seqs)); sendLossReport(filter_loss_seqs); - - if (m_bTsbPd) - { - HLOGC(qrlog.Debug, log << CONID() << "loss: signaling TSBPD cond"); - CSync::lock_notify_one(m_RcvTsbPdCond, m_RecvLock); - } } // Now review the list of FreshLoss to see if there's any "old enough" to send UMSG_LOSSREPORT to it. diff --git a/srtcore/core.h b/srtcore/core.h index ed250c641..9d9be4351 100644 --- a/srtcore/core.h +++ b/srtcore/core.h @@ -700,6 +700,7 @@ class CUDT void unlose(const CPacket& oldpacket); void dropFromLossLists(int32_t from, int32_t to); + SRT_ATTR_EXCLUDES(m_RcvBufferLock) SRT_ATTR_REQUIRES(m_RecvAckLock) bool getFirstNoncontSequence(int32_t& w_seq, std::string& w_log_reason); @@ -999,7 +1000,7 @@ class CUDT SRT_ATTR_GUARDED_BY(m_RcvTsbPdStartupLock) sync::CThread m_RcvTsbPdThread; // Rcv TsbPD Thread handle sync::Condition m_RcvTsbPdCond; // TSBPD signals if reading is ready. Use together with m_RecvLock - bool m_bTsbPdNeedsWakeup; // Signal TsbPd thread to wake up on RCV buffer state change. + sync::atomic m_bTsbPdNeedsWakeup; // Expected to wake up TSBPD when a read-ready data packet is received. sync::Mutex m_RcvTsbPdStartupLock; // Protects TSBPD thread creation and joining. CallbackHolder m_cbAcceptHook; @@ -1142,16 +1143,17 @@ class CUDT /// /// @param incoming [in] The packet coming from the network medium /// @param w_new_inserted [out] Set false, if the packet already exists, otherwise true (packet added) + /// @param w_next_tsbpd [out] Get the TSBPD time of the earliest playable packet after insertion /// @param w_was_sent_in_order [out] Set false, if the packet was belated, but had no R flag set. /// @param w_srt_loss_seqs [out] Gets inserted a loss, if this function has detected it. /// /// @return 0 The call was successful (regardless if the packet was accepted or not). /// @return -1 The call has failed: no space left in the buffer. /// @return -2 The incoming packet exceeds the expected sequence by more than a length of the buffer (irrepairable discrepancy). - int handleSocketPacketReception(const std::vector& incoming, bool& w_new_inserted, bool& w_was_sent_in_order, CUDT::loss_seqs_t& w_srt_loss_seqs); + int handleSocketPacketReception(const std::vector& incoming, bool& w_new_inserted, sync::steady_clock::time_point& w_next_tsbpd, bool& w_was_sent_in_order, CUDT::loss_seqs_t& w_srt_loss_seqs); - /// Get the packet's TSBPD time - - /// the time when it is passed to the reading application. + // This function is to return the packet's play time (time when + // it is submitted to the reading application) of the given packet. /// The @a grp passed by void* is not used yet /// and shall not be used when ENABLE_BONDING=0. time_point getPktTsbPdTime(void* grp, const CPacket& packet); diff --git a/srtcore/utilities.h b/srtcore/utilities.h index 1786cf0ae..a90e13e66 100644 --- a/srtcore/utilities.h +++ b/srtcore/utilities.h @@ -417,7 +417,7 @@ struct DynamicStruct /// Fixed-size array template class. namespace srt { -template +template class FixedArray { public: @@ -433,22 +433,23 @@ class FixedArray } public: - const T& operator[](size_t index) const + const T& operator[](Indexer index) const { - if (index >= m_size) - throw_invalid_index(index); + if (int(index) >= int(m_size)) + throw_invalid_index(int(index)); - return m_entries[index]; + return m_entries[int(index)]; } - T& operator[](size_t index) + T& operator[](Indexer index) { - if (index >= m_size) - throw_invalid_index(index); + if (int(index) >= int(m_size)) + throw_invalid_index(int(index)); - return m_entries[index]; + return m_entries[int(index)]; } + /* const T& operator[](int index) const { if (index < 0 || static_cast(index) >= m_size) @@ -464,6 +465,7 @@ class FixedArray return m_entries[index]; } + */ size_t size() const { return m_size; } diff --git a/test/test_buffer_rcv.cpp b/test/test_buffer_rcv.cpp index 511c2dcb1..cd00f371c 100644 --- a/test/test_buffer_rcv.cpp +++ b/test/test_buffer_rcv.cpp @@ -76,7 +76,10 @@ class CRcvBufferReadMsg EXPECT_TRUE(packet.getMsgOrderFlag()); } - return m_rcv_buffer->insert(unit); + auto info = m_rcv_buffer->insert(unit); + // XXX extra checks? + + return int(info.result); } /// @returns 0 on success, the result of rcv_buffer::insert(..) once it failed @@ -643,6 +646,77 @@ TEST_F(CRcvBufferReadMsg, MsgOutOfOrderDrop) EXPECT_EQ(m_unit_queue->size(), m_unit_queue->capacity()); } +TEST_F(CRcvBufferReadMsg, MsgOrderScraps) +{ + // Ok, in this test we're filling the message this way: + // 1. We have an empty packet in the first cell. + // 2. This is followed by a 5-packet message that is valid. + // 3. This is followed by empty, valid, empty, valid, valid packet, + // where all valid packets belong to the same message. + // 4. After that there should be 3-packet valid messsage. + // 5. We deploy drop request to that second scrapped message. + // 6. We read one message. Should be the first message. + // 7. We read one message. Should be the last message. + + auto& rcv_buffer = *m_rcv_buffer.get(); + + // 1, 2 + addMessage(5,// packets + 2, // msgno + m_init_seqno + 1, + true); + + // LAYOUT: 10 11 12 13 + // [0] [1] [2] [3] [4] [5] [6] [7] [8] [9] [A] [B] [C] [D] [E] [F] + // * (2 2 2 2 2) * 3 * 3 3) (4 4 4) + + // 3 + addPacket( + m_init_seqno + 7, + 3, + false, false, // subsequent + true); + + addPacket( + m_init_seqno + 9, + 3, + false, false, // subsequent + true); + + addPacket( + m_init_seqno + 10, + 3, + false, true, // last + true); + + // 4 + addMessage(3, // packets + 4, // msgno + m_init_seqno + 11, + true); + + // 5 + EXPECT_GT(rcv_buffer.dropMessage(m_init_seqno+8, m_init_seqno+8, 3, CRcvBuffer::KEEP_EXISTING), 0); + + // 6 + array buff; + SRT_MSGCTRL mc; + pair seqrange; + EXPECT_TRUE(rcv_buffer.readMessage(buff.data(), buff.size(), (&mc), (&seqrange)) == m_payload_sz*5); + EXPECT_EQ(mc.msgno, 2); + EXPECT_EQ(seqrange, make_pair(m_init_seqno+1, m_init_seqno+5)); + + CRcvBuffer::InsertInfo ii; + rcv_buffer.getAvailInfo((ii)); + EXPECT_EQ(ii.first_seq.val(), m_init_seqno+11); + + // 7 + EXPECT_TRUE(rcv_buffer.readMessage(buff.data(), buff.size(), (&mc), (&seqrange)) == m_payload_sz*3); + EXPECT_EQ(mc.msgno, 4); + EXPECT_EQ(seqrange, make_pair(m_init_seqno+11, m_init_seqno+13)); + +} + // One message (4 packets) is added to the buffer after a message with "in order" flag. // Read in order TEST_F(CRcvBufferReadMsg, MsgOutOfOrderAfterInOrder)