From f33887dfdabd746ad7165951f6f877abfd39d303 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Ma=C5=82ecki?= Date: Fri, 28 Oct 2022 18:28:05 +0200 Subject: [PATCH 1/6] Refax: smaller fixes and added utilities --- srtcore/api.cpp | 8 -- srtcore/api.h | 9 ++ srtcore/buffer_rcv.h | 14 +++ srtcore/buffer_snd.cpp | 6 +- srtcore/common.h | 20 +++++ srtcore/core.cpp | 180 ++++++++------------------------------- srtcore/core.h | 22 +++-- srtcore/group.cpp | 14 ++- srtcore/group.h | 20 +++-- srtcore/handshake.h | 2 + srtcore/list.cpp | 75 ++++++++++++++-- srtcore/list.h | 31 +++++++ srtcore/logging.h | 2 +- srtcore/packet.cpp | 117 +++++++++++++++++-------- srtcore/queue.cpp | 85 ++++++++---------- srtcore/socketconfig.cpp | 33 ++++--- srtcore/socketconfig.h | 3 + srtcore/sync.h | 2 +- srtcore/tsbpd_time.cpp | 11 ++- srtcore/utilities.h | 54 +++++++++++- test/filelist.maf | 2 +- testing/testmedia.cpp | 36 ++++++-- 22 files changed, 458 insertions(+), 288 deletions(-) diff --git a/srtcore/api.cpp b/srtcore/api.cpp index ea61a297a..c5a48ea3b 100644 --- a/srtcore/api.cpp +++ b/srtcore/api.cpp @@ -1481,14 +1481,6 @@ int srt::CUDTUnited::groupConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* targets, i int isn = g.currentSchedSequence(); - // Don't synchronize ISN in case of synch on msgno. Every link - // may send their own payloads independently. - if (g.synconmsgno()) - { - HLOGC(aclog.Debug, log << "groupConnect: NOT synchronizing sequence numbers: will sync on msgno"); - isn = -1; - } - // Set it the groupconnect option, as all in-group sockets should have. ns->core().m_config.iGroupConnect = 1; diff --git a/srtcore/api.h b/srtcore/api.h index ca812bf9e..551590e3e 100644 --- a/srtcore/api.h +++ b/srtcore/api.h @@ -193,6 +193,15 @@ class CUDTSocket /// to finish sending the data that were scheduled for sending so far. void setClosed(); + // This is necessary to be called from the group before the group clears + // the connection with the socket. As for managed groups (and there are + // currently no other group types), a socket disconnected from the group + // is no longer usable. + void setClosing() + { + core().m_bClosing = true; + } + /// This does the same as setClosed, plus sets the m_bBroken to true. /// Such a socket can still be read from so that remaining data from /// the receiver buffer can be read, but no longer sends anything. diff --git a/srtcore/buffer_rcv.h b/srtcore/buffer_rcv.h index 52e927f22..1a5a78e26 100644 --- a/srtcore/buffer_rcv.h +++ b/srtcore/buffer_rcv.h @@ -225,6 +225,18 @@ class CRcvBuffer inline int incPos(int pos, int inc = 1) const { return (pos + inc) % m_szSize; } inline int decPos(int pos) const { return (pos - 1) >= 0 ? (pos - 1) : int(m_szSize - 1); } inline int offPos(int pos1, int pos2) const { return (pos2 >= pos1) ? (pos2 - pos1) : int(m_szSize + pos2 - pos1); } + inline int cmpPos(int pos2, int pos1) const + { + // XXX maybe not the best implementation, but this keeps up to the rule + int off1 = pos1 >= m_iStartPos ? pos1 - m_iStartPos : pos1 + m_szSize - m_iStartPos; + int off2 = pos2 >= m_iStartPos ? pos2 - m_iStartPos : pos2 + m_szSize - m_iStartPos; + + return off2 - off1; + } + + // NOTE: Assumes that pUnit != NULL + CPacket& packetAt(int pos) { return m_entries[pos].pUnit->m_Packet; } + const CPacket& packetAt(int pos) const { return m_entries[pos].pUnit->m_Packet; } private: void countBytes(int pkts, int bytes); @@ -342,6 +354,8 @@ class CRcvBuffer time_point getTsbPdTimeBase(uint32_t usPktTimestamp) const; void updateTsbPdTimeBase(uint32_t usPktTimestamp); + bool isTsbPd() const { return m_tsbpd.isEnabled(); } + /// Form a string of the current buffer fullness state. /// number of packets acknowledged, TSBPD readiness, etc. std::string strFullnessState(bool enable_debug_log, int iFirstUnackSeqNo, const time_point& tsNow) const; diff --git a/srtcore/buffer_snd.cpp b/srtcore/buffer_snd.cpp index e945b166c..4eef47932 100644 --- a/srtcore/buffer_snd.cpp +++ b/srtcore/buffer_snd.cpp @@ -359,7 +359,11 @@ int CSndBuffer::readData(CPacket& w_packet, steady_clock::time_point& w_srctime, continue; } - HLOGC(bslog.Debug, log << CONID() << "CSndBuffer: extracting packet size=" << readlen << " to send"); + HLOGC(bslog.Debug, log << CONID() << "CSndBuffer: picked up packet to send: size=" << readlen + << " #" << w_packet.getMsgSeq() + << " %" << w_packet.m_iSeqNo + << " !" << BufferStamp(w_packet.m_pcData, w_packet.getLength())); + break; } diff --git a/srtcore/common.h b/srtcore/common.h index 227a91861..c8de517c0 100644 --- a/srtcore/common.h +++ b/srtcore/common.h @@ -1416,6 +1416,26 @@ inline std::string SrtVersionString(int version) bool SrtParseConfig(std::string s, SrtConfig& w_config); + +inline std::string FormatLossArray(const std::vector< std::pair >& lra) +{ + std::ostringstream os; + + os << "[ "; + for (std::vector< std::pair >::const_iterator i = lra.begin(); i != lra.end(); ++i) + { + int len = CSeqNo::seqoff(i->first, i->second); + os << "%" << i->first; + if (len > 1) + os << "+" << len; + os << " "; + } + + os << "]"; + return os.str(); +} + + } // namespace srt #endif diff --git a/srtcore/core.cpp b/srtcore/core.cpp index 7be1515de..6b5901230 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -955,9 +955,11 @@ void srt::CUDT::open() m_tsLastRspAckTime = currtime; m_tsLastSndTime.store(currtime); +#if ENABLE_BONDING m_tsUnstableSince = steady_clock::time_point(); m_tsFreshActivation = steady_clock::time_point(); m_tsWarySince = steady_clock::time_point(); +#endif m_iReXmitCount = 1; m_iPktCount = 0; @@ -1286,12 +1288,6 @@ size_t srt::CUDT::fillHsExtGroup(uint32_t* pcmdspec) SRT_GROUP_TYPE tp = m_parent->m_GroupOf->type(); uint32_t flags = 0; - // Note: if agent is a listener, and the current version supports - // both sync methods, this flag might have been changed according to - // the wish of the caller. - if (m_parent->m_GroupOf->synconmsgno()) - flags |= SRT_GFLAG_SYNCONMSG; - // NOTE: this code remains as is for historical reasons. // The initial implementation stated that the peer id be // extracted so that it can be reported and possibly the @@ -3391,29 +3387,22 @@ void srt::CUDT::synchronizeWithGroup(CUDTGroup* gp) // with updateAfterSrtHandshake(). updateSrtSndSettings(); - if (gp->synconmsgno()) + // These are the values that are normally set initially by setters. + int32_t snd_isn = m_iSndLastAck, rcv_isn = m_iRcvLastAck; + if (!gp->applyGroupSequences(m_SocketID, (snd_isn), (rcv_isn))) { - HLOGC(gmlog.Debug, log << CONID() << "synchronizeWithGroup: NOT synchronizing sequence numbers."); + HLOGC(gmlog.Debug, + log << CONID() << "synchronizeWithGroup: DERIVED ISN: RCV=%" << m_iRcvLastAck << " -> %" << rcv_isn + << " (shift by " << CSeqNo::seqcmp(rcv_isn, m_iRcvLastAck) << ") SND=%" << m_iSndLastAck + << " -> %" << snd_isn << " (shift by " << CSeqNo::seqcmp(snd_isn, m_iSndLastAck) << ")"); + setInitialRcvSeq(rcv_isn); + setInitialSndSeq(snd_isn); } else { - // These are the values that are normally set initially by setters. - int32_t snd_isn = m_iSndLastAck, rcv_isn = m_iRcvLastAck; - if (!gp->applyGroupSequences(m_SocketID, (snd_isn), (rcv_isn))) - { - HLOGC(gmlog.Debug, - log << CONID() << "synchronizeWithGroup: DERIVED ISN: RCV=%" << m_iRcvLastAck << " -> %" << rcv_isn - << " (shift by " << CSeqNo::seqcmp(rcv_isn, m_iRcvLastAck) << ") SND=%" << m_iSndLastAck - << " -> %" << snd_isn << " (shift by " << CSeqNo::seqcmp(snd_isn, m_iSndLastAck) << ")"); - setInitialRcvSeq(rcv_isn); - setInitialSndSeq(snd_isn); - } - else - { - HLOGC(gmlog.Debug, - log << CONID() << "synchronizeWithGroup: DEFINED ISN: RCV=%" << m_iRcvLastAck << " SND=%" - << m_iSndLastAck); - } + HLOGC(gmlog.Debug, + log << CONID() << "synchronizeWithGroup: DEFINED ISN: RCV=%" << m_iRcvLastAck << " SND=%" + << m_iSndLastAck); } } #endif @@ -9292,7 +9281,7 @@ bool srt::CUDT::isRetransmissionAllowed(const time_point& tnow SRT_ATR_UNUSED) return true; } -std::pair srt::CUDT::packData(CPacket& w_packet) +bool srt::CUDT::packData(CPacket& w_packet, steady_clock::time_point& w_nexttime) { int payload = 0; bool probe = false; @@ -9300,6 +9289,8 @@ std::pair srt::CUDT::packData(CPacket& w_packet) const steady_clock::time_point enter_time = steady_clock::now(); + w_nexttime = enter_time; + if (!is_zero(m_tsNextSendTime) && enter_time > m_tsNextSendTime) { m_tdSendTimeDiff = m_tdSendTimeDiff.load() + (enter_time - m_tsNextSendTime); @@ -9314,7 +9305,7 @@ std::pair srt::CUDT::packData(CPacket& w_packet) // start the dissolving process, this process will // not be started until this function is finished. if (!m_bOpened) - return std::make_pair(false, enter_time); + return false; payload = isRetransmissionAllowed(enter_time) ? packLostData((w_packet)) @@ -9342,7 +9333,7 @@ std::pair srt::CUDT::packData(CPacket& w_packet) { m_tsNextSendTime = steady_clock::time_point(); m_tdSendTimeDiff = steady_clock::duration(); - return std::make_pair(false, enter_time); + return false; } new_packet_packed = true; @@ -9424,7 +9415,9 @@ std::pair srt::CUDT::packData(CPacket& w_packet) #endif } - return std::make_pair(payload >= 0, m_tsNextSendTime); + w_nexttime = m_tsNextSendTime; + + return payload >= 0; // XXX shouldn't be > 0 ? == 0 is only when buffer range exceeded. } bool srt::CUDT::packUniqueData(CPacket& w_packet) @@ -9435,7 +9428,7 @@ bool srt::CUDT::packUniqueData(CPacket& w_packet) if (cwnd <= flightspan) { HLOGC(qslog.Debug, - log << CONID() << "packData: CONGESTED: cwnd=min(" << m_iFlowWindowSize << "," << m_dCongestionWindow + log << CONID() << "packUniqueData: CONGESTED: cwnd=min(" << m_iFlowWindowSize << "," << m_dCongestionWindow << ")=" << cwnd << " seqlen=(" << m_iSndLastAck << "-" << m_iSndCurrSeqNo << ")=" << flightspan); return false; } @@ -9453,10 +9446,13 @@ bool srt::CUDT::packUniqueData(CPacket& w_packet) { // Some packets were skipped due to TTL expiry. m_iSndCurrSeqNo = CSeqNo::incseq(m_iSndCurrSeqNo, pktskipseqno); + HLOGC(qslog.Debug, log << "packUniqueData: reading skipped " << pktskipseqno << " seq up to %" << m_iSndCurrSeqNo + << " due to TTL expiry"); } if (pld_size == 0) { + HLOGC(qslog.Debug, log << "packUniqueData: nothing extracted from the buffer"); return false; } @@ -9482,7 +9478,7 @@ bool srt::CUDT::packUniqueData(CPacket& w_packet) // no ACK to be awaited. We can screw up all the variables that are // initialized from ISN just after connection. LOGC(qslog.Note, - log << CONID() << "packData: Fixing EXTRACTION sequence " << m_iSndCurrSeqNo + log << CONID() << "packUniqueData: Fixing EXTRACTION sequence " << m_iSndCurrSeqNo << " from SCHEDULING sequence " << w_packet.m_iSeqNo << " for the first packet: DIFF=" << packetspan << " STAMP=" << BufferStamp(w_packet.m_pcData, w_packet.getLength())); } @@ -9490,7 +9486,7 @@ bool srt::CUDT::packUniqueData(CPacket& w_packet) { // There will be a serious data discrepancy between the agent and the peer. LOGC(qslog.Error, - log << CONID() << "IPE: packData: Fixing EXTRACTION sequence " << m_iSndCurrSeqNo + log << CONID() << "IPE: packUniqueData: Fixing EXTRACTION sequence " << m_iSndCurrSeqNo << " from SCHEDULING sequence " << w_packet.m_iSeqNo << " in the middle of transition: DIFF=" << packetspan << " STAMP=" << BufferStamp(w_packet.m_pcData, w_packet.getLength())); } @@ -9503,7 +9499,7 @@ bool srt::CUDT::packUniqueData(CPacket& w_packet) seqpair[1] = CSeqNo::decseq(w_packet.m_iSeqNo); const int32_t no_msgno = 0; LOGC(qslog.Debug, - log << CONID() << "packData: Sending DROPREQ: SEQ: " << seqpair[0] << " - " << seqpair[1] << " (" + log << CONID() << "packUniqueData: Sending DROPREQ: SEQ: " << seqpair[0] << " - " << seqpair[1] << " (" << packetspan << " packets)"); sendCtrl(UMSG_DROPREQ, &no_msgno, seqpair, sizeof(seqpair)); // In case when this message is lost, the peer will still get the @@ -9532,7 +9528,7 @@ bool srt::CUDT::packUniqueData(CPacket& w_packet) #endif { HLOGC(qslog.Debug, - log << CONID() << "packData: Applying EXTRACTION sequence " << m_iSndCurrSeqNo + log << CONID() << "packUniqueData: Applying EXTRACTION sequence " << m_iSndCurrSeqNo << " over SCHEDULING sequence " << w_packet.m_iSeqNo << " for socket not in group:" << " DIFF=" << CSeqNo::seqcmp(m_iSndCurrSeqNo, w_packet.m_iSeqNo) << " STAMP=" << BufferStamp(w_packet.m_pcData, w_packet.getLength())); @@ -10299,67 +10295,6 @@ void srt::CUDT::updateIdleLinkFrom(CUDT* source) setInitialRcvSeq(source->m_iRcvLastSkipAck); } -// XXX This function is currently unused. It should be fixed and put into use. -// See the blocked call in CUDT::processData(). -// XXX REVIEW LOCKS WHEN REACTIVATING! -srt::CUDT::loss_seqs_t srt::CUDT::defaultPacketArrival(void* vself, CPacket& pkt) -{ -// [[using affinity(m_pRcvBuffer->workerThread())]]; - CUDT* self = (CUDT*)vself; - loss_seqs_t output; - - // XXX When an alternative packet arrival callback is installed - // in case of groups, move this part to the groupwise version. - - if (self->m_parent->m_GroupOf) - { - groups::SocketData* gi = self->m_parent->m_GroupMemberData; - if (gi->rcvstate < SRT_GST_RUNNING) // PENDING or IDLE, tho PENDING is unlikely - { - HLOGC(qrlog.Debug, log << "defaultPacketArrival: IN-GROUP rcv state transition to RUNNING. NOT checking for loss"); - gi->rcvstate = SRT_GST_RUNNING; - return output; - } - } - - const int initial_loss_ttl = (self->m_bPeerRexmitFlag) ? self->m_iReorderTolerance : 0; - - int seqdiff = CSeqNo::seqcmp(pkt.m_iSeqNo, self->m_iRcvCurrSeqNo); - - HLOGC(qrlog.Debug, log << "defaultPacketArrival: checking sequence " << pkt.m_iSeqNo - << " against latest " << self->m_iRcvCurrSeqNo << " (distance: " << seqdiff << ")"); - - // Loss detection. - if (seqdiff > 1) // packet is later than the very subsequent packet - { - const int32_t seqlo = CSeqNo::incseq(self->m_iRcvCurrSeqNo); - const int32_t seqhi = CSeqNo::decseq(pkt.m_iSeqNo); - - { - // If loss found, insert them to the receiver loss list - ScopedLock lg (self->m_RcvLossLock); - self->m_pRcvLossList->insert(seqlo, seqhi); - - if (initial_loss_ttl) - { - // pack loss list for (possibly belated) NAK - // The LOSSREPORT will be sent in a while. - self->m_FreshLoss.push_back(CRcvFreshLoss(seqlo, seqhi, initial_loss_ttl)); - HLOGF(qrlog.Debug, "defaultPacketArrival: added loss sequence %d-%d (%d) with tolerance %d", seqlo, seqhi, - 1+CSeqNo::seqcmp(seqhi, seqlo), initial_loss_ttl); - } - } - - if (!initial_loss_ttl) - { - // old code; run immediately when tolerance = 0 - // or this feature isn't used because of the peer - output.push_back(make_pair(seqlo, seqhi)); - } - } - - return output; -} #endif /// This function is called when a packet has arrived, which was behind the current @@ -10438,53 +10373,8 @@ void srt::CUDT::unlose(const CPacket &packet) if (m_bPeerRexmitFlag == 0 || m_iReorderTolerance == 0) return; - size_t i = 0; - int had_ttl = 0; - for (i = 0; i < m_FreshLoss.size(); ++i) - { - had_ttl = m_FreshLoss[i].ttl; - switch (m_FreshLoss[i].revoke(sequence)) - { - case CRcvFreshLoss::NONE: - continue; // Not found. Search again. - - case CRcvFreshLoss::STRIPPED: - goto breakbreak; // Found and the modification is applied. We're done here. - - case CRcvFreshLoss::DELETE: - // No more elements. Kill it. - m_FreshLoss.erase(m_FreshLoss.begin() + i); - // Every loss is unique. We're done here. - goto breakbreak; - - case CRcvFreshLoss::SPLIT: - // Oh, this will be more complicated. This means that it was in between. - { - // So create a new element that will hold the upper part of the range, - // and this one modify to be the lower part of the range. - - // Keep the current end-of-sequence value for the second element - int32_t next_end = m_FreshLoss[i].seq[1]; - - // seq-1 set to the end of this element - m_FreshLoss[i].seq[1] = CSeqNo::decseq(sequence); - // seq+1 set to the begin of the next element - int32_t next_begin = CSeqNo::incseq(sequence); - - // Use position of the NEXT element because insertion happens BEFORE pointed element. - // Use the same TTL (will stay the same in the other one). - m_FreshLoss.insert(m_FreshLoss.begin() + i + 1, - CRcvFreshLoss(next_begin, next_end, m_FreshLoss[i].ttl)); - } - goto breakbreak; - } - } - - // Could have made the "return" instruction instead of goto, but maybe there will be something - // to add in future, so keeping that. -breakbreak:; - - if (i != m_FreshLoss.size()) + int had_ttl = 0; + if (CRcvFreshLoss::removeOne((m_FreshLoss), sequence, (&had_ttl))) { HLOGF(qrlog.Debug, "sequence %d removed from belated lossreport record", sequence); } @@ -11031,7 +10921,10 @@ int srt::CUDT::checkNAKTimer(const steady_clock::time_point& currtime) * not knowing what to retransmit when the only NAK sent by receiver is lost, * all packets past last ACK are retransmitted (rexmitMethod() == SRM_FASTREXMIT). */ + enterCS(m_RcvLossLock); const int loss_len = m_pRcvLossList->getLossLength(); + leaveCS(m_RcvLossLock); + SRT_ASSERT(loss_len >= 0); int debug_decision = BECAUSE_NO_REASON; @@ -11108,7 +11001,7 @@ bool srt::CUDT::checkExpTimer(const steady_clock::time_point& currtime, int chec // Application will detect this when it calls any UDT methods next time. // HLOGC(xtlog.Debug, - log << CONID() << "CONNECTION EXPIRED after " << count_milliseconds(currtime - last_rsp_time) << "ms"); + log << CONID() << "CONNECTION EXPIRED after " << FormatDuration(currtime - last_rsp_time) << " - BREAKING"); m_bClosing = true; m_bBroken = true; m_iBrokenCounter = 30; @@ -11331,6 +11224,7 @@ void srt::CUDT::completeBrokenConnectionDependencies(int errorcode) { // XXX This somehow can cause a deadlock // uglobal()->close(m_parent); + LOGC(smlog.Debug, log << "updateBrokenConnection...: BROKEN SOCKET @" << m_SocketID << " - CLOSING, to be removed from group."); m_parent->setBrokenClosed(); } #endif diff --git a/srtcore/core.h b/srtcore/core.h index c6d7cbaf2..50e6287b4 100644 --- a/srtcore/core.h +++ b/srtcore/core.h @@ -312,8 +312,10 @@ class CUDT int32_t schedSeqNo() const { return m_iSndNextSeqNo; } bool overrideSndSeqNo(int32_t seq); +#if ENABLE_BONDING sync::steady_clock::time_point lastRspTime() const { return m_tsLastRspTime.load(); } sync::steady_clock::time_point freshActivationStart() const { return m_tsFreshActivation; } +#endif int32_t rcvSeqNo() const { return m_iRcvCurrSeqNo; } int flowWindowSize() const { return m_iFlowWindowSize; } @@ -387,6 +389,11 @@ class CUDT return (int32_t) sync::count_microseconds(from_time - tsStartTime); } + static void setPacketTS(CPacket& p, const time_point& start_time, const time_point& ts) + { + p.m_iTimeStamp = makeTS(ts, start_time); + } + /// @brief Set the timestamp field of the packet using the provided value (no check) /// @param p the packet structure to set the timestamp on. /// @param ts timestamp to use as a source for packet timestamp. @@ -1044,13 +1051,12 @@ class CUDT /// Pack in CPacket the next data to be send. /// - /// @param packet [in, out] a CPacket structure to fill + /// @param packet [out] a CPacket structure to fill + /// @param nexttime [out] Time when this socket should be next time picked up for processing. /// - /// @return A pair of values is returned (is_payload_valid, timestamp). - /// If is_payload_valid is false, there was nothing packed for sending, - /// and the timestamp value should be ignored. - /// The timestamp is the full source/origin timestamp of the data. - std::pair packData(CPacket& packet); + /// @retval true A packet was extracted for sending, the socket should be rechecked at @a nexttime + /// @retval false Nothing was extracted for sending, @a nexttime should be ignored + bool packData(CPacket& packet, time_point& nexttime); int processData(CUnit* unit); void processClose(); @@ -1111,10 +1117,12 @@ class CUDT static const int PACKETPAIR_MASK = 0xF; private: // Timers functions +#if ENABLE_BONDING time_point m_tsFreshActivation; // GROUPS: time of fresh activation of the link, or 0 if past the activation phase or idle time_point m_tsUnstableSince; // GROUPS: time since unexpected ACK delay experienced, or 0 if link seems healthy time_point m_tsWarySince; // GROUPS: time since an unstable link has first some response - +#endif + static const int BECAUSE_NO_REASON = 0, // NO BITS BECAUSE_ACK = 1 << 0, BECAUSE_LITEACK = 1 << 1, diff --git a/srtcore/group.cpp b/srtcore/group.cpp index 40f85a908..88b8427ef 100644 --- a/srtcore/group.cpp +++ b/srtcore/group.cpp @@ -251,7 +251,6 @@ CUDTGroup::CUDTGroup(SRT_GROUP_TYPE gtype) : m_Global(CUDT::uglobal()) , m_GroupID(-1) , m_PeerGroupID(-1) - , m_bSyncOnMsgNo(false) , m_type(gtype) , m_listener() , m_iBusy() @@ -279,8 +278,8 @@ CUDTGroup::CUDTGroup(SRT_GROUP_TYPE gtype) , m_iLastSchedMsgNo(SRT_MSGNO_NONE) { setupMutex(m_GroupLock, "Group"); - setupMutex(m_RcvDataLock, "RcvData"); - setupCond(m_RcvDataCond, "RcvData"); + setupMutex(m_RcvDataLock, "G/RcvData"); + setupCond(m_RcvDataCond, "G/RcvData"); m_RcvEID = m_Global.m_EPoll.create(&m_RcvEpolld); m_SndEID = m_Global.m_EPoll.create(&m_SndEpolld); @@ -336,6 +335,7 @@ void CUDTGroup::GroupContainer::erase(CUDTGroup::gli_t it) } } m_List.erase(it); + --m_SizeCache; } void CUDTGroup::setOpt(SRT_SOCKOPT optName, const void* optval, int optlen) @@ -867,7 +867,7 @@ void CUDTGroup::syncWithSocket(const CUDT& core, const HandshakeSide side) // Get the latency (possibly fixed against the opposite side) // from the first socket (core.m_iTsbPdDelay_ms), // and set it on the current socket. - set_latency(core.m_iTsbPdDelay_ms * int64_t(1000)); + set_latency_us(core.m_iTsbPdDelay_ms * int64_t(1000)); } void CUDTGroup::close() @@ -893,6 +893,12 @@ void CUDTGroup::close() HLOGC(smlog.Debug, log << "group/close: IPE(NF): group member @" << ig->id << " already deleted"); continue; } + + // XXX This is not true in case of non-managed groups, which + // only collect sockets, but also non-managed groups should not + // use common group buffering and tsbpd. + s->setClosing(); + s->m_GroupOf = NULL; s->m_GroupMemberData = NULL; HLOGC(smlog.Debug, log << "group/close: CUTTING OFF @" << ig->id << " (found as @" << s->m_SocketID << ") from the group"); diff --git a/srtcore/group.h b/srtcore/group.h index b8b774b97..7c71ffedd 100644 --- a/srtcore/group.h +++ b/srtcore/group.h @@ -406,7 +406,9 @@ class CUDTGroup SRTSOCKET m_PeerGroupID; struct GroupContainer { - std::list m_List; + private: + std::list m_List; + sync::atomic m_SizeCache; /// This field is used only by some types of groups that need /// to keep track as to which link was lately used. Note that @@ -414,8 +416,11 @@ class CUDTGroup /// must be appropriately reset. gli_t m_LastActiveLink; + public: + GroupContainer() - : m_LastActiveLink(m_List.end()) + : m_SizeCache(0) + , m_LastActiveLink(m_List.end()) { } @@ -425,18 +430,18 @@ class CUDTGroup gli_t begin() { return m_List.begin(); } gli_t end() { return m_List.end(); } bool empty() { return m_List.empty(); } - void push_back(const SocketData& data) { m_List.push_back(data); } + void push_back(const SocketData& data) { m_List.push_back(data); ++m_SizeCache; } void clear() { m_LastActiveLink = end(); m_List.clear(); + m_SizeCache = 0; } - size_t size() { return m_List.size(); } + size_t size() { return m_SizeCache; } void erase(gli_t it); }; GroupContainer m_Group; - const bool m_bSyncOnMsgNo; // It goes into a dedicated HS field. Could be true for balancing groups (not implemented). SRT_GROUP_TYPE m_type; CUDTSocket* m_listener; // A "group" can only have one listener. srt::sync::atomic m_iBusy; @@ -609,7 +614,7 @@ class CUDTGroup private: // Fields required for SRT_GTYPE_BACKUP groups. - senderBuffer_t m_SenderBuffer; + senderBuffer_t m_SenderBuffer; // This mechanism is to be removed on group-common sndbuf int32_t m_iSndOldestMsgNo; // oldest position in the sender buffer sync::atomic m_iSndAckedMsgNo; uint32_t m_uOPT_MinStabilityTimeout_us; @@ -812,8 +817,7 @@ class CUDTGroup SRTU_PROPERTY_RW_CHAIN(CUDTGroup, SRT_GROUP_TYPE, type, m_type); SRTU_PROPERTY_RW_CHAIN(CUDTGroup, int32_t, currentSchedSequence, m_iLastSchedSeqNo); SRTU_PROPERTY_RRW(std::set&, epollset, m_sPollID); - SRTU_PROPERTY_RW_CHAIN(CUDTGroup, int64_t, latency, m_iTsbPdDelay_us); - SRTU_PROPERTY_RO(bool, synconmsgno, m_bSyncOnMsgNo); + SRTU_PROPERTY_RW_CHAIN(CUDTGroup, int64_t, latency_us, m_iTsbPdDelay_us); SRTU_PROPERTY_RO(bool, closing, m_bClosing); }; diff --git a/srtcore/handshake.h b/srtcore/handshake.h index 93a351f39..87c9d5761 100644 --- a/srtcore/handshake.h +++ b/srtcore/handshake.h @@ -311,6 +311,8 @@ class CHandShake // Applicable only when m_iVersion == HS_VERSION_SRT1 int32_t flags() { return m_iType; } + bool v5() { return m_iVersion > 4; } + public: int32_t m_iVersion; // UDT version (HS_VERSION_* symbols) int32_t m_iType; // UDT4: socket type (only UDT_DGRAM is valid); SRT1: extension flags diff --git a/srtcore/list.cpp b/srtcore/list.cpp index 78ee99ecb..8ac032028 100644 --- a/srtcore/list.cpp +++ b/srtcore/list.cpp @@ -97,15 +97,7 @@ srt::CSndLossList::~CSndLossList() void srt::CSndLossList::traceState() const { - int pos = m_iHead; - while (pos != SRT_SEQNO_NONE) - { - std::cout << pos << ":[" << m_caSeq[pos].seqstart; - if (m_caSeq[pos].seqend != SRT_SEQNO_NONE) - std::cout << ", " << m_caSeq[pos].seqend; - std::cout << "], "; - pos = m_caSeq[pos].inext; - } + traceState(std::cout); std::cout << "\n"; } @@ -506,6 +498,10 @@ srt::CRcvLossList::~CRcvLossList() int srt::CRcvLossList::insert(int32_t seqno1, int32_t seqno2) { + SRT_ASSERT(seqno1 != SRT_SEQNO_NONE && seqno2 != SRT_SEQNO_NONE); + // Make sure that seqno2 isn't earlier than seqno1. + SRT_ASSERT(CSeqNo::seqcmp(seqno1, seqno2) <= 0); + // Data to be inserted must be larger than all those in the list if (m_iLargestSeq != SRT_SEQNO_NONE && CSeqNo::seqcmp(seqno1, m_iLargestSeq) <= 0) { @@ -865,3 +861,64 @@ srt::CRcvFreshLoss::Emod srt::CRcvFreshLoss::revoke(int32_t lo, int32_t hi) return DELETE; } + +bool srt::CRcvFreshLoss::removeOne(std::deque& w_container, int32_t sequence, int* pw_had_ttl) +{ + size_t i = 0; + int had_ttl = 0; + for (i = 0; i < w_container.size(); ++i) + { + had_ttl = w_container[i].ttl; + switch (w_container[i].revoke(sequence)) + { + case CRcvFreshLoss::NONE: + continue; // Not found. Search again. + + case CRcvFreshLoss::STRIPPED: + goto breakbreak; // Found and the modification is applied. We're done here. + + case CRcvFreshLoss::DELETE: + // No more elements. Kill it. + w_container.erase(w_container.begin() + i); + // Every loss is unique. We're done here. + goto breakbreak; + + case CRcvFreshLoss::SPLIT: + // Oh, this will be more complicated. This means that it was in between. + { + // So create a new element that will hold the upper part of the range, + // and this one modify to be the lower part of the range. + + // Keep the current end-of-sequence value for the second element + int32_t next_end = w_container[i].seq[1]; + + // seq-1 set to the end of this element + w_container[i].seq[1] = CSeqNo::decseq(sequence); + // seq+1 set to the begin of the next element + int32_t next_begin = CSeqNo::incseq(sequence); + + // Use position of the NEXT element because insertion happens BEFORE pointed element. + // Use the same TTL (will stay the same in the other one). + w_container.insert(w_container.begin() + i + 1, + CRcvFreshLoss(next_begin, next_end, w_container[i].ttl)); + } + goto breakbreak; + } + } + + // Could have made the "return" instruction instead of goto, but maybe there will be something + // to add in future, so keeping that. +breakbreak: + ; + + if (pw_had_ttl) + *pw_had_ttl = had_ttl; + + if (i != w_container.size()) + { + return true; + } + + return false; +} + diff --git a/srtcore/list.h b/srtcore/list.h index c3b9d5089..2e99b29ab 100644 --- a/srtcore/list.h +++ b/srtcore/list.h @@ -53,6 +53,8 @@ modified by #ifndef INC_SRT_LIST_H #define INC_SRT_LIST_H +#include + #include "udt.h" #include "common.h" @@ -82,8 +84,33 @@ class CSndLossList /// @return The seq. no. or -1 if the list is empty. int32_t popLostSeq(); + template + Stream& traceState(Stream& sout) const + { + int pos = m_iHead; + while (pos != SRT_SEQNO_NONE) + { + sout << "[" << pos << "]:" << m_caSeq[pos].seqstart; + if (m_caSeq[pos].seqend != SRT_SEQNO_NONE) + sout << ":" << m_caSeq[pos].seqend; + if (m_caSeq[pos].inext == -1) + sout << "=|"; + else + sout << "->[" << m_caSeq[pos].inext << "]"; + sout << ", "; + pos = m_caSeq[pos].inext; + } + sout << " {len:" << m_iLength << " head:" << m_iHead << " last:" << m_iLastInsertPos << "}"; + return sout; + } void traceState() const; + // Debug/unittest support. + + int head() const { return m_iHead; } + int next(int loc) const { return m_caSeq[loc].inext; } + int last() const { return m_iLastInsertPos; } + private: struct Seq { @@ -118,6 +145,8 @@ class CSndLossList /// @param seqno2 last sequence number in range (SRT_SEQNO_NONE if no range) bool updateElement(int pos, int32_t seqno1, int32_t seqno2); + static const int LOC_NONE = -1; + private: CSndLossList(const CSndLossList&); CSndLossList& operator=(const CSndLossList&); @@ -264,6 +293,8 @@ struct CRcvFreshLoss Emod revoke(int32_t sequence); Emod revoke(int32_t lo, int32_t hi); + + static bool removeOne(std::deque& w_container, int32_t sequence, int* had_ttl = NULL); }; } // namespace srt diff --git a/srtcore/logging.h b/srtcore/logging.h index e79785b46..683dada3e 100644 --- a/srtcore/logging.h +++ b/srtcore/logging.h @@ -74,7 +74,7 @@ written by #define HLOGP LOGP #define HLOGF LOGF -#define IF_HEAVY_LOGGING(instr) instr +#define IF_HEAVY_LOGGING(instr,...) instr,##__VA_ARGS__ #else diff --git a/srtcore/packet.cpp b/srtcore/packet.cpp index 813d07995..cbe4dd90d 100644 --- a/srtcore/packet.cpp +++ b/srtcore/packet.cpp @@ -172,8 +172,10 @@ extern Logger inlog; } using namespace srt_logging; +namespace srt { + // Set up the aliases in the constructure -srt::CPacket::CPacket() +CPacket::CPacket() : m_extra_pad() , m_data_owned(false) , m_iSeqNo((int32_t&)(m_nHeader[SRT_PH_SEQNO])) @@ -194,12 +196,12 @@ srt::CPacket::CPacket() m_PacketVector[PV_DATA].set(NULL, 0); } -char* srt::CPacket::getData() +char* CPacket::getData() { return (char*)m_PacketVector[PV_DATA].dataRef(); } -void srt::CPacket::allocate(size_t alloc_buffer_size) +void CPacket::allocate(size_t alloc_buffer_size) { if (m_data_owned) { @@ -213,14 +215,14 @@ void srt::CPacket::allocate(size_t alloc_buffer_size) m_data_owned = true; } -void srt::CPacket::deallocate() +void CPacket::deallocate() { if (m_data_owned) delete[](char*) m_PacketVector[PV_DATA].data(); m_PacketVector[PV_DATA].set(NULL, 0); } -char* srt::CPacket::release() +char* CPacket::release() { // When not owned, release returns NULL. char* buffer = NULL; @@ -234,7 +236,7 @@ char* srt::CPacket::release() return buffer; } -srt::CPacket::~CPacket() +CPacket::~CPacket() { // PV_HEADER is always owned, PV_DATA may use a "borrowed" buffer. // Delete the internal buffer only if it was declared as owned. @@ -242,30 +244,78 @@ srt::CPacket::~CPacket() delete[](char*) m_PacketVector[PV_DATA].data(); } -size_t srt::CPacket::getLength() const +size_t CPacket::getLength() const { return m_PacketVector[PV_DATA].size(); } -void srt::CPacket::setLength(size_t len) +void CPacket::setLength(size_t len) { m_PacketVector[PV_DATA].setLength(len); } -void srt::CPacket::setLength(size_t len, size_t cap) +void CPacket::setLength(size_t len, size_t cap) { SRT_ASSERT(len <= cap); setLength(len); m_zCapacity = cap; } -void srt::CPacket::pack(UDTMessageType pkttype, const int32_t* lparam, void* rparam, size_t size) +#if ENABLE_HEAVY_LOGGING +// Debug only +static std::string FormatNumbers(UDTMessageType pkttype, const int32_t* lparam, void* rparam, size_t size) +{ + // This may be changed over time, so use special interpretation + // only for certain types, and still display all data, no matter + // if it is expected to provide anything or not. + std::ostringstream out; + + out << "ARG="; + if (lparam) + out << *lparam; + else + out << "none"; + + if (size == 0) + { + out << " [no data]"; + return out.str(); + } + else if (!rparam) + { + out << " [ {" << size << "} ]"; + return out.str(); + } + + bool interp_as_seq = (pkttype == UMSG_LOSSREPORT || pkttype == UMSG_DROPREQ); + + out << " [ "; + for (size_t i = 0; i < size; ++i) + { + int32_t val = ((int32_t*)rparam)[i]; + if (interp_as_seq) + { + if (val & LOSSDATA_SEQNO_RANGE_FIRST) + out << "<" << (val & (~LOSSDATA_SEQNO_RANGE_FIRST)) << ">"; + else + out << val; + } + else + { + out << std::showpos << std::hex << val << "/" << std::dec << val; + } + } + + out << "]"; + return out.str(); +} +#endif + +void CPacket::pack(UDTMessageType pkttype, const int32_t* lparam, void* rparam, size_t size) { // Set (bit-0 = 1) and (bit-1~15 = type) setControl(pkttype); - HLOGC(inlog.Debug, - log << "pack: type=" << MessageTypeStr(pkttype) << " ARG=" << (lparam ? Sprint(*lparam) : std::string("NULL")) - << " [ " << (rparam ? Sprint(*(int32_t*)rparam) : std::string()) << " ]"); + HLOGC(inlog.Debug, log << "pack: type=" << MessageTypeStr(pkttype) << FormatNumbers(pkttype, lparam, rparam, size)); // Set additional information and control information field switch (pkttype) @@ -371,7 +421,7 @@ void srt::CPacket::pack(UDTMessageType pkttype, const int32_t* lparam, void* rpa } } -void srt::CPacket::toNL() +void CPacket::toNL() { // XXX USE HtoNLA! if (isControl()) @@ -389,7 +439,7 @@ void srt::CPacket::toNL() } } -void srt::CPacket::toHL() +void CPacket::toHL() { // convert back into local host order uint32_t* p = m_nHeader; @@ -406,22 +456,22 @@ void srt::CPacket::toHL() } } -srt::IOVector* srt::CPacket::getPacketVector() +IOVector* CPacket::getPacketVector() { return m_PacketVector; } -srt::UDTMessageType srt::CPacket::getType() const +UDTMessageType CPacket::getType() const { return UDTMessageType(SEQNO_MSGTYPE::unwrap(m_nHeader[SRT_PH_SEQNO])); } -int srt::CPacket::getExtendedType() const +int CPacket::getExtendedType() const { return SEQNO_EXTTYPE::unwrap(m_nHeader[SRT_PH_SEQNO]); } -int32_t srt::CPacket::getAckSeqNo() const +int32_t CPacket::getAckSeqNo() const { // read additional information field // This field is used only in UMSG_ACK and UMSG_ACKACK, @@ -430,7 +480,7 @@ int32_t srt::CPacket::getAckSeqNo() const return m_nHeader[SRT_PH_MSGNO]; } -uint16_t srt::CPacket::getControlFlags() const +uint16_t CPacket::getControlFlags() const { // This returns exactly the "extended type" value, // which is not used at all in case when the standard @@ -439,17 +489,17 @@ uint16_t srt::CPacket::getControlFlags() const return SEQNO_EXTTYPE::unwrap(m_nHeader[SRT_PH_SEQNO]); } -srt::PacketBoundary srt::CPacket::getMsgBoundary() const +PacketBoundary CPacket::getMsgBoundary() const { return PacketBoundary(MSGNO_PACKET_BOUNDARY::unwrap(m_nHeader[SRT_PH_MSGNO])); } -bool srt::CPacket::getMsgOrderFlag() const +bool CPacket::getMsgOrderFlag() const { return 0 != MSGNO_PACKET_INORDER::unwrap(m_nHeader[SRT_PH_MSGNO]); } -int32_t srt::CPacket::getMsgSeq(bool has_rexmit) const +int32_t CPacket::getMsgSeq(bool has_rexmit) const { if (has_rexmit) { @@ -461,18 +511,18 @@ int32_t srt::CPacket::getMsgSeq(bool has_rexmit) const } } -bool srt::CPacket::getRexmitFlag() const +bool CPacket::getRexmitFlag() const { return 0 != MSGNO_REXMIT::unwrap(m_nHeader[SRT_PH_MSGNO]); } -void srt::CPacket::setRexmitFlag(bool bRexmit) +void CPacket::setRexmitFlag(bool bRexmit) { const int32_t clr_msgno = m_nHeader[SRT_PH_MSGNO] & ~MSGNO_REXMIT::mask; m_nHeader[SRT_PH_MSGNO] = clr_msgno | MSGNO_REXMIT::wrap(bRexmit? 1 : 0); } -srt::EncryptionKeySpec srt::CPacket::getMsgCryptoFlags() const +EncryptionKeySpec CPacket::getMsgCryptoFlags() const { return EncryptionKeySpec(MSGNO_ENCKEYSPEC::unwrap(m_nHeader[SRT_PH_MSGNO])); } @@ -480,19 +530,19 @@ srt::EncryptionKeySpec srt::CPacket::getMsgCryptoFlags() const // This is required as the encryption/decryption happens in place. // This is required to clear off the flags after decryption or set // crypto flags after encrypting a packet. -void srt::CPacket::setMsgCryptoFlags(EncryptionKeySpec spec) +void CPacket::setMsgCryptoFlags(EncryptionKeySpec spec) { int32_t clr_msgno = m_nHeader[SRT_PH_MSGNO] & ~MSGNO_ENCKEYSPEC::mask; m_nHeader[SRT_PH_MSGNO] = clr_msgno | EncryptionKeyBits(spec); } -uint32_t srt::CPacket::getMsgTimeStamp() const +uint32_t CPacket::getMsgTimeStamp() const { // SRT_DEBUG_TSBPD_WRAP may enable smaller timestamp for faster wraparoud handling tests return (uint32_t)m_nHeader[SRT_PH_TIMESTAMP] & TIMESTAMP_MASK; } -srt::CPacket* srt::CPacket::clone() const +CPacket* CPacket::clone() const { CPacket* pkt = new CPacket; memcpy((pkt->m_nHeader), m_nHeader, HDR_SIZE); @@ -503,9 +553,6 @@ srt::CPacket* srt::CPacket::clone() const return pkt; } -namespace srt -{ - // Useful for debugging std::string PacketMessageFlagStr(uint32_t msgno_field) { @@ -534,10 +581,8 @@ inline void SprintSpecialWord(std::ostream& os, int32_t val) os << val; } -} // namespace srt - #if ENABLE_LOGGING -std::string srt::CPacket::Info() +std::string CPacket::Info() { std::ostringstream os; os << "TARGET=@" << m_iID << " "; @@ -592,3 +637,5 @@ std::string srt::CPacket::Info() return os.str(); } #endif + +} // end namespace srt diff --git a/srtcore/queue.cpp b/srtcore/queue.cpp index 5011a241c..2eb268736 100644 --- a/srtcore/queue.cpp +++ b/srtcore/queue.cpp @@ -481,6 +481,25 @@ bool srt::CSndQueue::getBind(char* dst, size_t len) const } #endif +#if defined(SRT_DEBUG_SNDQ_HIGHRATE) +static void CSndQueueDebugHighratePrint(const CSndQueue* self, const steady_clock::time_point currtime) +{ + if (self->m_ullDbgTime <= currtime) + { + fprintf(stdout, + "SndQueue %lu slt:%lu nrp:%lu snt:%lu nrt:%lu ctw:%lu\n", + self->m_WorkerStats.lIteration, + self->m_WorkerStats.lSleepTo, + self->m_WorkerStats.lNotReadyPop, + self->m_WorkerStats.lSendTo, + self->m_WorkerStats.lNotReadyTs, + self->m_WorkerStats.lCondWait); + memset(&self->m_WorkerStats, 0, sizeof(self->m_WorkerStats)); + self->m_ullDbgTime = currtime + self->m_ullDbgPeriod; + } +} +#endif + void* srt::CSndQueue::worker(void* param) { CSndQueue* self = (CSndQueue*)param; @@ -492,34 +511,30 @@ void* srt::CSndQueue::worker(void* param) #endif #if defined(SRT_DEBUG_SNDQ_HIGHRATE) +#define IF_DEBUG_HIGHRATE(statement) statement CTimer::rdtsc(self->m_ullDbgTime); self->m_ullDbgPeriod = uint64_t(5000000) * CTimer::getCPUFrequency(); self->m_ullDbgTime += self->m_ullDbgPeriod; +#else +#define IF_DEBUG_HIGHRATE(statement) (void)0 #endif /* SRT_DEBUG_SNDQ_HIGHRATE */ while (!self->m_bClosing) { const steady_clock::time_point next_time = self->m_pSndUList->getNextProcTime(); -#if defined(SRT_DEBUG_SNDQ_HIGHRATE) - self->m_WorkerStats.lIteration++; -#endif /* SRT_DEBUG_SNDQ_HIGHRATE */ + IF_DEBUG_HIGHRATE(self->m_WorkerStats.lIteration++); if (is_zero(next_time)) { -#if defined(SRT_DEBUG_SNDQ_HIGHRATE) - self->m_WorkerStats.lNotReadyTs++; -#endif /* SRT_DEBUG_SNDQ_HIGHRATE */ + IF_DEBUG_HIGHRATE(self->m_WorkerStats.lNotReadyTs++); // wait here if there is no sockets with data to be sent THREAD_PAUSED(); if (!self->m_bClosing) { self->m_pSndUList->waitNonEmpty(); - -#if defined(SRT_DEBUG_SNDQ_HIGHRATE) - self->m_WorkerStats.lCondWait++; -#endif /* SRT_DEBUG_SNDQ_HIGHRATE */ + IF_DEBUG_HIGHRATE(self->m_WorkerStats.lCondWait++); } THREAD_RESUMED(); @@ -529,43 +544,23 @@ void* srt::CSndQueue::worker(void* param) // wait until next processing time of the first socket on the list const steady_clock::time_point currtime = steady_clock::now(); -#if defined(SRT_DEBUG_SNDQ_HIGHRATE) - if (self->m_ullDbgTime <= currtime) - { - fprintf(stdout, - "SndQueue %lu slt:%lu nrp:%lu snt:%lu nrt:%lu ctw:%lu\n", - self->m_WorkerStats.lIteration, - self->m_WorkerStats.lSleepTo, - self->m_WorkerStats.lNotReadyPop, - self->m_WorkerStats.lSendTo, - self->m_WorkerStats.lNotReadyTs, - self->m_WorkerStats.lCondWait); - memset(&self->m_WorkerStats, 0, sizeof(self->m_WorkerStats)); - self->m_ullDbgTime = currtime + self->m_ullDbgPeriod; - } -#endif /* SRT_DEBUG_SNDQ_HIGHRATE */ - - THREAD_PAUSED(); + IF_DEBUG_HIGHRATE(CSndQueueDebugHighratePrint(self, currtime)); if (currtime < next_time) { + THREAD_PAUSED(); self->m_pTimer->sleep_until(next_time); - -#if defined(HAI_DEBUG_SNDQ_HIGHRATE) - self->m_WorkerStats.lSleepTo++; -#endif /* SRT_DEBUG_SNDQ_HIGHRATE */ + THREAD_RESUMED(); + IF_DEBUG_HIGHRATE(self->m_WorkerStats.lSleepTo++); } - THREAD_RESUMED(); // Get a socket with a send request if any. CUDT* u = self->m_pSndUList->pop(); if (u == NULL) { -#if defined(SRT_DEBUG_SNDQ_HIGHRATE) - self->m_WorkerStats.lNotReadyPop++; -#endif /* SRT_DEBUG_SNDQ_HIGHRATE */ + IF_DEBUG_HIGHRATE(self->m_WorkerStats.lNotReadyPop++); continue; } - + #define UST(field) ((u->m_b##field) ? "+" : "-") << #field << " " HLOGC(qslog.Debug, log << "CSndQueue: requesting packet from @" << u->socketID() << " STATUS: " << UST(Listening) @@ -575,36 +570,30 @@ void* srt::CSndQueue::worker(void* param) if (!u->m_bConnected || u->m_bBroken) { -#if defined(SRT_DEBUG_SNDQ_HIGHRATE) - self->m_WorkerStats.lNotReadyPop++; -#endif /* SRT_DEBUG_SNDQ_HIGHRATE */ + IF_DEBUG_HIGHRATE(self->m_WorkerStats.lNotReadyPop++); continue; } // pack a packet from the socket CPacket pkt; - const std::pair res_time = u->packData((pkt)); + steady_clock::time_point next_send_time; + bool valid = u->packData((pkt), (next_send_time)); // Check if payload size is invalid. - if (res_time.first == false) + if (!valid) { -#if defined(SRT_DEBUG_SNDQ_HIGHRATE) - self->m_WorkerStats.lNotReadyPop++; -#endif /* SRT_DEBUG_SNDQ_HIGHRATE */ + IF_DEBUG_HIGHRATE(self->m_WorkerStats.lNotReadyPop++); continue; } const sockaddr_any addr = u->m_PeerAddr; - const steady_clock::time_point next_send_time = res_time.second; if (!is_zero(next_send_time)) self->m_pSndUList->update(u, CSndUList::DO_RESCHEDULE, next_send_time); HLOGC(qslog.Debug, log << self->CONID() << "chn:SENDING: " << pkt.Info()); self->m_pChannel->sendto(addr, pkt); -#if defined(SRT_DEBUG_SNDQ_HIGHRATE) - self->m_WorkerStats.lSendTo++; -#endif /* SRT_DEBUG_SNDQ_HIGHRATE */ + IF_DEBUG_HIGHRATE(self->m_WorkerStats.lSendTo++); } THREAD_EXIT(); diff --git a/srtcore/socketconfig.cpp b/srtcore/socketconfig.cpp index 35c6057b4..0dd4b623c 100644 --- a/srtcore/socketconfig.cpp +++ b/srtcore/socketconfig.cpp @@ -52,6 +52,27 @@ written by #include "srt.h" #include "socketconfig.h" +namespace srt +{ +int RcvBufferSizeOptionToValue(int val, int flightflag, int mss) +{ + // Mimimum recv buffer size is 32 packets + const int mssin_size = mss - CPacket::UDP_HDR_SIZE; + + int bufsize; + if (val > mssin_size * CSrtConfig::DEF_MIN_FLIGHT_PKT) + bufsize = val / mssin_size; + else + bufsize = CSrtConfig::DEF_MIN_FLIGHT_PKT; + + // recv buffer MUST not be greater than FC size + if (bufsize > flightflag) + bufsize = flightflag; + + return bufsize; +} +} + using namespace srt; extern const int32_t SRT_DEF_VERSION = SrtParseVersion(SRT_VERSION); @@ -122,17 +143,7 @@ struct CSrtConfigSetter if (val <= 0) throw CUDTException(MJ_NOTSUP, MN_INVAL, 0); - // Mimimum recv buffer size is 32 packets - const int mssin_size = co.iMSS - CPacket::UDP_HDR_SIZE; - - if (val > mssin_size * co.DEF_MIN_FLIGHT_PKT) - co.iRcvBufSize = val / mssin_size; - else - co.iRcvBufSize = co.DEF_MIN_FLIGHT_PKT; - - // recv buffer MUST not be greater than FC size - if (co.iRcvBufSize > co.iFlightFlagSize) - co.iRcvBufSize = co.iFlightFlagSize; + co.iRcvBufSize = srt::RcvBufferSizeOptionToValue(val, co.iFlightFlagSize, co.iMSS); } }; diff --git a/srtcore/socketconfig.h b/srtcore/socketconfig.h index 8e9086915..7127afc54 100644 --- a/srtcore/socketconfig.h +++ b/srtcore/socketconfig.h @@ -355,6 +355,9 @@ inline bool cast_optval(const void* optval, int optlen) return false; } + +int RcvBufferSizeOptionToValue(int optval, int flightflag, int mss); + } // namespace srt struct SRT_SocketOptionObject diff --git a/srtcore/sync.h b/srtcore/sync.h index 5d62536eb..840fc47c2 100644 --- a/srtcore/sync.h +++ b/srtcore/sync.h @@ -769,7 +769,7 @@ struct DurationUnitName template inline std::string FormatDuration(const steady_clock::duration& dur) { - return Sprint(DurationUnitName::count(dur)) + DurationUnitName::name(); + return Sprint(std::fixed, DurationUnitName::count(dur)) + DurationUnitName::name(); } inline std::string FormatDuration(const steady_clock::duration& dur) diff --git a/srtcore/tsbpd_time.cpp b/srtcore/tsbpd_time.cpp index 046c90b74..162fc7ac7 100644 --- a/srtcore/tsbpd_time.cpp +++ b/srtcore/tsbpd_time.cpp @@ -220,7 +220,16 @@ CTsbpdTime::time_point CTsbpdTime::getTsbPdTimeBase(uint32_t timestamp_us) const CTsbpdTime::time_point CTsbpdTime::getPktTsbPdTime(uint32_t usPktTimestamp) const { - return getPktTsbPdBaseTime(usPktTimestamp) + m_tdTsbPdDelay + microseconds_from(m_DriftTracer.drift()); + time_point value = getPktTsbPdBaseTime(usPktTimestamp) + m_tdTsbPdDelay + microseconds_from(m_DriftTracer.drift()); + + /* + HLOGC(brlog.Debug, log << "getPktTsbPdTime:" + << " BASE=" << FormatTime(m_tsTsbPdTimeBase) + << " TS=" << usPktTimestamp << "us, lat=" << FormatDuration(m_tdTsbPdDelay) + << " DRF=" << m_DriftTracer.drift() << "us = " << FormatTime(value)); + */ + + return value; } CTsbpdTime::time_point CTsbpdTime::getPktTsbPdBaseTime(uint32_t usPktTimestamp) const diff --git a/srtcore/utilities.h b/srtcore/utilities.h index 31e05b205..203b1bf8f 100644 --- a/srtcore/utilities.h +++ b/srtcore/utilities.h @@ -534,6 +534,34 @@ namespace srt_pair_op } } +namespace any_op +{ + template + struct AnyProxy + { + const T& value; + bool result; + + AnyProxy(const T& x, bool res): value(x), result(res) {} + + AnyProxy& operator,(const T& val) + { + if (result) + return *this; + result = value == val; + return *this; + } + + operator bool() { return result; } + }; + + template inline + AnyProxy EqualAny(const T& checked_val) + { + return AnyProxy(checked_val, false); + } +} + #if HAVE_CXX11 template @@ -666,6 +694,15 @@ inline std::string Sprint(const Arg1& arg) return sout.str(); } +// Ok, let it be 2-arg, in case when a manipulator is needed +template +inline std::string Sprint(const Arg1& arg1, const Arg2& arg2) +{ + std::ostringstream sout; + sout << arg1 << arg2; + return sout.str(); +} + template inline std::string Printable(const Container& in) { @@ -748,6 +785,19 @@ inline void insert_uniq(std::vector& v, const ArgValue& val) v.push_back(val); } +template +inline std::pair Tie(Type1& var1, Type2& var2) +{ + return std::pair(var1, var2); +} + +template +inline void FringeValues(const Container& from, std::map& out) +{ + for (typename Container::const_iterator i = from.begin(); i != from.end(); ++i) + ++out[*i]; +} + template struct CallbackHolder { @@ -1058,11 +1108,11 @@ inline ValueType avg_iir_w(ValueType old_value, ValueType new_value, size_t new_ // This relies only on a convention, which is the following: // // V x = object.prop(); <-- get the property's value -// object.prop(x); <-- set the property a value +// object.set_prop(x); <-- set the property a value // // Properties might be also chained when setting: // -// object.prop1(v1).prop2(v2).prop3(v3); +// object.set_prop1(v1).set_prop2(v2).set_prop3(v3); // // Properties may be defined various even very complicated // ways, which is simply providing a method with body. In order diff --git a/test/filelist.maf b/test/filelist.maf index c986ef56f..2749c682d 100644 --- a/test/filelist.maf +++ b/test/filelist.maf @@ -3,7 +3,6 @@ any.hpp SOURCES test_buffer_rcv.cpp -test_bonding.cpp test_common.cpp test_connection_timeout.cpp test_crypto.cpp @@ -30,3 +29,4 @@ test_reuseaddr.cpp # Tests for bonding only - put here! SOURCES - ENABLE_BONDING +test_bonding.cpp diff --git a/testing/testmedia.cpp b/testing/testmedia.cpp index 5193cf5fa..a0f665477 100755 --- a/testing/testmedia.cpp +++ b/testing/testmedia.cpp @@ -944,16 +944,36 @@ void TransmitGroupSocketConnect(void* srtcommon, SRTSOCKET sock, int error, cons Verb() << " IPE: LINK NOT FOUND???]"; } -void SrtCommon::OpenGroupClient() +SRT_GROUP_TYPE ResolveGroupType(const string& name) { - SRT_GROUP_TYPE type = SRT_GTYPE_UNDEFINED; + static struct + { + string name; + SRT_GROUP_TYPE type; + } table [] { +#define E(n) {#n, SRT_GTYPE_##n} + E(BROADCAST), + E(BACKUP) - // Resolve group type. - if (m_group_type == "broadcast") - type = SRT_GTYPE_BROADCAST; - else if (m_group_type == "backup") - type = SRT_GTYPE_BACKUP; - else +#undef E + }; + + typedef int charxform(int c); + + string uname; + transform(name.begin(), name.end(), back_inserter(uname), (charxform*)(&toupper)); + + for (auto& x: table) + if (x.name == uname) + return x.type; + + return SRT_GTYPE_UNDEFINED; +} + +void SrtCommon::OpenGroupClient() +{ + SRT_GROUP_TYPE type = ResolveGroupType(m_group_type); + if (type == SRT_GTYPE_UNDEFINED) { Error("With //group, type='" + m_group_type + "' undefined"); } From 01ef79965cb0232f0a7069c01b792f12a90996d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Ma=C5=82ecki?= Date: Thu, 3 Nov 2022 09:48:42 +0100 Subject: [PATCH 2/6] Fixed spare code for debugging high bitrate sending --- srtcore/queue.cpp | 12 ++++++------ srtcore/queue.h | 10 ++++++---- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/srtcore/queue.cpp b/srtcore/queue.cpp index 2eb268736..9cfe66007 100644 --- a/srtcore/queue.cpp +++ b/srtcore/queue.cpp @@ -482,9 +482,9 @@ bool srt::CSndQueue::getBind(char* dst, size_t len) const #endif #if defined(SRT_DEBUG_SNDQ_HIGHRATE) -static void CSndQueueDebugHighratePrint(const CSndQueue* self, const steady_clock::time_point currtime) +static void CSndQueueDebugHighratePrint(const srt::CSndQueue* self, const steady_clock::time_point currtime) { - if (self->m_ullDbgTime <= currtime) + if (self->m_DbgTime <= currtime) { fprintf(stdout, "SndQueue %lu slt:%lu nrp:%lu snt:%lu nrt:%lu ctw:%lu\n", @@ -495,7 +495,7 @@ static void CSndQueueDebugHighratePrint(const CSndQueue* self, const steady_cloc self->m_WorkerStats.lNotReadyTs, self->m_WorkerStats.lCondWait); memset(&self->m_WorkerStats, 0, sizeof(self->m_WorkerStats)); - self->m_ullDbgTime = currtime + self->m_ullDbgPeriod; + self->m_DbgTime = currtime + self->m_DbgPeriod; } } #endif @@ -512,9 +512,9 @@ void* srt::CSndQueue::worker(void* param) #if defined(SRT_DEBUG_SNDQ_HIGHRATE) #define IF_DEBUG_HIGHRATE(statement) statement - CTimer::rdtsc(self->m_ullDbgTime); - self->m_ullDbgPeriod = uint64_t(5000000) * CTimer::getCPUFrequency(); - self->m_ullDbgTime += self->m_ullDbgPeriod; + self->m_DbgTime = sync::steady_clock::now(); + self->m_DbgPeriod = sync::microseconds_from(5000000); + self->m_DbgTime += self->m_DbgPeriod; #else #define IF_DEBUG_HIGHRATE(statement) (void)0 #endif /* SRT_DEBUG_SNDQ_HIGHRATE */ diff --git a/srtcore/queue.h b/srtcore/queue.h index 2057353f7..6553b5a72 100644 --- a/srtcore/queue.h +++ b/srtcore/queue.h @@ -451,9 +451,10 @@ class CSndQueue sync::atomic m_bClosing; // closing the worker +public: #if defined(SRT_DEBUG_SNDQ_HIGHRATE) //>>debug high freq worker - uint64_t m_ullDbgPeriod; - uint64_t m_ullDbgTime; + sync::steady_clock::duration m_DbgPeriod; + mutable sync::steady_clock::time_point m_DbgTime; struct { unsigned long lIteration; // @@ -462,14 +463,15 @@ class CSndQueue unsigned long lSendTo; unsigned long lNotReadyTs; unsigned long lCondWait; // block on m_WindowCond - } m_WorkerStats; + } mutable m_WorkerStats; #endif /* SRT_DEBUG_SNDQ_HIGHRATE */ +private: + #if ENABLE_LOGGING static int m_counter; #endif -private: CSndQueue(const CSndQueue&); CSndQueue& operator=(const CSndQueue&); }; From a8ef62f39f25a29006f6aa8a8d8ec6fd5df0e0bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Ma=C5=82ecki?= Date: Fri, 10 Feb 2023 10:14:59 +0100 Subject: [PATCH 3/6] Applied a suggestion from the code review --- srtcore/list.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/srtcore/list.cpp b/srtcore/list.cpp index 1b5498ba0..0a175ee9d 100644 --- a/srtcore/list.cpp +++ b/srtcore/list.cpp @@ -99,8 +99,7 @@ srt::CSndLossList::~CSndLossList() void srt::CSndLossList::traceState() const { - traceState(std::cout); - std::cout << "\n"; + traceState(std::cout) << "\n"; } int srt::CSndLossList::insert(int32_t seqno1, int32_t seqno2) From 130f19388e3957c51c0c75fbfd9917855e203571 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Ma=C5=82ecki?= Date: Mon, 13 Feb 2023 09:36:19 +0100 Subject: [PATCH 4/6] Renamed v5 function --- srtcore/handshake.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/srtcore/handshake.h b/srtcore/handshake.h index 87c9d5761..c6a5731e4 100644 --- a/srtcore/handshake.h +++ b/srtcore/handshake.h @@ -311,7 +311,7 @@ class CHandShake // Applicable only when m_iVersion == HS_VERSION_SRT1 int32_t flags() { return m_iType; } - bool v5() { return m_iVersion > 4; } + bool v5orHigher() { return m_iVersion > 4; } public: int32_t m_iVersion; // UDT version (HS_VERSION_* symbols) From 26bcde804aaac7b6d2e6314263238207e42688a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Ma=C5=82ecki?= Date: Wed, 22 Feb 2023 11:50:48 +0100 Subject: [PATCH 5/6] Renamed expection --- srtcore/utilities.h | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/srtcore/utilities.h b/srtcore/utilities.h index 203b1bf8f..5a8fdeb67 100644 --- a/srtcore/utilities.h +++ b/srtcore/utilities.h @@ -432,7 +432,7 @@ class FixedArray const T& operator[](size_t index) const { if (index >= m_size) - raise_expection(index); + throw_invalid_index(index); return m_entries[index]; } @@ -440,7 +440,7 @@ class FixedArray T& operator[](size_t index) { if (index >= m_size) - raise_expection(index); + throw_invalid_index(index); return m_entries[index]; } @@ -448,7 +448,7 @@ class FixedArray const T& operator[](int index) const { if (index < 0 || static_cast(index) >= m_size) - raise_expection(index); + throw_invalid_index(index); return m_entries[index]; } @@ -456,7 +456,7 @@ class FixedArray T& operator[](int index) { if (index < 0 || static_cast(index) >= m_size) - raise_expection(index); + throw_invalid_index(index); return m_entries[index]; } @@ -478,7 +478,7 @@ class FixedArray FixedArray(const FixedArray& ); FixedArray& operator=(const FixedArray&); - void raise_expection(int i) const + void throw_invalid_index(int i) const { std::stringstream ss; ss << "Index " << i << "out of range"; From 8a7fa07af006be615e1d57357c29bcc99ebb6a23 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Ma=C5=82ecki?= Date: Wed, 13 Sep 2023 13:32:10 +0200 Subject: [PATCH 6/6] Added one more utility --- srtcore/utilities.h | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/srtcore/utilities.h b/srtcore/utilities.h index 5a8fdeb67..5f9d62898 100644 --- a/srtcore/utilities.h +++ b/srtcore/utilities.h @@ -791,6 +791,19 @@ inline std::pair Tie(Type1& var1, Type2& var2) return std::pair(var1, var2); } +// This can be used in conjunction with Tie to simplify the code +// in loops around a whole container: +// list::const_iterator it, end; +// Tie(it, end) = All(list_container); +template +std::pair +inline All(Container& c) { return std::make_pair(c.begin(), c.end()); } + +template +std::pair +inline All(const Container& c) { return std::make_pair(c.begin(), c.end()); } + + template inline void FringeValues(const Container& from, std::map& out) {