Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Idle tracking in gate way #1

Open
wants to merge 38 commits into
base: main
Choose a base branch
from
Open

Conversation

RuiyuZhu
Copy link
Owner

@RuiyuZhu RuiyuZhu commented Jul 13, 2023

No description provided.

Ruiyu Zhu added 6 commits July 13, 2023 10:48
Summary:

Test Plan:

Reviewers:

Subscribers:

Tasks:

Tags:
Summary:

Test Plan:

Reviewers:

Subscribers:

Tasks:

Tags:
Summary:

Test Plan:

Reviewers:

Subscribers:

Tasks:

Tags:
Summary:

Test Plan:

Reviewers:

Subscribers:

Tasks:

Tags:
Summary:

Test Plan:

Reviewers:

Subscribers:

Tasks:

Tags:
Summary:

Test Plan:

Reviewers:

Subscribers:

Tasks:

Tags:
Summary:

Test Plan:

Reviewers:

Subscribers:

Tasks:

Tags:
@akoshelev
Copy link

This is getting in the right direction. I would also like to see if we can make this functionality enabled only in debug builds (behind debug_assertions cfg flag)

@@ -40,6 +40,8 @@ struct State {
write_ready: Option<Waker>,
/// Another entity to wake when the buffer is read from.
stream_ready: Option<Waker>,
/// record whether the sender is idle recently
idle: bool,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is possible to generalize this by leveraging Rust's trait system

trait Observable {
    fn is_changed(&self) -> bool;
}

impl <K: Hash + Eq, V> Observable for DashMap<K, Watcher<V>> {
    fn is_changed(&self) -> bool {
        self.iter().any(|v| v.is_touched())
    }
}

/// Tracks whenever someone accesses `T` by a shared reference. If it happens, sets `accessed` to
/// `true` until someone calls `is_touched`.
struct Watcher<T> {
    inner: T,
    touched: Arc<AtomicBool>,
}

impl <T> Watcher<T> {
    fn watch(inner: T) -> Self {
        Self {
            inner,
            touched: Arc::new(AtomicBool::new(true)),
        }
    }
    
    fn is_touched(&self) -> bool {
        self.touched.swap(false, Ordering::Relaxed)
    }
}

impl <T> Deref for Watcher<T> {
    type Target = T;

    fn deref(&self) -> &Self::Target {
        self.touched.fetch_or(true, Ordering::Relaxed);
        &self.inner
    }
}

impl <T: Default> Default for Watcher<T> {
    fn default() -> Self {
        Self::watch(T::default())
    }
}

impl <T: Clone> Clone for Watcher<T> {
    fn clone(&self) -> Self {
        Self {
            inner: T::clone(&self.inner),
            touched: self.touched.clone(),
        }
    }
}

then you just implement it for senders and receivers

impl Observable for Watcher<GatewaySenders> {
    fn is_changed(&self) -> bool {
        self.is_touched() || self.inner.inner.is_changed()
    }
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that Deref allows us to track when someone calls get_channel or send/receive on that channel, but not the future poll like you implemented. However I think that may be enough because send and receive operations are cheap and happen many times per report/step.

self.idle
}

fn reset_idle(&mut self) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is only one client for this API, so is_idle can just reset the flag

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point!

@@ -40,8 +40,9 @@ pub type ReceivingEnd<M> = ReceivingEndBase<TransportImpl, M>;
pub struct Gateway<T: Transport = TransportImpl> {
config: GatewayConfig,
transport: RoleResolvingTransport<T>,
senders: GatewaySenders,
receivers: GatewayReceivers<T>,
senders: Arc<Mutex<GatewaySenders>>,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

both GatewaySenders and GatewayReceivers are Send + Sync, so they don't need to be protected by a mutex. You could just slap #derive(Clone) on them

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. That's much more elegant.

let senders_clone = Arc::clone(&senders);
let receivers = Arc::new(Mutex::new( GatewayReceivers::default()));
let receivers_clone = Arc::clone(&receivers);
thread::spawn(move|| {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason for spawning a thread instead of a tokio task?

thread::spawn(move|| {
// Perform some periodic work in the background
loop {
thread::sleep(Duration::from_secs(1));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given that it is for debug purposes, timeout could be higher to avoid false positives (5-10 secs)

senders.reset_idle();
receivers.reset_idle();
}
match rx.try_recv()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should just keep the handle returned by spawn method inside the gateway. In this case we don't need a sender to terminate it, we just call abort inside Drop

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good

@@ -69,13 +71,24 @@ impl GatewaySender {
// TODO: make OrderingSender::send fallible
// TODO: test channel close
let i = usize::from(record_id);
self.pending_records.lock().unwrap().insert(i);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is actually harder than that. Protocols may hang even before they call send and this approach does not seem to let us detect that.

If we look at send buffer, it is essentially a contiguous region in memory that is capable of storing N homogenious messages [_, _, _, _, _, _]. There is a pending window of records we want to send at each step.

For example, if buffer size is 10 messages x 4 bytes each, OrderingSender will have a buffer of size 40 and first will accept records 0..9. Once it receives them all, it flushes it down to transport and moves the window to recrods 10..19, etc.

Our job here is to figure out the current window (start, end) and figure out the gaps inside that buffer. Gaps are records that are never attempted to be sent (not inside State/waiting) and then print them

Ideal output would be something like this:

step: A, waiting to send: 14, 17
step B, waiting to send: 3, 7, 8
step C: waiting to receive: 0, 1, 2, 3, 4, 5, 6, 7, 8, 9

Ruiyu Zhu added 6 commits July 14, 2023 10:37
Summary:

Test Plan:

Reviewers:

Subscribers:

Tasks:

Tags:
Summary:

Test Plan:

Reviewers:

Subscribers:

Tasks:

Tags:
Summary:

Test Plan:

Reviewers:

Subscribers:

Tasks:

Tags:
Summary:

Test Plan:

Reviewers:

Subscribers:

Tasks:

Tags:
Summary:

Test Plan:

Reviewers:

Subscribers:

Tasks:

Tags:
Summary:

Test Plan:

Reviewers:

Subscribers:

Tasks:

Tags:
@@ -116,6 +122,33 @@ impl<T: Transport> Gateway<T> {
.get_or_create(channel_id, || self.transport.receive(channel_id)),
)
}

fn create_idle_tracker(senders: Arc<GatewaySenders>, receivers: Arc<GatewayReceivers<T>>) -> Option<tokio::task::JoinHandle<()>> {
if !cfg!(debug_assertions) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#[cfg(debug_assertions)] is better as you don't need to handle it in runtime

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good

state.check_idle_and_reset()
}

pub fn get_status(&self) ->(usize /*next*/,usize/* current written */, usize /* total buffer size */) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a new type would explain it better imo

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

@@ -143,6 +144,9 @@ where
/// end-to-end back pressure for tasks that do not involve sending at all.
overflow_wakers: Vec<Waker>,
_marker: PhantomData<C>,

idle: bool,
messages_waiting_for_receive: Arc<DashSet<usize>>,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you derive it from wakers? whenever someone calls receive and polls it, we either resolve it immediately, or register a waker.

note that some wakers will land in overflow_wakers initially but it shouldn't be a problem as they all eventually move to wakers. we can do something about overflow_wakers later

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -40,8 +41,9 @@ pub type ReceivingEnd<M> = ReceivingEndBase<TransportImpl, M>;
pub struct Gateway<T: Transport = TransportImpl> {
config: GatewayConfig,
transport: RoleResolvingTransport<T>,
senders: GatewaySenders,
receivers: GatewayReceivers<T>,
senders: Arc<GatewaySenders>,
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I think this is needed. We need to ensure that the copies owned by the idle tracking task can reflect the updates in the original ones. Without Arc, Clone doesn't seem to accomplish that.

@@ -201,9 +206,12 @@ where

/// Poll for the next record. This should only be invoked when
/// the future for the next message is polled.
fn poll_next<M: Message>(&mut self, cx: &mut Context<'_>) -> Poll<Result<M, Error>> {
fn poll_next<M: Message>(&mut self, i:usize, cx: &mut Context<'_>) -> Poll<Result<M, Error>> {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this feels wrong because it should know the index (wake_next())

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I agree. Removed.

@@ -40,8 +41,9 @@ pub type ReceivingEnd<M> = ReceivingEndBase<TransportImpl, M>;
pub struct Gateway<T: Transport = TransportImpl> {
config: GatewayConfig,
transport: RoleResolvingTransport<T>,
senders: GatewaySenders,
receivers: GatewayReceivers<T>,
senders: Arc<GatewaySenders>,
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I think this is needed. We need to ensure that the copies owned by the idle tracking task can reflect the updates in the original ones. Without Arc, Clone doesn't seem to accomplish that.

Ruiyu Zhu added 5 commits July 18, 2023 09:57
Summary:

Test Plan:

Reviewers:

Subscribers:

Tasks:

Tags:
Summary:

Test Plan:

Reviewers:

Subscribers:

Tasks:

Tags:
Summary:

Test Plan:

Reviewers:

Subscribers:

Tasks:

Tags:
Summary:

Test Plan:

Reviewers:

Subscribers:

Tasks:

Tags:
Summary:

Test Plan:

Reviewers:

Subscribers:

Tasks:

Tags:
// TODO: print something
let sender_message =senders.get_all_missing_records();
if !sender_message.is_empty() {
println!("{:?}: Idle: waiting to send messages: {:?}", thread::current().id(), sender_message);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thread id does not give us much, so we can omit it. tokio tasks are run on a thread pool
also, tracing::warn! is better than println!

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it

@@ -36,18 +36,26 @@ pub(super) struct GatewaySender {
channel_id: ChannelId,
ordering_tx: OrderingSender,
total_records: TotalRecords,

message_size: usize,
pending_records: Mutex<HashSet<usize>>,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you seem to track indices inside the ordering sender, do you still need this?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

let _ = tokio::time::sleep(Duration::from_secs(5)).await;
if senders.check_idle_and_reset() && receivers.check_idle_and_reset() {
// TODO: print something
let sender_message =senders.get_all_missing_records();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets delegate the responsibility of making the string here. i.e. the API should probably look like this

   let missing_records = senders.get_pending(); // fn get_pending(&self) -> impl Iterator<item = usize>
   let str = missing_records.map(...)

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure


impl<T: Transport> Drop for Gateway<T> {
fn drop(&mut self) {
#[cfg(debug_assertions)]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moving it to line 151 will make it cleaner imo

@@ -116,6 +126,38 @@ impl<T: Transport> Gateway<T> {
.get_or_create(channel_id, || self.transport.receive(channel_id)),
)
}

fn create_idle_tracker(senders: Arc<GatewaySenders>, receivers: Arc<GatewayReceivers<T>>) -> Option<tokio::task::JoinHandle<()>> {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should return JoinHandle and wrap it in Option in constructor

@@ -59,6 +61,13 @@ impl<T: Transport> Gateway<T> {
roles: RoleAssignment,
transport: T,
) -> Self {
let senders = Arc::new(GatewaySenders::default());
let receivers = Arc::new(GatewayReceivers::default());
let mut handle :Option<tokio::task::JoinHandle<()>> = None;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let handle = if cfg!(debug_assertions) { Some(Self::create(...)) } else { None }

can we consider working with GatewaySenders rather than Arc<GatewaySenders> if debug assertions are disabled?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Moved Arc to debug only

@@ -38,3 +38,49 @@ mod waiting {
}
}
}

pub trait GatherWaitingMessage {
fn compress_numbers(&self, numbers: &[usize]) -> String {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would something like this work here?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it works.

Ruiyu Zhu added 4 commits July 19, 2023 12:23
Summary:

Test Plan:

Reviewers:

Subscribers:

Tasks:

Tags:
Summary:

Test Plan:

Reviewers:

Subscribers:

Tasks:

Tags:
Summary:

Test Plan:

Reviewers:

Subscribers:

Tasks:

Tags:
Summary:

Test Plan:

Reviewers:

Subscribers:

Tasks:

Tags:
Summary:

Test Plan:

Reviewers:

Subscribers:

Tasks:

Tags:
@@ -87,6 +87,7 @@ tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
typenum = "1.16"
# hpke is pinned to it
x25519-dalek = "2.0.0-pre.0"
itertools = "0.11.0"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what functionality is required from this crate? we typically avoid pulling such heavy crates into our deps

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's for the "group_by" function (to fold the indexes).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer if we don't pull it if possible

@@ -97,7 +100,7 @@ impl State {
if self.written > 0 && (self.written + self.spare.get() >= self.buf.len() || self.closed) {
let v = self.buf[..self.written].to_vec();
self.written = 0;

self.idle = false;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just noticed that it is not 100% accurate. if buffers are very large, take won't happen for quite some time while there will be many write calls indicating that buffer is not idle.

Having said that, have you explored the avenue of letting code outside observe the buffer state and decide whether it is idle or no? It feels a much more scalable approach that does not require "observable" state to be plugged in buffer's code. It can be done with Deref polymorphism (wrapper) or by exploring buffer's internal state and comparing it with the latest snapshot. For send buffer it could be as simple as counting messages written to it and keeping this number as the state.

@@ -197,6 +214,23 @@ impl Waiting {
fn wake(&self, i: usize) {
self.shard(i).wake(i);
}

fn get_waiting_messages(&self) -> Vec<usize> {
let mut waiting_messages = self.shards[0].lock().unwrap().get_waiting_messages();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure I understand how it works now. This returns messages waiting to be sent, so from the protocol's standpoint everything is done correctly - they sent the message. What we are interested at is messages that infra hasn't received from protocols yet (gaps)

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to figure out what are missing - I get that by subtracting the "not missing messages" from the set of all messages.

if senders.check_idle_and_reset() && receivers.check_idle_and_reset() {
let sender_missing_records =senders.get_all_missing_records();
let sender_message : HashMap<&ChannelId, String> = sender_missing_records.iter().map(|entry|{
let (SenderStatus {next, current_written , buf_size, message_size},missing_records) = entry.1;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see where you use sender status struct fields, is it intentional or I am missing something?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are used in the next few lines. E.g. let chunk_head = next - current_written / message_size; uses next, etc

}
}

(entry.0, response)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why can't you print it right away instead of collecting?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because I want to print some hint first (e.g. ""Idle: waiting to send messages:\n") before printing the actual messages to increase readability. Printing right away will make it not so straight-forward to decide whether to print the hint.

}
}
if chunk_response.len() > 0 {
response = response + &chunk_response_head + &Self::compress_numbers(&chunk_response) + " if not closed early.\n";

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is a bit hard to read - would it make sense to do this

  • ask sender buffer to give us all records ids that are missing
  • convert Vec<usize> to Vec<MissingRange<usize>> using the contiguous range algo
  • spit out Vec<MissingRange<usize>> to logging system using the debug formatter `{:?}'

and the missing range is just a wrapper around standard Range struct to allow custom formatting

struct MissingRange(Range<usize>);

impl Debug for MissingRange {
   .. // format start..end as [start..end] 
      // if start == end format as [start]
}

the more stuff we pull out of this function, the easier it will be to understand what is going on here

type ReceiverType<T> = Arc<GatewayReceivers<T>>;

#[cfg(not(debug_assertions))]
type SenderType = Arc<GatewaySenders>;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is exactly the same as with debug assertions

Ruiyu Zhu added 6 commits July 20, 2023 14:27
Summary:

Test Plan:

Reviewers:

Subscribers:

Tasks:

Tags:
Summary:

Test Plan:

Reviewers:

Subscribers:

Tasks:

Tags:
Summary:

Test Plan:

Reviewers:

Subscribers:

Tasks:

Tags:
Summary:

Test Plan:

Reviewers:

Subscribers:

Tasks:

Tags:
Summary:

Test Plan:

Reviewers:

Subscribers:

Tasks:

Tags:
Summary:

Test Plan:

Reviewers:

Subscribers:

Tasks:

Tags:
Copy link

@akoshelev akoshelev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we are very close to publish this. Lets make sure we document this code, format it run ./pre-commit hook to see errors.

Encapsulation topic (see below) would be great to address

impl<S, C> IdleTrackOperatingState<S, C> where
S: Stream<Item = C> + Send,
C: AsRef<[u8]>, {
fn new(stream: Pin<Box<S>>, next: usize,spare: Spare,wakers: Vec<Option<Waker>>,overflow_wakers: Vec<Waker>,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just implement From<S> for IdleTrackOperatingState so you don't need to pass stuff through the constructor

@@ -14,8 +14,9 @@ pub struct ReceivingEnd<T: Transport, M: Message> {
}

/// Receiving channels, indexed by (role, step).
#[derive(Clone)]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we wrap it into Arc, so no more need to clone I think

pub(super) struct GatewayReceivers<T: Transport> {
inner: DashMap<ChannelId, UR<T>>,
pub inner: DashMap<ChannelId, UR<T>>,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you probably don't need it as well

self.state.lock().unwrap().check_idle_and_reset()
}

pub fn get_missing_messages(&self) -> Vec<LoggingRanges> {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given that you have a wrapper now, we could move all this logging helper methods to the wrapper itself. that will provide nice encapsulation as we would not need to change send/receive buffer code at all

Ruiyu Zhu added 6 commits July 21, 2023 10:28
Summary:

Test Plan:

Reviewers:

Subscribers:

Tasks:

Tags:
Summary:

Test Plan:

Reviewers:

Subscribers:

Tasks:

Tags:
Summary:

Test Plan:

Reviewers:

Subscribers:

Tasks:

Tags:
Summary:

Test Plan:

Reviewers:

Subscribers:

Tasks:

Tags:
Summary:

Test Plan:

Reviewers:

Subscribers:

Tasks:

Tags:
Summary:

Test Plan:

Reviewers:

Subscribers:

Tasks:

Tags:
Copy link

@akoshelev akoshelev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are some things I think we need to address, but it should be good enough as first draft. We can get more feedback from other folks if we publish this to main repo

) -> tokio::task::JoinHandle<()> {
tokio::task::spawn(async move {
// Perform some periodic work in the background
#[cfg(debug_assertions)]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#[cfg(debug_assertions)]

Ruiyu Zhu added 3 commits July 24, 2023 21:52
Summary:

Test Plan:

Reviewers:

Subscribers:

Tasks:

Tags:
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants