Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Fixes and rework concerning TSA reports #2986

Draft
wants to merge 10 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -850,7 +850,7 @@ if (ENABLE_THREAD_CHECK)
endif()

if (ENABLE_CLANG_TSA)
list(APPEND SRT_EXTRA_CFLAGS "-Wthread-safety")
list(APPEND SRT_EXTRA_CFLAGS "-Wthread-safety -Wthread-safety-beta")
message(STATUS "Clang TSA: Enabled")
endif()

Expand Down
22 changes: 7 additions & 15 deletions srtcore/api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ srt::CUDTSocket::~CUDTSocket()
releaseMutex(m_ControlLock);
}

SRT_TSA_DISABLED // Uses m_Status that should be guarded, but for reading it is enough to be atomic
SRT_SOCKSTATUS srt::CUDTSocket::getStatus()
{
// TTL in CRendezvousQueue::updateConnStatus() will set m_bConnecting to false.
Expand Down Expand Up @@ -128,6 +129,7 @@ void srt::CUDTSocket::breakSocket_LOCKED()
setClosed();
}

SRT_TSA_DISABLED // Uses m_Status that should be guarded, but for reading it is enough to be atomic
void srt::CUDTSocket::setClosed()
{
m_Status = SRTS_CLOSED;
Expand Down Expand Up @@ -2340,7 +2342,7 @@ int srt::CUDTUnited::selectEx(const vector<SRTSOCKET>& fds,

if (readfds)
{
if ((s->core().m_bConnected && s->core().m_pRcvBuffer->isRcvDataReady()) ||
if ((s->core().m_bConnected && s->core().isRcvBufferReady()) ||
(s->core().m_bListening && (s->m_QueuedSockets.size() > 0)))
{
readfds->push_back(s->m_SocketID);
Expand Down Expand Up @@ -3508,12 +3510,12 @@ srt::CUDT::APIError::APIError(CodeMajor mj, CodeMinor mn, int syserr)
// This doesn't have argument of GroupType due to header file conflicts.

// [[using locked(s_UDTUnited.m_GlobControlLock)]]
srt::CUDTGroup& srt::CUDT::newGroup(const int type)
srt::CUDTGroup& srt::CUDTUnited::newGroup(const int type)
{
const SRTSOCKET id = uglobal().generateSocketID(true);
const SRTSOCKET id = generateSocketID(true);

// Now map the group
return uglobal().addGroup(id, SRT_GROUP_TYPE(type)).set_id(id);
return addGroup(id, SRT_GROUP_TYPE(type)).set_id(id);
}

SRTSOCKET srt::CUDT::createGroup(SRT_GROUP_TYPE gt)
Expand All @@ -3524,7 +3526,7 @@ SRTSOCKET srt::CUDT::createGroup(SRT_GROUP_TYPE gt)
try
{
srt::sync::ScopedLock globlock(uglobal().m_GlobControlLock);
return newGroup(gt).id();
return uglobal().newGroup(gt).id();
// Note: potentially, after this function exits, the group
// could be deleted, immediately, from a separate thread (tho
// unlikely because the other thread would need some handle to
Expand Down Expand Up @@ -4391,16 +4393,6 @@ srt::CUDT* srt::CUDT::getUDTHandle(SRTSOCKET u)
}
}

vector<SRTSOCKET> srt::CUDT::existingSockets()
{
vector<SRTSOCKET> out;
for (CUDTUnited::sockets_t::iterator i = uglobal().m_Sockets.begin(); i != uglobal().m_Sockets.end(); ++i)
{
out.push_back(i->first);
}
return out;
}

SRT_SOCKSTATUS srt::CUDT::getsockstate(SRTSOCKET u)
{
try
Expand Down
55 changes: 40 additions & 15 deletions srtcore/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,10 @@ class CUDTSocket
}


SRT_ATTR_GUARDED_BY(m_ControlLock)
// Controversial whether it should stand. This lock is mainly
// for API things connected to this socket, while status is also
// set as atomic to allow multi-thread access.
// SRT_TSA_GUARDED_BY(m_ControlLock)
sync::atomic<SRT_SOCKSTATUS> m_Status; //< current socket state

/// Time when the socket is closed.
Expand Down Expand Up @@ -269,6 +272,13 @@ class CUDTUnited
/// @return The new UDT socket ID, or INVALID_SOCK.
SRTSOCKET newSocket(CUDTSocket** pps = NULL);

#if ENABLE_BONDING
// This is an internal function; 'type' should be pre-checked if it has a correct value.
// This doesn't have argument of GroupType due to header file conflicts.

SRT_TSA_NEEDS_LOCKED(m_GlobControlLock)
srt::CUDTGroup& newGroup(const int type);
#endif
/// Create (listener-side) a new socket associated with the incoming connection request.
/// @param [in] listen the listening socket ID.
/// @param [in] peer peer address.
Expand Down Expand Up @@ -337,7 +347,8 @@ class CUDTUnited
int epoll_release(const int eid);

#if ENABLE_BONDING
SRT_ATR_NODISCARD SRT_ATTR_REQUIRES(m_GlobControlLock)
SRT_ATR_NODISCARD
SRT_TSA_NEEDS_LOCKED(m_GlobControlLock)
CUDTGroup& addGroup(SRTSOCKET id, SRT_GROUP_TYPE type)
{
// This only ensures that the element exists.
Expand All @@ -356,10 +367,14 @@ class CUDTUnited
return *g;
}

SRT_TSA_NEEDS_NONLOCKED(m_GlobControlLock)
void deleteGroup(CUDTGroup* g);

SRT_TSA_NEEDS_LOCKED(m_GlobControlLock)
void deleteGroup_LOCKED(CUDTGroup* g);

SRT_ATR_NODISCARD SRT_ATTR_REQUIRES(m_GlobControlLock)
SRT_ATR_NODISCARD
SRT_TSA_NEEDS_LOCKED(m_GlobControlLock)
CUDTGroup* findPeerGroup_LOCKED(SRTSOCKET peergroup)
{
for (groups_t::iterator i = m_Groups.begin(); i != m_Groups.end(); ++i)
Expand Down Expand Up @@ -398,38 +413,47 @@ class CUDTUnited

private:
typedef std::map<SRTSOCKET, CUDTSocket*> sockets_t; // stores all the socket structures
SRT_ATTR_GUARDED_BY(m_GlobControlLock)
SRT_TSA_GUARDED_BY(m_GlobControlLock)
sockets_t m_Sockets;

#if ENABLE_BONDING
typedef std::map<SRTSOCKET, CUDTGroup*> groups_t;
SRT_ATTR_GUARDED_BY(m_GlobControlLock)
SRT_TSA_GUARDED_BY(m_GlobControlLock)
groups_t m_Groups;
#endif

// XXX Desired, but blocked because the older clang compilers
// do not handle this declaration correctly. Unblock in devel builds
// for checking.
// SRT_TSA_LOCK_ORDERS_AFTER(CUDT::m_ConnectionLock)
sync::Mutex m_GlobControlLock; // used to synchronize UDT API

sync::Mutex m_IDLock; // used to synchronize ID generation

SRTSOCKET m_SocketIDGenerator; // seed to generate a new unique socket ID
SRTSOCKET m_SocketIDGenerator_init; // Keeps track of the very first one

SRT_ATTR_GUARDED_BY(m_GlobControlLock)
SRT_TSA_GUARDED_BY(m_GlobControlLock)
std::map<int64_t, std::set<SRTSOCKET> >
m_PeerRec; // record sockets from peers to avoid repeated connection request, int64_t = (socker_id << 30) + isn

private:
friend struct FLookupSocketWithEvent_LOCKED;

SRT_TSA_NEEDS_NONLOCKED(m_GlobControlLock)
CUDTSocket* locateSocket(SRTSOCKET u, ErrorHandling erh = ERH_RETURN);
// This function does the same as locateSocket, except that:
// - lock on m_GlobControlLock is expected (so that you don't unlock between finding and using)
// - only return NULL if not found
SRT_TSA_NEEDS_LOCKED(m_GlobControlLock)
CUDTSocket* locateSocket_LOCKED(SRTSOCKET u);
CUDTSocket* locatePeer(const sockaddr_any& peer, const SRTSOCKET id, int32_t isn);

#if ENABLE_BONDING
SRT_TSA_NEEDS_NONLOCKED(m_GlobControlLock)
CUDTGroup* locateAcquireGroup(SRTSOCKET u, ErrorHandling erh = ERH_RETURN);

SRT_TSA_NEEDS_NONLOCKED(m_GlobControlLock)
CUDTGroup* acquireSocketsGroup(CUDTSocket* s);

struct GroupKeeper
Expand Down Expand Up @@ -520,37 +544,38 @@ class CUDTUnited
const sockaddr_any& reqaddr, const CSrtMuxerConfig& cfgSocket);

private:
SRT_ATTR_GUARDED_BY(m_GlobControlLock)
SRT_TSA_GUARDED_BY(m_GlobControlLock)
std::map<int, CMultiplexer> m_mMultiplexer; // UDP multiplexer

/// UDT network information cache.
/// Existence is guarded by m_GlobControlLock, but the cache itself is thread-safe.
SRT_ATTR_GUARDED_BY(m_GlobControlLock)
SRT_TSA_PT_GUARDED_BY(m_GlobControlLock)
CCache<CInfoBlock>* const m_pCache;

private:
srt::sync::atomic<bool> m_bClosing;
sync::atomic<bool> m_bClosing;
sync::Mutex m_GCStopLock;
sync::Condition m_GCStopCond;

sync::Mutex m_InitLock;
SRT_ATTR_GUARDED_BY(m_InitLock)
SRT_TSA_GUARDED_BY(m_InitLock)
int m_iInstanceCount; // number of startup() called by application
SRT_ATTR_GUARDED_BY(m_InitLock)
bool m_bGCStatus; // if the GC thread is working (true)
sync::atomic<bool> m_bGCStatus; // if the GC thread is working (true)

SRT_ATTR_GUARDED_BY(m_InitLock)
SRT_TSA_GUARDED_BY(m_InitLock)
sync::CThread m_GCThread;
static void* garbageCollect(void*);

SRT_ATTR_GUARDED_BY(m_GlobControlLock)
SRT_TSA_GUARDED_BY(m_GlobControlLock)
sockets_t m_ClosedSockets; // temporarily store closed sockets
#if ENABLE_BONDING
SRT_ATTR_GUARDED_BY(m_GlobControlLock)
SRT_TSA_GUARDED_BY(m_GlobControlLock)
groups_t m_ClosedGroups;
#endif

void checkBrokenSockets();

SRT_TSA_NEEDS_LOCKED(m_GlobControlLock)
void removeSocket(const SRTSOCKET u);

CEPoll m_EPoll; // handling epoll data structures and events
Expand Down
9 changes: 5 additions & 4 deletions srtcore/buffer_rcv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,10 @@ std::pair<int, int> CRcvBuffer::dropUpTo(int32_t seqno)
return std::make_pair(0, 0);
}

m_iMaxPosOff -= len;
if (m_iMaxPosOff < 0)
m_iMaxPosOff = 0;
int newmax = m_iMaxPosOff - len;
if (newmax < 0)
newmax = 0;
m_iMaxPosOff = newmax;

int iNumDropped = 0; // Number of dropped packets that were missing.
int iNumDiscarded = 0; // The number of dropped packets that existed in the buffer.
Expand Down Expand Up @@ -776,7 +777,7 @@ void CRcvBuffer::countBytes(int pkts, int bytes)
if (!m_uAvgPayloadSz)
m_uAvgPayloadSz = bytes;
else
m_uAvgPayloadSz = avg_iir<100>(m_uAvgPayloadSz, (unsigned) bytes);
m_uAvgPayloadSz = avg_iir<100, unsigned>(m_uAvgPayloadSz, bytes);
}
}

Expand Down
11 changes: 8 additions & 3 deletions srtcore/buffer_rcv.h
Original file line number Diff line number Diff line change
Expand Up @@ -358,10 +358,15 @@ class CRcvBuffer
const size_t m_szSize; // size of the array of units (buffer)
CUnitQueue* m_pUnitQueue; // the shared unit queue

int m_iStartSeqNo;
// ATOMIC because getStartSeqNo() may be called from other thread
// than CUDT's receiver worker thread. Even if it's not a problem
// if this value is a bit outdated, it must be read solid.
sync::atomic<int> m_iStartSeqNo;
int m_iStartPos; // the head position for I/O (inclusive)
int m_iFirstNonreadPos; // First position that can't be read (<= m_iLastAckPos)
int m_iMaxPosOff; // the furthest data position

// ATOMIC: sometimes this value is checked for buffer emptiness
sync::atomic<int> m_iMaxPosOff; // the furthest data position
int m_iNotch; // the starting read point of the first unit

size_t m_numOutOfOrderPackets; // The number of stored packets with "inorder" flag set to false
Expand Down Expand Up @@ -408,7 +413,7 @@ class CRcvBuffer
mutable sync::Mutex m_BytesCountLock; // used to protect counters operations
int m_iBytesCount; // Number of payload bytes in the buffer
int m_iPktsCount; // Number of payload bytes in the buffer
unsigned m_uAvgPayloadSz; // Average payload size for dropped bytes estimation
sync::atomic<unsigned> m_uAvgPayloadSz; // Average payload size for dropped bytes estimation
};

} // namespace srt
Expand Down
16 changes: 8 additions & 8 deletions srtcore/buffer_snd.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,14 @@ class CSndBuffer
/// @param [in] data pointer to the user data block.
/// @param [in] len size of the block.
/// @param [inout] w_mctrl Message control data
SRT_ATTR_EXCLUDES(m_BufLock)
SRT_TSA_NEEDS_NONLOCKED(m_BufLock)
void addBuffer(const char* data, int len, SRT_MSGCTRL& w_mctrl);

/// Read a block of data from file and insert it into the sending list.
/// @param [in] ifs input file stream.
/// @param [in] len size of the block.
/// @return actual size of data added from the file.
SRT_ATTR_EXCLUDES(m_BufLock)
SRT_TSA_NEEDS_NONLOCKED(m_BufLock)
int addBufferFromFile(std::fstream& ifs, int len);

// Special values that can be returned by readData.
Expand All @@ -126,12 +126,12 @@ class CSndBuffer
/// @param [in] kflags Odd|Even crypto key flag
/// @param [out] seqnoinc the number of packets skipped due to TTL, so that seqno should be incremented.
/// @return Actual length of data read.
SRT_ATTR_EXCLUDES(m_BufLock)
SRT_TSA_NEEDS_NONLOCKED(m_BufLock)
int readData(CPacket& w_packet, time_point& w_origintime, int kflgs, int& w_seqnoinc);

/// Peek an information on the next original data packet to send.
/// @return origin time stamp of the next packet; epoch start time otherwise.
SRT_ATTR_EXCLUDES(m_BufLock)
SRT_TSA_NEEDS_NONLOCKED(m_BufLock)
time_point peekNextOriginal() const;

struct DropRange
Expand All @@ -155,14 +155,14 @@ class CSndBuffer
/// @retval >0 Length of the data read.
/// @retval READ_NONE No data available or @a offset points out of the buffer occupied space.
/// @retval READ_DROP The call requested data drop due to TTL exceeded, to be handled first.
SRT_ATTR_EXCLUDES(m_BufLock)
SRT_TSA_NEEDS_NONLOCKED(m_BufLock)
int readData(const int offset, CPacket& w_packet, time_point& w_origintime, DropRange& w_drop);

/// Get the time of the last retransmission (if any) of the DATA packet.
/// @param [in] offset offset from the last ACK point (backward sequence number difference)
///
/// @return Last time of the last retransmission event for the corresponding DATA packet.
SRT_ATTR_EXCLUDES(m_BufLock)
SRT_TSA_NEEDS_NONLOCKED(m_BufLock)
time_point getPacketRexmitTime(const int offset);

/// Update the ACK point and may release/unmap/return the user data according to the flag.
Expand All @@ -175,7 +175,7 @@ class CSndBuffer
/// @return Current size of the data in the sending list.
int getCurrBufSize() const;

SRT_ATTR_EXCLUDES(m_BufLock)
SRT_TSA_NEEDS_NONLOCKED(m_BufLock)
int dropLateData(int& bytes, int32_t& w_first_msgno, const time_point& too_late_time);

void updAvgBufSize(const time_point& time);
Expand All @@ -199,7 +199,7 @@ class CSndBuffer

/// @brief Get the buffering delay of the oldest message in the buffer.
/// @return the delay value.
SRT_ATTR_EXCLUDES(m_BufLock)
SRT_TSA_NEEDS_NONLOCKED(m_BufLock)
duration getBufferingDelay(const time_point& tnow) const;

uint64_t getInRatePeriod() const { return m_rateEstimator.getInRatePeriod(); }
Expand Down
Loading
Loading