-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Idle tracking in gate way #1
base: main
Are you sure you want to change the base?
Conversation
Summary: Test Plan: Reviewers: Subscribers: Tasks: Tags:
Summary: Test Plan: Reviewers: Subscribers: Tasks: Tags:
Summary: Test Plan: Reviewers: Subscribers: Tasks: Tags:
Summary: Test Plan: Reviewers: Subscribers: Tasks: Tags:
Summary: Test Plan: Reviewers: Subscribers: Tasks: Tags:
Summary: Test Plan: Reviewers: Subscribers: Tasks: Tags:
a30e7c7
to
6765812
Compare
Summary: Test Plan: Reviewers: Subscribers: Tasks: Tags:
6765812
to
ed22220
Compare
This is getting in the right direction. I would also like to see if we can make this functionality enabled only in debug builds (behind debug_assertions cfg flag) |
@@ -40,6 +40,8 @@ struct State { | |||
write_ready: Option<Waker>, | |||
/// Another entity to wake when the buffer is read from. | |||
stream_ready: Option<Waker>, | |||
/// record whether the sender is idle recently | |||
idle: bool, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is possible to generalize this by leveraging Rust's trait system
trait Observable {
fn is_changed(&self) -> bool;
}
impl <K: Hash + Eq, V> Observable for DashMap<K, Watcher<V>> {
fn is_changed(&self) -> bool {
self.iter().any(|v| v.is_touched())
}
}
/// Tracks whenever someone accesses `T` by a shared reference. If it happens, sets `accessed` to
/// `true` until someone calls `is_touched`.
struct Watcher<T> {
inner: T,
touched: Arc<AtomicBool>,
}
impl <T> Watcher<T> {
fn watch(inner: T) -> Self {
Self {
inner,
touched: Arc::new(AtomicBool::new(true)),
}
}
fn is_touched(&self) -> bool {
self.touched.swap(false, Ordering::Relaxed)
}
}
impl <T> Deref for Watcher<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
self.touched.fetch_or(true, Ordering::Relaxed);
&self.inner
}
}
impl <T: Default> Default for Watcher<T> {
fn default() -> Self {
Self::watch(T::default())
}
}
impl <T: Clone> Clone for Watcher<T> {
fn clone(&self) -> Self {
Self {
inner: T::clone(&self.inner),
touched: self.touched.clone(),
}
}
}
then you just implement it for senders and receivers
impl Observable for Watcher<GatewaySenders> {
fn is_changed(&self) -> bool {
self.is_touched() || self.inner.inner.is_changed()
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that Deref
allows us to track when someone calls get_channel
or send/receive
on that channel, but not the future poll like you implemented. However I think that may be enough because send and receive operations are cheap and happen many times per report/step.
self.idle | ||
} | ||
|
||
fn reset_idle(&mut self) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is only one client for this API, so is_idle
can just reset the flag
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point!
src/helpers/gateway/mod.rs
Outdated
@@ -40,8 +40,9 @@ pub type ReceivingEnd<M> = ReceivingEndBase<TransportImpl, M>; | |||
pub struct Gateway<T: Transport = TransportImpl> { | |||
config: GatewayConfig, | |||
transport: RoleResolvingTransport<T>, | |||
senders: GatewaySenders, | |||
receivers: GatewayReceivers<T>, | |||
senders: Arc<Mutex<GatewaySenders>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
both GatewaySenders
and GatewayReceivers
are Send + Sync, so they don't need to be protected by a mutex. You could just slap #derive(Clone)
on them
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. That's much more elegant.
src/helpers/gateway/mod.rs
Outdated
let senders_clone = Arc::clone(&senders); | ||
let receivers = Arc::new(Mutex::new( GatewayReceivers::default())); | ||
let receivers_clone = Arc::clone(&receivers); | ||
thread::spawn(move|| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any reason for spawning a thread instead of a tokio task?
src/helpers/gateway/mod.rs
Outdated
thread::spawn(move|| { | ||
// Perform some periodic work in the background | ||
loop { | ||
thread::sleep(Duration::from_secs(1)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
given that it is for debug purposes, timeout could be higher to avoid false positives (5-10 secs)
src/helpers/gateway/mod.rs
Outdated
senders.reset_idle(); | ||
receivers.reset_idle(); | ||
} | ||
match rx.try_recv() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should just keep the handle returned by spawn method inside the gateway. In this case we don't need a sender to terminate it, we just call abort inside Drop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good
src/helpers/gateway/send.rs
Outdated
@@ -69,13 +71,24 @@ impl GatewaySender { | |||
// TODO: make OrderingSender::send fallible | |||
// TODO: test channel close | |||
let i = usize::from(record_id); | |||
self.pending_records.lock().unwrap().insert(i); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is actually harder than that. Protocols may hang even before they call send and this approach does not seem to let us detect that.
If we look at send buffer, it is essentially a contiguous region in memory that is capable of storing N
homogenious messages [_, _, _, _, _, _]. There is a pending window of records we want to send at each step.
For example, if buffer size is 10 messages x 4 bytes each, OrderingSender will have a buffer of size 40 and first will accept records 0..9. Once it receives them all, it flushes it down to transport and moves the window to recrods 10..19, etc.
Our job here is to figure out the current window (start, end) and figure out the gaps inside that buffer. Gaps are records that are never attempted to be sent (not inside State
/waiting
) and then print them
Ideal output would be something like this:
step: A, waiting to send: 14, 17
step B, waiting to send: 3, 7, 8
step C: waiting to receive: 0, 1, 2, 3, 4, 5, 6, 7, 8, 9
Summary: Test Plan: Reviewers: Subscribers: Tasks: Tags:
Summary: Test Plan: Reviewers: Subscribers: Tasks: Tags:
Summary: Test Plan: Reviewers: Subscribers: Tasks: Tags:
Summary: Test Plan: Reviewers: Subscribers: Tasks: Tags:
Summary: Test Plan: Reviewers: Subscribers: Tasks: Tags:
Summary: Test Plan: Reviewers: Subscribers: Tasks: Tags:
src/helpers/gateway/mod.rs
Outdated
@@ -116,6 +122,33 @@ impl<T: Transport> Gateway<T> { | |||
.get_or_create(channel_id, || self.transport.receive(channel_id)), | |||
) | |||
} | |||
|
|||
fn create_idle_tracker(senders: Arc<GatewaySenders>, receivers: Arc<GatewayReceivers<T>>) -> Option<tokio::task::JoinHandle<()>> { | |||
if !cfg!(debug_assertions) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#[cfg(debug_assertions)]
is better as you don't need to handle it in runtime
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good
state.check_idle_and_reset() | ||
} | ||
|
||
pub fn get_status(&self) ->(usize /*next*/,usize/* current written */, usize /* total buffer size */) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a new type would explain it better imo
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
@@ -143,6 +144,9 @@ where | |||
/// end-to-end back pressure for tasks that do not involve sending at all. | |||
overflow_wakers: Vec<Waker>, | |||
_marker: PhantomData<C>, | |||
|
|||
idle: bool, | |||
messages_waiting_for_receive: Arc<DashSet<usize>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you derive it from wakers
? whenever someone calls receive
and polls it, we either resolve it immediately, or register a waker.
note that some wakers will land in overflow_wakers
initially but it shouldn't be a problem as they all eventually move to wakers
. we can do something about overflow_wakers
later
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
src/helpers/gateway/mod.rs
Outdated
@@ -40,8 +41,9 @@ pub type ReceivingEnd<M> = ReceivingEndBase<TransportImpl, M>; | |||
pub struct Gateway<T: Transport = TransportImpl> { | |||
config: GatewayConfig, | |||
transport: RoleResolvingTransport<T>, | |||
senders: GatewaySenders, | |||
receivers: GatewayReceivers<T>, | |||
senders: Arc<GatewaySenders>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I think this is needed. We need to ensure that the copies owned by the idle tracking task can reflect the updates in the original ones. Without Arc
, Clone
doesn't seem to accomplish that.
@@ -201,9 +206,12 @@ where | |||
|
|||
/// Poll for the next record. This should only be invoked when | |||
/// the future for the next message is polled. | |||
fn poll_next<M: Message>(&mut self, cx: &mut Context<'_>) -> Poll<Result<M, Error>> { | |||
fn poll_next<M: Message>(&mut self, i:usize, cx: &mut Context<'_>) -> Poll<Result<M, Error>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this feels wrong because it should know the index (wake_next())
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah I agree. Removed.
src/helpers/gateway/mod.rs
Outdated
@@ -40,8 +41,9 @@ pub type ReceivingEnd<M> = ReceivingEndBase<TransportImpl, M>; | |||
pub struct Gateway<T: Transport = TransportImpl> { | |||
config: GatewayConfig, | |||
transport: RoleResolvingTransport<T>, | |||
senders: GatewaySenders, | |||
receivers: GatewayReceivers<T>, | |||
senders: Arc<GatewaySenders>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I think this is needed. We need to ensure that the copies owned by the idle tracking task can reflect the updates in the original ones. Without Arc
, Clone
doesn't seem to accomplish that.
Summary: Test Plan: Reviewers: Subscribers: Tasks: Tags:
Summary: Test Plan: Reviewers: Subscribers: Tasks: Tags:
Summary: Test Plan: Reviewers: Subscribers: Tasks: Tags:
Summary: Test Plan: Reviewers: Subscribers: Tasks: Tags:
Summary: Test Plan: Reviewers: Subscribers: Tasks: Tags:
src/helpers/gateway/mod.rs
Outdated
// TODO: print something | ||
let sender_message =senders.get_all_missing_records(); | ||
if !sender_message.is_empty() { | ||
println!("{:?}: Idle: waiting to send messages: {:?}", thread::current().id(), sender_message); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thread id does not give us much, so we can omit it. tokio tasks are run on a thread pool
also, tracing::warn!
is better than println!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it
src/helpers/gateway/send.rs
Outdated
@@ -36,18 +36,26 @@ pub(super) struct GatewaySender { | |||
channel_id: ChannelId, | |||
ordering_tx: OrderingSender, | |||
total_records: TotalRecords, | |||
|
|||
message_size: usize, | |||
pending_records: Mutex<HashSet<usize>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you seem to track indices inside the ordering sender, do you still need this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
src/helpers/gateway/mod.rs
Outdated
let _ = tokio::time::sleep(Duration::from_secs(5)).await; | ||
if senders.check_idle_and_reset() && receivers.check_idle_and_reset() { | ||
// TODO: print something | ||
let sender_message =senders.get_all_missing_records(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets delegate the responsibility of making the string here. i.e. the API should probably look like this
let missing_records = senders.get_pending(); // fn get_pending(&self) -> impl Iterator<item = usize>
let str = missing_records.map(...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure
src/helpers/gateway/mod.rs
Outdated
|
||
impl<T: Transport> Drop for Gateway<T> { | ||
fn drop(&mut self) { | ||
#[cfg(debug_assertions)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moving it to line 151 will make it cleaner imo
src/helpers/gateway/mod.rs
Outdated
@@ -116,6 +126,38 @@ impl<T: Transport> Gateway<T> { | |||
.get_or_create(channel_id, || self.transport.receive(channel_id)), | |||
) | |||
} | |||
|
|||
fn create_idle_tracker(senders: Arc<GatewaySenders>, receivers: Arc<GatewayReceivers<T>>) -> Option<tokio::task::JoinHandle<()>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should return JoinHandle
and wrap it in Option in constructor
src/helpers/gateway/mod.rs
Outdated
@@ -59,6 +61,13 @@ impl<T: Transport> Gateway<T> { | |||
roles: RoleAssignment, | |||
transport: T, | |||
) -> Self { | |||
let senders = Arc::new(GatewaySenders::default()); | |||
let receivers = Arc::new(GatewayReceivers::default()); | |||
let mut handle :Option<tokio::task::JoinHandle<()>> = None; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let handle = if cfg!(debug_assertions) { Some(Self::create(...)) } else { None }
can we consider working with GatewaySenders
rather than Arc<GatewaySenders>
if debug assertions are disabled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Moved Arc to debug only
src/helpers/buffers/mod.rs
Outdated
@@ -38,3 +38,49 @@ mod waiting { | |||
} | |||
} | |||
} | |||
|
|||
pub trait GatherWaitingMessage { | |||
fn compress_numbers(&self, numbers: &[usize]) -> String { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would something like this work here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah it works.
Summary: Test Plan: Reviewers: Subscribers: Tasks: Tags:
Summary: Test Plan: Reviewers: Subscribers: Tasks: Tags:
Summary: Test Plan: Reviewers: Subscribers: Tasks: Tags:
Summary: Test Plan: Reviewers: Subscribers: Tasks: Tags:
Summary: Test Plan: Reviewers: Subscribers: Tasks: Tags:
@@ -87,6 +87,7 @@ tracing-subscriber = { version = "0.3.17", features = ["env-filter"] } | |||
typenum = "1.16" | |||
# hpke is pinned to it | |||
x25519-dalek = "2.0.0-pre.0" | |||
itertools = "0.11.0" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what functionality is required from this crate? we typically avoid pulling such heavy crates into our deps
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's for the "group_by" function (to fold the indexes).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer if we don't pull it if possible
@@ -97,7 +100,7 @@ impl State { | |||
if self.written > 0 && (self.written + self.spare.get() >= self.buf.len() || self.closed) { | |||
let v = self.buf[..self.written].to_vec(); | |||
self.written = 0; | |||
|
|||
self.idle = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just noticed that it is not 100% accurate. if buffers are very large, take
won't happen for quite some time while there will be many write
calls indicating that buffer is not idle.
Having said that, have you explored the avenue of letting code outside observe the buffer state and decide whether it is idle or no? It feels a much more scalable approach that does not require "observable" state to be plugged in buffer's code. It can be done with Deref polymorphism (wrapper) or by exploring buffer's internal state and comparing it with the latest snapshot. For send buffer it could be as simple as counting messages written to it and keeping this number as the state.
@@ -197,6 +214,23 @@ impl Waiting { | |||
fn wake(&self, i: usize) { | |||
self.shard(i).wake(i); | |||
} | |||
|
|||
fn get_waiting_messages(&self) -> Vec<usize> { | |||
let mut waiting_messages = self.shards[0].lock().unwrap().get_waiting_messages(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure I understand how it works now. This returns messages waiting to be sent, so from the protocol's standpoint everything is done correctly - they sent the message. What we are interested at is messages that infra hasn't received from protocols yet (gaps)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We want to figure out what are missing - I get that by subtracting the "not missing messages" from the set of all messages.
src/helpers/gateway/mod.rs
Outdated
if senders.check_idle_and_reset() && receivers.check_idle_and_reset() { | ||
let sender_missing_records =senders.get_all_missing_records(); | ||
let sender_message : HashMap<&ChannelId, String> = sender_missing_records.iter().map(|entry|{ | ||
let (SenderStatus {next, current_written , buf_size, message_size},missing_records) = entry.1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see where you use sender status struct fields, is it intentional or I am missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They are used in the next few lines. E.g. let chunk_head = next - current_written / message_size;
uses next
, etc
src/helpers/gateway/mod.rs
Outdated
} | ||
} | ||
|
||
(entry.0, response) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why can't you print it right away instead of collecting?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because I want to print some hint first (e.g. ""Idle: waiting to send messages:\n") before printing the actual messages to increase readability. Printing right away will make it not so straight-forward to decide whether to print the hint.
src/helpers/gateway/mod.rs
Outdated
} | ||
} | ||
if chunk_response.len() > 0 { | ||
response = response + &chunk_response_head + &Self::compress_numbers(&chunk_response) + " if not closed early.\n"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is a bit hard to read - would it make sense to do this
- ask sender buffer to give us all records ids that are missing
- convert
Vec<usize>
toVec<MissingRange<usize>>
using the contiguous range algo - spit out
Vec<MissingRange<usize>>
to logging system using the debug formatter `{:?}'
and the missing range is just a wrapper around standard Range
struct to allow custom formatting
struct MissingRange(Range<usize>);
impl Debug for MissingRange {
.. // format start..end as [start..end]
// if start == end format as [start]
}
the more stuff we pull out of this function, the easier it will be to understand what is going on here
src/helpers/gateway/mod.rs
Outdated
type ReceiverType<T> = Arc<GatewayReceivers<T>>; | ||
|
||
#[cfg(not(debug_assertions))] | ||
type SenderType = Arc<GatewaySenders>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is exactly the same as with debug assertions
Summary: Test Plan: Reviewers: Subscribers: Tasks: Tags:
Summary: Test Plan: Reviewers: Subscribers: Tasks: Tags:
Summary: Test Plan: Reviewers: Subscribers: Tasks: Tags:
Summary: Test Plan: Reviewers: Subscribers: Tasks: Tags:
Summary: Test Plan: Reviewers: Subscribers: Tasks: Tags:
Summary: Test Plan: Reviewers: Subscribers: Tasks: Tags:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we are very close to publish this. Lets make sure we document this code, format it run ./pre-commit
hook to see errors.
Encapsulation topic (see below) would be great to address
impl<S, C> IdleTrackOperatingState<S, C> where | ||
S: Stream<Item = C> + Send, | ||
C: AsRef<[u8]>, { | ||
fn new(stream: Pin<Box<S>>, next: usize,spare: Spare,wakers: Vec<Option<Waker>>,overflow_wakers: Vec<Waker>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just implement From<S> for IdleTrackOperatingState
so you don't need to pass stuff through the constructor
src/helpers/gateway/receive.rs
Outdated
@@ -14,8 +14,9 @@ pub struct ReceivingEnd<T: Transport, M: Message> { | |||
} | |||
|
|||
/// Receiving channels, indexed by (role, step). | |||
#[derive(Clone)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we wrap it into Arc
, so no more need to clone I think
src/helpers/gateway/receive.rs
Outdated
pub(super) struct GatewayReceivers<T: Transport> { | ||
inner: DashMap<ChannelId, UR<T>>, | ||
pub inner: DashMap<ChannelId, UR<T>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you probably don't need it as well
self.state.lock().unwrap().check_idle_and_reset() | ||
} | ||
|
||
pub fn get_missing_messages(&self) -> Vec<LoggingRanges> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
given that you have a wrapper now, we could move all this logging helper methods to the wrapper itself. that will provide nice encapsulation as we would not need to change send/receive buffer code at all
Summary: Test Plan: Reviewers: Subscribers: Tasks: Tags:
Summary: Test Plan: Reviewers: Subscribers: Tasks: Tags:
Summary: Test Plan: Reviewers: Subscribers: Tasks: Tags:
Summary: Test Plan: Reviewers: Subscribers: Tasks: Tags:
Summary: Test Plan: Reviewers: Subscribers: Tasks: Tags:
Summary: Test Plan: Reviewers: Subscribers: Tasks: Tags:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there are some things I think we need to address, but it should be good enough as first draft. We can get more feedback from other folks if we publish this to main repo
src/helpers/gateway/mod.rs
Outdated
) -> tokio::task::JoinHandle<()> { | ||
tokio::task::spawn(async move { | ||
// Perform some periodic work in the background | ||
#[cfg(debug_assertions)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#[cfg(debug_assertions)] |
No description provided.