Skip to content

Commit

Permalink
[core] fix applyGroupSequences
Browse files Browse the repository at this point in the history
  • Loading branch information
gou4shi1 committed May 18, 2023
1 parent 3cefede commit 07c9105
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 155 deletions.
4 changes: 0 additions & 4 deletions docs/dev/low-level-info.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,6 @@ CRcvQueue::worker_TryAsyncRend_OrStore
[IF Responder]
{
CUDT::makeMePeerOf
[LOCKS m_GroupLock]
CUDTGroup::syncWithSocket
CUDTGroup::find --> [LOCKED m_GroupLock]
}
debugGroup -- > [LOCKED m_GroupLock]
Expand Down Expand Up @@ -189,8 +187,6 @@ CRcvQueue::worker_ProcessConnectionRequest
[IF Responder]
{
CUDT::makeMePeerOf
[LOCKS m_GroupLock]
CUDTGroup::syncWithSocket
CUDTGroup::find --> [LOCKED m_GroupLock]
}
debugGroup -- > [LOCKED m_GroupLock]
Expand Down
47 changes: 15 additions & 32 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3151,10 +3151,10 @@ bool srt::CUDT::interpretGroup(const int32_t groupdata[], size_t data_size SRT_A
log << CONID() << "HS/RSP: group $" << pg->id() << " -> peer $" << pg->peerid()
<< ", copying characteristic data");

// The call to syncWithSocket is copying
// The call to syncWithFirstSocket is copying
// some interesting data from the first connected
// socket. This should be only done for the first successful connection.
pg->syncWithSocket(*this, HSD_INITIATOR);
pg->syncWithFirstSocket(*this, HSD_INITIATOR);
}
// Otherwise the peer id must be the same as existing, otherwise
// this group is considered already bound to another peer group.
Expand Down Expand Up @@ -3236,7 +3236,6 @@ SRTSOCKET srt::CUDT::makeMePeerOf(SRTSOCKET peergroup, SRT_GROUP_TYPE gtp, uint3

// Check if there exists a group that this one is a peer of.
CUDTGroup* gp = uglobal().findPeerGroup_LOCKED(peergroup);
bool was_empty = true;
if (gp)
{
if (gp->type() != gtp)
Expand All @@ -3248,9 +3247,6 @@ SRTSOCKET srt::CUDT::makeMePeerOf(SRTSOCKET peergroup, SRT_GROUP_TYPE gtp, uint3
}

HLOGC(gmlog.Debug, log << CONID() << "makeMePeerOf: group for peer=$" << peergroup << " found: $" << gp->id());

if (!gp->groupEmpty())
was_empty = false;
}
else
{
Expand All @@ -3273,6 +3269,7 @@ SRTSOCKET srt::CUDT::makeMePeerOf(SRTSOCKET peergroup, SRT_GROUP_TYPE gtp, uint3

gp->set_peerid(peergroup);
gp->deriveSettings(this);
gp->syncWithFirstSocket(s->core(), HSD_RESPONDER);

// This can only happen on a listener (it's only called on a site that is
// HSD_RESPONDER), so it was a response for a groupwise connection.
Expand All @@ -3284,19 +3281,6 @@ SRTSOCKET srt::CUDT::makeMePeerOf(SRTSOCKET peergroup, SRT_GROUP_TYPE gtp, uint3
<< gp->id());
}

{
ScopedLock glock (*gp->exp_groupLock());
if (gp->closing())
{
HLOGC(gmlog.Debug, log << CONID() << "makeMePeerOf: group $" << gp->id() << " is being closed, can't process");
}

if (was_empty)
{
gp->syncWithSocket(s->core(), HSD_RESPONDER);
}
}

// Setting non-blocking reading for group socket.
s->core().m_config.bSynRecving = false;
s->core().m_config.bSynSending = false;
Expand Down Expand Up @@ -3396,21 +3380,15 @@ void srt::CUDT::synchronizeWithGroup(CUDTGroup* gp)

// These are the values that are normally set initially by setters.
int32_t snd_isn = m_iSndLastAck, rcv_isn = m_iRcvLastAck;
if (!gp->applyGroupSequences(m_SocketID, (snd_isn), (rcv_isn)))
{
HLOGC(gmlog.Debug,
log << CONID() << "synchronizeWithGroup: DERIVED ISN: RCV=%" << m_iRcvLastAck << " -> %" << rcv_isn
<< " (shift by " << CSeqNo::seqcmp(rcv_isn, m_iRcvLastAck) << ") SND=%" << m_iSndLastAck
<< " -> %" << snd_isn << " (shift by " << CSeqNo::seqcmp(snd_isn, m_iSndLastAck) << ")");
gp->applyGroupSequences(m_SocketID, (snd_isn), (rcv_isn));
HLOGC(gmlog.Debug,
log << CONID() << "synchronizeWithGroup: RCV-ISN=%" << m_iRcvLastAck << " -> %" << rcv_isn << " (shift by "
<< CSeqNo::seqoff(m_iRcvLastAck, rcv_isn) << ") SND-ISN=%" << m_iSndLastAck << " -> %" << snd_isn
<< " (shift by " << CSeqNo::seqoff(m_iSndLastAck, snd_isn) << ")");
if (rcv_isn != m_iRcvLastAck)
setInitialRcvSeq(rcv_isn);
if (snd_isn != m_iSndLastAck)
setInitialSndSeq(snd_isn);
}
else
{
HLOGC(gmlog.Debug,
log << CONID() << "synchronizeWithGroup: DEFINED ISN: RCV=%" << m_iRcvLastAck << " SND=%"
<< m_iSndLastAck);
}
}
#endif

Expand Down Expand Up @@ -7686,7 +7664,12 @@ void srt::CUDT::dropToGroupRecvBase()
// Note that getRcvBaseSeqNo() will lock m_GroupOf->m_GroupLock,
// but this is an intended order.
if (m_parent->m_GroupOf)
{
group_recv_base = m_parent->m_GroupOf->getRcvBaseSeqNo();
// To reduce overhead, especially the m_GlobControlLock,
// the group wise recv seq is updated here.
m_parent->m_GroupOf->updateRcvCurrSeqNo(m_iRcvCurrSeqNo);
}
}
if (group_recv_base == SRT_SEQNO_NONE)
return;
Expand Down
126 changes: 35 additions & 91 deletions srtcore/group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,81 +63,19 @@ bool CUDTGroup::getBufferTimeBase(CUDT* forthesakeof,
}

// [[using locked(this->m_GroupLock)]];
bool CUDTGroup::applyGroupSequences(SRTSOCKET target, int32_t& w_snd_isn, int32_t& w_rcv_isn)
void CUDTGroup::applyGroupSequences(SRTSOCKET /* not sure if needed */, int32_t& w_snd_isn, int32_t& w_rcv_isn)
{
if (m_bConnected) // You are the first one, no need to change.
{
IF_HEAVY_LOGGING(string update_reason = "what?");
// Find a socket that is declared connected and is not
// the socket that caused the call.
for (gli_t gi = m_Group.begin(); gi != m_Group.end(); ++gi)
{
if (gi->id == target)
continue;

CUDT& se = gi->ps->core();
if (!se.m_bConnected)
continue;

// Found it. Get the following sequences:
// For sending, the sequence that is about to be sent next.
// For receiving, the sequence of the latest received packet.

// SndCurrSeqNo is initially set to ISN-1, this next one is
// the sequence that is about to be stamped on the next sent packet
// over that socket. Using this field is safer because it is atomic
// and its affinity is to the same thread as the sending function.

// NOTE: the groupwise scheduling sequence might have been set
// already. If so, it means that it was set by either:
// - the call of this function on the very first conencted socket (see below)
// - the call to `sendBroadcast` or `sendBackup`
// In both cases, we want THIS EXACTLY value to be reported
if (m_iLastSchedSeqNo != -1)
{
w_snd_isn = m_iLastSchedSeqNo;
IF_HEAVY_LOGGING(update_reason = "GROUPWISE snd-seq");
}
else
{
w_snd_isn = se.m_iSndNextSeqNo;

// Write it back to the groupwise scheduling sequence so that
// any next connected socket will take this value as well.
m_iLastSchedSeqNo = w_snd_isn;
IF_HEAVY_LOGGING(update_reason = "existing socket not yet sending");
}

// RcvCurrSeqNo is increased by one because it happens that at the
// synchronization moment it's already past reading and delivery.
// This is redundancy, so the redundant socket is connected at the moment
// when the other one is already transmitting, so skipping one packet
// even if later transmitted is less troublesome than requesting a
// "mistakenly seen as lost" packet.
w_rcv_isn = CSeqNo::incseq(se.m_iRcvCurrSeqNo);

HLOGC(gmlog.Debug,
log << "applyGroupSequences: @" << target << " gets seq from @" << gi->id << " rcv %" << (w_rcv_isn)
<< " snd %" << (w_snd_isn) << " as " << update_reason);
return false;
}
}

// If the GROUP (!) is not connected, or no running/pending socket has been found.
// // That is, given socket is the first one.
// The group data should be set up with its own data. They should already be passed here
// in the variables.
//
// Override the schedule sequence of the group in this case because whatever is set now,
// it's not valid.

HLOGC(gmlog.Debug,
log << "applyGroupSequences: no socket found connected and transmitting, @" << target
<< " not changing sequences, storing snd-seq %" << (w_snd_isn));

set_currentSchedSequence(w_snd_isn);

return true;
// LastSchedSeqNo is initially set to the ISN of first connected socket,
// its' also updated in group send functions to the next scheduling seq.
w_snd_isn = m_iLastSchedSeqNo;

// RcvCurrSeqNo is increased by one because it happens that at the
// synchronization moment it's already past reading and delivery.
// This is redundancy, so the redundant socket is connected at the moment
// when the other one is already transmitting, so skipping one packet
// even if later transmitted is less troublesome than requesting a
// "mistakenly seen as lost" packet.
w_rcv_isn = CSeqNo::incseq(m_iRcvCurrSeqNo);
}

// NOTE: This function is now for DEBUG PURPOSES ONLY.
Expand Down Expand Up @@ -263,7 +201,6 @@ CUDTGroup::CUDTGroup(SRT_GROUP_TYPE gtype)
, m_bSynSending(true)
, m_bTsbPd(true)
, m_bTLPktDrop(true)
, m_iTsbPdDelay_us(0)
// m_*EID and m_*Epolld fields will be initialized
// in the constructor body.
, m_iSndTimeOut(-1)
Expand All @@ -276,6 +213,7 @@ CUDTGroup::CUDTGroup(SRT_GROUP_TYPE gtype)
, m_bClosing(false)
, m_iLastSchedSeqNo(SRT_SEQNO_NONE)
, m_iLastSchedMsgNo(SRT_MSGNO_NONE)
, m_iRcvCurrSeqNo(SRT_SEQNO_NONE)
{
setupMutex(m_GroupLock, "Group");
setupMutex(m_RcvDataLock, "RcvData");
Expand Down Expand Up @@ -833,8 +771,7 @@ SRT_SOCKSTATUS CUDTGroup::getStatus()
return SRTS_BROKEN;
}

// [[using locked(m_GroupLock)]];
void CUDTGroup::syncWithSocket(const CUDT& core, const HandshakeSide side)
void CUDTGroup::syncWithFirstSocket(const CUDT& core, const HandshakeSide side)
{
if (side == HSD_RESPONDER)
{
Expand All @@ -845,23 +782,14 @@ void CUDTGroup::syncWithSocket(const CUDT& core, const HandshakeSide side)
set_currentSchedSequence(core.ISN());
}

// XXX
// Might need further investigation as to whether this isn't
// wrong for some cases. By having this -1 here the value will be
// laziliy set from the first reading one. It is believed that
// it covers all possible scenarios, that is:
//
m_iRcvCurrSeqNo = core.m_iRcvCurrSeqNo.load();

// By having this -1 here the value will be laziliy set from the first reading one.
// It is believed that it covers all possible scenarios, that is:
// - no readers - no problem!
// - have some readers and a new is attached - this is set already
// - connect multiple links, but none has read yet - you'll be the first.
//
// Previous implementation used setting to: core.m_iPeerISN
resetInitialRxSequence();

// Get the latency (possibly fixed against the opposite side)
// from the first socket (core.m_iTsbPdDelay_ms),
// and set it on the current socket.
set_latency(core.m_iTsbPdDelay_ms * int64_t(1000));
m_RcvBaseSeqNo = SRT_SEQNO_NONE;
}

void CUDTGroup::close()
Expand Down Expand Up @@ -2089,6 +2017,22 @@ void CUDTGroup::updateReadState(SRTSOCKET /* not sure if needed */, int32_t sequ
}
}

void CUDTGroup::updateRcvCurrSeqNo(int32_t seq)
{
ScopedLock lg(m_GroupLock);

if (m_iRcvCurrSeqNo == SRT_SEQNO_NONE)
{
LOGC(grlog.Error,
log << "IPE: CUDTGroup::m_iRcvCurrSeqNo was not initialized by the first member, setting to %" << seq);
m_iRcvCurrSeqNo = seq;
}
else if (CSeqNo::seqcmp(seq, m_iRcvCurrSeqNo) > 0)
{
m_iRcvCurrSeqNo = seq;
}
}

int32_t CUDTGroup::getRcvBaseSeqNo()
{
ScopedLock lg(m_GroupLock);
Expand Down
32 changes: 4 additions & 28 deletions srtcore/group.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,23 +162,8 @@ class CUDTGroup
{
m_Group.erase(f);

// Reset sequence numbers on a dead group so that they are
// initialized anew with the new alive connection within
// the group.
// XXX The problem is that this should be done after the
// socket is considered DISCONNECTED, not when it's being
// closed. After being disconnected, the sequence numbers
// are no longer valid, and will be reinitialized when the
// socket is connected again. This may stay as is for now
// as in SRT it's not predicted to do anything with the socket
// that was disconnected other than immediately closing it.
if (m_Group.empty())
{
// When the group is empty, there's no danger that this
// number will collide with any ISN provided by a socket.
// Also since now every socket will derive this ISN.
m_iLastSchedSeqNo = generateISN();
resetInitialRxSequence();
empty = true;
}
}
Expand Down Expand Up @@ -346,6 +331,7 @@ class CUDTGroup
/// @param sock member socket ID (unused)
/// @param sequence the latest packet sequence number available for reading.
void updateReadState(SRTSOCKET sock, int32_t sequence);
void updateRcvCurrSeqNo(int32_t seq);

void updateWriteState();
void updateFailedLink();
Expand All @@ -372,7 +358,7 @@ class CUDTGroup
/// @param ack The past-the-last-received ACK sequence number
void readyPackets(srt::CUDT* core, int32_t ack);

void syncWithSocket(const srt::CUDT& core, const HandshakeSide side);
void syncWithFirstSocket(const srt::CUDT& core, const HandshakeSide side);
int getGroupData(SRT_SOCKGROUPDATA* pdata, size_t* psize);
int getGroupData_LOCKED(SRT_SOCKGROUPDATA* pdata, size_t* psize);

Expand Down Expand Up @@ -630,7 +616,6 @@ class CUDTGroup
bool m_bSynSending;
bool m_bTsbPd;
bool m_bTLPktDrop;
int64_t m_iTsbPdDelay_us;
int m_RcvEID;
class CEPollDesc* m_RcvEpolld;
int m_SndEID;
Expand Down Expand Up @@ -692,6 +677,7 @@ class CUDTGroup
sync::Mutex m_RcvDataLock;
sync::atomic<int32_t> m_iLastSchedSeqNo; // represetnts the value of CUDT::m_iSndNextSeqNo for each running socket
sync::atomic<int32_t> m_iLastSchedMsgNo;
sync::atomic<int32_t> m_iRcvCurrSeqNo; // Represetnts the max value of CUDT::m_iRcvCurrSeqNo for all sockets in this group.
// Statistics

struct Stats
Expand Down Expand Up @@ -764,15 +750,6 @@ class CUDTGroup
#endif
}

void resetInitialRxSequence()
{
// The app-reader doesn't care about the real sequence number.
// The first provided one will be taken as a good deal; even if
// this is going to be past the ISN, at worst it will be caused
// by TLPKTDROP.
m_RcvBaseSeqNo = SRT_SEQNO_NONE;
}

bool applyGroupTime(time_point& w_start_time, time_point& w_peer_start_time)
{
using srt::sync::is_zero;
Expand Down Expand Up @@ -803,7 +780,7 @@ class CUDTGroup

// Live state synchronization
bool getBufferTimeBase(srt::CUDT* forthesakeof, time_point& w_tb, bool& w_wp, duration& w_dr);
bool applyGroupSequences(SRTSOCKET, int32_t& w_snd_isn, int32_t& w_rcv_isn);
void applyGroupSequences(SRTSOCKET, int32_t& w_snd_isn, int32_t& w_rcv_isn);

/// @brief Synchronize TSBPD base time and clock drift among members using the @a srcMember as a reference.
/// @param srcMember a reference for synchronization.
Expand All @@ -817,7 +794,6 @@ class CUDTGroup
SRTU_PROPERTY_RW_CHAIN(CUDTGroup, SRT_GROUP_TYPE, type, m_type);
SRTU_PROPERTY_RW_CHAIN(CUDTGroup, int32_t, currentSchedSequence, m_iLastSchedSeqNo);
SRTU_PROPERTY_RRW(std::set<int>&, epollset, m_sPollID);
SRTU_PROPERTY_RW_CHAIN(CUDTGroup, int64_t, latency, m_iTsbPdDelay_us);
SRTU_PROPERTY_RO(bool, closing, m_bClosing);
};

Expand Down

0 comments on commit 07c9105

Please sign in to comment.