Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Fixes and rework concerning TSA reports #2986

Draft
wants to merge 10 commits into
base: master
Choose a base branch
from
12 changes: 8 additions & 4 deletions srtcore/api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
releaseMutex(m_ControlLock);
}

SRT_TSA_DISABLED // Uses m_Status that should be guarded, but for reading it is enough to be atomic
SRT_SOCKSTATUS srt::CUDTSocket::getStatus()
{
// TTL in CRendezvousQueue::updateConnStatus() will set m_bConnecting to false.
Expand Down Expand Up @@ -128,6 +129,7 @@
setClosed();
}

SRT_TSA_DISABLED // Uses m_Status that should be guarded, but for reading it is enough to be atomic
void srt::CUDTSocket::setClosed()
{
m_Status = SRTS_CLOSED;
Expand Down Expand Up @@ -3430,12 +3432,12 @@
// This doesn't have argument of GroupType due to header file conflicts.

// [[using locked(s_UDTUnited.m_GlobControlLock)]]
srt::CUDTGroup& srt::CUDT::newGroup(const int type)
srt::CUDTGroup& srt::CUDTUnited::newGroup(const int type)
{
const SRTSOCKET id = uglobal().generateSocketID(true);
const SRTSOCKET id = generateSocketID(true);

// Now map the group
return uglobal().addGroup(id, SRT_GROUP_TYPE(type)).set_id(id);
return addGroup(id, SRT_GROUP_TYPE(type)).set_id(id);
}

SRTSOCKET srt::CUDT::createGroup(SRT_GROUP_TYPE gt)
Expand All @@ -3447,7 +3449,7 @@
try
{
srt::sync::ScopedLock globlock(uglobal().m_GlobControlLock);
return newGroup(gt).id();
return uglobal().newGroup(gt).id();
// Note: potentially, after this function exits, the group
// could be deleted, immediately, from a separate thread (tho
// unlikely because the other thread would need some handle to
Expand Down Expand Up @@ -4314,15 +4316,17 @@
}
}

/* UNUSED. Restore if needed. NOTE: SRT_TSA_NEEDS_LOCKED(CUDTUnited::m_GlobControlLock)
vector<SRTSOCKET> srt::CUDT::existingSockets()
{
vector<SRTSOCKET> out;
for (CUDTUnited::sockets_t::iterator i = uglobal().m_Sockets.begin(); i != uglobal().m_Sockets.end(); ++i)
{
out.push_back(i->first);
}
return out;
}
*/
Fixed Show fixed Hide fixed

SRT_SOCKSTATUS srt::CUDT::getsockstate(SRTSOCKET u)
{
Expand Down
50 changes: 35 additions & 15 deletions srtcore/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,10 @@ class CUDTSocket

void construct();

SRT_ATTR_GUARDED_BY(m_ControlLock)
// Controversial whether it should stand. This lock is mainly
// for API things connected to this socket, while status is also
// set as atomic to allow multi-thread access.
// SRT_TSA_GUARDED_BY(m_ControlLock)
sync::atomic<SRT_SOCKSTATUS> m_Status; //< current socket state

/// Time when the socket is closed.
Expand Down Expand Up @@ -256,6 +259,13 @@ class CUDTUnited
/// @return The new UDT socket ID, or INVALID_SOCK.
SRTSOCKET newSocket(CUDTSocket** pps = NULL);

#if ENABLE_BONDING
// This is an internal function; 'type' should be pre-checked if it has a correct value.
// This doesn't have argument of GroupType due to header file conflicts.

SRT_TSA_NEEDS_LOCKED(m_GlobControlLock)
srt::CUDTGroup& newGroup(const int type);
#endif
/// Create (listener-side) a new socket associated with the incoming connection request.
/// @param [in] listen the listening socket ID.
/// @param [in] peer peer address.
Expand Down Expand Up @@ -324,7 +334,7 @@ class CUDTUnited
int epoll_release(const int eid);

#if ENABLE_BONDING
// [[using locked(m_GlobControlLock)]]
SRT_TSA_NEEDS_LOCKED(m_GlobControlLock)
CUDTGroup& addGroup(SRTSOCKET id, SRT_GROUP_TYPE type)
{
// This only ensures that the element exists.
Expand All @@ -343,10 +353,13 @@ class CUDTUnited
return *g;
}

SRT_TSA_NEEDS_NONLOCKED(m_GlobControlLock)
void deleteGroup(CUDTGroup* g);

SRT_TSA_NEEDS_LOCKED(m_GlobControlLock)
void deleteGroup_LOCKED(CUDTGroup* g);

// [[using locked(m_GlobControlLock)]]
SRT_TSA_NEEDS_LOCKED(m_GlobControlLock)
CUDTGroup* findPeerGroup_LOCKED(SRTSOCKET peergroup)
{
for (groups_t::iterator i = m_Groups.begin(); i != m_Groups.end(); ++i)
Expand Down Expand Up @@ -385,38 +398,44 @@ class CUDTUnited

private:
typedef std::map<SRTSOCKET, CUDTSocket*> sockets_t; // stores all the socket structures
SRT_ATTR_GUARDED_BY(m_GlobControlLock)
SRT_TSA_GUARDED_BY(m_GlobControlLock)
sockets_t m_Sockets;

#if ENABLE_BONDING
typedef std::map<SRTSOCKET, CUDTGroup*> groups_t;
SRT_ATTR_GUARDED_BY(m_GlobControlLock)
SRT_TSA_GUARDED_BY(m_GlobControlLock)
groups_t m_Groups;
#endif

SRT_TSA_LOCK_ORDERS_AFTER(CUDT::m_ConnectionLock)
sync::Mutex m_GlobControlLock; // used to synchronize UDT API

sync::Mutex m_IDLock; // used to synchronize ID generation

SRTSOCKET m_SocketIDGenerator; // seed to generate a new unique socket ID
SRTSOCKET m_SocketIDGenerator_init; // Keeps track of the very first one

SRT_ATTR_GUARDED_BY(m_GlobControlLock)
SRT_TSA_GUARDED_BY(m_GlobControlLock)
std::map<int64_t, std::set<SRTSOCKET> >
m_PeerRec; // record sockets from peers to avoid repeated connection request, int64_t = (socker_id << 30) + isn

private:
friend struct FLookupSocketWithEvent_LOCKED;

SRT_TSA_NEEDS_NONLOCKED(m_GlobControlLock)
CUDTSocket* locateSocket(SRTSOCKET u, ErrorHandling erh = ERH_RETURN);
// This function does the same as locateSocket, except that:
// - lock on m_GlobControlLock is expected (so that you don't unlock between finding and using)
// - only return NULL if not found
SRT_TSA_NEEDS_LOCKED(m_GlobControlLock)
CUDTSocket* locateSocket_LOCKED(SRTSOCKET u);
CUDTSocket* locatePeer(const sockaddr_any& peer, const SRTSOCKET id, int32_t isn);

#if ENABLE_BONDING
SRT_TSA_NEEDS_NONLOCKED(m_GlobControlLock)
CUDTGroup* locateAcquireGroup(SRTSOCKET u, ErrorHandling erh = ERH_RETURN);

SRT_TSA_NEEDS_NONLOCKED(m_GlobControlLock)
CUDTGroup* acquireSocketsGroup(CUDTSocket* s);

struct GroupKeeper
Expand Down Expand Up @@ -463,37 +482,38 @@ class CUDTUnited
const sockaddr_any& reqaddr, const CSrtMuxerConfig& cfgSocket);

private:
SRT_ATTR_GUARDED_BY(m_GlobControlLock)
SRT_TSA_GUARDED_BY(m_GlobControlLock)
std::map<int, CMultiplexer> m_mMultiplexer; // UDP multiplexer

/// UDT network information cache.
/// Existence is guarded by m_GlobControlLock, but the cache itself is thread-safe.
SRT_ATTR_GUARDED_BY(m_GlobControlLock)
SRT_TSA_PT_GUARDED_BY(m_GlobControlLock)
CCache<CInfoBlock>* const m_pCache;

private:
srt::sync::atomic<bool> m_bClosing;
sync::atomic<bool> m_bClosing;
sync::Mutex m_GCStopLock;
sync::Condition m_GCStopCond;

sync::Mutex m_InitLock;
SRT_ATTR_GUARDED_BY(m_InitLock)
SRT_TSA_GUARDED_BY(m_InitLock)
int m_iInstanceCount; // number of startup() called by application
SRT_ATTR_GUARDED_BY(m_InitLock)
bool m_bGCStatus; // if the GC thread is working (true)
sync::atomic<bool> m_bGCStatus; // if the GC thread is working (true)

SRT_ATTR_GUARDED_BY(m_InitLock)
SRT_TSA_GUARDED_BY(m_InitLock)
sync::CThread m_GCThread;
static void* garbageCollect(void*);

SRT_ATTR_GUARDED_BY(m_GlobControlLock)
SRT_TSA_GUARDED_BY(m_GlobControlLock)
sockets_t m_ClosedSockets; // temporarily store closed sockets
#if ENABLE_BONDING
SRT_ATTR_GUARDED_BY(m_GlobControlLock)
SRT_TSA_GUARDED_BY(m_GlobControlLock)
groups_t m_ClosedGroups;
#endif

void checkBrokenSockets();

SRT_TSA_NEEDS_LOCKED(m_GlobControlLock)
void removeSocket(const SRTSOCKET u);

CEPoll m_EPoll; // handling epoll data structures and events
Expand Down
16 changes: 8 additions & 8 deletions srtcore/buffer_snd.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,14 @@ class CSndBuffer
/// @param [in] data pointer to the user data block.
/// @param [in] len size of the block.
/// @param [inout] w_mctrl Message control data
SRT_ATTR_EXCLUDES(m_BufLock)
SRT_TSA_NEEDS_NONLOCKED(m_BufLock)
void addBuffer(const char* data, int len, SRT_MSGCTRL& w_mctrl);

/// Read a block of data from file and insert it into the sending list.
/// @param [in] ifs input file stream.
/// @param [in] len size of the block.
/// @return actual size of data added from the file.
SRT_ATTR_EXCLUDES(m_BufLock)
SRT_TSA_NEEDS_NONLOCKED(m_BufLock)
int addBufferFromFile(std::fstream& ifs, int len);

// Special values that can be returned by readData.
Expand All @@ -126,12 +126,12 @@ class CSndBuffer
/// @param [in] kflags Odd|Even crypto key flag
/// @param [out] seqnoinc the number of packets skipped due to TTL, so that seqno should be incremented.
/// @return Actual length of data read.
SRT_ATTR_EXCLUDES(m_BufLock)
SRT_TSA_NEEDS_NONLOCKED(m_BufLock)
int readData(CPacket& w_packet, time_point& w_origintime, int kflgs, int& w_seqnoinc);

/// Peek an information on the next original data packet to send.
/// @return origin time stamp of the next packet; epoch start time otherwise.
SRT_ATTR_EXCLUDES(m_BufLock)
SRT_TSA_NEEDS_NONLOCKED(m_BufLock)
time_point peekNextOriginal() const;

struct DropRange
Expand All @@ -155,14 +155,14 @@ class CSndBuffer
/// @retval >0 Length of the data read.
/// @retval READ_NONE No data available or @a offset points out of the buffer occupied space.
/// @retval READ_DROP The call requested data drop due to TTL exceeded, to be handled first.
SRT_ATTR_EXCLUDES(m_BufLock)
SRT_TSA_NEEDS_NONLOCKED(m_BufLock)
int readData(const int offset, CPacket& w_packet, time_point& w_origintime, DropRange& w_drop);

/// Get the time of the last retransmission (if any) of the DATA packet.
/// @param [in] offset offset from the last ACK point (backward sequence number difference)
///
/// @return Last time of the last retransmission event for the corresponding DATA packet.
SRT_ATTR_EXCLUDES(m_BufLock)
SRT_TSA_NEEDS_NONLOCKED(m_BufLock)
time_point getPacketRexmitTime(const int offset);

/// Update the ACK point and may release/unmap/return the user data according to the flag.
Expand All @@ -175,7 +175,7 @@ class CSndBuffer
/// @return Current size of the data in the sending list.
int getCurrBufSize() const;

SRT_ATTR_EXCLUDES(m_BufLock)
SRT_TSA_NEEDS_NONLOCKED(m_BufLock)
int dropLateData(int& bytes, int32_t& w_first_msgno, const time_point& too_late_time);

void updAvgBufSize(const time_point& time);
Expand All @@ -199,7 +199,7 @@ class CSndBuffer

/// @brief Get the buffering delay of the oldest message in the buffer.
/// @return the delay value.
SRT_ATTR_EXCLUDES(m_BufLock)
SRT_TSA_NEEDS_NONLOCKED(m_BufLock)
duration getBufferingDelay(const time_point& tnow) const;

uint64_t getInRatePeriod() const { return m_rateEstimator.getInRatePeriod(); }
Expand Down
Loading
Loading