Skip to content

Commit

Permalink
QUIC_TLS_SESSION and Perspective
Browse files Browse the repository at this point in the history
SaveStreamData
  • Loading branch information
huangyuanbing committed Dec 24, 2023
1 parent 64e9422 commit f2b1b65
Show file tree
Hide file tree
Showing 13 changed files with 43 additions and 167 deletions.
14 changes: 9 additions & 5 deletions quiche/quic/core/quic_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2274,10 +2274,6 @@ void QuicConnection::OnPacketComplete() {
return;
}

if (IsCurrentPacketConnectivityProbing()) {
QUICHE_DCHECK(!version().HasIetfQuicFrames());
++stats_.num_connectivity_probing_received;
}

QUIC_DVLOG(1) << ENDPOINT << "Got"
<< (SupportsMultiplePacketNumberSpaces()
Expand Down Expand Up @@ -2806,8 +2802,9 @@ void QuicConnection::ProcessUdpPacket(const QuicSocketAddress& self_address,
<< last_received_packet_info_.header.packet_number;
current_packet_data_ = nullptr;
is_current_packet_connectivity_probing_ = false;

#if QUIC_TLS_SESSION
MaybeProcessCoalescedPackets();
#endif
return;
}

Expand All @@ -2829,8 +2826,10 @@ void QuicConnection::ProcessUdpPacket(const QuicSocketAddress& self_address,
}
}

#if QUIC_TLS_SESSION
if (!received_coalesced_packets_.empty())
MaybeProcessCoalescedPackets();
#endif
if (!undecryptable_packets_.empty())
MaybeProcessUndecryptablePackets();

Expand Down Expand Up @@ -5593,6 +5592,11 @@ bool QuicConnection::UpdatePacketContent(QuicFrameType type) {
<< last_received_packet_info_.destination_address
<< ", default path self_address :" << default_path_.self_address;
}

if (IsCurrentPacketConnectivityProbing()) {
QUICHE_DCHECK(!version().HasIetfQuicFrames());
++stats_.num_connectivity_probing_received;
}
return connected_;
}

Expand Down
2 changes: 1 addition & 1 deletion quiche/quic/core/quic_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -2274,7 +2274,7 @@ class QUIC_EXPORT_PRIVATE QuicConnection final
bool should_proactively_validate_peer_address_on_path_challenge_ = false;

// Enable this via reloadable flag once this feature is complete.
#if !QUIC_TLS_SESSION
#if QUIC_TLS_SESSION == 0
constexpr static
#endif
bool connection_migration_use_new_cid_ = false;
Expand Down
2 changes: 1 addition & 1 deletion quiche/quic/core/quic_crypto_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ void QuicCryptoStream::WriteCryptoData(EncryptionLevel level,
}

// Append |data| to the send buffer for this encryption level.
send_buffer->SaveStreamDatav(data);
send_buffer->SaveStreamData(data);
if (kMaxStreamLength - offset < data.length()) {
QUIC_BUG(quic_bug_10322_2) << "Writing too much crypto handshake data";
OnUnrecoverableError(QUIC_INTERNAL_ERROR,
Expand Down
2 changes: 1 addition & 1 deletion quiche/quic/core/quic_flow_controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ class QUIC_EXPORT_PRIVATE QuicFlowController
bool is_connection_flow_controller_;

// Tracks if this is owned by a server or a client.
Perspective perspective_;
const Perspective perspective_;

// Tracks number of bytes sent to the peer.
QuicByteCount bytes_sent_;
Expand Down
4 changes: 2 additions & 2 deletions quiche/quic/core/quic_idle_network_detector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -105,15 +105,15 @@ void QuicIdleNetworkDetector::OnPacketSent(QuicTime now,
MaybeSetAlarmOnSentPacket(pto_delay);
return;
}
// if (!alarm_->IsSet()) //hybchanged
// if (!alarm_->IsSet()) //TODO2 hybchanged
// SetAlarm();
}

void QuicIdleNetworkDetector::OnPacketReceived(QuicTime now) {
QUICHE_DCHECK(time_of_last_received_packet_ <= now);
time_of_last_received_packet_ = now;
//MaybeSetAlarmOnSentPacket(kAlarmGranularity * 1000);
SetAlarm(); //TODO hybchanged
SetAlarm(); //TODO2 hybchanged
}

void QuicIdleNetworkDetector::SetAlarm() {
Expand Down
5 changes: 4 additions & 1 deletion quiche/quic/core/quic_ping_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ QuicPingManager::QuicPingManager(Perspective perspective, Delegate* delegate,
QuicConnectionArena* arena,
QuicAlarmFactory* alarm_factory,
QuicConnectionContext* context)
: perspective_(perspective),
:
#if QUIC_SERVER_SESSION == 1
perspective_(perspective),
#endif
delegate_(delegate),
alarm_(alarm_factory->CreateAlarm(
arena->New<AlarmDelegate>(this, context), arena)) {}
Expand Down
6 changes: 6 additions & 0 deletions quiche/quic/core/quic_ping_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,13 @@ class QUIC_EXPORT_PRIVATE QuicPingManager {
// |keep_alive_deadline_|. Returns 0 if both deadlines are not initialized.
QuicTime GetEarliestDeadline() const;

#if QUIC_SERVER_SESSION == 0
constexpr static Perspective perspective_ = Perspective::IS_CLIENT;
#elif QUIC_SERVER_SESSION == 2
constexpr static Perspective perspective_ = Perspective::IS_SERVER;
#else
const Perspective perspective_;
#endif

Delegate* delegate_; // Not owned.

Expand Down
16 changes: 9 additions & 7 deletions quiche/quic/core/quic_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -243,24 +243,26 @@ void QuicSession::OnStreamFrame(const QuicStreamFrame& frame) {
return;
}

QuicStream* stream = GetOrCreateStream(stream_id);

QuicStream* stream = stream_map_.at(stream_id);//TODO2 hybchanged.
if (!stream) {
if (stream_id == QuicUtils::GetInvalidStreamId(transport_version())) {
connection_->CloseConnection(
QUIC_INVALID_STREAM_ID, "Received data for an invalid stream",
ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
return;
connection_->CloseConnection(
QUIC_INVALID_STREAM_ID, "Received data for an invalid stream",
ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
return;
}

stream = GetOrCreateStream(stream_id);

// The stream no longer exists, but we may still be interested in the
// final stream byte offset sent by the peer. A frame with a FIN can give
// us this offset.
if (frame.fin) {
QuicStreamOffset final_byte_offset = frame.offset + frame.data_length;
OnFinalByteOffsetReceived(stream_id, final_byte_offset);
}
return;
if (!stream)
return;
}
stream->OnStreamFrame(frame);
}
Expand Down
2 changes: 1 addition & 1 deletion quiche/quic/core/quic_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,7 @@ class QUIC_EXPORT_PRIVATE QuicSession
using PendingStreamMap =
std::map<QuicStreamId, std::unique_ptr<PendingStream>>;

using ClosedStreams = absl::InlinedVector<QuicStream*, 1>;
using ClosedStreams = std::vector<QuicStream*>;

//using ZombieStreamMap =
// std::map<QuicStreamId, std::unique_ptr<QuicStream>>;
Expand Down
2 changes: 1 addition & 1 deletion quiche/quic/core/quic_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1258,7 +1258,7 @@ bool QuicStream::WriteStreamData(QuicStreamOffset offset,
QUICHE_DCHECK_LT(0u, data_length);
QUIC_DVLOG(2) << ENDPOINT << "Write stream " << id_ << " data from offset "
<< offset << " length " << data_length;
return send_buffer_.WriteStreamDatav(offset, data_length, writer);
return send_buffer_.WriteStreamData(offset, data_length, writer);
}

void QuicStream::WriteBufferedData(EncryptionLevel level) {
Expand Down
145 changes: 7 additions & 138 deletions quiche/quic/core/quic_stream_send_buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ bool StreamPendingRetransmission::operator==(
return offset == other.offset && length == other.length;
}

#define OPT_WBUFF 1
QuicStreamSendBuffer::QuicStreamSendBuffer(
quiche::QuicheBufferAllocator* allocator)
: current_end_offset_(0),
Expand All @@ -71,29 +70,8 @@ QuicStreamSendBuffer::~QuicStreamSendBuffer() {
blocks_.clear();
}

#if 0
void QuicStreamSendBuffer::SaveStreamData(absl::string_view data) {
QUICHE_DCHECK(!data.empty());
#if OPT_WBUFF
SaveStreamDatav(data);
#else
// Latch the maximum data slice size.
const QuicByteCount max_data_slice_size =
GetQuicFlag(quic_send_buffer_max_data_slice_size);
while (!data.empty()) {
auto slice_len = std::min<absl::string_view::size_type>(
data.length(), max_data_slice_size);
auto buffer =
quiche::QuicheBuffer::Copy(allocator_, data.substr(0, slice_len));
SaveMemSlice(quiche::QuicheMemSlice(std::move(buffer)));

data = data.substr(slice_len);
}
#endif
}
#endif

void QuicStreamSendBuffer::SaveStreamDatav(std::string_view data) {
void QuicStreamSendBuffer::SaveStreamData(std::string_view data) {
QUICHE_DCHECK(!data.empty());

// Latch the maximum data slice size.
Expand Down Expand Up @@ -124,26 +102,8 @@ void QuicStreamSendBuffer::SaveStreamDatav(std::string_view data) {
void QuicStreamSendBuffer::SaveMemSlice(quiche::QuicheMemSlice slice) {
QUIC_DVLOG(2) << "Save slice offset " << stream_offset_ << " length "
<< slice.length();
#if 0
if (slice.empty()) {
QUIC_BUG(quic_bug_10853_1) << "Try to save empty MemSlice to send buffer.";
return;
}
#endif

#if OPT_WBUFF
SaveStreamDatav(std::string_view(slice.data(), slice.length()));
#else
size_t length = slice.length();
// Need to start the offsets at the right interval.
if (interval_deque_.Empty()) {
const QuicStreamOffset end = stream_offset_ + length;
current_end_offset_ = std::max(current_end_offset_, end);
}
BufferedSlice bs = BufferedSlice(std::move(slice), stream_offset_);
interval_deque_.PushBack(std::move(bs));
stream_offset_ += length;
#endif

SaveStreamData(std::string_view(slice.data(), slice.length()));
}

QuicByteCount QuicStreamSendBuffer::SaveMemSliceSpan(
Expand All @@ -166,7 +126,7 @@ void QuicStreamSendBuffer::OnStreamDataConsumed(size_t bytes_consumed) {
stream_bytes_outstanding_ += bytes_consumed;
}

bool QuicStreamSendBuffer::WriteStreamDatav(QuicStreamOffset stream_offset,
bool QuicStreamSendBuffer::WriteStreamData(QuicStreamOffset stream_offset,
QuicByteCount data_length,
QuicDataWriter* writer) {
QUIC_BUG_IF(quic_bug_12823_1, current_end_offset_ < stream_offset)
Expand Down Expand Up @@ -205,41 +165,6 @@ bool QuicStreamSendBuffer::WriteStreamDatav(QuicStreamOffset stream_offset,
return false;
}

bool QuicStreamSendBuffer::WriteStreamData(QuicStreamOffset offset,
QuicByteCount data_length,
QuicDataWriter* writer) {
QUIC_BUG_IF(quic_bug_12823_1, current_end_offset_ < offset)
<< "Tried to write data out of sequence. last_offset_end:"
<< current_end_offset_ << ", offset:" << offset;
// The iterator returned from |interval_deque_| will automatically advance
// the internal write index for the QuicIntervalDeque. The incrementing is
// done in operator++.
QUICHE_DCHECK(data_length);
#if OPT_WBUFF
return WriteStreamDatav(offset, data_length, writer);
#else
for (auto slice_it = interval_deque_.DataAt(offset);
slice_it != interval_deque_.DataEnd(); ++slice_it) {

QuicByteCount slice_offset = offset - slice_it->offset;
QUICHE_DCHECK((int64_t)slice_offset >= 0);

QuicByteCount available_bytes_in_slice =
slice_it->slice.length() - slice_offset;
QuicByteCount copy_length = std::min(data_length, available_bytes_in_slice);
writer->WriteBytes(slice_it->slice.data() + slice_offset, copy_length);
offset += copy_length;
data_length -= copy_length;
const QuicStreamOffset new_end =
slice_it->offset + slice_it->slice.length();
current_end_offset_ = std::max(current_end_offset_, new_end);
if (data_length == 0)
return true;
}
return data_length == 0;
#endif
}

bool QuicStreamSendBuffer::OnStreamDataAcked(
QuicStreamOffset offset, QuicByteCount data_length,
QuicByteCount* newly_acked_length) {
Expand All @@ -260,7 +185,7 @@ bool QuicStreamSendBuffer::OnStreamDataAcked(
// QUICHE_DCHECK(pending_retransmissions_.Empty() || !pending_retransmissions_.SpanningInterval().Intersects(off));
if (!pending_retransmissions_.Empty())
pending_retransmissions_.Difference(off);
return FreeMemSlicesv(offset, ending_offset);
return FreeMemSlices(offset, ending_offset);
}
else if (offset > lmax) {
// Optimization for the typical case, hole happend.
Expand All @@ -283,7 +208,7 @@ bool QuicStreamSendBuffer::OnStreamDataAcked(
stream_bytes_outstanding_ -= data_length;
if (!pending_retransmissions_.Empty())
pending_retransmissions_.Difference(off);
return FreeMemSlicesv(offset, ending_offset);
return FreeMemSlices(offset, ending_offset);
}
// Exit if dupliacted
else if (bytes_acked_.Contains(off)) {
Expand Down Expand Up @@ -356,7 +281,7 @@ StreamPendingRetransmission QuicStreamSendBuffer::NextPendingRetransmission()
return {0, 0};
}

bool QuicStreamSendBuffer::FreeMemSlicesv(QuicStreamOffset start, QuicStreamOffset end) {
bool QuicStreamSendBuffer::FreeMemSlices(QuicStreamOffset start, QuicStreamOffset end) {
if (end < stream_bytes_start_ + kBlockSizeBytes)
return true;

Expand All @@ -376,67 +301,11 @@ bool QuicStreamSendBuffer::FreeMemSlicesv(QuicStreamOffset start, QuicStreamOffs
return true;
}

bool QuicStreamSendBuffer::FreeMemSlices(QuicStreamOffset start,
QuicStreamOffset end) {
#if OPT_WBUFF == 0
auto it = interval_deque_.DataBegin();
if (it == interval_deque_.DataEnd() || it->slice.empty()) {
QUIC_BUG(quic_bug_10853_4)
<< "Trying to ack stream data [" << start << ", " << end << "), "
<< (it == interval_deque_.DataEnd()
? "and there is no outstanding data."
: "and the first slice is empty.");
return false;
}
if (!it->interval().Contains(start)) {
// Slow path that not the earliest outstanding data gets acked.
it = std::lower_bound(interval_deque_.DataBegin(),
interval_deque_.DataEnd(), start, CompareOffset());
}
if (it == interval_deque_.DataEnd() || it->slice.empty()) {
QUIC_BUG(quic_bug_10853_5)
<< "Offset " << start << " with iterator offset: " << it->offset
<< (it == interval_deque_.DataEnd() ? " does not exist."
: " has already been acked.");
return false;
}
for (; it != interval_deque_.DataEnd(); ++it) {
if (it->offset >= end) {
break;
}
if (!it->slice.empty() &&
bytes_acked_.Contains(it->offset, it->offset + it->slice.length())) {
it->slice.Reset();
}
}
CleanUpBufferedSlices();
#endif
return true;
}

void QuicStreamSendBuffer::CleanUpBufferedSlices() {
#if OPT_WBUFF == 0
while (!interval_deque_.Empty() &&
interval_deque_.DataBegin()->slice.empty()) {
QUIC_BUG_IF(quic_bug_12823_2,
interval_deque_.DataBegin()->offset > current_end_offset_)
<< "Fail to pop front from interval_deque_. Front element contained "
"a slice whose data has not all be written. Front offset "
<< interval_deque_.DataBegin()->offset << " length "
<< interval_deque_.DataBegin()->slice.length();
interval_deque_.PopFront();
}
#endif
}

bool QuicStreamSendBuffer::IsStreamDataOutstanding(
QuicStreamOffset offset, QuicByteCount data_length) const {
QUICHE_DCHECK(data_length);
return //data_length > 0 &&
!bytes_acked_.Contains(offset, offset + data_length);
}

#if OPT_WBUFF == 0
size_t QuicStreamSendBuffer::size() const { return interval_deque_.Size(); }
#endif
} // namespace quic
Loading

0 comments on commit f2b1b65

Please sign in to comment.