Skip to content

Commit

Permalink
simple processors are working.
Browse files Browse the repository at this point in the history
  • Loading branch information
shekhar-kotekar committed Jun 16, 2024
1 parent 2671e30 commit 85c3e19
Show file tree
Hide file tree
Showing 9 changed files with 217 additions and 68 deletions.
37 changes: 37 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ rough idea about how to create and manage processors and connections between the
- remove rx from the vector of rx in receving processor
- if no other tx and rx using the channel then flush, close, delete the channel
- questions
- who will monitor list of channels and connctions?
- who will monitorg list of channels and connctions?

## References:
- NiFi docs : https://nifi.apache.org/docs/nifi-docs/html/nifi-in-depth.html#intro
1 change: 1 addition & 0 deletions commons/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub fn enable_tracing() {
.with_line_number(true)
.with_target(false)
.with_span_events(FmtSpan::ENTER | FmtSpan::CLOSE)
.with_thread_ids(true)
.finish();
tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed");
tracing::info!("Tracing enabled!");
Expand Down
3 changes: 2 additions & 1 deletion main/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ commons = { path = "../commons" }
tracing = "0.1.40"
tracing-subscriber = "0.3.18"
tokio = { version = "1.38.0", features = ["full"] }
uuid = {version = "1.8.0", features = ["v4"]}
uuid = {version = "1.8.0", features = ["v4"]}
rand = "0.8.5"
95 changes: 29 additions & 66 deletions main/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,81 +1,44 @@
use std::collections::HashMap;
use processors::{
add_one_processor,
base_processor::{Packet, Processor},
random_number_generator::RandomNumberGeneratorProcessor,
};
use tokio::sync::mpsc;

use uuid::Uuid;
mod processors;

const PROCESSOR_DEFAULT_QUEUE_LENGTH: u16 = 50;
const PROCESSOR_DEFAULT_QUEUE_LENGTH: usize = 100;

#[tokio::main]
async fn main() {
commons::enable_tracing();
let (tx, rx) = mpsc::channel::<Packet>(PROCESSOR_DEFAULT_QUEUE_LENGTH);
let mut random_number_generator_processor =
RandomNumberGeneratorProcessor::new("R1".to_string());

let mut flow_graph = FlowGraph::new();
random_number_generator_processor
.tx
.insert("AddOne".to_string(), tx.clone());

let first_processor = flow_graph.create_processor("Processor A".to_string(), Some(100));
tracing::info!("{:?}", first_processor);
first_processor.process().await;
let add_one_processor = add_one_processor::AddOneProcessor::new("Add One".to_string());

let processor_b = flow_graph.create_processor("Processor B".to_string(), None);
tracing::info!("{:?}", processor_b);
tokio::spawn(async move {
random_number_generator_processor.process(None).await;
});

flow_graph.add_edge(&first_processor, &processor_b);
flow_graph.print_edges();
let mut r2 = RandomNumberGeneratorProcessor::new("R2".to_string());
r2.tx.insert("AddOne".to_string(), tx.clone());

first_processor.stop().await;
processor_b.stop().await;
}

#[derive(Debug)]
struct Processor {
name: String,
uuid: Uuid,
queue_length: u16,
}

impl Processor {
fn new(name: String, queue_length: u16) -> Self {
Processor {
name,
uuid: Uuid::new_v4(),
queue_length: queue_length,
}
}
tokio::spawn(async move {
r2.process(None).await;
});

async fn process(&self) {
tracing::info!(
"Processing with name: {} and uuid: {}",
self.name,
self.uuid
);
}

async fn stop(&self) {
tracing::info!("Stopping processor with name: {}", self.name);
}
}

struct FlowGraph {
edges: HashMap<String, String>,
}

impl FlowGraph {
fn new() -> Self {
FlowGraph {
edges: HashMap::new(),
}
}

fn create_processor(&self, name: String, queue_length: Option<u16>) -> Processor {
Processor::new(name, queue_length.unwrap_or(PROCESSOR_DEFAULT_QUEUE_LENGTH))
}

fn add_edge(&mut self, from: &Processor, to: &Processor) {
self.edges
.insert(from.uuid.to_string(), to.uuid.to_string());
}
tokio::spawn(async move {
add_one_processor.process(Some(rx)).await;
});
tracing::info!("task spawned for AddOneProcessor");

fn print_edges(&self) {
for (from, to) in self.edges.iter() {
tracing::info!("Edge from: {} to: {}", from, to);
}
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
}
50 changes: 50 additions & 0 deletions main/src/processors/add_one_processor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
use std::collections::HashMap;

use tokio::sync::mpsc;
use uuid::Uuid;

use super::base_processor::{Packet, Processor, ProcessorStatus};

pub struct AddOneProcessor {
pub name: String,
pub uuid: Uuid,
pub tx: Vec<mpsc::Sender<Packet>>,
pub status: ProcessorStatus,
}

impl Processor for AddOneProcessor {
fn new(name: String) -> Self {
AddOneProcessor {
name,
uuid: Uuid::new_v4(),
tx: Vec::new(),
status: ProcessorStatus::Running,
}
}

async fn process(&self, receiver: Option<mpsc::Receiver<Packet>>) {
let mut rx = receiver.unwrap();
while let Some(packet) = rx.recv().await {
tracing::info!("Received: {:?}", packet);

let new_data = packet.data.map(|x| x + 1);
let packet = Packet::new(new_data, HashMap::new());

tracing::info!("Processed : {:?}", packet);

for tx in &self.tx {
tx.send(packet.clone()).await.unwrap();
}
}
}

async fn start(&mut self) {
tracing::info!("Starting {} processor", self.name);
self.status = ProcessorStatus::Running;
}

async fn stop(&mut self) {
tracing::info!("Stopping {} processor", self.name);
self.status = ProcessorStatus::Stopped;
}
}
43 changes: 43 additions & 0 deletions main/src/processors/base_processor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use tokio::sync::mpsc;
use uuid::Uuid;

use std::collections::HashMap;

const MAX_DATA_PER_PACKET_BYTES: usize = 10;

pub trait Processor {
fn new(name: String) -> Self;
async fn process(&self, receiver: Option<mpsc::Receiver<Packet>>);
async fn start(&mut self);
async fn stop(&mut self);
}

#[derive(Debug)]
pub struct Packet {
pub data: [u16; MAX_DATA_PER_PACKET_BYTES],
pub atributes: HashMap<String, String>,
pub uuid: Uuid,
}

impl Packet {
pub fn new(data: [u16; MAX_DATA_PER_PACKET_BYTES], atributes: HashMap<String, String>) -> Self {
Packet {
data,
atributes,
uuid: Uuid::new_v4(),
}
}
pub fn clone(&self) -> Self {
Packet {
data: self.data,
atributes: self.atributes.clone(),
uuid: self.uuid,
}
}
}

#[derive(Debug)]
pub enum ProcessorStatus {
Running,
Stopped,
}
3 changes: 3 additions & 0 deletions main/src/processors/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod add_one_processor;
pub mod base_processor;
pub mod random_number_generator;
51 changes: 51 additions & 0 deletions main/src/processors/random_number_generator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
use std::collections::HashMap;

use tokio::sync::mpsc;
use uuid::Uuid;

use super::base_processor::{Packet, Processor, ProcessorStatus};

#[derive(Debug)]
pub struct RandomNumberGeneratorProcessor {
pub name: String,
pub uuid: Uuid,
pub tx: HashMap<String, mpsc::Sender<Packet>>,
pub status: ProcessorStatus,
pub sleep_time_milliseconds: u16,
}

impl Processor for RandomNumberGeneratorProcessor {
fn new(name: String) -> Self {
RandomNumberGeneratorProcessor {
name,
uuid: Uuid::new_v4(),
tx: HashMap::new(),
status: ProcessorStatus::Running,
sleep_time_milliseconds: 1000,
}
}

async fn process(&self, _receiver: Option<mpsc::Receiver<Packet>>) {
loop {
let random_number = rand::random::<u16>();
let packet = Packet::new([random_number; 10], Default::default());
tracing::info!("{} generated: {:?}", self.name, packet);

for (key, sender) in self.tx.iter() {
sender.send(packet.clone()).await.unwrap();
tracing::info!("Sent packet to {}", key);
}
tokio::task::yield_now().await;
}
}

async fn start(&mut self) {
tracing::info!("Starting {} processor", self.name);
self.status = ProcessorStatus::Running;
}

async fn stop(&mut self) {
tracing::info!("Stopping {} processor", self.name);
self.status = ProcessorStatus::Stopped;
}
}

0 comments on commit 85c3e19

Please sign in to comment.