Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] fix applyGroupSequences #2735

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions docs/dev/low-level-info.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,6 @@ CRcvQueue::worker_TryAsyncRend_OrStore
[IF Responder]
{
CUDT::makeMePeerOf
[LOCKS m_GroupLock]
CUDTGroup::syncWithSocket
CUDTGroup::find --> [LOCKED m_GroupLock]
}
debugGroup -- > [LOCKED m_GroupLock]
Expand Down Expand Up @@ -189,8 +187,6 @@ CRcvQueue::worker_ProcessConnectionRequest
[IF Responder]
{
CUDT::makeMePeerOf
[LOCKS m_GroupLock]
CUDTGroup::syncWithSocket
CUDTGroup::find --> [LOCKED m_GroupLock]
}
debugGroup -- > [LOCKED m_GroupLock]
Expand Down
47 changes: 15 additions & 32 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3151,10 +3151,10 @@ bool srt::CUDT::interpretGroup(const int32_t groupdata[], size_t data_size SRT_A
log << CONID() << "HS/RSP: group $" << pg->id() << " -> peer $" << pg->peerid()
<< ", copying characteristic data");

// The call to syncWithSocket is copying
// The call to syncWithFirstSocket is copying
// some interesting data from the first connected
// socket. This should be only done for the first successful connection.
pg->syncWithSocket(*this, HSD_INITIATOR);
pg->syncWithFirstSocket(*this, HSD_INITIATOR);
}
// Otherwise the peer id must be the same as existing, otherwise
// this group is considered already bound to another peer group.
Expand Down Expand Up @@ -3236,7 +3236,6 @@ SRTSOCKET srt::CUDT::makeMePeerOf(SRTSOCKET peergroup, SRT_GROUP_TYPE gtp, uint3

// Check if there exists a group that this one is a peer of.
CUDTGroup* gp = uglobal().findPeerGroup_LOCKED(peergroup);
bool was_empty = true;
if (gp)
{
if (gp->type() != gtp)
Expand All @@ -3248,9 +3247,6 @@ SRTSOCKET srt::CUDT::makeMePeerOf(SRTSOCKET peergroup, SRT_GROUP_TYPE gtp, uint3
}

HLOGC(gmlog.Debug, log << CONID() << "makeMePeerOf: group for peer=$" << peergroup << " found: $" << gp->id());

if (!gp->groupEmpty())
was_empty = false;
}
else
{
Expand All @@ -3273,6 +3269,7 @@ SRTSOCKET srt::CUDT::makeMePeerOf(SRTSOCKET peergroup, SRT_GROUP_TYPE gtp, uint3

gp->set_peerid(peergroup);
gp->deriveSettings(this);
gp->syncWithFirstSocket(s->core(), HSD_RESPONDER);

// This can only happen on a listener (it's only called on a site that is
// HSD_RESPONDER), so it was a response for a groupwise connection.
Expand All @@ -3284,19 +3281,6 @@ SRTSOCKET srt::CUDT::makeMePeerOf(SRTSOCKET peergroup, SRT_GROUP_TYPE gtp, uint3
<< gp->id());
}

{
ScopedLock glock (*gp->exp_groupLock());
if (gp->closing())
{
HLOGC(gmlog.Debug, log << CONID() << "makeMePeerOf: group $" << gp->id() << " is being closed, can't process");
}

if (was_empty)
{
gp->syncWithSocket(s->core(), HSD_RESPONDER);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To fix the issue, it should keep previous group-wise sequences even if the group is empty.

}
}

// Setting non-blocking reading for group socket.
s->core().m_config.bSynRecving = false;
s->core().m_config.bSynSending = false;
Expand Down Expand Up @@ -3396,21 +3380,15 @@ void srt::CUDT::synchronizeWithGroup(CUDTGroup* gp)

// These are the values that are normally set initially by setters.
int32_t snd_isn = m_iSndLastAck, rcv_isn = m_iRcvLastAck;
if (!gp->applyGroupSequences(m_SocketID, (snd_isn), (rcv_isn)))
{
HLOGC(gmlog.Debug,
log << CONID() << "synchronizeWithGroup: DERIVED ISN: RCV=%" << m_iRcvLastAck << " -> %" << rcv_isn
<< " (shift by " << CSeqNo::seqcmp(rcv_isn, m_iRcvLastAck) << ") SND=%" << m_iSndLastAck
<< " -> %" << snd_isn << " (shift by " << CSeqNo::seqcmp(snd_isn, m_iSndLastAck) << ")");
gp->applyGroupSequences(m_SocketID, (snd_isn), (rcv_isn));
HLOGC(gmlog.Debug,
log << CONID() << "synchronizeWithGroup: RCV-ISN=%" << m_iRcvLastAck << " -> %" << rcv_isn << " (shift by "
<< CSeqNo::seqoff(m_iRcvLastAck, rcv_isn) << ") SND-ISN=%" << m_iSndLastAck << " -> %" << snd_isn
<< " (shift by " << CSeqNo::seqoff(m_iSndLastAck, snd_isn) << ")");
if (rcv_isn != m_iRcvLastAck)
setInitialRcvSeq(rcv_isn);
if (snd_isn != m_iSndLastAck)
setInitialSndSeq(snd_isn);
}
else
{
HLOGC(gmlog.Debug,
log << CONID() << "synchronizeWithGroup: DEFINED ISN: RCV=%" << m_iRcvLastAck << " SND=%"
<< m_iSndLastAck);
}
}
#endif

Expand Down Expand Up @@ -7686,7 +7664,12 @@ void srt::CUDT::dropToGroupRecvBase()
// Note that getRcvBaseSeqNo() will lock m_GroupOf->m_GroupLock,
// but this is an intended order.
if (m_parent->m_GroupOf)
{
group_recv_base = m_parent->m_GroupOf->getRcvBaseSeqNo();
// To reduce overhead, especially the m_GlobControlLock,
// the group wise recv seq is updated here.
m_parent->m_GroupOf->updateRcvCurrSeqNo(m_iRcvCurrSeqNo);
}
}
if (group_recv_base == SRT_SEQNO_NONE)
return;
Expand Down
125 changes: 31 additions & 94 deletions srtcore/group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,81 +63,19 @@ bool CUDTGroup::getBufferTimeBase(CUDT* forthesakeof,
}

// [[using locked(this->m_GroupLock)]];
bool CUDTGroup::applyGroupSequences(SRTSOCKET target, int32_t& w_snd_isn, int32_t& w_rcv_isn)
void CUDTGroup::applyGroupSequences(SRTSOCKET /* not sure if needed */, int32_t& w_snd_isn, int32_t& w_rcv_isn)
{
if (m_bConnected) // You are the first one, no need to change.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To fix the issue, it should inherit previous group-wise sequences even if the group is not connected (empty).

{
IF_HEAVY_LOGGING(string update_reason = "what?");
// Find a socket that is declared connected and is not
// the socket that caused the call.
for (gli_t gi = m_Group.begin(); gi != m_Group.end(); ++gi)
{
if (gi->id == target)
continue;

CUDT& se = gi->ps->core();
if (!se.m_bConnected)
continue;

// Found it. Get the following sequences:
// For sending, the sequence that is about to be sent next.
// For receiving, the sequence of the latest received packet.

// SndCurrSeqNo is initially set to ISN-1, this next one is
// the sequence that is about to be stamped on the next sent packet
// over that socket. Using this field is safer because it is atomic
// and its affinity is to the same thread as the sending function.

// NOTE: the groupwise scheduling sequence might have been set
// already. If so, it means that it was set by either:
// - the call of this function on the very first conencted socket (see below)
// - the call to `sendBroadcast` or `sendBackup`
// In both cases, we want THIS EXACTLY value to be reported
if (m_iLastSchedSeqNo != -1)
{
w_snd_isn = m_iLastSchedSeqNo;
IF_HEAVY_LOGGING(update_reason = "GROUPWISE snd-seq");
}
else
{
w_snd_isn = se.m_iSndNextSeqNo;

// Write it back to the groupwise scheduling sequence so that
// any next connected socket will take this value as well.
m_iLastSchedSeqNo = w_snd_isn;
IF_HEAVY_LOGGING(update_reason = "existing socket not yet sending");
}

// RcvCurrSeqNo is increased by one because it happens that at the
// synchronization moment it's already past reading and delivery.
// This is redundancy, so the redundant socket is connected at the moment
// when the other one is already transmitting, so skipping one packet
// even if later transmitted is less troublesome than requesting a
// "mistakenly seen as lost" packet.
w_rcv_isn = CSeqNo::incseq(se.m_iRcvCurrSeqNo);

HLOGC(gmlog.Debug,
log << "applyGroupSequences: @" << target << " gets seq from @" << gi->id << " rcv %" << (w_rcv_isn)
<< " snd %" << (w_snd_isn) << " as " << update_reason);
return false;
}
}

// If the GROUP (!) is not connected, or no running/pending socket has been found.
// // That is, given socket is the first one.
// The group data should be set up with its own data. They should already be passed here
// in the variables.
//
// Override the schedule sequence of the group in this case because whatever is set now,
// it's not valid.

HLOGC(gmlog.Debug,
log << "applyGroupSequences: no socket found connected and transmitting, @" << target
<< " not changing sequences, storing snd-seq %" << (w_snd_isn));

set_currentSchedSequence(w_snd_isn);

return true;
// LastSchedSeqNo is initially set to the ISN of first connected socket,
// its' also updated in group send functions to the next scheduling seq.
w_snd_isn = m_iLastSchedSeqNo;

// RcvCurrSeqNo is increased by one because it happens that at the
// synchronization moment it's already past reading and delivery.
// This is redundancy, so the redundant socket is connected at the moment
// when the other one is already transmitting, so skipping one packet
// even if later transmitted is less troublesome than requesting a
// "mistakenly seen as lost" packet.
w_rcv_isn = CSeqNo::incseq(m_iRcvCurrSeqNo);
}

// NOTE: This function is now for DEBUG PURPOSES ONLY.
Expand Down Expand Up @@ -263,7 +201,6 @@ CUDTGroup::CUDTGroup(SRT_GROUP_TYPE gtype)
, m_bSynSending(true)
, m_bTsbPd(true)
, m_bTLPktDrop(true)
, m_iTsbPdDelay_us(0)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was not used.

// m_*EID and m_*Epolld fields will be initialized
// in the constructor body.
, m_iSndTimeOut(-1)
Expand All @@ -276,6 +213,7 @@ CUDTGroup::CUDTGroup(SRT_GROUP_TYPE gtype)
, m_bClosing(false)
, m_iLastSchedSeqNo(SRT_SEQNO_NONE)
, m_iLastSchedMsgNo(SRT_MSGNO_NONE)
, m_iRcvCurrSeqNo(SRT_SEQNO_NONE)
{
setupMutex(m_GroupLock, "Group");
setupMutex(m_RcvDataLock, "RcvData");
Expand Down Expand Up @@ -833,8 +771,7 @@ SRT_SOCKSTATUS CUDTGroup::getStatus()
return SRTS_BROKEN;
}

// [[using locked(m_GroupLock)]];
void CUDTGroup::syncWithSocket(const CUDT& core, const HandshakeSide side)
void CUDTGroup::syncWithFirstSocket(const CUDT& core, const HandshakeSide side)
{
if (side == HSD_RESPONDER)
{
Expand All @@ -845,23 +782,7 @@ void CUDTGroup::syncWithSocket(const CUDT& core, const HandshakeSide side)
set_currentSchedSequence(core.ISN());
}

// XXX
// Might need further investigation as to whether this isn't
// wrong for some cases. By having this -1 here the value will be
// laziliy set from the first reading one. It is believed that
// it covers all possible scenarios, that is:
//
// - no readers - no problem!
// - have some readers and a new is attached - this is set already
// - connect multiple links, but none has read yet - you'll be the first.
//
// Previous implementation used setting to: core.m_iPeerISN
resetInitialRxSequence();

// Get the latency (possibly fixed against the opposite side)
// from the first socket (core.m_iTsbPdDelay_ms),
// and set it on the current socket.
set_latency(core.m_iTsbPdDelay_ms * int64_t(1000));
m_iRcvCurrSeqNo = core.m_iRcvCurrSeqNo.load();
}

void CUDTGroup::close()
Expand Down Expand Up @@ -2089,6 +2010,22 @@ void CUDTGroup::updateReadState(SRTSOCKET /* not sure if needed */, int32_t sequ
}
}

void CUDTGroup::updateRcvCurrSeqNo(int32_t seq)
{
ScopedLock lg(m_GroupLock);

if (m_iRcvCurrSeqNo == SRT_SEQNO_NONE)
{
LOGC(grlog.Error,
log << "IPE: CUDTGroup::m_iRcvCurrSeqNo was not initialized by the first member, setting to %" << seq);
m_iRcvCurrSeqNo = seq;
}
else if (CSeqNo::seqcmp(seq, m_iRcvCurrSeqNo) > 0)
{
m_iRcvCurrSeqNo = seq;
}
}

int32_t CUDTGroup::getRcvBaseSeqNo()
{
ScopedLock lg(m_GroupLock);
Expand Down
32 changes: 4 additions & 28 deletions srtcore/group.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,23 +162,8 @@ class CUDTGroup
{
m_Group.erase(f);

// Reset sequence numbers on a dead group so that they are
// initialized anew with the new alive connection within
// the group.
// XXX The problem is that this should be done after the
// socket is considered DISCONNECTED, not when it's being
// closed. After being disconnected, the sequence numbers
// are no longer valid, and will be reinitialized when the
// socket is connected again. This may stay as is for now
// as in SRT it's not predicted to do anything with the socket
// that was disconnected other than immediately closing it.
if (m_Group.empty())
{
// When the group is empty, there's no danger that this
// number will collide with any ISN provided by a socket.
// Also since now every socket will derive this ISN.
m_iLastSchedSeqNo = generateISN();
resetInitialRxSequence();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To fix the issue, it should keep previous group-wise sequences even if the group is empty.

empty = true;
}
}
Expand Down Expand Up @@ -346,6 +331,7 @@ class CUDTGroup
/// @param sock member socket ID (unused)
/// @param sequence the latest packet sequence number available for reading.
void updateReadState(SRTSOCKET sock, int32_t sequence);
void updateRcvCurrSeqNo(int32_t seq);

void updateWriteState();
void updateFailedLink();
Expand All @@ -372,7 +358,7 @@ class CUDTGroup
/// @param ack The past-the-last-received ACK sequence number
void readyPackets(srt::CUDT* core, int32_t ack);

void syncWithSocket(const srt::CUDT& core, const HandshakeSide side);
void syncWithFirstSocket(const srt::CUDT& core, const HandshakeSide side);
int getGroupData(SRT_SOCKGROUPDATA* pdata, size_t* psize);
int getGroupData_LOCKED(SRT_SOCKGROUPDATA* pdata, size_t* psize);

Expand Down Expand Up @@ -630,7 +616,6 @@ class CUDTGroup
bool m_bSynSending;
bool m_bTsbPd;
bool m_bTLPktDrop;
int64_t m_iTsbPdDelay_us;
int m_RcvEID;
class CEPollDesc* m_RcvEpolld;
int m_SndEID;
Expand Down Expand Up @@ -692,6 +677,7 @@ class CUDTGroup
sync::Mutex m_RcvDataLock;
sync::atomic<int32_t> m_iLastSchedSeqNo; // represetnts the value of CUDT::m_iSndNextSeqNo for each running socket
sync::atomic<int32_t> m_iLastSchedMsgNo;
sync::atomic<int32_t> m_iRcvCurrSeqNo; // Represetnts the max value of CUDT::m_iRcvCurrSeqNo for all sockets in this group.
// Statistics

struct Stats
Expand Down Expand Up @@ -764,15 +750,6 @@ class CUDTGroup
#endif
}

void resetInitialRxSequence()
{
// The app-reader doesn't care about the real sequence number.
// The first provided one will be taken as a good deal; even if
// this is going to be past the ISN, at worst it will be caused
// by TLPKTDROP.
m_RcvBaseSeqNo = SRT_SEQNO_NONE;
}

bool applyGroupTime(time_point& w_start_time, time_point& w_peer_start_time)
{
using srt::sync::is_zero;
Expand Down Expand Up @@ -803,7 +780,7 @@ class CUDTGroup

// Live state synchronization
bool getBufferTimeBase(srt::CUDT* forthesakeof, time_point& w_tb, bool& w_wp, duration& w_dr);
bool applyGroupSequences(SRTSOCKET, int32_t& w_snd_isn, int32_t& w_rcv_isn);
void applyGroupSequences(SRTSOCKET, int32_t& w_snd_isn, int32_t& w_rcv_isn);

/// @brief Synchronize TSBPD base time and clock drift among members using the @a srcMember as a reference.
/// @param srcMember a reference for synchronization.
Expand All @@ -817,7 +794,6 @@ class CUDTGroup
SRTU_PROPERTY_RW_CHAIN(CUDTGroup, SRT_GROUP_TYPE, type, m_type);
SRTU_PROPERTY_RW_CHAIN(CUDTGroup, int32_t, currentSchedSequence, m_iLastSchedSeqNo);
SRTU_PROPERTY_RRW(std::set<int>&, epollset, m_sPollID);
SRTU_PROPERTY_RW_CHAIN(CUDTGroup, int64_t, latency, m_iTsbPdDelay_us);
SRTU_PROPERTY_RO(bool, closing, m_bClosing);
};

Expand Down