Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Utilize all insert results of the new recv buffer. #2535

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions srtcore/buffer_rcv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1048,18 +1048,16 @@ void CRcvBuffer::updateTsbPdTimeBase(uint32_t usPktTimestamp)
m_tsbpd.updateTsbPdTimeBase(usPktTimestamp);
}

string CRcvBuffer::strFullnessState(bool enable_debug_log, int iFirstUnackSeqNo, const time_point& tsNow) const
string CRcvBuffer::strFullnessState(bool enable_debug_log, const time_point& tsNow) const
{
stringstream ss;

if (enable_debug_log)
{
ss << "iFirstUnackSeqNo=" << iFirstUnackSeqNo << " m_iStartSeqNo=" << m_iStartSeqNo
<< " m_iStartPos=" << m_iStartPos << " m_iMaxPosInc=" << m_iMaxPosInc << ". ";
ss << "m_iStartSeqNo=" << m_iStartSeqNo << " m_iStartPos=" << m_iStartPos << " m_iMaxPosInc=" << m_iMaxPosInc
<< ". ";
}

ss << "Space avail " << getAvailSize(iFirstUnackSeqNo) << "/" << m_szSize << " pkts. ";

if (m_tsbpd.isEnabled() && m_iMaxPosInc > 0)
{
const PacketInfo nextValidPkt = getFirstValidPacketInfo();
Expand Down
3 changes: 1 addition & 2 deletions srtcore/buffer_rcv.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ class CRcvBuffer
/// Similar to CRcvBuffer::addData(CUnit* unit, int offset)
///
/// @param [in] unit pointer to a data unit containing new packet
/// @param [in] offset offset from last ACK point.
///
/// @return 0 on success, -1 if packet is already in buffer, -2 if packet is before m_iStartSeqNo.
/// -3 if a packet is offset is ahead the buffer capacity.
Expand Down Expand Up @@ -343,7 +342,7 @@ class CRcvBuffer

/// Form a string of the current buffer fullness state.
/// number of packets acknowledged, TSBPD readiness, etc.
std::string strFullnessState(bool enable_debug_log, int iFirstUnackSeqNo, const time_point& tsNow) const;
std::string strFullnessState(bool enable_debug_log, const time_point& tsNow) const;

private:
CTsbpdTime m_tsbpd;
Expand Down
63 changes: 29 additions & 34 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9721,76 +9721,69 @@ int srt::CUDT::handleSocketPacketReception(const vector<CUnit*>& incoming, bool&
const int pktrexmitflag = m_bPeerRexmitFlag ? (rpkt.getRexmitFlag() ? 1 : 0) : 2;
const bool retransmitted = pktrexmitflag == 1;

bool adding_successful = true;

// m_iRcvLastSkipAck is the base sequence number for the receiver buffer.
// This is the offset in the buffer; if this is negative, it means that
// this sequence is already in the past and the buffer is not interested.
// Meaning, this packet will be rejected, even if it could potentially be
// one of missing packets in the transmission.
int32_t offset = CSeqNo::seqoff(m_iRcvLastSkipAck, rpkt.m_iSeqNo);

IF_HEAVY_LOGGING(const char *exc_type = "EXPECTED");
const int insert_offset = CSeqNo::seqoff(m_pRcvBuffer->getStartSeqNo(), rpkt.m_iSeqNo);
const int ack_offset = CSeqNo::seqoff(m_iRcvLastSkipAck, rpkt.m_iSeqNo);

if (offset < 0)
// If this is negative, it means that this sequence is already in the past
// and the buffer is not interested. Meaning, this packet will be rejected,
// even if it could potentially be one of missing packets in the transmission.
if (insert_offset < 0 || ack_offset < 0)
{
IF_HEAVY_LOGGING(exc_type = "BELATED");
time_point pts = getPktTsbPdTime(NULL, rpkt);

enterCS(m_StatsLock);
const double bltime = (double) CountIIR<uint64_t>(
uint64_t(m_stats.traceBelatedTime) * 1000,
count_microseconds(steady_clock::now() - pts), 0.2);

m_stats.traceBelatedTime = bltime / 1000.0;
m_stats.rcvr.recvdBelated.count(rpkt.getLength());
leaveCS(m_StatsLock);

HLOGC(qrlog.Debug,
log << CONID() << "RECEIVED: seq=" << rpkt.m_iSeqNo << " offset=" << offset << " (BELATED/"
<< s_rexmitstat_str[pktrexmitflag] << ") FLAGS: " << rpkt.MessageFlagStr());
log << CONID() << "RECEIVED: seq=" << rpkt.m_iSeqNo << " insert_offset=" << insert_offset
<< " ack_offset=" << ack_offset << " (BELATED/" << s_rexmitstat_str[pktrexmitflag]
<< ") FLAGS: " << rpkt.MessageFlagStr());
continue;
}

const int avail_bufsize = (int) getAvailRcvBufferSizeNoLock();
IF_HEAVY_LOGGING(const char *exc_type = "EXPECTED");
bool adding_successful = true;

if (offset >= avail_bufsize)
const int insert_res = m_pRcvBuffer->insert(u);
if (insert_res == -3)
{
// This is already a sequence discrepancy. Probably there could be found
// some way to make it continue reception by overriding the sequence and
// make a kinda TLKPTDROP, but there has been found no reliable way to do this.
// The insert() result is -3 if the insert offset exceeds capacity.
if (m_bTsbPd && m_bTLPktDrop && m_pRcvBuffer->empty())
{
// This is already a sequence discrepancy. Probably there could be found
// some way to make it continue reception by overriding the sequence and
// make a kinda TLKPTDROP, but there has been found no reliable way to do this.
// Only in live mode. In File mode this shall not be possible
// because the sender should stop sending in this situation.
// In Live mode this means that there is a gap between the
// lowest sequence in the empty buffer and the incoming sequence
// that exceeds the buffer size. Receiving data in this situation
// is no longer possible and this is a point of no return.

LOGC(qrlog.Error, log << CONID() <<
"SEQUENCE DISCREPANCY. BREAKING CONNECTION."
" seq=" << rpkt.m_iSeqNo
<< " buffer=(" << m_iRcvLastSkipAck
<< ":" << m_iRcvCurrSeqNo // -1 = size to last index
<< "+" << CSeqNo::incseq(m_iRcvLastSkipAck, int(m_pRcvBuffer->capacity()) - 1)
<< "), " << (offset-avail_bufsize+1)
<< "), " << (insert_offset - int(m_pRcvBuffer->capacity()) + 1)
<< " past max. Reception no longer possible. REQUESTING TO CLOSE.");

return -2;
}
else
{
LOGC(qrlog.Warn, log << CONID() << "No room to store incoming packet seqno " << rpkt.m_iSeqNo
<< ", insert offset " << offset << ". "
<< m_pRcvBuffer->strFullnessState(qrlog.Warn.CheckEnabled(), m_iRcvLastAck, steady_clock::now())
);

LOGC(qrlog.Warn,
log << CONID() << "No room to store incoming packet. seqno=" << rpkt.m_iSeqNo
<< " insert_offset=" << insert_offset << " ack_offset=" << ack_offset << ". "
<< m_pRcvBuffer->strFullnessState(qrlog.Warn.CheckEnabled(), steady_clock::now()));
return -1;
}
}

int buffer_add_result = m_pRcvBuffer->insert(u);
if (buffer_add_result < 0)
else if (insert_res == -1)
{
// The insert() result is -1 if at the position evaluated from this packet's
// sequence number there already is a packet.
Expand All @@ -9800,10 +9793,12 @@ int srt::CUDT::handleSocketPacketReception(const vector<CUnit*>& incoming, bool&
}
else
{
SRT_ASSERT(insert_res == 0);
w_new_inserted = true;

IF_HEAVY_LOGGING(exc_type = "ACCEPTED");
excessive = false;

if (u->m_Packet.getMsgCryptoFlags() != EK_NOENC)
{
// TODO: reset and restore the timestamp if TSBPD is disabled.
Expand Down Expand Up @@ -9857,8 +9852,7 @@ int srt::CUDT::handleSocketPacketReception(const vector<CUnit*>& incoming, bool&

if (m_pRcvBuffer)
{
bufinfo << " BUFr=" << avail_bufsize
<< " avail=" << getAvailRcvBufferSizeNoLock()
bufinfo << " avail=" << getAvailRcvBufferSizeNoLock()
<< " buffer=(" << m_iRcvLastSkipAck
<< ":" << m_iRcvCurrSeqNo // -1 = size to last index
<< "+" << CSeqNo::incseq(m_iRcvLastSkipAck, m_pRcvBuffer->capacity()-1)
Expand All @@ -9869,7 +9863,8 @@ int srt::CUDT::handleSocketPacketReception(const vector<CUnit*>& incoming, bool&
// There's no way to obtain this information here.

LOGC(qrlog.Debug, log << CONID() << "RECEIVED: seq=" << rpkt.m_iSeqNo
<< " offset=" << offset
<< " insert_offset=" << insert_offset
<< " ack_offset=" << ack_offset
<< bufinfo.str()
<< " RSL=" << expectspec.str()
<< " SN=" << s_rexmitstat_str[pktrexmitflag]
Expand Down