Repo created

This commit is contained in:
Fr4nz D13trich 2025-11-22 14:04:28 +01:00
parent 81b91f4139
commit f8c34fa5ee
22732 changed files with 4815320 additions and 2 deletions

View file

@ -0,0 +1,62 @@
/*
* Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef NET_DCSCTP_TX_MOCK_SEND_QUEUE_H_
#define NET_DCSCTP_TX_MOCK_SEND_QUEUE_H_
#include <cstdint>
#include <vector>
#include "absl/types/optional.h"
#include "api/array_view.h"
#include "api/units/timestamp.h"
#include "net/dcsctp/tx/send_queue.h"
#include "test/gmock.h"
namespace dcsctp {
class MockSendQueue : public SendQueue {
public:
MockSendQueue() {
ON_CALL(*this, Produce)
.WillByDefault([](webrtc::Timestamp now, size_t max_size) {
return absl::nullopt;
});
}
MOCK_METHOD(absl::optional<SendQueue::DataToSend>,
Produce,
(webrtc::Timestamp now, size_t max_size),
(override));
MOCK_METHOD(bool,
Discard,
(StreamID stream_id, OutgoingMessageId message_id),
(override));
MOCK_METHOD(void, PrepareResetStream, (StreamID stream_id), (override));
MOCK_METHOD(bool, HasStreamsReadyToBeReset, (), (const, override));
MOCK_METHOD(std::vector<StreamID>, GetStreamsReadyToBeReset, (), (override));
MOCK_METHOD(void, CommitResetStreams, (), (override));
MOCK_METHOD(void, RollbackResetStreams, (), (override));
MOCK_METHOD(void, Reset, (), (override));
MOCK_METHOD(size_t, buffered_amount, (StreamID stream_id), (const, override));
MOCK_METHOD(size_t, total_buffered_amount, (), (const, override));
MOCK_METHOD(size_t,
buffered_amount_low_threshold,
(StreamID stream_id),
(const, override));
MOCK_METHOD(void,
SetBufferedAmountLowThreshold,
(StreamID stream_id, size_t bytes),
(override));
MOCK_METHOD(void, EnableMessageInterleaving, (bool enabled), (override));
};
} // namespace dcsctp
#endif // NET_DCSCTP_TX_MOCK_SEND_QUEUE_H_

View file

@ -0,0 +1,577 @@
/*
* Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "net/dcsctp/tx/outstanding_data.h"
#include <algorithm>
#include <set>
#include <utility>
#include <vector>
#include "api/units/time_delta.h"
#include "api/units/timestamp.h"
#include "net/dcsctp/common/math.h"
#include "net/dcsctp/common/sequence_numbers.h"
#include "net/dcsctp/public/types.h"
#include "rtc_base/checks.h"
#include "rtc_base/logging.h"
namespace dcsctp {
using ::webrtc::Timestamp;
// The number of times a packet must be NACKed before it's retransmitted.
// See https://tools.ietf.org/html/rfc4960#section-7.2.4
constexpr uint8_t kNumberOfNacksForRetransmission = 3;
// Returns how large a chunk will be, serialized, carrying the data
size_t OutstandingData::GetSerializedChunkSize(const Data& data) const {
return RoundUpTo4(data_chunk_header_size_ + data.size());
}
void OutstandingData::Item::Ack() {
if (lifecycle_ != Lifecycle::kAbandoned) {
lifecycle_ = Lifecycle::kActive;
}
ack_state_ = AckState::kAcked;
}
OutstandingData::Item::NackAction OutstandingData::Item::Nack(
bool retransmit_now) {
ack_state_ = AckState::kNacked;
++nack_count_;
if (!should_be_retransmitted() && !is_abandoned() &&
(retransmit_now || nack_count_ >= kNumberOfNacksForRetransmission)) {
// Nacked enough times - it's considered lost.
if (num_retransmissions_ < *max_retransmissions_) {
lifecycle_ = Lifecycle::kToBeRetransmitted;
return NackAction::kRetransmit;
}
Abandon();
return NackAction::kAbandon;
}
return NackAction::kNothing;
}
void OutstandingData::Item::MarkAsRetransmitted() {
lifecycle_ = Lifecycle::kActive;
ack_state_ = AckState::kUnacked;
nack_count_ = 0;
++num_retransmissions_;
}
void OutstandingData::Item::Abandon() {
RTC_DCHECK(!expires_at_.IsPlusInfinity() ||
max_retransmissions_ != MaxRetransmits::NoLimit());
lifecycle_ = Lifecycle::kAbandoned;
}
bool OutstandingData::Item::has_expired(Timestamp now) const {
return expires_at_ <= now;
}
bool OutstandingData::IsConsistent() const {
size_t actual_unacked_bytes = 0;
size_t actual_unacked_items = 0;
std::set<UnwrappedTSN> combined_to_be_retransmitted;
combined_to_be_retransmitted.insert(to_be_retransmitted_.begin(),
to_be_retransmitted_.end());
combined_to_be_retransmitted.insert(to_be_fast_retransmitted_.begin(),
to_be_fast_retransmitted_.end());
std::set<UnwrappedTSN> actual_combined_to_be_retransmitted;
UnwrappedTSN tsn = last_cumulative_tsn_ack_;
for (const Item& item : outstanding_data_) {
tsn.Increment();
if (item.is_outstanding()) {
actual_unacked_bytes += GetSerializedChunkSize(item.data());
++actual_unacked_items;
}
if (item.should_be_retransmitted()) {
actual_combined_to_be_retransmitted.insert(tsn);
}
}
return actual_unacked_bytes == unacked_bytes_ &&
actual_unacked_items == unacked_items_ &&
actual_combined_to_be_retransmitted == combined_to_be_retransmitted;
}
void OutstandingData::AckChunk(AckInfo& ack_info,
UnwrappedTSN tsn,
Item& item) {
if (!item.is_acked()) {
size_t serialized_size = GetSerializedChunkSize(item.data());
ack_info.bytes_acked += serialized_size;
if (item.is_outstanding()) {
unacked_bytes_ -= serialized_size;
--unacked_items_;
}
if (item.should_be_retransmitted()) {
RTC_DCHECK(to_be_fast_retransmitted_.find(tsn) ==
to_be_fast_retransmitted_.end());
to_be_retransmitted_.erase(tsn);
}
item.Ack();
ack_info.highest_tsn_acked = std::max(ack_info.highest_tsn_acked, tsn);
}
}
OutstandingData::AckInfo OutstandingData::HandleSack(
UnwrappedTSN cumulative_tsn_ack,
rtc::ArrayView<const SackChunk::GapAckBlock> gap_ack_blocks,
bool is_in_fast_recovery) {
OutstandingData::AckInfo ack_info(cumulative_tsn_ack);
// Erase all items up to cumulative_tsn_ack.
RemoveAcked(cumulative_tsn_ack, ack_info);
// ACK packets reported in the gap ack blocks
AckGapBlocks(cumulative_tsn_ack, gap_ack_blocks, ack_info);
// NACK and possibly mark for retransmit chunks that weren't acked.
NackBetweenAckBlocks(cumulative_tsn_ack, gap_ack_blocks, is_in_fast_recovery,
ack_info);
RTC_DCHECK(IsConsistent());
return ack_info;
}
OutstandingData::Item& OutstandingData::GetItem(UnwrappedTSN tsn) {
RTC_DCHECK(tsn > last_cumulative_tsn_ack_);
RTC_DCHECK(tsn < next_tsn());
int index = UnwrappedTSN::Difference(tsn, last_cumulative_tsn_ack_) - 1;
RTC_DCHECK(index >= 0);
RTC_DCHECK(index < static_cast<int>(outstanding_data_.size()));
return outstanding_data_[index];
}
const OutstandingData::Item& OutstandingData::GetItem(UnwrappedTSN tsn) const {
RTC_DCHECK(tsn > last_cumulative_tsn_ack_);
RTC_DCHECK(tsn < next_tsn());
int index = UnwrappedTSN::Difference(tsn, last_cumulative_tsn_ack_) - 1;
RTC_DCHECK(index >= 0);
RTC_DCHECK(index < static_cast<int>(outstanding_data_.size()));
return outstanding_data_[index];
}
void OutstandingData::RemoveAcked(UnwrappedTSN cumulative_tsn_ack,
AckInfo& ack_info) {
while (!outstanding_data_.empty() &&
last_cumulative_tsn_ack_ < cumulative_tsn_ack) {
UnwrappedTSN tsn = last_cumulative_tsn_ack_.next_value();
Item& item = outstanding_data_.front();
AckChunk(ack_info, tsn, item);
if (item.lifecycle_id().IsSet()) {
RTC_DCHECK(item.data().is_end);
if (item.is_abandoned()) {
ack_info.abandoned_lifecycle_ids.push_back(item.lifecycle_id());
} else {
ack_info.acked_lifecycle_ids.push_back(item.lifecycle_id());
}
}
outstanding_data_.pop_front();
last_cumulative_tsn_ack_.Increment();
}
stream_reset_breakpoint_tsns_.erase(stream_reset_breakpoint_tsns_.begin(),
stream_reset_breakpoint_tsns_.upper_bound(
cumulative_tsn_ack.next_value()));
}
void OutstandingData::AckGapBlocks(
UnwrappedTSN cumulative_tsn_ack,
rtc::ArrayView<const SackChunk::GapAckBlock> gap_ack_blocks,
AckInfo& ack_info) {
// Mark all non-gaps as ACKED (but they can't be removed) as (from RFC)
// "SCTP considers the information carried in the Gap Ack Blocks in the
// SACK chunk as advisory.". Note that when NR-SACK is supported, this can be
// handled differently.
for (auto& block : gap_ack_blocks) {
UnwrappedTSN start = UnwrappedTSN::AddTo(cumulative_tsn_ack, block.start);
UnwrappedTSN end = UnwrappedTSN::AddTo(cumulative_tsn_ack, block.end);
for (UnwrappedTSN tsn = start; tsn <= end; tsn = tsn.next_value()) {
if (tsn > last_cumulative_tsn_ack_ && tsn < next_tsn()) {
Item& item = GetItem(tsn);
AckChunk(ack_info, tsn, item);
}
}
}
}
void OutstandingData::NackBetweenAckBlocks(
UnwrappedTSN cumulative_tsn_ack,
rtc::ArrayView<const SackChunk::GapAckBlock> gap_ack_blocks,
bool is_in_fast_recovery,
OutstandingData::AckInfo& ack_info) {
// Mark everything between the blocks as NACKED/TO_BE_RETRANSMITTED.
// https://tools.ietf.org/html/rfc4960#section-7.2.4
// "Mark the DATA chunk(s) with three miss indications for retransmission."
// "For each incoming SACK, miss indications are incremented only for
// missing TSNs prior to the highest TSN newly acknowledged in the SACK."
//
// What this means is that only when there is a increasing stream of data
// received and there are new packets seen (since last time), packets that are
// in-flight and between gaps should be nacked. This means that SCTP relies on
// the T3-RTX-timer to re-send packets otherwise.
UnwrappedTSN max_tsn_to_nack = ack_info.highest_tsn_acked;
if (is_in_fast_recovery && cumulative_tsn_ack > last_cumulative_tsn_ack_) {
// https://tools.ietf.org/html/rfc4960#section-7.2.4
// "If an endpoint is in Fast Recovery and a SACK arrives that advances
// the Cumulative TSN Ack Point, the miss indications are incremented for
// all TSNs reported missing in the SACK."
max_tsn_to_nack = UnwrappedTSN::AddTo(
cumulative_tsn_ack,
gap_ack_blocks.empty() ? 0 : gap_ack_blocks.rbegin()->end);
}
UnwrappedTSN prev_block_last_acked = cumulative_tsn_ack;
for (auto& block : gap_ack_blocks) {
UnwrappedTSN cur_block_first_acked =
UnwrappedTSN::AddTo(cumulative_tsn_ack, block.start);
for (UnwrappedTSN tsn = prev_block_last_acked.next_value();
tsn < cur_block_first_acked && tsn <= max_tsn_to_nack;
tsn = tsn.next_value()) {
ack_info.has_packet_loss |=
NackItem(tsn, /*retransmit_now=*/false,
/*do_fast_retransmit=*/!is_in_fast_recovery);
}
prev_block_last_acked = UnwrappedTSN::AddTo(cumulative_tsn_ack, block.end);
}
// Note that packets are not NACKED which are above the highest gap-ack-block
// (or above the cumulative ack TSN if no gap-ack-blocks) as only packets
// up until the highest_tsn_acked (see above) should be considered when
// NACKing.
}
bool OutstandingData::NackItem(UnwrappedTSN tsn,
bool retransmit_now,
bool do_fast_retransmit) {
Item& item = GetItem(tsn);
if (item.is_outstanding()) {
unacked_bytes_ -= GetSerializedChunkSize(item.data());
--unacked_items_;
}
switch (item.Nack(retransmit_now)) {
case Item::NackAction::kNothing:
return false;
case Item::NackAction::kRetransmit:
if (do_fast_retransmit) {
to_be_fast_retransmitted_.insert(tsn);
} else {
to_be_retransmitted_.insert(tsn);
}
RTC_DLOG(LS_VERBOSE) << *tsn.Wrap() << " marked for retransmission";
break;
case Item::NackAction::kAbandon:
RTC_DLOG(LS_VERBOSE) << *tsn.Wrap() << " Nacked, resulted in abandoning";
AbandonAllFor(item);
break;
}
return true;
}
void OutstandingData::AbandonAllFor(const Item& item) {
// Erase all remaining chunks from the producer, if any.
if (discard_from_send_queue_(item.data().stream_id, item.message_id())) {
// There were remaining chunks to be produced for this message. Since the
// receiver may have already received all chunks (up till now) for this
// message, we can't just FORWARD-TSN to the last fragment in this
// (abandoned) message and start sending a new message, as the receiver will
// then see a new message before the end of the previous one was seen (or
// skipped over). So create a new fragment, representing the end, that the
// received will never see as it is abandoned immediately and used as cum
// TSN in the sent FORWARD-TSN.
Data message_end(item.data().stream_id, item.data().ssn, item.data().mid,
item.data().fsn, item.data().ppid, std::vector<uint8_t>(),
Data::IsBeginning(false), Data::IsEnd(true),
item.data().is_unordered);
UnwrappedTSN tsn = next_tsn();
Item& added_item = outstanding_data_.emplace_back(
item.message_id(), std::move(message_end), Timestamp::Zero(),
MaxRetransmits(0), Timestamp::PlusInfinity(), LifecycleId::NotSet());
// The added chunk shouldn't be included in `unacked_bytes`, so set it
// as acked.
added_item.Ack();
RTC_DLOG(LS_VERBOSE) << "Adding unsent end placeholder for message at tsn="
<< *tsn.Wrap();
}
UnwrappedTSN tsn = last_cumulative_tsn_ack_;
for (Item& other : outstanding_data_) {
tsn.Increment();
if (!other.is_abandoned() &&
other.data().stream_id == item.data().stream_id &&
other.message_id() == item.message_id()) {
RTC_DLOG(LS_VERBOSE) << "Marking chunk " << *tsn.Wrap()
<< " as abandoned";
if (other.should_be_retransmitted()) {
to_be_fast_retransmitted_.erase(tsn);
to_be_retransmitted_.erase(tsn);
}
other.Abandon();
}
}
}
std::vector<std::pair<TSN, Data>> OutstandingData::ExtractChunksThatCanFit(
std::set<UnwrappedTSN>& chunks,
size_t max_size) {
std::vector<std::pair<TSN, Data>> result;
for (auto it = chunks.begin(); it != chunks.end();) {
UnwrappedTSN tsn = *it;
Item& item = GetItem(tsn);
RTC_DCHECK(item.should_be_retransmitted());
RTC_DCHECK(!item.is_outstanding());
RTC_DCHECK(!item.is_abandoned());
RTC_DCHECK(!item.is_acked());
size_t serialized_size = GetSerializedChunkSize(item.data());
if (serialized_size <= max_size) {
item.MarkAsRetransmitted();
result.emplace_back(tsn.Wrap(), item.data().Clone());
max_size -= serialized_size;
unacked_bytes_ += serialized_size;
++unacked_items_;
it = chunks.erase(it);
} else {
++it;
}
// No point in continuing if the packet is full.
if (max_size <= data_chunk_header_size_) {
break;
}
}
return result;
}
std::vector<std::pair<TSN, Data>>
OutstandingData::GetChunksToBeFastRetransmitted(size_t max_size) {
std::vector<std::pair<TSN, Data>> result =
ExtractChunksThatCanFit(to_be_fast_retransmitted_, max_size);
// https://datatracker.ietf.org/doc/html/rfc4960#section-7.2.4
// "Those TSNs marked for retransmission due to the Fast-Retransmit algorithm
// that did not fit in the sent datagram carrying K other TSNs are also marked
// as ineligible for a subsequent Fast Retransmit. However, as they are
// marked for retransmission they will be retransmitted later on as soon as
// cwnd allows."
if (!to_be_fast_retransmitted_.empty()) {
to_be_retransmitted_.insert(to_be_fast_retransmitted_.begin(),
to_be_fast_retransmitted_.end());
to_be_fast_retransmitted_.clear();
}
RTC_DCHECK(IsConsistent());
return result;
}
std::vector<std::pair<TSN, Data>> OutstandingData::GetChunksToBeRetransmitted(
size_t max_size) {
// Chunks scheduled for fast retransmission must be sent first.
RTC_DCHECK(to_be_fast_retransmitted_.empty());
return ExtractChunksThatCanFit(to_be_retransmitted_, max_size);
}
void OutstandingData::ExpireOutstandingChunks(Timestamp now) {
UnwrappedTSN tsn = last_cumulative_tsn_ack_;
for (const Item& item : outstanding_data_) {
tsn.Increment();
// Chunks that are nacked can be expired. Care should be taken not to expire
// unacked (in-flight) chunks as they might have been received, but the SACK
// is either delayed or in-flight and may be received later.
if (item.is_abandoned()) {
// Already abandoned.
} else if (item.is_nacked() && item.has_expired(now)) {
RTC_DLOG(LS_VERBOSE) << "Marking nacked chunk " << *tsn.Wrap()
<< " and message " << *item.data().mid
<< " as expired";
AbandonAllFor(item);
} else {
// A non-expired chunk. No need to iterate any further.
break;
}
}
RTC_DCHECK(IsConsistent());
}
UnwrappedTSN OutstandingData::highest_outstanding_tsn() const {
return UnwrappedTSN::AddTo(last_cumulative_tsn_ack_,
outstanding_data_.size());
}
absl::optional<UnwrappedTSN> OutstandingData::Insert(
OutgoingMessageId message_id,
const Data& data,
Timestamp time_sent,
MaxRetransmits max_retransmissions,
Timestamp expires_at,
LifecycleId lifecycle_id) {
// All chunks are always padded to be even divisible by 4.
size_t chunk_size = GetSerializedChunkSize(data);
unacked_bytes_ += chunk_size;
++unacked_items_;
UnwrappedTSN tsn = next_tsn();
Item& item = outstanding_data_.emplace_back(message_id, data.Clone(),
time_sent, max_retransmissions,
expires_at, lifecycle_id);
if (item.has_expired(time_sent)) {
// No need to send it - it was expired when it was in the send
// queue.
RTC_DLOG(LS_VERBOSE) << "Marking freshly produced chunk " << *tsn.Wrap()
<< " and message " << *item.data().mid
<< " as expired";
AbandonAllFor(item);
RTC_DCHECK(IsConsistent());
return absl::nullopt;
}
RTC_DCHECK(IsConsistent());
return tsn;
}
void OutstandingData::NackAll() {
UnwrappedTSN tsn = last_cumulative_tsn_ack_;
// A two-pass algorithm is needed, as NackItem will invalidate iterators.
std::vector<UnwrappedTSN> tsns_to_nack;
for (Item& item : outstanding_data_) {
tsn.Increment();
if (!item.is_acked()) {
tsns_to_nack.push_back(tsn);
}
}
for (UnwrappedTSN tsn : tsns_to_nack) {
NackItem(tsn, /*retransmit_now=*/true,
/*do_fast_retransmit=*/false);
}
RTC_DCHECK(IsConsistent());
}
webrtc::TimeDelta OutstandingData::MeasureRTT(Timestamp now,
UnwrappedTSN tsn) const {
if (tsn > last_cumulative_tsn_ack_ && tsn < next_tsn()) {
const Item& item = GetItem(tsn);
if (!item.has_been_retransmitted()) {
// https://tools.ietf.org/html/rfc4960#section-6.3.1
// "Karn's algorithm: RTT measurements MUST NOT be made using
// packets that were retransmitted (and thus for which it is ambiguous
// whether the reply was for the first instance of the chunk or for a
// later instance)"
return now - item.time_sent();
}
}
return webrtc::TimeDelta::PlusInfinity();
}
std::vector<std::pair<TSN, OutstandingData::State>>
OutstandingData::GetChunkStatesForTesting() const {
std::vector<std::pair<TSN, State>> states;
states.emplace_back(last_cumulative_tsn_ack_.Wrap(), State::kAcked);
UnwrappedTSN tsn = last_cumulative_tsn_ack_;
for (const Item& item : outstanding_data_) {
tsn.Increment();
State state;
if (item.is_abandoned()) {
state = State::kAbandoned;
} else if (item.should_be_retransmitted()) {
state = State::kToBeRetransmitted;
} else if (item.is_acked()) {
state = State::kAcked;
} else if (item.is_outstanding()) {
state = State::kInFlight;
} else {
state = State::kNacked;
}
states.emplace_back(tsn.Wrap(), state);
}
return states;
}
bool OutstandingData::ShouldSendForwardTsn() const {
if (!outstanding_data_.empty()) {
return outstanding_data_.front().is_abandoned();
}
return false;
}
ForwardTsnChunk OutstandingData::CreateForwardTsn() const {
std::map<StreamID, SSN> skipped_per_ordered_stream;
UnwrappedTSN new_cumulative_ack = last_cumulative_tsn_ack_;
UnwrappedTSN tsn = last_cumulative_tsn_ack_;
for (const Item& item : outstanding_data_) {
tsn.Increment();
if (stream_reset_breakpoint_tsns_.contains(tsn) ||
(tsn != new_cumulative_ack.next_value()) || !item.is_abandoned()) {
break;
}
new_cumulative_ack = tsn;
if (!item.data().is_unordered &&
item.data().ssn > skipped_per_ordered_stream[item.data().stream_id]) {
skipped_per_ordered_stream[item.data().stream_id] = item.data().ssn;
}
}
std::vector<ForwardTsnChunk::SkippedStream> skipped_streams;
skipped_streams.reserve(skipped_per_ordered_stream.size());
for (const auto& [stream_id, ssn] : skipped_per_ordered_stream) {
skipped_streams.emplace_back(stream_id, ssn);
}
return ForwardTsnChunk(new_cumulative_ack.Wrap(), std::move(skipped_streams));
}
IForwardTsnChunk OutstandingData::CreateIForwardTsn() const {
std::map<std::pair<IsUnordered, StreamID>, MID> skipped_per_stream;
UnwrappedTSN new_cumulative_ack = last_cumulative_tsn_ack_;
UnwrappedTSN tsn = last_cumulative_tsn_ack_;
for (const Item& item : outstanding_data_) {
tsn.Increment();
if (stream_reset_breakpoint_tsns_.contains(tsn) ||
(tsn != new_cumulative_ack.next_value()) || !item.is_abandoned()) {
break;
}
new_cumulative_ack = tsn;
std::pair<IsUnordered, StreamID> stream_id =
std::make_pair(item.data().is_unordered, item.data().stream_id);
if (item.data().mid > skipped_per_stream[stream_id]) {
skipped_per_stream[stream_id] = item.data().mid;
}
}
std::vector<IForwardTsnChunk::SkippedStream> skipped_streams;
skipped_streams.reserve(skipped_per_stream.size());
for (const auto& [stream, mid] : skipped_per_stream) {
skipped_streams.emplace_back(stream.first, stream.second, mid);
}
return IForwardTsnChunk(new_cumulative_ack.Wrap(),
std::move(skipped_streams));
}
void OutstandingData::ResetSequenceNumbers(UnwrappedTSN last_cumulative_tsn) {
RTC_DCHECK(outstanding_data_.empty());
last_cumulative_tsn_ack_ = last_cumulative_tsn;
}
void OutstandingData::BeginResetStreams() {
stream_reset_breakpoint_tsns_.insert(next_tsn());
}
} // namespace dcsctp

View file

@ -0,0 +1,376 @@
/*
* Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef NET_DCSCTP_TX_OUTSTANDING_DATA_H_
#define NET_DCSCTP_TX_OUTSTANDING_DATA_H_
#include <deque>
#include <map>
#include <set>
#include <utility>
#include <vector>
#include "absl/types/optional.h"
#include "api/units/timestamp.h"
#include "net/dcsctp/common/internal_types.h"
#include "net/dcsctp/common/sequence_numbers.h"
#include "net/dcsctp/packet/chunk/forward_tsn_chunk.h"
#include "net/dcsctp/packet/chunk/iforward_tsn_chunk.h"
#include "net/dcsctp/packet/chunk/sack_chunk.h"
#include "net/dcsctp/packet/data.h"
#include "net/dcsctp/public/types.h"
#include "rtc_base/containers/flat_set.h"
namespace dcsctp {
// This class keeps track of outstanding data chunks (sent, not yet acked) and
// handles acking, nacking, rescheduling and abandoning.
//
// Items are added to this queue as they are sent and will be removed when the
// peer acks them using the cumulative TSN ack.
class OutstandingData {
public:
// State for DATA chunks (message fragments) in the queue - used in tests.
enum class State {
// The chunk has been sent but not received yet (from the sender's point of
// view, as no SACK has been received yet that reference this chunk).
kInFlight,
// A SACK has been received which explicitly marked this chunk as missing -
// it's now NACKED and may be retransmitted if NACKED enough times.
kNacked,
// A chunk that will be retransmitted when possible.
kToBeRetransmitted,
// A SACK has been received which explicitly marked this chunk as received.
kAcked,
// A chunk whose message has expired or has been retransmitted too many
// times (RFC3758). It will not be retransmitted anymore.
kAbandoned,
};
// Contains variables scoped to a processing of an incoming SACK.
struct AckInfo {
explicit AckInfo(UnwrappedTSN cumulative_tsn_ack)
: highest_tsn_acked(cumulative_tsn_ack) {}
// Bytes acked by increasing cumulative_tsn_ack and gap_ack_blocks.
size_t bytes_acked = 0;
// Indicates if this SACK indicates that packet loss has occurred. Just
// because a packet is missing in the SACK doesn't necessarily mean that
// there is packet loss as that packet might be in-flight and received
// out-of-order. But when it has been reported missing consecutive times, it
// will eventually be considered "lost" and this will be set.
bool has_packet_loss = false;
// Highest TSN Newly Acknowledged, an SCTP variable.
UnwrappedTSN highest_tsn_acked;
// The set of lifecycle IDs that were acked using cumulative_tsn_ack.
std::vector<LifecycleId> acked_lifecycle_ids;
// The set of lifecycle IDs that were acked, but had been abandoned.
std::vector<LifecycleId> abandoned_lifecycle_ids;
};
OutstandingData(
size_t data_chunk_header_size,
UnwrappedTSN last_cumulative_tsn_ack,
std::function<bool(StreamID, OutgoingMessageId)> discard_from_send_queue)
: data_chunk_header_size_(data_chunk_header_size),
last_cumulative_tsn_ack_(last_cumulative_tsn_ack),
discard_from_send_queue_(std::move(discard_from_send_queue)) {}
AckInfo HandleSack(
UnwrappedTSN cumulative_tsn_ack,
rtc::ArrayView<const SackChunk::GapAckBlock> gap_ack_blocks,
bool is_in_fast_recovery);
// Returns as many of the chunks that are eligible for fast retransmissions
// and that would fit in a single packet of `max_size`. The eligible chunks
// that didn't fit will be marked for (normal) retransmission and will not be
// returned if this method is called again.
std::vector<std::pair<TSN, Data>> GetChunksToBeFastRetransmitted(
size_t max_size);
// Given `max_size` of space left in a packet, which chunks can be added to
// it?
std::vector<std::pair<TSN, Data>> GetChunksToBeRetransmitted(size_t max_size);
size_t unacked_bytes() const { return unacked_bytes_; }
// Returns the number of DATA chunks that are in-flight (not acked or nacked).
size_t unacked_items() const { return unacked_items_; }
// Given the current time `now_ms`, expire and abandon outstanding (sent at
// least once) chunks that have a limited lifetime.
void ExpireOutstandingChunks(webrtc::Timestamp now);
bool empty() const { return outstanding_data_.empty(); }
bool has_data_to_be_fast_retransmitted() const {
return !to_be_fast_retransmitted_.empty();
}
bool has_data_to_be_retransmitted() const {
return !to_be_retransmitted_.empty() || !to_be_fast_retransmitted_.empty();
}
UnwrappedTSN last_cumulative_tsn_ack() const {
return last_cumulative_tsn_ack_;
}
UnwrappedTSN next_tsn() const {
return highest_outstanding_tsn().next_value();
}
UnwrappedTSN highest_outstanding_tsn() const;
// Schedules `data` to be sent, with the provided partial reliability
// parameters. Returns the TSN if the item was actually added and scheduled to
// be sent, and absl::nullopt if it shouldn't be sent.
absl::optional<UnwrappedTSN> Insert(
OutgoingMessageId message_id,
const Data& data,
webrtc::Timestamp time_sent,
MaxRetransmits max_retransmissions = MaxRetransmits::NoLimit(),
webrtc::Timestamp expires_at = webrtc::Timestamp::PlusInfinity(),
LifecycleId lifecycle_id = LifecycleId::NotSet());
// Nacks all outstanding data.
void NackAll();
// Creates a FORWARD-TSN chunk.
ForwardTsnChunk CreateForwardTsn() const;
// Creates an I-FORWARD-TSN chunk.
IForwardTsnChunk CreateIForwardTsn() const;
// Given the current time and a TSN, it returns the measured RTT between when
// the chunk was sent and now. It takes into acccount Karn's algorithm, so if
// the chunk has ever been retransmitted, it will return `PlusInfinity()`.
webrtc::TimeDelta MeasureRTT(webrtc::Timestamp now, UnwrappedTSN tsn) const;
// Returns the internal state of all queued chunks. This is only used in
// unit-tests.
std::vector<std::pair<TSN, State>> GetChunkStatesForTesting() const;
// Returns true if the next chunk that is not acked by the peer has been
// abandoned, which means that a FORWARD-TSN should be sent.
bool ShouldSendForwardTsn() const;
// Sets the next TSN to be used. This is used in handover.
void ResetSequenceNumbers(UnwrappedTSN last_cumulative_tsn);
// Called when an outgoing stream reset is sent, marking the last assigned TSN
// as a breakpoint that a FORWARD-TSN shouldn't cross.
void BeginResetStreams();
private:
// A fragmented message's DATA chunk while in the retransmission queue, and
// its associated metadata.
class Item {
public:
enum class NackAction {
kNothing,
kRetransmit,
kAbandon,
};
Item(OutgoingMessageId message_id,
Data data,
webrtc::Timestamp time_sent,
MaxRetransmits max_retransmissions,
webrtc::Timestamp expires_at,
LifecycleId lifecycle_id)
: message_id_(message_id),
time_sent_(time_sent),
max_retransmissions_(max_retransmissions),
expires_at_(expires_at),
lifecycle_id_(lifecycle_id),
data_(std::move(data)) {}
Item(const Item&) = delete;
Item& operator=(const Item&) = delete;
OutgoingMessageId message_id() const { return message_id_; }
webrtc::Timestamp time_sent() const { return time_sent_; }
const Data& data() const { return data_; }
// Acks an item.
void Ack();
// Nacks an item. If it has been nacked enough times, or if `retransmit_now`
// is set, it might be marked for retransmission. If the item has reached
// its max retransmission value, it will instead be abandoned. The action
// performed is indicated as return value.
NackAction Nack(bool retransmit_now);
// Prepares the item to be retransmitted. Sets it as outstanding and
// clears all nack counters.
void MarkAsRetransmitted();
// Marks this item as abandoned.
void Abandon();
bool is_outstanding() const { return ack_state_ == AckState::kUnacked; }
bool is_acked() const { return ack_state_ == AckState::kAcked; }
bool is_nacked() const { return ack_state_ == AckState::kNacked; }
bool is_abandoned() const { return lifecycle_ == Lifecycle::kAbandoned; }
// Indicates if this chunk should be retransmitted.
bool should_be_retransmitted() const {
return lifecycle_ == Lifecycle::kToBeRetransmitted;
}
// Indicates if this chunk has ever been retransmitted.
bool has_been_retransmitted() const { return num_retransmissions_ > 0; }
// Given the current time, and the current state of this DATA chunk, it will
// indicate if it has expired (SCTP Partial Reliability Extension).
bool has_expired(webrtc::Timestamp now) const;
LifecycleId lifecycle_id() const { return lifecycle_id_; }
private:
enum class Lifecycle : uint8_t {
// The chunk is alive (sent, received, etc)
kActive,
// The chunk is scheduled to be retransmitted, and will then transition to
// become active.
kToBeRetransmitted,
// The chunk has been abandoned. This is a terminal state.
kAbandoned
};
enum class AckState : uint8_t {
// The chunk is in-flight.
kUnacked,
// The chunk has been received and acknowledged.
kAcked,
// The chunk has been nacked and is possibly lost.
kNacked
};
// NOTE: This data structure has been optimized for size, by ordering fields
// to avoid unnecessary padding.
const OutgoingMessageId message_id_;
// When the packet was sent, and placed in this queue.
const webrtc::Timestamp time_sent_;
// If the message was sent with a maximum number of retransmissions, this is
// set to that number. The value zero (0) means that it will never be
// retransmitted.
const MaxRetransmits max_retransmissions_;
// Indicates the life cycle status of this chunk.
Lifecycle lifecycle_ = Lifecycle::kActive;
// Indicates the presence of this chunk, if it's in flight (Unacked), has
// been received (Acked) or is possibly lost (Nacked).
AckState ack_state_ = AckState::kUnacked;
// The number of times the DATA chunk has been nacked (by having received a
// SACK which doesn't include it). Will be cleared on retransmissions.
uint8_t nack_count_ = 0;
// The number of times the DATA chunk has been retransmitted.
uint16_t num_retransmissions_ = 0;
// At this exact millisecond, the item is considered expired. If the message
// is not to be expired, this is set to the infinite future.
const webrtc::Timestamp expires_at_;
// An optional lifecycle id, which may only be set for the last fragment.
const LifecycleId lifecycle_id_;
// The actual data to send/retransmit.
const Data data_;
};
// Returns how large a chunk will be, serialized, carrying the data
size_t GetSerializedChunkSize(const Data& data) const;
Item& GetItem(UnwrappedTSN tsn);
const Item& GetItem(UnwrappedTSN tsn) const;
// Given a `cumulative_tsn_ack` from an incoming SACK, will remove those items
// in the retransmission queue up until this value and will update `ack_info`
// by setting `bytes_acked_by_cumulative_tsn_ack`.
void RemoveAcked(UnwrappedTSN cumulative_tsn_ack, AckInfo& ack_info);
// Will mark the chunks covered by the `gap_ack_blocks` from an incoming SACK
// as "acked" and update `ack_info` by adding new TSNs to `added_tsns`.
void AckGapBlocks(UnwrappedTSN cumulative_tsn_ack,
rtc::ArrayView<const SackChunk::GapAckBlock> gap_ack_blocks,
AckInfo& ack_info);
// Mark chunks reported as "missing", as "nacked" or "to be retransmitted"
// depending how many times this has happened. Only packets up until
// `ack_info.highest_tsn_acked` (highest TSN newly acknowledged) are
// nacked/retransmitted. The method will set `ack_info.has_packet_loss`.
void NackBetweenAckBlocks(
UnwrappedTSN cumulative_tsn_ack,
rtc::ArrayView<const SackChunk::GapAckBlock> gap_ack_blocks,
bool is_in_fast_recovery,
OutstandingData::AckInfo& ack_info);
// Process the acknowledgement of the chunk referenced by `iter` and updates
// state in `ack_info` and the object's state.
void AckChunk(AckInfo& ack_info, UnwrappedTSN tsn, Item& item);
// Helper method to process an incoming nack of an item and perform the
// correct operations given the action indicated when nacking an item (e.g.
// retransmitting or abandoning). The return value indicate if an action was
// performed, meaning that packet loss was detected and acted upon. If
// `do_fast_retransmit` is set and if the item has been nacked sufficiently
// many times so that it should be retransmitted, this will schedule it to be
// "fast retransmitted". This is only done just before going into fast
// recovery.
//
// Note that since nacking an item may result in it becoming abandoned, which
// in turn could alter `outstanding_data_`, any iterators are invalidated
// after having called this method.
bool NackItem(UnwrappedTSN tsn, bool retransmit_now, bool do_fast_retransmit);
// Given that a message fragment, `item` has been abandoned, abandon all other
// fragments that share the same message - both never-before-sent fragments
// that are still in the SendQueue and outstanding chunks.
void AbandonAllFor(const OutstandingData::Item& item);
std::vector<std::pair<TSN, Data>> ExtractChunksThatCanFit(
std::set<UnwrappedTSN>& chunks,
size_t max_size);
bool IsConsistent() const;
// The size of the data chunk (DATA/I-DATA) header that is used.
const size_t data_chunk_header_size_;
// The last cumulative TSN ack number.
UnwrappedTSN last_cumulative_tsn_ack_;
// Callback when to discard items from the send queue.
std::function<bool(StreamID, OutgoingMessageId)> discard_from_send_queue_;
// Outstanding items. If non-empty, the first element has
// `TSN=last_cumulative_tsn_ack_ + 1` and the following items are in strict
// increasing TSN order. The last item has `TSN=highest_outstanding_tsn()`.
std::deque<Item> outstanding_data_;
// The number of bytes that are in-flight (sent but not yet acked or nacked).
size_t unacked_bytes_ = 0;
// The number of DATA chunks that are in-flight (sent but not yet acked or
// nacked).
size_t unacked_items_ = 0;
// Data chunks that are eligible for fast retransmission.
std::set<UnwrappedTSN> to_be_fast_retransmitted_;
// Data chunks that are to be retransmitted.
std::set<UnwrappedTSN> to_be_retransmitted_;
// Wben a stream reset has begun, the "next TSN to assign" is added to this
// set, and removed when the cum-ack TSN reaches it. This is used to limit a
// FORWARD-TSN to reset streams past a "stream reset last assigned TSN".
webrtc::flat_set<UnwrappedTSN> stream_reset_breakpoint_tsns_;
};
} // namespace dcsctp
#endif // NET_DCSCTP_TX_OUTSTANDING_DATA_H_

View file

@ -0,0 +1,37 @@
/*
* Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "net/dcsctp/tx/retransmission_error_counter.h"
#include "absl/strings/string_view.h"
#include "rtc_base/logging.h"
namespace dcsctp {
bool RetransmissionErrorCounter::Increment(absl::string_view reason) {
++counter_;
if (limit_.has_value() && counter_ > limit_.value()) {
RTC_DLOG(LS_INFO) << log_prefix_ << reason
<< ", too many retransmissions, counter=" << counter_;
return false;
}
RTC_DLOG(LS_VERBOSE) << log_prefix_ << reason << ", new counter=" << counter_
<< ", max=" << limit_.value_or(-1);
return true;
}
void RetransmissionErrorCounter::Clear() {
if (counter_ > 0) {
RTC_DLOG(LS_VERBOSE) << log_prefix_
<< "recovered from counter=" << counter_;
counter_ = 0;
}
}
} // namespace dcsctp

View file

@ -0,0 +1,50 @@
/*
* Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef NET_DCSCTP_TX_RETRANSMISSION_ERROR_COUNTER_H_
#define NET_DCSCTP_TX_RETRANSMISSION_ERROR_COUNTER_H_
#include <functional>
#include <string>
#include <utility>
#include "absl/strings/string_view.h"
#include "net/dcsctp/public/dcsctp_options.h"
namespace dcsctp {
// The RetransmissionErrorCounter is a simple counter with a limit, and when
// the limit is exceeded, the counter is exhausted and the connection will
// be closed. It's incremented on retransmission errors, such as the T3-RTX
// timer expiring, but also missing heartbeats and stream reset requests.
class RetransmissionErrorCounter {
public:
RetransmissionErrorCounter(absl::string_view log_prefix,
const DcSctpOptions& options)
: log_prefix_(log_prefix), limit_(options.max_retransmissions) {}
// Increments the retransmission timer. If the maximum error count has been
// reached, `false` will be returned.
bool Increment(absl::string_view reason);
bool IsExhausted() const { return limit_.has_value() && counter_ > *limit_; }
// Clears the retransmission errors.
void Clear();
// Returns its current value
int value() const { return counter_; }
private:
const absl::string_view log_prefix_;
const absl::optional<int> limit_;
int counter_ = 0;
};
} // namespace dcsctp
#endif // NET_DCSCTP_TX_RETRANSMISSION_ERROR_COUNTER_H_

View file

@ -0,0 +1,624 @@
/*
* Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "net/dcsctp/tx/retransmission_queue.h"
#include <algorithm>
#include <cstdint>
#include <functional>
#include <iterator>
#include <map>
#include <set>
#include <string>
#include <utility>
#include <vector>
#include "absl/algorithm/container.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include "api/array_view.h"
#include "net/dcsctp/common/math.h"
#include "net/dcsctp/common/sequence_numbers.h"
#include "net/dcsctp/packet/chunk/data_chunk.h"
#include "net/dcsctp/packet/chunk/forward_tsn_chunk.h"
#include "net/dcsctp/packet/chunk/forward_tsn_common.h"
#include "net/dcsctp/packet/chunk/idata_chunk.h"
#include "net/dcsctp/packet/chunk/iforward_tsn_chunk.h"
#include "net/dcsctp/packet/chunk/sack_chunk.h"
#include "net/dcsctp/packet/data.h"
#include "net/dcsctp/public/dcsctp_options.h"
#include "net/dcsctp/public/types.h"
#include "net/dcsctp/timer/timer.h"
#include "net/dcsctp/tx/outstanding_data.h"
#include "net/dcsctp/tx/send_queue.h"
#include "rtc_base/checks.h"
#include "rtc_base/logging.h"
#include "rtc_base/strings/str_join.h"
#include "rtc_base/strings/string_builder.h"
namespace dcsctp {
namespace {
using ::webrtc::TimeDelta;
using ::webrtc::Timestamp;
// Allow sending only slightly less than an MTU, to account for headers.
constexpr float kMinBytesRequiredToSendFactor = 0.9;
} // namespace
RetransmissionQueue::RetransmissionQueue(
absl::string_view log_prefix,
DcSctpSocketCallbacks* callbacks,
TSN my_initial_tsn,
size_t a_rwnd,
SendQueue& send_queue,
std::function<void(TimeDelta rtt)> on_new_rtt,
std::function<void()> on_clear_retransmission_counter,
Timer& t3_rtx,
const DcSctpOptions& options,
bool supports_partial_reliability,
bool use_message_interleaving)
: callbacks_(*callbacks),
options_(options),
min_bytes_required_to_send_(options.mtu * kMinBytesRequiredToSendFactor),
partial_reliability_(supports_partial_reliability),
log_prefix_(log_prefix),
data_chunk_header_size_(use_message_interleaving
? IDataChunk::kHeaderSize
: DataChunk::kHeaderSize),
on_new_rtt_(std::move(on_new_rtt)),
on_clear_retransmission_counter_(
std::move(on_clear_retransmission_counter)),
t3_rtx_(t3_rtx),
cwnd_(options_.cwnd_mtus_initial * options_.mtu),
rwnd_(a_rwnd),
// https://tools.ietf.org/html/rfc4960#section-7.2.1
// "The initial value of ssthresh MAY be arbitrarily high (for
// example, implementations MAY use the size of the receiver advertised
// window).""
ssthresh_(rwnd_),
partial_bytes_acked_(0),
send_queue_(send_queue),
outstanding_data_(
data_chunk_header_size_,
tsn_unwrapper_.Unwrap(TSN(*my_initial_tsn - 1)),
[this](StreamID stream_id, OutgoingMessageId message_id) {
return send_queue_.Discard(stream_id, message_id);
}) {}
bool RetransmissionQueue::IsConsistent() const {
return true;
}
// Returns how large a chunk will be, serialized, carrying the data
size_t RetransmissionQueue::GetSerializedChunkSize(const Data& data) const {
return RoundUpTo4(data_chunk_header_size_ + data.size());
}
void RetransmissionQueue::MaybeExitFastRecovery(
UnwrappedTSN cumulative_tsn_ack) {
// https://tools.ietf.org/html/rfc4960#section-7.2.4
// "When a SACK acknowledges all TSNs up to and including this [fast
// recovery] exit point, Fast Recovery is exited."
if (fast_recovery_exit_tsn_.has_value() &&
cumulative_tsn_ack >= *fast_recovery_exit_tsn_) {
RTC_DLOG(LS_VERBOSE) << log_prefix_
<< "exit_point=" << *fast_recovery_exit_tsn_->Wrap()
<< " reached - exiting fast recovery";
fast_recovery_exit_tsn_ = absl::nullopt;
}
}
void RetransmissionQueue::HandleIncreasedCumulativeTsnAck(
size_t unacked_bytes,
size_t total_bytes_acked) {
// Allow some margin for classifying as fully utilized, due to e.g. that too
// small packets (less than kMinimumFragmentedPayload) are not sent +
// overhead.
bool is_fully_utilized = unacked_bytes + options_.mtu >= cwnd_;
size_t old_cwnd = cwnd_;
if (phase() == CongestionAlgorithmPhase::kSlowStart) {
if (is_fully_utilized && !is_in_fast_recovery()) {
// https://tools.ietf.org/html/rfc4960#section-7.2.1
// "Only when these three conditions are met can the cwnd be
// increased; otherwise, the cwnd MUST not be increased. If these
// conditions are met, then cwnd MUST be increased by, at most, the
// lesser of 1) the total size of the previously outstanding DATA
// chunk(s) acknowledged, and 2) the destination's path MTU."
cwnd_ += std::min(total_bytes_acked, options_.mtu);
RTC_DLOG(LS_VERBOSE) << log_prefix_ << "SS increase cwnd=" << cwnd_
<< " (" << old_cwnd << ")";
}
} else if (phase() == CongestionAlgorithmPhase::kCongestionAvoidance) {
// https://tools.ietf.org/html/rfc4960#section-7.2.2
// "Whenever cwnd is greater than ssthresh, upon each SACK arrival
// that advances the Cumulative TSN Ack Point, increase
// partial_bytes_acked by the total number of bytes of all new chunks
// acknowledged in that SACK including chunks acknowledged by the new
// Cumulative TSN Ack and by Gap Ack Blocks."
size_t old_pba = partial_bytes_acked_;
partial_bytes_acked_ += total_bytes_acked;
if (partial_bytes_acked_ >= cwnd_ && is_fully_utilized) {
// https://tools.ietf.org/html/rfc4960#section-7.2.2
// "When partial_bytes_acked is equal to or greater than cwnd and
// before the arrival of the SACK the sender had cwnd or more bytes of
// data outstanding (i.e., before arrival of the SACK, flightsize was
// greater than or equal to cwnd), increase cwnd by MTU, and reset
// partial_bytes_acked to (partial_bytes_acked - cwnd)."
// Errata: https://datatracker.ietf.org/doc/html/rfc8540#section-3.12
partial_bytes_acked_ -= cwnd_;
cwnd_ += options_.mtu;
RTC_DLOG(LS_VERBOSE) << log_prefix_ << "CA increase cwnd=" << cwnd_
<< " (" << old_cwnd << ") ssthresh=" << ssthresh_
<< ", pba=" << partial_bytes_acked_ << " ("
<< old_pba << ")";
} else {
RTC_DLOG(LS_VERBOSE) << log_prefix_ << "CA unchanged cwnd=" << cwnd_
<< " (" << old_cwnd << ") ssthresh=" << ssthresh_
<< ", pba=" << partial_bytes_acked_ << " ("
<< old_pba << ")";
}
}
}
void RetransmissionQueue::HandlePacketLoss(UnwrappedTSN highest_tsn_acked) {
if (!is_in_fast_recovery()) {
// https://tools.ietf.org/html/rfc4960#section-7.2.4
// "If not in Fast Recovery, adjust the ssthresh and cwnd of the
// destination address(es) to which the missing DATA chunks were last
// sent, according to the formula described in Section 7.2.3."
size_t old_cwnd = cwnd_;
size_t old_pba = partial_bytes_acked_;
ssthresh_ = std::max(cwnd_ / 2, options_.cwnd_mtus_min * options_.mtu);
cwnd_ = ssthresh_;
partial_bytes_acked_ = 0;
RTC_DLOG(LS_VERBOSE) << log_prefix_
<< "packet loss detected (not fast recovery). cwnd="
<< cwnd_ << " (" << old_cwnd
<< "), ssthresh=" << ssthresh_
<< ", pba=" << partial_bytes_acked_ << " (" << old_pba
<< ")";
// https://tools.ietf.org/html/rfc4960#section-7.2.4
// "If not in Fast Recovery, enter Fast Recovery and mark the highest
// outstanding TSN as the Fast Recovery exit point."
fast_recovery_exit_tsn_ = outstanding_data_.highest_outstanding_tsn();
RTC_DLOG(LS_VERBOSE) << log_prefix_
<< "fast recovery initiated with exit_point="
<< *fast_recovery_exit_tsn_->Wrap();
} else {
// https://tools.ietf.org/html/rfc4960#section-7.2.4
// "While in Fast Recovery, the ssthresh and cwnd SHOULD NOT change for
// any destinations due to a subsequent Fast Recovery event (i.e., one
// SHOULD NOT reduce the cwnd further due to a subsequent Fast Retransmit)."
RTC_DLOG(LS_VERBOSE) << log_prefix_
<< "packet loss detected (fast recovery). No changes.";
}
}
void RetransmissionQueue::UpdateReceiverWindow(uint32_t a_rwnd) {
rwnd_ = outstanding_data_.unacked_bytes() >= a_rwnd
? 0
: a_rwnd - outstanding_data_.unacked_bytes();
}
void RetransmissionQueue::StartT3RtxTimerIfOutstandingData() {
// Note: Can't use `unacked_bytes()` as that one doesn't count chunks to
// be retransmitted.
if (outstanding_data_.empty()) {
// https://tools.ietf.org/html/rfc4960#section-6.3.2
// "Whenever all outstanding data sent to an address have been
// acknowledged, turn off the T3-rtx timer of that address.
// Note: Already stopped in `StopT3RtxTimerOnIncreasedCumulativeTsnAck`."
} else {
// https://tools.ietf.org/html/rfc4960#section-6.3.2
// "Whenever a SACK is received that acknowledges the DATA chunk
// with the earliest outstanding TSN for that address, restart the T3-rtx
// timer for that address with its current RTO (if there is still
// outstanding data on that address)."
// "Whenever a SACK is received missing a TSN that was previously
// acknowledged via a Gap Ack Block, start the T3-rtx for the destination
// address to which the DATA chunk was originally transmitted if it is not
// already running."
if (!t3_rtx_.is_running()) {
t3_rtx_.Start();
}
}
}
bool RetransmissionQueue::IsSackValid(const SackChunk& sack) const {
// https://tools.ietf.org/html/rfc4960#section-6.2.1
// "If Cumulative TSN Ack is less than the Cumulative TSN Ack Point,
// then drop the SACK. Since Cumulative TSN Ack is monotonically increasing,
// a SACK whose Cumulative TSN Ack is less than the Cumulative TSN Ack Point
// indicates an out-of- order SACK."
//
// Note: Important not to drop SACKs with identical TSN to that previously
// received, as the gap ack blocks or dup tsn fields may have changed.
UnwrappedTSN cumulative_tsn_ack =
tsn_unwrapper_.PeekUnwrap(sack.cumulative_tsn_ack());
if (cumulative_tsn_ack < outstanding_data_.last_cumulative_tsn_ack()) {
// https://tools.ietf.org/html/rfc4960#section-6.2.1
// "If Cumulative TSN Ack is less than the Cumulative TSN Ack Point,
// then drop the SACK. Since Cumulative TSN Ack is monotonically
// increasing, a SACK whose Cumulative TSN Ack is less than the Cumulative
// TSN Ack Point indicates an out-of- order SACK."
return false;
} else if (cumulative_tsn_ack > outstanding_data_.highest_outstanding_tsn()) {
return false;
}
return true;
}
bool RetransmissionQueue::HandleSack(Timestamp now, const SackChunk& sack) {
if (!IsSackValid(sack)) {
return false;
}
UnwrappedTSN old_last_cumulative_tsn_ack =
outstanding_data_.last_cumulative_tsn_ack();
size_t old_unacked_bytes = outstanding_data_.unacked_bytes();
size_t old_rwnd = rwnd_;
UnwrappedTSN cumulative_tsn_ack =
tsn_unwrapper_.Unwrap(sack.cumulative_tsn_ack());
if (sack.gap_ack_blocks().empty()) {
UpdateRTT(now, cumulative_tsn_ack);
}
// Exit fast recovery before continuing processing, in case it needs to go
// into fast recovery again due to new reported packet loss.
MaybeExitFastRecovery(cumulative_tsn_ack);
OutstandingData::AckInfo ack_info = outstanding_data_.HandleSack(
cumulative_tsn_ack, sack.gap_ack_blocks(), is_in_fast_recovery());
// Add lifecycle events for delivered messages.
for (LifecycleId lifecycle_id : ack_info.acked_lifecycle_ids) {
RTC_DLOG(LS_VERBOSE) << "Triggering OnLifecycleMessageDelivered("
<< lifecycle_id.value() << ")";
callbacks_.OnLifecycleMessageDelivered(lifecycle_id);
callbacks_.OnLifecycleEnd(lifecycle_id);
}
for (LifecycleId lifecycle_id : ack_info.abandoned_lifecycle_ids) {
RTC_DLOG(LS_VERBOSE) << "Triggering OnLifecycleMessageExpired("
<< lifecycle_id.value() << ", true)";
callbacks_.OnLifecycleMessageExpired(lifecycle_id,
/*maybe_delivered=*/true);
callbacks_.OnLifecycleEnd(lifecycle_id);
}
// Update of outstanding_data_ is now done. Congestion control remains.
UpdateReceiverWindow(sack.a_rwnd());
RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Received SACK, cum_tsn_ack="
<< *cumulative_tsn_ack.Wrap() << " ("
<< *old_last_cumulative_tsn_ack.Wrap()
<< "), unacked_bytes="
<< outstanding_data_.unacked_bytes() << " ("
<< old_unacked_bytes << "), rwnd=" << rwnd_ << " ("
<< old_rwnd << ")";
if (cumulative_tsn_ack > old_last_cumulative_tsn_ack) {
// https://tools.ietf.org/html/rfc4960#section-6.3.2
// "Whenever a SACK is received that acknowledges the DATA chunk
// with the earliest outstanding TSN for that address, restart the T3-rtx
// timer for that address with its current RTO (if there is still
// outstanding data on that address)."
// Note: It may be started again in a bit further down.
t3_rtx_.Stop();
HandleIncreasedCumulativeTsnAck(old_unacked_bytes, ack_info.bytes_acked);
}
if (ack_info.has_packet_loss) {
HandlePacketLoss(ack_info.highest_tsn_acked);
}
// https://tools.ietf.org/html/rfc4960#section-8.2
// "When an outstanding TSN is acknowledged [...] the endpoint shall clear
// the error counter ..."
if (ack_info.bytes_acked > 0) {
on_clear_retransmission_counter_();
}
StartT3RtxTimerIfOutstandingData();
RTC_DCHECK(IsConsistent());
return true;
}
void RetransmissionQueue::UpdateRTT(Timestamp now,
UnwrappedTSN cumulative_tsn_ack) {
// RTT updating is flawed in SCTP, as explained in e.g. Pedersen J, Griwodz C,
// Halvorsen P (2006) Considerations of SCTP retransmission delays for thin
// streams.
// Due to delayed acknowledgement, the SACK may be sent much later which
// increases the calculated RTT.
// TODO(boivie): Consider occasionally sending DATA chunks with I-bit set and
// use only those packets for measurement.
TimeDelta rtt = outstanding_data_.MeasureRTT(now, cumulative_tsn_ack);
if (rtt.IsFinite()) {
on_new_rtt_(rtt);
}
}
void RetransmissionQueue::HandleT3RtxTimerExpiry() {
size_t old_cwnd = cwnd_;
size_t old_unacked_bytes = unacked_bytes();
// https://tools.ietf.org/html/rfc4960#section-6.3.3
// "For the destination address for which the timer expires, adjust
// its ssthresh with rules defined in Section 7.2.3 and set the cwnd <- MTU."
ssthresh_ = std::max(cwnd_ / 2, 4 * options_.mtu);
cwnd_ = 1 * options_.mtu;
// Errata: https://datatracker.ietf.org/doc/html/rfc8540#section-3.11
partial_bytes_acked_ = 0;
// https://tools.ietf.org/html/rfc4960#section-6.3.3
// "For the destination address for which the timer expires, set RTO
// <- RTO * 2 ("back off the timer"). The maximum value discussed in rule C7
// above (RTO.max) may be used to provide an upper bound to this doubling
// operation."
// Already done by the Timer implementation.
// https://tools.ietf.org/html/rfc4960#section-6.3.3
// "Determine how many of the earliest (i.e., lowest TSN) outstanding
// DATA chunks for the address for which the T3-rtx has expired will fit into
// a single packet"
// https://tools.ietf.org/html/rfc4960#section-6.3.3
// "Note: Any DATA chunks that were sent to the address for which the
// T3-rtx timer expired but did not fit in one MTU (rule E3 above) should be
// marked for retransmission and sent as soon as cwnd allows (normally, when a
// SACK arrives)."
outstanding_data_.NackAll();
// https://tools.ietf.org/html/rfc4960#section-6.3.3
// "Start the retransmission timer T3-rtx on the destination address
// to which the retransmission is sent, if rule R1 above indicates to do so."
// Already done by the Timer implementation.
RTC_DLOG(LS_INFO) << log_prefix_ << "t3-rtx expired. new cwnd=" << cwnd_
<< " (" << old_cwnd << "), ssthresh=" << ssthresh_
<< ", unacked_bytes " << unacked_bytes() << " ("
<< old_unacked_bytes << ")";
RTC_DCHECK(IsConsistent());
}
std::vector<std::pair<TSN, Data>>
RetransmissionQueue::GetChunksForFastRetransmit(size_t bytes_in_packet) {
RTC_DCHECK(outstanding_data_.has_data_to_be_fast_retransmitted());
RTC_DCHECK(IsDivisibleBy4(bytes_in_packet));
std::vector<std::pair<TSN, Data>> to_be_sent;
size_t old_unacked_bytes = unacked_bytes();
to_be_sent =
outstanding_data_.GetChunksToBeFastRetransmitted(bytes_in_packet);
RTC_DCHECK(!to_be_sent.empty());
// https://tools.ietf.org/html/rfc4960#section-7.2.4
// "4) Restart the T3-rtx timer only if ... the endpoint is retransmitting
// the first outstanding DATA chunk sent to that address."
if (to_be_sent[0].first ==
outstanding_data_.last_cumulative_tsn_ack().next_value().Wrap()) {
RTC_DLOG(LS_VERBOSE)
<< log_prefix_
<< "First outstanding DATA to be retransmitted - restarting T3-RTX";
t3_rtx_.Stop();
}
// https://tools.ietf.org/html/rfc4960#section-6.3.2
// "Every time a DATA chunk is sent to any address (including a
// retransmission), if the T3-rtx timer of that address is not running,
// start it running so that it will expire after the RTO of that address."
if (!t3_rtx_.is_running()) {
t3_rtx_.Start();
}
size_t bytes_retransmitted = absl::c_accumulate(
to_be_sent, 0, [&](size_t r, const std::pair<TSN, Data>& d) {
return r + GetSerializedChunkSize(d.second);
});
++rtx_packets_count_;
rtx_bytes_count_ += bytes_retransmitted;
RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Fast-retransmitting TSN "
<< StrJoin(to_be_sent, ",",
[&](rtc::StringBuilder& sb,
const std::pair<TSN, Data>& c) {
sb << *c.first;
})
<< " - " << bytes_retransmitted
<< " bytes. unacked_bytes=" << unacked_bytes() << " ("
<< old_unacked_bytes << ")";
RTC_DCHECK(IsConsistent());
return to_be_sent;
}
std::vector<std::pair<TSN, Data>> RetransmissionQueue::GetChunksToSend(
Timestamp now,
size_t bytes_remaining_in_packet) {
// Chunks are always padded to even divisible by four.
RTC_DCHECK(IsDivisibleBy4(bytes_remaining_in_packet));
std::vector<std::pair<TSN, Data>> to_be_sent;
size_t old_unacked_bytes = unacked_bytes();
size_t old_rwnd = rwnd_;
// Calculate the bandwidth budget (how many bytes that is
// allowed to be sent), and fill that up first with chunks that are
// scheduled to be retransmitted. If there is still budget, send new chunks
// (which will have their TSN assigned here.)
size_t max_bytes =
RoundDownTo4(std::min(max_bytes_to_send(), bytes_remaining_in_packet));
to_be_sent = outstanding_data_.GetChunksToBeRetransmitted(max_bytes);
size_t bytes_retransmitted = absl::c_accumulate(
to_be_sent, 0, [&](size_t r, const std::pair<TSN, Data>& d) {
return r + GetSerializedChunkSize(d.second);
});
max_bytes -= bytes_retransmitted;
if (!to_be_sent.empty()) {
++rtx_packets_count_;
rtx_bytes_count_ += bytes_retransmitted;
}
while (max_bytes > data_chunk_header_size_) {
RTC_DCHECK(IsDivisibleBy4(max_bytes));
absl::optional<SendQueue::DataToSend> chunk_opt =
send_queue_.Produce(now, max_bytes - data_chunk_header_size_);
if (!chunk_opt.has_value()) {
break;
}
size_t chunk_size = GetSerializedChunkSize(chunk_opt->data);
max_bytes -= chunk_size;
rwnd_ -= chunk_size;
absl::optional<UnwrappedTSN> tsn = outstanding_data_.Insert(
chunk_opt->message_id, chunk_opt->data, now,
partial_reliability_ ? chunk_opt->max_retransmissions
: MaxRetransmits::NoLimit(),
partial_reliability_ ? chunk_opt->expires_at
: Timestamp::PlusInfinity(),
chunk_opt->lifecycle_id);
if (tsn.has_value()) {
if (chunk_opt->lifecycle_id.IsSet()) {
RTC_DCHECK(chunk_opt->data.is_end);
callbacks_.OnLifecycleMessageFullySent(chunk_opt->lifecycle_id);
}
to_be_sent.emplace_back(tsn->Wrap(), std::move(chunk_opt->data));
}
}
if (!to_be_sent.empty()) {
// https://tools.ietf.org/html/rfc4960#section-6.3.2
// "Every time a DATA chunk is sent to any address (including a
// retransmission), if the T3-rtx timer of that address is not running,
// start it running so that it will expire after the RTO of that address."
if (!t3_rtx_.is_running()) {
t3_rtx_.Start();
}
RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Sending TSN "
<< StrJoin(to_be_sent, ",",
[&](rtc::StringBuilder& sb,
const std::pair<TSN, Data>& c) {
sb << *c.first;
})
<< " - "
<< absl::c_accumulate(
to_be_sent, 0,
[&](size_t r, const std::pair<TSN, Data>& d) {
return r + GetSerializedChunkSize(d.second);
})
<< " bytes. unacked_bytes=" << unacked_bytes() << " ("
<< old_unacked_bytes << "), cwnd=" << cwnd_
<< ", rwnd=" << rwnd_ << " (" << old_rwnd << ")";
}
RTC_DCHECK(IsConsistent());
return to_be_sent;
}
bool RetransmissionQueue::can_send_data() const {
return cwnd_ < options_.avoid_fragmentation_cwnd_mtus * options_.mtu ||
max_bytes_to_send() >= min_bytes_required_to_send_;
}
bool RetransmissionQueue::ShouldSendForwardTsn(Timestamp now) {
if (!partial_reliability_) {
return false;
}
outstanding_data_.ExpireOutstandingChunks(now);
bool ret = outstanding_data_.ShouldSendForwardTsn();
RTC_DCHECK(IsConsistent());
return ret;
}
size_t RetransmissionQueue::max_bytes_to_send() const {
size_t left = unacked_bytes() >= cwnd_ ? 0 : cwnd_ - unacked_bytes();
if (unacked_bytes() == 0) {
// https://datatracker.ietf.org/doc/html/rfc4960#section-6.1
// ... However, regardless of the value of rwnd (including if it is 0), the
// data sender can always have one DATA chunk in flight to the receiver if
// allowed by cwnd (see rule B, below).
return left;
}
return std::min(rwnd(), left);
}
void RetransmissionQueue::PrepareResetStream(StreamID stream_id) {
// TODO(boivie): These calls are now only affecting the send queue. The
// packet buffer can also change behavior - for example draining the chunk
// producer and eagerly assign TSNs so that an "Outgoing SSN Reset Request"
// can be sent quickly, with a known `sender_last_assigned_tsn`.
send_queue_.PrepareResetStream(stream_id);
}
bool RetransmissionQueue::HasStreamsReadyToBeReset() const {
return send_queue_.HasStreamsReadyToBeReset();
}
std::vector<StreamID> RetransmissionQueue::BeginResetStreams() {
outstanding_data_.BeginResetStreams();
return send_queue_.GetStreamsReadyToBeReset();
}
void RetransmissionQueue::CommitResetStreams() {
send_queue_.CommitResetStreams();
}
void RetransmissionQueue::RollbackResetStreams() {
send_queue_.RollbackResetStreams();
}
HandoverReadinessStatus RetransmissionQueue::GetHandoverReadiness() const {
HandoverReadinessStatus status;
if (!outstanding_data_.empty()) {
status.Add(HandoverUnreadinessReason::kRetransmissionQueueOutstandingData);
}
if (fast_recovery_exit_tsn_.has_value()) {
status.Add(HandoverUnreadinessReason::kRetransmissionQueueFastRecovery);
}
if (outstanding_data_.has_data_to_be_retransmitted()) {
status.Add(HandoverUnreadinessReason::kRetransmissionQueueNotEmpty);
}
return status;
}
void RetransmissionQueue::AddHandoverState(DcSctpSocketHandoverState& state) {
state.tx.next_tsn = next_tsn().value();
state.tx.rwnd = rwnd_;
state.tx.cwnd = cwnd_;
state.tx.ssthresh = ssthresh_;
state.tx.partial_bytes_acked = partial_bytes_acked_;
}
void RetransmissionQueue::RestoreFromState(
const DcSctpSocketHandoverState& state) {
// Validate that the component is in pristine state.
RTC_DCHECK(outstanding_data_.empty());
RTC_DCHECK(!t3_rtx_.is_running());
RTC_DCHECK(partial_bytes_acked_ == 0);
cwnd_ = state.tx.cwnd;
rwnd_ = state.tx.rwnd;
ssthresh_ = state.tx.ssthresh;
partial_bytes_acked_ = state.tx.partial_bytes_acked;
outstanding_data_.ResetSequenceNumbers(
tsn_unwrapper_.Unwrap(TSN(state.tx.next_tsn - 1)));
}
} // namespace dcsctp

View file

@ -0,0 +1,263 @@
/*
* Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef NET_DCSCTP_TX_RETRANSMISSION_QUEUE_H_
#define NET_DCSCTP_TX_RETRANSMISSION_QUEUE_H_
#include <cstdint>
#include <functional>
#include <map>
#include <set>
#include <string>
#include <utility>
#include <vector>
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include "api/array_view.h"
#include "net/dcsctp/common/sequence_numbers.h"
#include "net/dcsctp/packet/chunk/forward_tsn_chunk.h"
#include "net/dcsctp/packet/chunk/iforward_tsn_chunk.h"
#include "net/dcsctp/packet/chunk/sack_chunk.h"
#include "net/dcsctp/packet/data.h"
#include "net/dcsctp/public/dcsctp_handover_state.h"
#include "net/dcsctp/public/dcsctp_options.h"
#include "net/dcsctp/public/dcsctp_socket.h"
#include "net/dcsctp/timer/timer.h"
#include "net/dcsctp/tx/outstanding_data.h"
#include "net/dcsctp/tx/retransmission_timeout.h"
#include "net/dcsctp/tx/send_queue.h"
namespace dcsctp {
// The RetransmissionQueue manages all DATA/I-DATA chunks that are in-flight and
// schedules them to be retransmitted if necessary. Chunks are retransmitted
// when they have been lost for a number of consecutive SACKs, or when the
// retransmission timer, `t3_rtx` expires.
//
// As congestion control is tightly connected with the state of transmitted
// packets, that's also managed here to limit the amount of data that is
// in-flight (sent, but not yet acknowledged).
class RetransmissionQueue {
public:
static constexpr size_t kMinimumFragmentedPayload = 10;
using State = OutstandingData::State;
// Creates a RetransmissionQueue which will send data using `my_initial_tsn`
// (or a value from `DcSctpSocketHandoverState` if given) as the first TSN
// to use for sent fragments. It will poll data from `send_queue`. When SACKs
// are received, it will estimate the RTT, and call `on_new_rtt`. When an
// outstanding chunk has been ACKed, it will call
// `on_clear_retransmission_counter` and will also use `t3_rtx`, which is the
// SCTP retransmission timer to manage retransmissions.
RetransmissionQueue(absl::string_view log_prefix,
DcSctpSocketCallbacks* callbacks,
TSN my_initial_tsn,
size_t a_rwnd,
SendQueue& send_queue,
std::function<void(webrtc::TimeDelta rtt)> on_new_rtt,
std::function<void()> on_clear_retransmission_counter,
Timer& t3_rtx,
const DcSctpOptions& options,
bool supports_partial_reliability = true,
bool use_message_interleaving = false);
// Handles a received SACK. Returns true if the `sack` was processed and
// false if it was discarded due to received out-of-order and not relevant.
bool HandleSack(webrtc::Timestamp now, const SackChunk& sack);
// Handles an expired retransmission timer.
void HandleT3RtxTimerExpiry();
bool has_data_to_be_fast_retransmitted() const {
return outstanding_data_.has_data_to_be_fast_retransmitted();
}
// Returns a list of chunks to "fast retransmit" that would fit in one SCTP
// packet with `bytes_in_packet` bytes available. The current value
// of `cwnd` is ignored.
std::vector<std::pair<TSN, Data>> GetChunksForFastRetransmit(
size_t bytes_in_packet);
// Returns a list of chunks to send that would fit in one SCTP packet with
// `bytes_remaining_in_packet` bytes available. This may be further limited by
// the congestion control windows. Note that `ShouldSendForwardTSN` must be
// called prior to this method, to abandon expired chunks, as this method will
// not expire any chunks.
std::vector<std::pair<TSN, Data>> GetChunksToSend(
webrtc::Timestamp now,
size_t bytes_remaining_in_packet);
// Returns the internal state of all queued chunks. This is only used in
// unit-tests.
std::vector<std::pair<TSN, OutstandingData::State>> GetChunkStatesForTesting()
const {
return outstanding_data_.GetChunkStatesForTesting();
}
// Returns the next TSN that will be allocated for sent DATA chunks.
TSN next_tsn() const { return outstanding_data_.next_tsn().Wrap(); }
TSN last_assigned_tsn() const {
return UnwrappedTSN::AddTo(outstanding_data_.next_tsn(), -1).Wrap();
}
// Returns the size of the congestion window, in bytes. This is the number of
// bytes that may be in-flight.
size_t cwnd() const { return cwnd_; }
// Overrides the current congestion window size.
void set_cwnd(size_t cwnd) { cwnd_ = cwnd; }
// Returns the current receiver window size.
size_t rwnd() const { return rwnd_; }
size_t rtx_packets_count() const { return rtx_packets_count_; }
uint64_t rtx_bytes_count() const { return rtx_bytes_count_; }
// Returns the number of bytes of packets that are in-flight.
size_t unacked_bytes() const { return outstanding_data_.unacked_bytes(); }
// Returns the number of DATA chunks that are in-flight.
size_t unacked_items() const { return outstanding_data_.unacked_items(); }
// Indicates if the congestion control algorithm allows data to be sent.
bool can_send_data() const;
// Given the current time `now`, it will evaluate if there are chunks that
// have expired and that need to be discarded. It returns true if a
// FORWARD-TSN should be sent.
bool ShouldSendForwardTsn(webrtc::Timestamp now);
// Creates a FORWARD-TSN chunk.
ForwardTsnChunk CreateForwardTsn() const {
return outstanding_data_.CreateForwardTsn();
}
// Creates an I-FORWARD-TSN chunk.
IForwardTsnChunk CreateIForwardTsn() const {
return outstanding_data_.CreateIForwardTsn();
}
// See the SendQueue for a longer description of these methods related
// to stream resetting.
void PrepareResetStream(StreamID stream_id);
bool HasStreamsReadyToBeReset() const;
std::vector<StreamID> BeginResetStreams();
void CommitResetStreams();
void RollbackResetStreams();
HandoverReadinessStatus GetHandoverReadiness() const;
void AddHandoverState(DcSctpSocketHandoverState& state);
void RestoreFromState(const DcSctpSocketHandoverState& state);
private:
enum class CongestionAlgorithmPhase {
kSlowStart,
kCongestionAvoidance,
};
bool IsConsistent() const;
// Returns how large a chunk will be, serialized, carrying the data
size_t GetSerializedChunkSize(const Data& data) const;
// Indicates if the congestion control algorithm is in "fast recovery".
bool is_in_fast_recovery() const {
return fast_recovery_exit_tsn_.has_value();
}
// Indicates if the provided SACK is valid given what has previously been
// received. If it returns false, the SACK is most likely a duplicate of
// something already seen, so this returning false doesn't necessarily mean
// that the SACK is illegal.
bool IsSackValid(const SackChunk& sack) const;
// When a SACK chunk is received, this method will be called which _may_ call
// into the `RetransmissionTimeout` to update the RTO.
void UpdateRTT(webrtc::Timestamp now, UnwrappedTSN cumulative_tsn_ack);
// If the congestion control is in "fast recovery mode", this may be exited
// now.
void MaybeExitFastRecovery(UnwrappedTSN cumulative_tsn_ack);
// If chunks have been ACKed, stop the retransmission timer.
void StopT3RtxTimerOnIncreasedCumulativeTsnAck(
UnwrappedTSN cumulative_tsn_ack);
// Update the congestion control algorithm given as the cumulative ack TSN
// value has increased, as reported in an incoming SACK chunk.
void HandleIncreasedCumulativeTsnAck(size_t unacked_bytes,
size_t total_bytes_acked);
// Update the congestion control algorithm, given as packet loss has been
// detected, as reported in an incoming SACK chunk.
void HandlePacketLoss(UnwrappedTSN highest_tsn_acked);
// Update the view of the receiver window size.
void UpdateReceiverWindow(uint32_t a_rwnd);
// If there is data sent and not ACKED, ensure that the retransmission timer
// is running.
void StartT3RtxTimerIfOutstandingData();
// Returns the current congestion control algorithm phase.
CongestionAlgorithmPhase phase() const {
return (cwnd_ <= ssthresh_)
? CongestionAlgorithmPhase::kSlowStart
: CongestionAlgorithmPhase::kCongestionAvoidance;
}
// Returns the number of bytes that may be sent in a single packet according
// to the congestion control algorithm.
size_t max_bytes_to_send() const;
DcSctpSocketCallbacks& callbacks_;
const DcSctpOptions options_;
// The minimum bytes required to be available in the congestion window to
// allow packets to be sent - to avoid sending too small packets.
const size_t min_bytes_required_to_send_;
// If the peer supports RFC3758 - SCTP Partial Reliability Extension.
const bool partial_reliability_;
const absl::string_view log_prefix_;
// The size of the data chunk (DATA/I-DATA) header that is used.
const size_t data_chunk_header_size_;
// Called when a new RTT measurement has been done
const std::function<void(webrtc::TimeDelta rtt)> on_new_rtt_;
// Called when a SACK has been seen that cleared the retransmission counter.
const std::function<void()> on_clear_retransmission_counter_;
// The retransmission counter.
Timer& t3_rtx_;
// Unwraps TSNs
UnwrappedTSN::Unwrapper tsn_unwrapper_;
// Congestion Window. Number of bytes that may be in-flight (sent, not acked).
size_t cwnd_;
// Receive Window. Number of bytes available in the receiver's RX buffer.
size_t rwnd_;
// Slow Start Threshold. See RFC4960.
size_t ssthresh_;
// Partial Bytes Acked. See RFC4960.
size_t partial_bytes_acked_;
// See `dcsctp::Metrics`.
size_t rtx_packets_count_ = 0;
uint64_t rtx_bytes_count_ = 0;
// If set, fast recovery is enabled until this TSN has been cumulative
// acked.
absl::optional<UnwrappedTSN> fast_recovery_exit_tsn_ = absl::nullopt;
// The send queue.
SendQueue& send_queue_;
// All the outstanding data chunks that are in-flight and that have not been
// cumulative acked. Note that it also contains chunks that have been acked in
// gap ack blocks.
OutstandingData outstanding_data_;
};
} // namespace dcsctp
#endif // NET_DCSCTP_TX_RETRANSMISSION_QUEUE_H_

View file

@ -0,0 +1,64 @@
/*
* Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "net/dcsctp/tx/retransmission_timeout.h"
#include <algorithm>
#include <cstdint>
#include "api/units/time_delta.h"
#include "net/dcsctp/public/dcsctp_options.h"
namespace dcsctp {
RetransmissionTimeout::RetransmissionTimeout(const DcSctpOptions& options)
: min_rto_(options.rto_min.ToTimeDelta()),
max_rto_(options.rto_max.ToTimeDelta()),
max_rtt_(options.rtt_max.ToTimeDelta()),
min_rtt_variance_(*options.min_rtt_variance),
scaled_srtt_(*options.rto_initial << kRttShift),
rto_(*options.rto_initial) {}
void RetransmissionTimeout::ObserveRTT(webrtc::TimeDelta measured_rtt) {
// Unrealistic values will be skipped. If a wrongly measured (or otherwise
// corrupt) value was processed, it could change the state in a way that would
// take a very long time to recover.
if (measured_rtt < webrtc::TimeDelta::Zero() || measured_rtt > max_rtt_) {
return;
}
const int64_t rtt = measured_rtt.ms();
// From https://tools.ietf.org/html/rfc4960#section-6.3.1, but avoiding
// floating point math by implementing algorithm from "V. Jacobson: Congestion
// avoidance and control", but adapted for SCTP.
if (first_measurement_) {
scaled_srtt_ = rtt << kRttShift;
scaled_rtt_var_ = (rtt / 2) << kRttVarShift;
first_measurement_ = false;
} else {
int64_t rtt_diff = rtt - (scaled_srtt_ >> kRttShift);
scaled_srtt_ += rtt_diff;
if (rtt_diff < 0) {
rtt_diff = -rtt_diff;
}
rtt_diff -= (scaled_rtt_var_ >> kRttVarShift);
scaled_rtt_var_ += rtt_diff;
}
if (scaled_rtt_var_ < min_rtt_variance_) {
scaled_rtt_var_ = min_rtt_variance_;
}
rto_ = (scaled_srtt_ >> kRttShift) + scaled_rtt_var_;
// Clamp RTO between min and max.
rto_ = std::min(std::max(rto_, min_rto_.ms()), max_rto_.ms());
}
} // namespace dcsctp

View file

@ -0,0 +1,61 @@
/*
* Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef NET_DCSCTP_TX_RETRANSMISSION_TIMEOUT_H_
#define NET_DCSCTP_TX_RETRANSMISSION_TIMEOUT_H_
#include <cstdint>
#include <functional>
#include "net/dcsctp/public/dcsctp_options.h"
namespace dcsctp {
// Manages updating of the Retransmission Timeout (RTO) SCTP variable, which is
// used directly as the base timeout for T3-RTX and for other timers, such as
// delayed ack.
//
// When a round-trip-time (RTT) is calculated (outside this class), `Observe`
// is called, which calculates the retransmission timeout (RTO) value. The RTO
// value will become larger if the RTT is high and/or the RTT values are varying
// a lot, which is an indicator of a bad connection.
class RetransmissionTimeout {
public:
static constexpr int kRttShift = 3;
static constexpr int kRttVarShift = 2;
explicit RetransmissionTimeout(const DcSctpOptions& options);
// To be called when a RTT has been measured, to update the RTO value.
void ObserveRTT(webrtc::TimeDelta measured_rtt);
// Returns the Retransmission Timeout (RTO) value, in milliseconds.
webrtc::TimeDelta rto() const { return webrtc::TimeDelta::Millis(rto_); }
// Returns the smoothed RTT value, in milliseconds.
webrtc::TimeDelta srtt() const {
return webrtc::TimeDelta::Millis(scaled_srtt_ >> kRttShift);
}
private:
const webrtc::TimeDelta min_rto_;
const webrtc::TimeDelta max_rto_;
const webrtc::TimeDelta max_rtt_;
const int64_t min_rtt_variance_;
// If this is the first measurement
bool first_measurement_ = true;
// Smoothed Round-Trip Time, shifted by kRttShift
int64_t scaled_srtt_;
// Round-Trip Time Variation, shifted by kRttVarShift
int64_t scaled_rtt_var_ = 0;
// Retransmission Timeout
int64_t rto_;
};
} // namespace dcsctp
#endif // NET_DCSCTP_TX_RETRANSMISSION_TIMEOUT_H_

View file

@ -0,0 +1,545 @@
/*
* Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "net/dcsctp/tx/rr_send_queue.h"
#include <cstdint>
#include <deque>
#include <limits>
#include <map>
#include <set>
#include <utility>
#include <vector>
#include "absl/algorithm/container.h"
#include "absl/types/optional.h"
#include "api/array_view.h"
#include "net/dcsctp/common/internal_types.h"
#include "net/dcsctp/packet/data.h"
#include "net/dcsctp/public/dcsctp_message.h"
#include "net/dcsctp/public/dcsctp_socket.h"
#include "net/dcsctp/public/types.h"
#include "net/dcsctp/tx/send_queue.h"
#include "rtc_base/logging.h"
#include "rtc_base/strings/str_join.h"
namespace dcsctp {
using ::webrtc::TimeDelta;
using ::webrtc::Timestamp;
RRSendQueue::RRSendQueue(absl::string_view log_prefix,
DcSctpSocketCallbacks* callbacks,
size_t buffer_size,
size_t mtu,
StreamPriority default_priority,
size_t total_buffered_amount_low_threshold)
: log_prefix_(log_prefix),
callbacks_(*callbacks),
buffer_size_(buffer_size),
default_priority_(default_priority),
scheduler_(log_prefix_, mtu),
total_buffered_amount_(
[this]() { callbacks_.OnTotalBufferedAmountLow(); }) {
total_buffered_amount_.SetLowThreshold(total_buffered_amount_low_threshold);
}
size_t RRSendQueue::OutgoingStream::bytes_to_send_in_next_message() const {
if (pause_state_ == PauseState::kPaused ||
pause_state_ == PauseState::kResetting) {
// The stream has paused (and there is no partially sent message).
return 0;
}
if (items_.empty()) {
return 0;
}
return items_.front().remaining_size;
}
void RRSendQueue::OutgoingStream::AddHandoverState(
DcSctpSocketHandoverState::OutgoingStream& state) const {
state.next_ssn = next_ssn_.value();
state.next_ordered_mid = next_ordered_mid_.value();
state.next_unordered_mid = next_unordered_mid_.value();
state.priority = *scheduler_stream_->priority();
}
bool RRSendQueue::IsConsistent() const {
std::set<StreamID> expected_active_streams;
std::set<StreamID> actual_active_streams =
scheduler_.ActiveStreamsForTesting();
size_t total_buffered_amount = 0;
for (const auto& [stream_id, stream] : streams_) {
total_buffered_amount += stream.buffered_amount().value();
if (stream.bytes_to_send_in_next_message() > 0) {
expected_active_streams.emplace(stream_id);
}
}
if (expected_active_streams != actual_active_streams) {
auto fn = [&](rtc::StringBuilder& sb, const auto& p) { sb << *p; };
RTC_DLOG(LS_ERROR) << "Active streams mismatch, is=["
<< StrJoin(actual_active_streams, ",", fn)
<< "], expected=["
<< StrJoin(expected_active_streams, ",", fn) << "]";
return false;
}
return total_buffered_amount == total_buffered_amount_.value();
}
bool RRSendQueue::OutgoingStream::IsConsistent() const {
size_t bytes = 0;
for (const auto& item : items_) {
bytes += item.remaining_size;
}
return bytes == buffered_amount_.value();
}
void RRSendQueue::ThresholdWatcher::Decrease(size_t bytes) {
RTC_DCHECK(bytes <= value_);
size_t old_value = value_;
value_ -= bytes;
if (old_value > low_threshold_ && value_ <= low_threshold_) {
on_threshold_reached_();
}
}
void RRSendQueue::ThresholdWatcher::SetLowThreshold(size_t low_threshold) {
// Betting on https://github.com/w3c/webrtc-pc/issues/2654 being accepted.
if (low_threshold_ < value_ && low_threshold >= value_) {
on_threshold_reached_();
}
low_threshold_ = low_threshold;
}
void RRSendQueue::OutgoingStream::Add(DcSctpMessage message,
MessageAttributes attributes) {
bool was_active = bytes_to_send_in_next_message() > 0;
buffered_amount_.Increase(message.payload().size());
parent_.total_buffered_amount_.Increase(message.payload().size());
OutgoingMessageId message_id = parent_.current_message_id;
parent_.current_message_id =
OutgoingMessageId(*parent_.current_message_id + 1);
items_.emplace_back(message_id, std::move(message), std::move(attributes));
if (!was_active) {
scheduler_stream_->MaybeMakeActive();
}
RTC_DCHECK(IsConsistent());
}
absl::optional<SendQueue::DataToSend> RRSendQueue::OutgoingStream::Produce(
Timestamp now,
size_t max_size) {
RTC_DCHECK(pause_state_ != PauseState::kPaused &&
pause_state_ != PauseState::kResetting);
while (!items_.empty()) {
Item& item = items_.front();
DcSctpMessage& message = item.message;
// Allocate Message ID and SSN when the first fragment is sent.
if (!item.mid.has_value()) {
// Oops, this entire message has already expired. Try the next one.
if (item.attributes.expires_at <= now) {
HandleMessageExpired(item);
items_.pop_front();
continue;
}
MID& mid =
item.attributes.unordered ? next_unordered_mid_ : next_ordered_mid_;
item.mid = mid;
mid = MID(*mid + 1);
}
if (!item.attributes.unordered && !item.ssn.has_value()) {
item.ssn = next_ssn_;
next_ssn_ = SSN(*next_ssn_ + 1);
}
// Grab the next `max_size` fragment from this message and calculate flags.
rtc::ArrayView<const uint8_t> chunk_payload =
item.message.payload().subview(item.remaining_offset, max_size);
rtc::ArrayView<const uint8_t> message_payload = message.payload();
Data::IsBeginning is_beginning(chunk_payload.data() ==
message_payload.data());
Data::IsEnd is_end((chunk_payload.data() + chunk_payload.size()) ==
(message_payload.data() + message_payload.size()));
StreamID stream_id = message.stream_id();
PPID ppid = message.ppid();
// Zero-copy the payload if the message fits in a single chunk.
std::vector<uint8_t> payload =
is_beginning && is_end
? std::move(message).ReleasePayload()
: std::vector<uint8_t>(chunk_payload.begin(), chunk_payload.end());
FSN fsn(item.current_fsn);
item.current_fsn = FSN(*item.current_fsn + 1);
buffered_amount_.Decrease(payload.size());
parent_.total_buffered_amount_.Decrease(payload.size());
SendQueue::DataToSend chunk(
item.message_id, Data(stream_id, item.ssn.value_or(SSN(0)), *item.mid,
fsn, ppid, std::move(payload), is_beginning,
is_end, item.attributes.unordered));
chunk.max_retransmissions = item.attributes.max_retransmissions;
chunk.expires_at = item.attributes.expires_at;
chunk.lifecycle_id =
is_end ? item.attributes.lifecycle_id : LifecycleId::NotSet();
if (is_end) {
// The entire message has been sent, and its last data copied to `chunk`,
// so it can safely be discarded.
items_.pop_front();
if (pause_state_ == PauseState::kPending) {
RTC_DLOG(LS_VERBOSE) << "Pause state on " << *stream_id
<< " is moving from pending to paused";
pause_state_ = PauseState::kPaused;
}
} else {
item.remaining_offset += chunk_payload.size();
item.remaining_size -= chunk_payload.size();
RTC_DCHECK(item.remaining_offset + item.remaining_size ==
item.message.payload().size());
RTC_DCHECK(item.remaining_size > 0);
}
RTC_DCHECK(IsConsistent());
return chunk;
}
RTC_DCHECK(IsConsistent());
return absl::nullopt;
}
void RRSendQueue::OutgoingStream::HandleMessageExpired(
OutgoingStream::Item& item) {
buffered_amount_.Decrease(item.remaining_size);
parent_.total_buffered_amount_.Decrease(item.remaining_size);
if (item.attributes.lifecycle_id.IsSet()) {
RTC_DLOG(LS_VERBOSE) << "Triggering OnLifecycleMessageExpired("
<< item.attributes.lifecycle_id.value() << ", false)";
parent_.callbacks_.OnLifecycleMessageExpired(item.attributes.lifecycle_id,
/*maybe_delivered=*/false);
parent_.callbacks_.OnLifecycleEnd(item.attributes.lifecycle_id);
}
}
bool RRSendQueue::OutgoingStream::Discard(OutgoingMessageId message_id) {
bool result = false;
if (!items_.empty()) {
Item& item = items_.front();
if (item.message_id == message_id) {
HandleMessageExpired(item);
items_.pop_front();
// Only partially sent messages are discarded, so if a message was
// discarded, then it was the currently sent message.
scheduler_stream_->ForceReschedule();
if (pause_state_ == PauseState::kPending) {
pause_state_ = PauseState::kPaused;
scheduler_stream_->MakeInactive();
} else if (bytes_to_send_in_next_message() == 0) {
scheduler_stream_->MakeInactive();
}
// As the item still existed, it had unsent data.
result = true;
}
}
RTC_DCHECK(IsConsistent());
return result;
}
void RRSendQueue::OutgoingStream::Pause() {
if (pause_state_ != PauseState::kNotPaused) {
// Already in progress.
return;
}
bool had_pending_items = !items_.empty();
// https://datatracker.ietf.org/doc/html/rfc8831#section-6.7
// "Closing of a data channel MUST be signaled by resetting the corresponding
// outgoing streams [RFC6525]. This means that if one side decides to close
// the data channel, it resets the corresponding outgoing stream."
// ... "[RFC6525] also guarantees that all the messages are delivered (or
// abandoned) before the stream is reset."
// A stream is paused when it's about to be reset. In this implementation,
// it will throw away all non-partially send messages - they will be abandoned
// as noted above. This is subject to change. It will however not discard any
// partially sent messages - only whole messages. Partially delivered messages
// (at the time of receiving a Stream Reset command) will always deliver all
// the fragments before actually resetting the stream.
for (auto it = items_.begin(); it != items_.end();) {
if (it->remaining_offset == 0) {
HandleMessageExpired(*it);
it = items_.erase(it);
} else {
++it;
}
}
pause_state_ = (items_.empty() || items_.front().remaining_offset == 0)
? PauseState::kPaused
: PauseState::kPending;
if (had_pending_items && pause_state_ == PauseState::kPaused) {
RTC_DLOG(LS_VERBOSE) << "Stream " << *stream_id()
<< " was previously active, but is now paused.";
scheduler_stream_->MakeInactive();
}
RTC_DCHECK(IsConsistent());
}
void RRSendQueue::OutgoingStream::Resume() {
RTC_DCHECK(pause_state_ == PauseState::kResetting);
pause_state_ = PauseState::kNotPaused;
scheduler_stream_->MaybeMakeActive();
RTC_DCHECK(IsConsistent());
}
void RRSendQueue::OutgoingStream::Reset() {
// This can be called both when an outgoing stream reset has been responded
// to, or when the entire SendQueue is reset due to detecting the peer having
// restarted. The stream may be in any state at this time.
PauseState old_pause_state = pause_state_;
pause_state_ = PauseState::kNotPaused;
next_ordered_mid_ = MID(0);
next_unordered_mid_ = MID(0);
next_ssn_ = SSN(0);
if (!items_.empty()) {
// If this message has been partially sent, reset it so that it will be
// re-sent.
auto& item = items_.front();
buffered_amount_.Increase(item.message.payload().size() -
item.remaining_size);
parent_.total_buffered_amount_.Increase(item.message.payload().size() -
item.remaining_size);
item.remaining_offset = 0;
item.remaining_size = item.message.payload().size();
item.mid = absl::nullopt;
item.ssn = absl::nullopt;
item.current_fsn = FSN(0);
if (old_pause_state == PauseState::kPaused ||
old_pause_state == PauseState::kResetting) {
scheduler_stream_->MaybeMakeActive();
}
}
RTC_DCHECK(IsConsistent());
}
bool RRSendQueue::OutgoingStream::has_partially_sent_message() const {
if (items_.empty()) {
return false;
}
return items_.front().mid.has_value();
}
void RRSendQueue::Add(Timestamp now,
DcSctpMessage message,
const SendOptions& send_options) {
RTC_DCHECK(!message.payload().empty());
// Any limited lifetime should start counting from now - when the message
// has been added to the queue.
// `expires_at` is the time when it expires. Which is slightly larger than the
// message's lifetime, as the message is alive during its entire lifetime
// (which may be zero).
MessageAttributes attributes = {
.unordered = send_options.unordered,
.max_retransmissions =
send_options.max_retransmissions.has_value()
? MaxRetransmits(send_options.max_retransmissions.value())
: MaxRetransmits::NoLimit(),
.expires_at = send_options.lifetime.has_value()
? now + send_options.lifetime->ToTimeDelta() +
TimeDelta::Millis(1)
: Timestamp::PlusInfinity(),
.lifecycle_id = send_options.lifecycle_id,
};
StreamID stream_id = message.stream_id();
GetOrCreateStreamInfo(stream_id).Add(std::move(message),
std::move(attributes));
RTC_DCHECK(IsConsistent());
}
bool RRSendQueue::IsFull() const {
return total_buffered_amount() >= buffer_size_;
}
bool RRSendQueue::IsEmpty() const {
return total_buffered_amount() == 0;
}
absl::optional<SendQueue::DataToSend> RRSendQueue::Produce(Timestamp now,
size_t max_size) {
return scheduler_.Produce(now, max_size);
}
bool RRSendQueue::Discard(StreamID stream_id, OutgoingMessageId message_id) {
bool has_discarded = GetOrCreateStreamInfo(stream_id).Discard(message_id);
RTC_DCHECK(IsConsistent());
return has_discarded;
}
void RRSendQueue::PrepareResetStream(StreamID stream_id) {
GetOrCreateStreamInfo(stream_id).Pause();
RTC_DCHECK(IsConsistent());
}
bool RRSendQueue::HasStreamsReadyToBeReset() const {
for (auto& [unused, stream] : streams_) {
if (stream.IsReadyToBeReset()) {
return true;
}
}
return false;
}
std::vector<StreamID> RRSendQueue::GetStreamsReadyToBeReset() {
RTC_DCHECK(absl::c_count_if(streams_, [](const auto& p) {
return p.second.IsResetting();
}) == 0);
std::vector<StreamID> ready;
for (auto& [stream_id, stream] : streams_) {
if (stream.IsReadyToBeReset()) {
stream.SetAsResetting();
ready.push_back(stream_id);
}
}
return ready;
}
void RRSendQueue::CommitResetStreams() {
RTC_DCHECK(absl::c_count_if(streams_, [](const auto& p) {
return p.second.IsResetting();
}) > 0);
for (auto& [unused, stream] : streams_) {
if (stream.IsResetting()) {
stream.Reset();
}
}
RTC_DCHECK(IsConsistent());
}
void RRSendQueue::RollbackResetStreams() {
RTC_DCHECK(absl::c_count_if(streams_, [](const auto& p) {
return p.second.IsResetting();
}) > 0);
for (auto& [unused, stream] : streams_) {
if (stream.IsResetting()) {
stream.Resume();
}
}
RTC_DCHECK(IsConsistent());
}
void RRSendQueue::Reset() {
// Recalculate buffered amount, as partially sent messages may have been put
// fully back in the queue.
for (auto& [unused, stream] : streams_) {
stream.Reset();
}
scheduler_.ForceReschedule();
}
size_t RRSendQueue::buffered_amount(StreamID stream_id) const {
auto it = streams_.find(stream_id);
if (it == streams_.end()) {
return 0;
}
return it->second.buffered_amount().value();
}
size_t RRSendQueue::buffered_amount_low_threshold(StreamID stream_id) const {
auto it = streams_.find(stream_id);
if (it == streams_.end()) {
return 0;
}
return it->second.buffered_amount().low_threshold();
}
void RRSendQueue::SetBufferedAmountLowThreshold(StreamID stream_id,
size_t bytes) {
GetOrCreateStreamInfo(stream_id).buffered_amount().SetLowThreshold(bytes);
}
RRSendQueue::OutgoingStream& RRSendQueue::GetOrCreateStreamInfo(
StreamID stream_id) {
auto it = streams_.find(stream_id);
if (it != streams_.end()) {
return it->second;
}
return streams_
.emplace(
std::piecewise_construct, std::forward_as_tuple(stream_id),
std::forward_as_tuple(this, &scheduler_, stream_id, default_priority_,
[this, stream_id]() {
callbacks_.OnBufferedAmountLow(stream_id);
}))
.first->second;
}
void RRSendQueue::SetStreamPriority(StreamID stream_id,
StreamPriority priority) {
OutgoingStream& stream = GetOrCreateStreamInfo(stream_id);
stream.SetPriority(priority);
RTC_DCHECK(IsConsistent());
}
StreamPriority RRSendQueue::GetStreamPriority(StreamID stream_id) const {
auto stream_it = streams_.find(stream_id);
if (stream_it == streams_.end()) {
return default_priority_;
}
return stream_it->second.priority();
}
HandoverReadinessStatus RRSendQueue::GetHandoverReadiness() const {
HandoverReadinessStatus status;
if (!IsEmpty()) {
status.Add(HandoverUnreadinessReason::kSendQueueNotEmpty);
}
return status;
}
void RRSendQueue::AddHandoverState(DcSctpSocketHandoverState& state) {
for (const auto& [stream_id, stream] : streams_) {
DcSctpSocketHandoverState::OutgoingStream state_stream;
state_stream.id = stream_id.value();
stream.AddHandoverState(state_stream);
state.tx.streams.push_back(std::move(state_stream));
}
}
void RRSendQueue::RestoreFromState(const DcSctpSocketHandoverState& state) {
for (const DcSctpSocketHandoverState::OutgoingStream& state_stream :
state.tx.streams) {
StreamID stream_id(state_stream.id);
streams_.emplace(
std::piecewise_construct, std::forward_as_tuple(stream_id),
std::forward_as_tuple(
this, &scheduler_, stream_id, StreamPriority(state_stream.priority),
[this, stream_id]() { callbacks_.OnBufferedAmountLow(stream_id); },
&state_stream));
}
}
} // namespace dcsctp

View file

@ -0,0 +1,287 @@
/*
* Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef NET_DCSCTP_TX_RR_SEND_QUEUE_H_
#define NET_DCSCTP_TX_RR_SEND_QUEUE_H_
#include <cstdint>
#include <deque>
#include <map>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "absl/algorithm/container.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include "api/array_view.h"
#include "net/dcsctp/common/internal_types.h"
#include "net/dcsctp/public/dcsctp_message.h"
#include "net/dcsctp/public/dcsctp_socket.h"
#include "net/dcsctp/public/types.h"
#include "net/dcsctp/tx/send_queue.h"
#include "net/dcsctp/tx/stream_scheduler.h"
namespace dcsctp {
// The Round Robin SendQueue holds all messages that the client wants to send,
// but that haven't yet been split into chunks and fully sent on the wire.
//
// As defined in https://datatracker.ietf.org/doc/html/rfc8260#section-3.2,
// it will cycle to send messages from different streams. It will send all
// fragments from one message before continuing with a different message on
// possibly a different stream, until support for message interleaving has been
// implemented.
//
// As messages can be (requested to be) sent before the connection is properly
// established, this send queue is always present - even for closed connections.
//
// The send queue may trigger callbacks:
// * `OnBufferedAmountLow`, `OnTotalBufferedAmountLow`
// These will be triggered as defined in their documentation.
// * `OnLifecycleMessageExpired(/*maybe_delivered=*/false)`, `OnLifecycleEnd`
// These will be triggered when messages have been expired, abandoned or
// discarded from the send queue. If a message is fully produced, meaning
// that the last fragment has been produced, the responsibility to send
// lifecycle events is then transferred to the retransmission queue, which
// is the one asking to produce the message.
class RRSendQueue : public SendQueue {
public:
RRSendQueue(absl::string_view log_prefix,
DcSctpSocketCallbacks* callbacks,
size_t buffer_size,
size_t mtu,
StreamPriority default_priority,
size_t total_buffered_amount_low_threshold);
// Indicates if the buffer is full. Note that it's up to the caller to ensure
// that the buffer is not full prior to adding new items to it.
bool IsFull() const;
// Indicates if the buffer is empty.
bool IsEmpty() const;
// Adds the message to be sent using the `send_options` provided. The current
// time should be in `now`. Note that it's the responsibility of the caller to
// ensure that the buffer is not full (by calling `IsFull`) before adding
// messages to it.
void Add(webrtc::Timestamp now,
DcSctpMessage message,
const SendOptions& send_options = {});
// Implementation of `SendQueue`.
absl::optional<DataToSend> Produce(webrtc::Timestamp now,
size_t max_size) override;
bool Discard(StreamID stream_id, OutgoingMessageId message_id) override;
void PrepareResetStream(StreamID streams) override;
bool HasStreamsReadyToBeReset() const override;
std::vector<StreamID> GetStreamsReadyToBeReset() override;
void CommitResetStreams() override;
void RollbackResetStreams() override;
void Reset() override;
size_t buffered_amount(StreamID stream_id) const override;
size_t total_buffered_amount() const override {
return total_buffered_amount_.value();
}
size_t buffered_amount_low_threshold(StreamID stream_id) const override;
void SetBufferedAmountLowThreshold(StreamID stream_id, size_t bytes) override;
void EnableMessageInterleaving(bool enabled) override {
scheduler_.EnableMessageInterleaving(enabled);
}
void SetStreamPriority(StreamID stream_id, StreamPriority priority);
StreamPriority GetStreamPriority(StreamID stream_id) const;
HandoverReadinessStatus GetHandoverReadiness() const;
void AddHandoverState(DcSctpSocketHandoverState& state);
void RestoreFromState(const DcSctpSocketHandoverState& state);
private:
struct MessageAttributes {
IsUnordered unordered;
MaxRetransmits max_retransmissions;
webrtc::Timestamp expires_at;
LifecycleId lifecycle_id;
};
// Represents a value and a "low threshold" that when the value reaches or
// goes under the "low threshold", will trigger `on_threshold_reached`
// callback.
class ThresholdWatcher {
public:
explicit ThresholdWatcher(std::function<void()> on_threshold_reached)
: on_threshold_reached_(std::move(on_threshold_reached)) {}
// Increases the value.
void Increase(size_t bytes) { value_ += bytes; }
// Decreases the value and triggers `on_threshold_reached` if it's at or
// below `low_threshold()`.
void Decrease(size_t bytes);
size_t value() const { return value_; }
size_t low_threshold() const { return low_threshold_; }
void SetLowThreshold(size_t low_threshold);
private:
const std::function<void()> on_threshold_reached_;
size_t value_ = 0;
size_t low_threshold_ = 0;
};
// Per-stream information.
class OutgoingStream : public StreamScheduler::StreamProducer {
public:
OutgoingStream(
RRSendQueue* parent,
StreamScheduler* scheduler,
StreamID stream_id,
StreamPriority priority,
std::function<void()> on_buffered_amount_low,
const DcSctpSocketHandoverState::OutgoingStream* state = nullptr)
: parent_(*parent),
scheduler_stream_(scheduler->CreateStream(this, stream_id, priority)),
next_unordered_mid_(MID(state ? state->next_unordered_mid : 0)),
next_ordered_mid_(MID(state ? state->next_ordered_mid : 0)),
next_ssn_(SSN(state ? state->next_ssn : 0)),
buffered_amount_(std::move(on_buffered_amount_low)) {}
StreamID stream_id() const { return scheduler_stream_->stream_id(); }
// Enqueues a message to this stream.
void Add(DcSctpMessage message, MessageAttributes attributes);
// Implementing `StreamScheduler::StreamProducer`.
absl::optional<SendQueue::DataToSend> Produce(webrtc::Timestamp now,
size_t max_size) override;
size_t bytes_to_send_in_next_message() const override;
const ThresholdWatcher& buffered_amount() const { return buffered_amount_; }
ThresholdWatcher& buffered_amount() { return buffered_amount_; }
// Discards a partially sent message, see `SendQueue::Discard`.
bool Discard(OutgoingMessageId message_id);
// Pauses this stream, which is used before resetting it.
void Pause();
// Resumes a paused stream.
void Resume();
bool IsReadyToBeReset() const {
return pause_state_ == PauseState::kPaused;
}
bool IsResetting() const { return pause_state_ == PauseState::kResetting; }
void SetAsResetting() {
RTC_DCHECK(pause_state_ == PauseState::kPaused);
pause_state_ = PauseState::kResetting;
}
// Resets this stream, meaning MIDs and SSNs are set to zero.
void Reset();
// Indicates if this stream has a partially sent message in it.
bool has_partially_sent_message() const;
StreamPriority priority() const { return scheduler_stream_->priority(); }
void SetPriority(StreamPriority priority) {
scheduler_stream_->SetPriority(priority);
}
void AddHandoverState(
DcSctpSocketHandoverState::OutgoingStream& state) const;
private:
// Streams are paused before they can be reset. To reset a stream, the
// socket sends an outgoing stream reset command with the TSN of the last
// fragment of the last message, so that receivers and senders can agree on
// when it stopped. And if the send queue is in the middle of sending a
// message, and without fragments not yet sent and without TSNs allocated to
// them, it will keep sending data until that message has ended.
enum class PauseState {
// The stream is not paused, and not scheduled to be reset.
kNotPaused,
// The stream has requested to be reset/paused but is still producing
// fragments of a message that hasn't ended yet. When it does, it will
// transition to the `kPaused` state.
kPending,
// The stream is fully paused and can be reset.
kPaused,
// The stream has been added to an outgoing stream reset request and a
// response from the peer hasn't been received yet.
kResetting,
};
// An enqueued message and metadata.
struct Item {
explicit Item(OutgoingMessageId message_id,
DcSctpMessage msg,
MessageAttributes attributes)
: message_id(message_id),
message(std::move(msg)),
attributes(std::move(attributes)),
remaining_offset(0),
remaining_size(message.payload().size()) {}
OutgoingMessageId message_id;
DcSctpMessage message;
MessageAttributes attributes;
// The remaining payload (offset and size) to be sent, when it has been
// fragmented.
size_t remaining_offset;
size_t remaining_size;
// If set, an allocated Message ID and SSN. Will be allocated when the
// first fragment is sent.
absl::optional<MID> mid = absl::nullopt;
absl::optional<SSN> ssn = absl::nullopt;
// The current Fragment Sequence Number, incremented for each fragment.
FSN current_fsn = FSN(0);
};
bool IsConsistent() const;
void HandleMessageExpired(OutgoingStream::Item& item);
RRSendQueue& parent_;
const std::unique_ptr<StreamScheduler::Stream> scheduler_stream_;
PauseState pause_state_ = PauseState::kNotPaused;
// MIDs are different for unordered and ordered messages sent on a stream.
MID next_unordered_mid_;
MID next_ordered_mid_;
SSN next_ssn_;
// Enqueued messages, and metadata.
std::deque<Item> items_;
// The current amount of buffered data.
ThresholdWatcher buffered_amount_;
};
bool IsConsistent() const;
OutgoingStream& GetOrCreateStreamInfo(StreamID stream_id);
absl::optional<DataToSend> Produce(
std::map<StreamID, OutgoingStream>::iterator it,
webrtc::Timestamp now,
size_t max_size);
const absl::string_view log_prefix_;
DcSctpSocketCallbacks& callbacks_;
const size_t buffer_size_;
const StreamPriority default_priority_;
OutgoingMessageId current_message_id = OutgoingMessageId(0);
StreamScheduler scheduler_;
// The total amount of buffer data, for all streams.
ThresholdWatcher total_buffered_amount_;
// All streams, and messages added to those.
std::map<StreamID, OutgoingStream> streams_;
};
} // namespace dcsctp
#endif // NET_DCSCTP_TX_RR_SEND_QUEUE_H_

View file

@ -0,0 +1,146 @@
/*
* Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef NET_DCSCTP_TX_SEND_QUEUE_H_
#define NET_DCSCTP_TX_SEND_QUEUE_H_
#include <cstdint>
#include <limits>
#include <utility>
#include <vector>
#include "absl/types/optional.h"
#include "api/array_view.h"
#include "api/units/timestamp.h"
#include "net/dcsctp/common/internal_types.h"
#include "net/dcsctp/packet/data.h"
#include "net/dcsctp/public/types.h"
namespace dcsctp {
class SendQueue {
public:
// Container for a data chunk that is produced by the SendQueue
struct DataToSend {
DataToSend(OutgoingMessageId message_id, Data data)
: message_id(message_id), data(std::move(data)) {}
OutgoingMessageId message_id;
// The data to send, including all parameters.
Data data;
// Partial reliability - RFC3758
MaxRetransmits max_retransmissions = MaxRetransmits::NoLimit();
webrtc::Timestamp expires_at = webrtc::Timestamp::PlusInfinity();
// Lifecycle - set for the last fragment, and `LifecycleId::NotSet()` for
// all other fragments.
LifecycleId lifecycle_id = LifecycleId::NotSet();
};
virtual ~SendQueue() = default;
// TODO(boivie): This interface is obviously missing an "Add" function, but
// that is postponed a bit until the story around how to model message
// prioritization, which is important for any advanced stream scheduler, is
// further clarified.
// Produce a chunk to be sent.
//
// `max_size` refers to how many payload bytes that may be produced, not
// including any headers.
virtual absl::optional<DataToSend> Produce(webrtc::Timestamp now,
size_t max_size) = 0;
// Discards a partially sent message identified by the parameters
// `stream_id` and `message_id`. The `message_id` comes from the returned
// information when having called `Produce`. A partially sent message means
// that it has had at least one fragment of it returned when `Produce` was
// called prior to calling this method).
//
// This is used when a message has been found to be expired (by the partial
// reliability extension), and the retransmission queue will signal the
// receiver that any partially received message fragments should be skipped.
// This means that any remaining fragments in the Send Queue must be removed
// as well so that they are not sent.
//
// This function returns true if this message had unsent fragments still in
// the queue that were discarded, and false if there were no such fragments.
virtual bool Discard(StreamID stream_id, OutgoingMessageId message_id) = 0;
// Prepares the stream to be reset. This is used to close a WebRTC data
// channel and will be signaled to the other side.
//
// Concretely, it discards all whole (not partly sent) messages in the given
// stream and pauses that stream so that future added messages aren't
// produced until `ResumeStreams` is called.
//
// TODO(boivie): Investigate if it really should discard any message at all.
// RFC8831 only mentions that "[RFC6525] also guarantees that all the messages
// are delivered (or abandoned) before the stream is reset."
//
// This method can be called multiple times to add more streams to be
// reset, and paused while they are resetting. This is the first part of the
// two-phase commit protocol to reset streams, where the caller completes the
// procedure by either calling `CommitResetStreams` or `RollbackResetStreams`.
virtual void PrepareResetStream(StreamID stream_id) = 0;
// Indicates if there are any streams that are ready to be reset.
virtual bool HasStreamsReadyToBeReset() const = 0;
// Returns a list of streams that are ready to be included in an outgoing
// stream reset request. Any streams that are returned here must be included
// in an outgoing stream reset request, and there must not be concurrent
// requests. Before calling this method again, you must have called
virtual std::vector<StreamID> GetStreamsReadyToBeReset() = 0;
// Called to commit to reset the streams returned by
// `GetStreamsReadyToBeReset`. It will reset the stream sequence numbers
// (SSNs) and message identifiers (MIDs) and resume the paused streams.
virtual void CommitResetStreams() = 0;
// Called to abort the resetting of streams returned by
// `GetStreamsReadyToBeReset`. Will resume the paused streams without
// resetting the stream sequence numbers (SSNs) or message identifiers (MIDs).
// Note that the non-partial messages that were discarded when calling
// `PrepareResetStreams` will not be recovered, to better match the intention
// from the sender to "close the channel".
virtual void RollbackResetStreams() = 0;
// Resets all message identifier counters (MID, SSN) and makes all partially
// messages be ready to be re-sent in full. This is used when the peer has
// been detected to have restarted and is used to try to minimize the amount
// of data loss. However, data loss cannot be completely guaranteed when a
// peer restarts.
virtual void Reset() = 0;
// Returns the amount of buffered data. This doesn't include packets that are
// e.g. inflight.
virtual size_t buffered_amount(StreamID stream_id) const = 0;
// Returns the total amount of buffer data, for all streams.
virtual size_t total_buffered_amount() const = 0;
// Returns the limit for the `OnBufferedAmountLow` event. Default value is 0.
virtual size_t buffered_amount_low_threshold(StreamID stream_id) const = 0;
// Sets a limit for the `OnBufferedAmountLow` event.
virtual void SetBufferedAmountLowThreshold(StreamID stream_id,
size_t bytes) = 0;
// Configures the send queue to support interleaved message sending as
// described in RFC8260. Every send queue starts with this value set as
// disabled, but can later change it when the capabilities of the connection
// have been negotiated. This affects the behavior of the `Produce` method.
virtual void EnableMessageInterleaving(bool enabled) = 0;
};
} // namespace dcsctp
#endif // NET_DCSCTP_TX_SEND_QUEUE_H_

View file

@ -0,0 +1,205 @@
/*
* Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "net/dcsctp/tx/stream_scheduler.h"
#include <algorithm>
#include "absl/algorithm/container.h"
#include "absl/types/optional.h"
#include "api/array_view.h"
#include "net/dcsctp/packet/data.h"
#include "net/dcsctp/public/dcsctp_message.h"
#include "net/dcsctp/public/dcsctp_socket.h"
#include "net/dcsctp/public/types.h"
#include "net/dcsctp/tx/send_queue.h"
#include "rtc_base/checks.h"
#include "rtc_base/logging.h"
#include "rtc_base/strings/str_join.h"
namespace dcsctp {
void StreamScheduler::Stream::SetPriority(StreamPriority priority) {
priority_ = priority;
inverse_weight_ = InverseWeight(priority);
}
absl::optional<SendQueue::DataToSend> StreamScheduler::Produce(
webrtc::Timestamp now,
size_t max_size) {
// For non-interleaved streams, avoid rescheduling while still sending a
// message as it needs to be sent in full. For interleaved messaging,
// reschedule for every I-DATA chunk sent.
bool rescheduling =
enable_message_interleaving_ || !currently_sending_a_message_;
RTC_DLOG(LS_VERBOSE) << log_prefix_
<< "Producing data, rescheduling=" << rescheduling
<< ", active="
<< StrJoin(active_streams_, ", ",
[&](rtc::StringBuilder& sb, const auto& p) {
sb << *p->stream_id() << "@"
<< *p->next_finish_time();
});
RTC_DCHECK(rescheduling || current_stream_ != nullptr);
absl::optional<SendQueue::DataToSend> data;
while (!data.has_value() && !active_streams_.empty()) {
if (rescheduling) {
auto it = active_streams_.begin();
current_stream_ = *it;
RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Rescheduling to stream "
<< *current_stream_->stream_id();
active_streams_.erase(it);
current_stream_->ForceMarkInactive();
} else {
RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Producing from previous stream: "
<< *current_stream_->stream_id();
RTC_DCHECK(absl::c_any_of(active_streams_, [this](const auto* p) {
return p == current_stream_;
}));
}
data = current_stream_->Produce(now, max_size);
}
if (!data.has_value()) {
RTC_DLOG(LS_VERBOSE)
<< log_prefix_
<< "There is no stream with data; Can't produce any data.";
RTC_DCHECK(IsConsistent());
return absl::nullopt;
}
RTC_DCHECK(data->data.stream_id == current_stream_->stream_id());
RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Producing DATA, type="
<< (data->data.is_unordered ? "unordered" : "ordered")
<< "::"
<< (*data->data.is_beginning && *data->data.is_end
? "complete"
: *data->data.is_beginning ? "first"
: *data->data.is_end ? "last"
: "middle")
<< ", stream_id=" << *current_stream_->stream_id()
<< ", ppid=" << *data->data.ppid
<< ", length=" << data->data.payload.size();
currently_sending_a_message_ = !*data->data.is_end;
virtual_time_ = current_stream_->current_time();
// One side-effect of rescheduling is that the new stream will not be present
// in `active_streams`.
size_t bytes_to_send_next = current_stream_->bytes_to_send_in_next_message();
if (rescheduling && bytes_to_send_next > 0) {
current_stream_->MakeActive(bytes_to_send_next);
} else if (!rescheduling && bytes_to_send_next == 0) {
current_stream_->MakeInactive();
}
RTC_DCHECK(IsConsistent());
return data;
}
StreamScheduler::VirtualTime StreamScheduler::Stream::CalculateFinishTime(
size_t bytes_to_send_next) const {
if (parent_.enable_message_interleaving_) {
// Perform weighted fair queuing scheduling.
return VirtualTime(*current_virtual_time_ +
bytes_to_send_next * *inverse_weight_);
}
// Perform round-robin scheduling by letting the stream have its next virtual
// finish time in the future. It doesn't matter how far into the future, just
// any positive number so that any other stream that has the same virtual
// finish time as this stream gets to produce their data before revisiting
// this stream.
return VirtualTime(*current_virtual_time_ + 1);
}
absl::optional<SendQueue::DataToSend> StreamScheduler::Stream::Produce(
webrtc::Timestamp now,
size_t max_size) {
absl::optional<SendQueue::DataToSend> data = producer_.Produce(now, max_size);
if (data.has_value()) {
VirtualTime new_current = CalculateFinishTime(data->data.payload.size());
RTC_DLOG(LS_VERBOSE) << parent_.log_prefix_
<< "Virtual time changed: " << *current_virtual_time_
<< " -> " << *new_current;
current_virtual_time_ = new_current;
}
return data;
}
bool StreamScheduler::IsConsistent() const {
for (Stream* stream : active_streams_) {
if (stream->next_finish_time_ == VirtualTime::Zero()) {
RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Stream " << *stream->stream_id()
<< " is active, but has no next-finish-time";
return false;
}
}
return true;
}
void StreamScheduler::Stream::MaybeMakeActive() {
RTC_DLOG(LS_VERBOSE) << parent_.log_prefix_ << "MaybeMakeActive("
<< *stream_id() << ")";
RTC_DCHECK(next_finish_time_ == VirtualTime::Zero());
size_t bytes_to_send_next = bytes_to_send_in_next_message();
if (bytes_to_send_next == 0) {
return;
}
MakeActive(bytes_to_send_next);
}
void StreamScheduler::Stream::MakeActive(size_t bytes_to_send_next) {
current_virtual_time_ = parent_.virtual_time_;
RTC_DCHECK_GT(bytes_to_send_next, 0);
VirtualTime next_finish_time = CalculateFinishTime(
std::min(bytes_to_send_next, parent_.max_payload_bytes_));
RTC_DCHECK_GT(*next_finish_time, 0);
RTC_DLOG(LS_VERBOSE) << parent_.log_prefix_ << "Making stream "
<< *stream_id() << " active, expiring at "
<< *next_finish_time;
RTC_DCHECK(next_finish_time_ == VirtualTime::Zero());
next_finish_time_ = next_finish_time;
RTC_DCHECK(!absl::c_any_of(parent_.active_streams_,
[this](const auto* p) { return p == this; }));
parent_.active_streams_.emplace(this);
}
void StreamScheduler::Stream::ForceMarkInactive() {
RTC_DLOG(LS_VERBOSE) << parent_.log_prefix_ << "Making stream "
<< *stream_id() << " inactive";
RTC_DCHECK(next_finish_time_ != VirtualTime::Zero());
next_finish_time_ = VirtualTime::Zero();
}
void StreamScheduler::Stream::MakeInactive() {
ForceMarkInactive();
webrtc::EraseIf(parent_.active_streams_,
[&](const auto* s) { return s == this; });
}
std::set<StreamID> StreamScheduler::ActiveStreamsForTesting() const {
std::set<StreamID> stream_ids;
for (const auto& stream : active_streams_) {
stream_ids.insert(stream->stream_id());
}
return stream_ids;
}
} // namespace dcsctp

View file

@ -0,0 +1,226 @@
/*
* Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef NET_DCSCTP_TX_STREAM_SCHEDULER_H_
#define NET_DCSCTP_TX_STREAM_SCHEDULER_H_
#include <algorithm>
#include <cstdint>
#include <deque>
#include <map>
#include <memory>
#include <queue>
#include <set>
#include <string>
#include <utility>
#include "absl/algorithm/container.h"
#include "absl/memory/memory.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include "api/array_view.h"
#include "net/dcsctp/packet/chunk/idata_chunk.h"
#include "net/dcsctp/packet/sctp_packet.h"
#include "net/dcsctp/public/dcsctp_message.h"
#include "net/dcsctp/public/dcsctp_socket.h"
#include "net/dcsctp/public/types.h"
#include "net/dcsctp/tx/send_queue.h"
#include "rtc_base/containers/flat_set.h"
#include "rtc_base/strong_alias.h"
namespace dcsctp {
// A parameterized stream scheduler. Currently, it implements the round robin
// scheduling algorithm using virtual finish time. It is to be used as a part of
// a send queue and will track all active streams (streams that have any data
// that can be sent).
//
// The stream scheduler works with the concept of associating active streams
// with a "virtual finish time", which is the time when a stream is allowed to
// produce data. Streams are ordered by their virtual finish time, and the
// "current virtual time" will advance to the next following virtual finish time
// whenever a chunk is to be produced.
//
// When message interleaving is enabled, the WFQ - Weighted Fair Queueing -
// scheduling algorithm will be used. And when it's not, round-robin scheduling
// will be used instead.
//
// In the round robin scheduling algorithm, a stream's virtual finish time will
// just increment by one (1) after having produced a chunk, which results in a
// round-robin scheduling.
//
// In WFQ scheduling algorithm, a stream's virtual finish time will be defined
// as the number of bytes in the next fragment to be sent, multiplied by the
// inverse of the stream's priority, meaning that a high priority - or a smaller
// fragment - results in a closer virtual finish time, compared to a stream with
// either a lower priority or a larger fragment to be sent.
class StreamScheduler {
private:
class VirtualTime : public webrtc::StrongAlias<class VirtualTimeTag, double> {
public:
constexpr explicit VirtualTime(const UnderlyingType& v)
: webrtc::StrongAlias<class VirtualTimeTag, double>(v) {}
static constexpr VirtualTime Zero() { return VirtualTime(0); }
};
class InverseWeight
: public webrtc::StrongAlias<class InverseWeightTag, double> {
public:
constexpr explicit InverseWeight(StreamPriority priority)
: webrtc::StrongAlias<class InverseWeightTag, double>(
1.0 / std::max(static_cast<double>(*priority), 0.000001)) {}
};
public:
class StreamProducer {
public:
virtual ~StreamProducer() = default;
// Produces a fragment of data to send. The current wall time is specified
// as `now` and should be used to skip chunks with expired limited lifetime.
// The parameter `max_size` specifies the maximum amount of actual payload
// that may be returned. If these constraints prevents the stream from
// sending some data, `absl::nullopt` should be returned.
virtual absl::optional<SendQueue::DataToSend> Produce(webrtc::Timestamp now,
size_t max_size) = 0;
// Returns the number of payload bytes that is scheduled to be sent in the
// next enqueued message, or zero if there are no enqueued messages or if
// the stream has been actively paused.
virtual size_t bytes_to_send_in_next_message() const = 0;
};
class Stream {
public:
StreamID stream_id() const { return stream_id_; }
StreamPriority priority() const { return priority_; }
void SetPriority(StreamPriority priority);
// Will activate the stream _if_ it has any data to send. That is, if the
// callback to `bytes_to_send_in_next_message` returns non-zero. If the
// callback returns zero, the stream will not be made active.
void MaybeMakeActive();
// Will remove the stream from the list of active streams, and will not try
// to produce data from it. To make it active again, call `MaybeMakeActive`.
void MakeInactive();
// Make the scheduler move to another message, or another stream. This is
// used to abort the scheduler from continuing producing fragments for the
// current message in case it's deleted.
void ForceReschedule() { parent_.ForceReschedule(); }
private:
friend class StreamScheduler;
Stream(StreamScheduler* parent,
StreamProducer* producer,
StreamID stream_id,
StreamPriority priority)
: parent_(*parent),
producer_(*producer),
stream_id_(stream_id),
priority_(priority),
inverse_weight_(priority) {}
// Produces a message from this stream. This will only be called on streams
// that have data.
absl::optional<SendQueue::DataToSend> Produce(webrtc::Timestamp now,
size_t max_size);
void MakeActive(size_t bytes_to_send_next);
void ForceMarkInactive();
VirtualTime current_time() const { return current_virtual_time_; }
VirtualTime next_finish_time() const { return next_finish_time_; }
size_t bytes_to_send_in_next_message() const {
return producer_.bytes_to_send_in_next_message();
}
VirtualTime CalculateFinishTime(size_t bytes_to_send_next) const;
StreamScheduler& parent_;
StreamProducer& producer_;
const StreamID stream_id_;
StreamPriority priority_;
InverseWeight inverse_weight_;
// This outgoing stream's "current" virtual_time.
VirtualTime current_virtual_time_ = VirtualTime::Zero();
VirtualTime next_finish_time_ = VirtualTime::Zero();
};
// The `mtu` parameter represents the maximum SCTP packet size, which should
// be the same as `DcSctpOptions::mtu`.
StreamScheduler(absl::string_view log_prefix, size_t mtu)
: log_prefix_(log_prefix),
max_payload_bytes_(mtu - SctpPacket::kHeaderSize -
IDataChunk::kHeaderSize) {}
std::unique_ptr<Stream> CreateStream(StreamProducer* producer,
StreamID stream_id,
StreamPriority priority) {
return absl::WrapUnique(new Stream(this, producer, stream_id, priority));
}
void EnableMessageInterleaving(bool enabled) {
enable_message_interleaving_ = enabled;
}
// Makes the scheduler stop producing message from the current stream and
// re-evaluates which stream to produce from.
void ForceReschedule() { currently_sending_a_message_ = false; }
// Produces a fragment of data to send. The current wall time is specified as
// `now` and will be used to skip chunks with expired limited lifetime. The
// parameter `max_size` specifies the maximum amount of actual payload that
// may be returned. If no data can be produced, `absl::nullopt` is returned.
absl::optional<SendQueue::DataToSend> Produce(webrtc::Timestamp now,
size_t max_size);
std::set<StreamID> ActiveStreamsForTesting() const;
private:
struct ActiveStreamComparator {
// Ordered by virtual finish time (primary), stream-id (secondary).
bool operator()(Stream* a, Stream* b) const {
VirtualTime a_vft = a->next_finish_time();
VirtualTime b_vft = b->next_finish_time();
if (a_vft == b_vft) {
return a->stream_id() < b->stream_id();
}
return a_vft < b_vft;
}
};
bool IsConsistent() const;
const absl::string_view log_prefix_;
const size_t max_payload_bytes_;
// The current virtual time, as defined in the WFQ algorithm.
VirtualTime virtual_time_ = VirtualTime::Zero();
// The current stream to send chunks from.
Stream* current_stream_ = nullptr;
bool enable_message_interleaving_ = false;
// Indicates if the streams is currently sending a message, and should then
// - if message interleaving is not enabled - continue sending from this
// stream until that message has been sent in full.
bool currently_sending_a_message_ = false;
// The currently active streams, ordered by virtual finish time.
webrtc::flat_set<Stream*, ActiveStreamComparator> active_streams_;
};
} // namespace dcsctp
#endif // NET_DCSCTP_TX_STREAM_SCHEDULER_H_