Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refax: smaller fixes and added utilities #2504

Merged
merged 16 commits into from
Sep 19, 2023
6 changes: 5 additions & 1 deletion srtcore/buffer_snd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,11 @@ int CSndBuffer::readData(CPacket& w_packet, steady_clock::time_point& w_srctime,
continue;
}

HLOGC(bslog.Debug, log << CONID() << "CSndBuffer: extracting packet size=" << readlen << " to send");
HLOGC(bslog.Debug, log << CONID() << "CSndBuffer: picked up packet to send: size=" << readlen
<< " #" << w_packet.getMsgSeq()
<< " %" << w_packet.m_iSeqNo
<< " !" << BufferStamp(w_packet.m_pcData, w_packet.getLength()));

break;
}

Expand Down
17 changes: 17 additions & 0 deletions srtcore/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -1430,6 +1430,23 @@ inline bool checkMappedIPv4(const sockaddr_in6& sa)
return checkMappedIPv4(addr);
}

inline std::string FormatLossArray(const std::vector< std::pair<int32_t, int32_t> >& lra)
{
std::ostringstream os;

os << "[ ";
for (std::vector< std::pair<int32_t, int32_t> >::const_iterator i = lra.begin(); i != lra.end(); ++i)
{
int len = CSeqNo::seqoff(i->first, i->second);
os << "%" << i->first;
if (len > 1)
os << "+" << len;
os << " ";
}

os << "]";
return os.str();
}

} // namespace srt

Expand Down
3 changes: 2 additions & 1 deletion srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11338,7 +11338,7 @@ bool srt::CUDT::checkExpTimer(const steady_clock::time_point& currtime, int chec
// Application will detect this when it calls any UDT methods next time.
//
HLOGC(xtlog.Debug,
log << CONID() << "CONNECTION EXPIRED after " << count_milliseconds(currtime - last_rsp_time) << "ms");
log << CONID() << "CONNECTION EXPIRED after " << FormatDuration<DUNIT_MS>(currtime - last_rsp_time) << " - BREAKING");
m_bClosing = true;
m_bBroken = true;
m_iBrokenCounter = 30;
Expand Down Expand Up @@ -11566,6 +11566,7 @@ void srt::CUDT::completeBrokenConnectionDependencies(int errorcode)
{
// XXX This somehow can cause a deadlock
// uglobal()->close(m_parent);
LOGC(smlog.Debug, log << "updateBrokenConnection...: BROKEN SOCKET @" << m_SocketID << " - CLOSING, to be removed from group.");
m_parent->setBrokenClosed();
}
#endif
Expand Down
5 changes: 5 additions & 0 deletions srtcore/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,11 @@ class CUDT
return (int32_t) sync::count_microseconds(from_time - tsStartTime);
}

static void setPacketTS(CPacket& p, const time_point& start_time, const time_point& ts)
{
p.m_iTimeStamp = makeTS(ts, start_time);
}

/// @brief Set the timestamp field of the packet using the provided value (no check)
/// @param p the packet structure to set the timestamp on.
/// @param ts timestamp to use as a source for packet timestamp.
Expand Down
6 changes: 3 additions & 3 deletions srtcore/group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -278,8 +278,8 @@ CUDTGroup::CUDTGroup(SRT_GROUP_TYPE gtype)
, m_iLastSchedMsgNo(SRT_MSGNO_NONE)
{
setupMutex(m_GroupLock, "Group");
setupMutex(m_RcvDataLock, "RcvData");
setupCond(m_RcvDataCond, "RcvData");
setupMutex(m_RcvDataLock, "G/RcvData");
setupCond(m_RcvDataCond, "G/RcvData");
m_RcvEID = m_Global.m_EPoll.create(&m_RcvEpolld);
m_SndEID = m_Global.m_EPoll.create(&m_SndEpolld);

Expand Down Expand Up @@ -861,7 +861,7 @@ void CUDTGroup::syncWithSocket(const CUDT& core, const HandshakeSide side)
// Get the latency (possibly fixed against the opposite side)
// from the first socket (core.m_iTsbPdDelay_ms),
// and set it on the current socket.
set_latency(core.m_iTsbPdDelay_ms * int64_t(1000));
set_latency_us(core.m_iTsbPdDelay_ms * int64_t(1000));
}

void CUDTGroup::close()
Expand Down
4 changes: 2 additions & 2 deletions srtcore/group.h
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,7 @@ class CUDTGroup

private:
// Fields required for SRT_GTYPE_BACKUP groups.
senderBuffer_t m_SenderBuffer;
senderBuffer_t m_SenderBuffer; // This mechanism is to be removed on group-common sndbuf
int32_t m_iSndOldestMsgNo; // oldest position in the sender buffer
sync::atomic<int32_t> m_iSndAckedMsgNo;
uint32_t m_uOPT_MinStabilityTimeout_us;
Expand Down Expand Up @@ -800,7 +800,7 @@ class CUDTGroup
SRTU_PROPERTY_RW_CHAIN(CUDTGroup, SRT_GROUP_TYPE, type, m_type);
SRTU_PROPERTY_RW_CHAIN(CUDTGroup, int32_t, currentSchedSequence, m_iLastSchedSeqNo);
SRTU_PROPERTY_RRW(std::set<int>&, epollset, m_sPollID);
SRTU_PROPERTY_RW_CHAIN(CUDTGroup, int64_t, latency, m_iTsbPdDelay_us);
SRTU_PROPERTY_RW_CHAIN(CUDTGroup, int64_t, latency_us, m_iTsbPdDelay_us);
SRTU_PROPERTY_RO(bool, closing, m_bClosing);
};

Expand Down
2 changes: 2 additions & 0 deletions srtcore/handshake.h
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,8 @@ class CHandShake
// Applicable only when m_iVersion == HS_VERSION_SRT1
int32_t flags() { return m_iType; }

bool v5orHigher() { return m_iVersion > 4; }

public:
int32_t m_iVersion; // UDT version (HS_VERSION_* symbols)
int32_t m_iType; // UDT4: socket type (only UDT_DGRAM is valid); SRT1: extension flags
Expand Down
15 changes: 5 additions & 10 deletions srtcore/list.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,16 +99,7 @@ srt::CSndLossList::~CSndLossList()

void srt::CSndLossList::traceState() const
{
int pos = m_iHead;
while (pos != SRT_SEQNO_NONE)
{
std::cout << pos << ":[" << m_caSeq[pos].seqstart;
if (m_caSeq[pos].seqend != SRT_SEQNO_NONE)
std::cout << ", " << m_caSeq[pos].seqend;
std::cout << "], ";
pos = m_caSeq[pos].inext;
}
std::cout << "\n";
traceState(std::cout) << "\n";
}

int srt::CSndLossList::insert(int32_t seqno1, int32_t seqno2)
Expand Down Expand Up @@ -508,6 +499,10 @@ srt::CRcvLossList::~CRcvLossList()

int srt::CRcvLossList::insert(int32_t seqno1, int32_t seqno2)
{
SRT_ASSERT(seqno1 != SRT_SEQNO_NONE && seqno2 != SRT_SEQNO_NONE);
// Make sure that seqno2 isn't earlier than seqno1.
SRT_ASSERT(CSeqNo::seqcmp(seqno1, seqno2) <= 0);

// Data to be inserted must be larger than all those in the list
if (m_iLargestSeq != SRT_SEQNO_NONE && CSeqNo::seqcmp(seqno1, m_iLargestSeq) <= 0)
{
Expand Down
19 changes: 19 additions & 0 deletions srtcore/list.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,25 @@ class CSndLossList
/// @return The seq. no. or -1 if the list is empty.
int32_t popLostSeq();

template <class Stream>
Stream& traceState(Stream& sout) const
{
int pos = m_iHead;
while (pos != SRT_SEQNO_NONE)
{
sout << "[" << pos << "]:" << m_caSeq[pos].seqstart;
if (m_caSeq[pos].seqend != SRT_SEQNO_NONE)
sout << ":" << m_caSeq[pos].seqend;
if (m_caSeq[pos].inext == -1)
sout << "=|";
else
sout << "->[" << m_caSeq[pos].inext << "]";
sout << ", ";
pos = m_caSeq[pos].inext;
}
sout << " {len:" << m_iLength << " head:" << m_iHead << " last:" << m_iLastInsertPos << "}";
return sout;
}
void traceState() const;

// Debug/unittest support.
Expand Down
2 changes: 1 addition & 1 deletion srtcore/logging.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ written by
#define HLOGP LOGP
#define HLOGF LOGF

#define IF_HEAVY_LOGGING(instr) instr
#define IF_HEAVY_LOGGING(instr,...) instr,##__VA_ARGS__

#else

Expand Down
33 changes: 22 additions & 11 deletions srtcore/socketconfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,27 @@ written by
#include "srt.h"
#include "socketconfig.h"

namespace srt
{
int RcvBufferSizeOptionToValue(int val, int flightflag, int mss)
{
// Mimimum recv buffer size is 32 packets
const int mssin_size = mss - CPacket::UDP_HDR_SIZE;

int bufsize;
if (val > mssin_size * CSrtConfig::DEF_MIN_FLIGHT_PKT)
bufsize = val / mssin_size;
else
bufsize = CSrtConfig::DEF_MIN_FLIGHT_PKT;

// recv buffer MUST not be greater than FC size
if (bufsize > flightflag)
bufsize = flightflag;

return bufsize;
}
}

using namespace srt;
extern const int32_t SRT_DEF_VERSION = SrtParseVersion(SRT_VERSION);

Expand Down Expand Up @@ -122,17 +143,7 @@ struct CSrtConfigSetter<SRTO_RCVBUF>
if (val <= 0)
throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);

// Mimimum recv buffer size is 32 packets
const int mssin_size = co.iMSS - CPacket::UDP_HDR_SIZE;

if (val > mssin_size * co.DEF_MIN_FLIGHT_PKT)
co.iRcvBufSize = val / mssin_size;
else
co.iRcvBufSize = co.DEF_MIN_FLIGHT_PKT;

// recv buffer MUST not be greater than FC size
if (co.iRcvBufSize > co.iFlightFlagSize)
co.iRcvBufSize = co.iFlightFlagSize;
co.iRcvBufSize = srt::RcvBufferSizeOptionToValue(val, co.iFlightFlagSize, co.iMSS);
}
};

Expand Down
3 changes: 3 additions & 0 deletions srtcore/socketconfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,9 @@ inline bool cast_optval(const void* optval, int optlen)
return false;
}


int RcvBufferSizeOptionToValue(int optval, int flightflag, int mss);

} // namespace srt

struct SRT_SocketOptionObject
Expand Down
2 changes: 1 addition & 1 deletion srtcore/sync.h
Original file line number Diff line number Diff line change
Expand Up @@ -771,7 +771,7 @@ struct DurationUnitName<DUNIT_S>
template<eDurationUnit UNIT>
inline std::string FormatDuration(const steady_clock::duration& dur)
{
return Sprint(DurationUnitName<UNIT>::count(dur)) + DurationUnitName<UNIT>::name();
return Sprint(std::fixed, DurationUnitName<UNIT>::count(dur)) + DurationUnitName<UNIT>::name();
}

inline std::string FormatDuration(const steady_clock::duration& dur)
Expand Down
11 changes: 10 additions & 1 deletion srtcore/tsbpd_time.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,16 @@ CTsbpdTime::time_point CTsbpdTime::getTsbPdTimeBase(uint32_t timestamp_us) const

CTsbpdTime::time_point CTsbpdTime::getPktTsbPdTime(uint32_t usPktTimestamp) const
{
return getPktTsbPdBaseTime(usPktTimestamp) + m_tdTsbPdDelay + microseconds_from(m_DriftTracer.drift());
time_point value = getPktTsbPdBaseTime(usPktTimestamp) + m_tdTsbPdDelay + microseconds_from(m_DriftTracer.drift());

/*
HLOGC(brlog.Debug, log << "getPktTsbPdTime:"
<< " BASE=" << FormatTime(m_tsTsbPdTimeBase)
<< " TS=" << usPktTimestamp << "us, lat=" << FormatDuration<DUNIT_US>(m_tdTsbPdDelay)
<< " DRF=" << m_DriftTracer.drift() << "us = " << FormatTime(value));
*/

return value;
}

CTsbpdTime::time_point CTsbpdTime::getPktTsbPdBaseTime(uint32_t usPktTimestamp) const
Expand Down
Loading