-
Notifications
You must be signed in to change notification settings - Fork 222
/
retransmit_stage.rs
118 lines (107 loc) · 3.8 KB
/
retransmit_stage.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
use counter::Counter;
use blockthread::BlockThread;
use entry::Entry;
use log::Level;
use result::{Error, Result};
use service::Service;
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, AtomicUsize};
use std::sync::mpsc::RecvTimeoutError;
use std::sync::mpsc::{channel, Receiver};
use std::sync::{Arc, RwLock};
use std::thread::{self, Builder, JoinHandle};
use std::time::Duration;
use streamer::BlobReceiver;
use window::SharedWindow;
use window_service::{window_service, WindowServiceReturnType};
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum RetransmitStageReturnType {
LeaderRotation(u64),
}
fn retransmit(blockthread: &Arc<RwLock<BlockThread>>, r: &BlobReceiver, sock: &UdpSocket) -> Result<()> {
let timer = Duration::new(1, 0);
let mut dq = r.recv_timeout(timer)?;
while let Ok(mut nq) = r.try_recv() {
dq.append(&mut nq);
}
for b in &mut dq {
BlockThread::retransmit(&blockthread, b, sock)?;
}
Ok(())
}
/// Service to retransmit messages from the leader to layer 1 nodes.
/// See `blockthread` for network layer definitions.
/// # Arguments
/// * `sock` - Socket to read from. Read timeout is set to 1.
/// * `exit` - Boolean to signal system exit.
/// * `blockthread` - This structure needs to be updated and populated by the transaction_processor and via gossip.
/// * `recycler` - Blob recycler.
/// * `r` - Receive channel for blobs to be retransmitted to all the layer 1 nodes.
fn retransmitter(sock: Arc<UdpSocket>, blockthread: Arc<RwLock<BlockThread>>, r: BlobReceiver) -> JoinHandle<()> {
Builder::new()
.name("hypercube-retransmitter".to_string())
.spawn(move || {
trace!("retransmitter started");
loop {
if let Err(e) = retransmit(&blockthread, &r, &sock) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
_ => {
inc_new_counter_info!("streamer-retransmit-error", 1, 1);
}
}
}
}
trace!("exiting retransmitter");
}).unwrap()
}
pub struct RetransmitStage {
t_retransmit: JoinHandle<()>,
t_window: JoinHandle<Option<WindowServiceReturnType>>,
}
impl RetransmitStage {
pub fn new(
blockthread: &Arc<RwLock<BlockThread>>,
window: SharedWindow,
entry_height: u64,
retransmit_socket: Arc<UdpSocket>,
repair_socket: Arc<UdpSocket>,
fetch_stage_receiver: BlobReceiver,
) -> (Self, Receiver<Vec<Entry>>) {
let (retransmit_sender, retransmit_receiver) = channel();
let t_retransmit = retransmitter(retransmit_socket, blockthread.clone(), retransmit_receiver);
let (entry_sender, entry_receiver) = channel();
let done = Arc::new(AtomicBool::new(false));
let t_window = window_service(
blockthread.clone(),
window,
entry_height,
0,
fetch_stage_receiver,
entry_sender,
retransmit_sender,
repair_socket,
done,
);
(
RetransmitStage {
t_window,
t_retransmit,
},
entry_receiver,
)
}
}
impl Service for RetransmitStage {
type JoinReturnType = Option<RetransmitStageReturnType>;
fn join(self) -> thread::Result<Option<RetransmitStageReturnType>> {
self.t_retransmit.join()?;
match self.t_window.join()? {
Some(WindowServiceReturnType::LeaderRotation(entry_height)) => Ok(Some(
RetransmitStageReturnType::LeaderRotation(entry_height),
)),
_ => Ok(None),
}
}
}