Skip to content

Commit

Permalink
#
Browse files Browse the repository at this point in the history
  • Loading branch information
yutiansut committed Dec 5, 2019
1 parent 94416b9 commit 6e979ca
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 42 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ Cargo.lock

/target
#**/*.rs.bk
.idea/
2 changes: 0 additions & 2 deletions .idea/.gitignore

This file was deleted.

6 changes: 0 additions & 6 deletions .idea/misc.xml

This file was deleted.

8 changes: 0 additions & 8 deletions .idea/modules.xml

This file was deleted.

14 changes: 0 additions & 14 deletions .idea/qatrader-rs.iml

This file was deleted.

6 changes: 0 additions & 6 deletions .idea/vcs.xml

This file was deleted.

28 changes: 23 additions & 5 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
pub mod qamongo;

pub mod qawebsocket;
pub mod qaeventmq;
extern crate ndarray;

extern crate chrono;
use chrono::prelude::*;
use ndarray::array;

use std::time::Duration;
use std::thread;
use qaeventmq::Subscriber;
use crate::qaeventmq::Publisher;

Expand All @@ -19,16 +21,32 @@ fn main() {
model: "direct".to_string(),
routing_key: "rb2001".to_string()
};
puber.publish_routing("s".to_string());
let mut i = 1;
thread::spawn(move|| {
while i<1000 {
puber.publish_routing("s".to_string());
i+=1;
thread::sleep(Duration::from_secs(5));
}


});

let mut client = qaeventmq::QAEventMQ{
amqp: "amqp:https://admin:[email protected]:5672/".to_string(),
exchange: "tick".to_string(),
model: "direct".to_string(),
routing_key: "rb2001".to_string()
};
client.subscribe_routing();
// qawebsockets::websocketclient::wsmain(
// "ws:https://101.132.37.31:7988".to_string());

thread::spawn(move || {
client.subscribe_routing();

});


qawebsocket::wsmain(
"ws:https://101.132.37.31:7988".to_string());

test_ndarray();
test_datetime();
Expand Down
53 changes: 52 additions & 1 deletion src/qaeventmq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub struct QAEventMQ{pub amqp : String,

pub trait Subscriber {
fn subscribe_routing(&mut self) -> Result<()>;
fn subscribe_fanout(&mut self) -> Result<()>;
}

pub trait Publisher {
Expand All @@ -24,7 +25,7 @@ pub trait Callback {
}
impl Callback for QAEventMQ{
fn callback(&mut self, message:String) -> Option<i32>{
println!("{}",message);
println!("receive !! {}",message);
Some(1)
}
}
Expand Down Expand Up @@ -70,6 +71,56 @@ impl Subscriber for QAEventMQ{
}
connection.close()
}
fn subscribe_fanout(&mut self) -> Result<()> {
// Open connection.
let mut connection = Connection::insecure_open(&self.amqp)?;

// Open a channel - None says let the library choose the channel ID.
let channel = connection.open_channel(None)?;

// Declare the fanout exchange we will bind to.
let exchange = channel.exchange_declare(
ExchangeType::Fanout,
&self.exchange,
ExchangeDeclareOptions::default(),
)?;

// Declare the exclusive, server-named queue we will use to consume.
let queue = channel.queue_declare(
"",
QueueDeclareOptions {
exclusive: true,
..QueueDeclareOptions::default()
},
)?;
println!("created exclusive queue {}", queue.name());
// Bind our queue to the logs exchange.
queue.bind(&exchange, "", FieldTable::new())?;

// Start a consumer. Use no_ack: true so the server doesn't wait for us to ack
// the messages it sends us.
let consumer = queue.consume(ConsumerOptions {
no_ack: true,
..ConsumerOptions::default()
})?;
println!("Waiting for logs. Press Ctrl-C to exit.");

for (i, message) in consumer.receiver().iter().enumerate() {
match message {
ConsumerMessage::Delivery(delivery) => {
let body = String::from_utf8_lossy(&delivery.body);

self.callback(body.to_string());
println!("({:>3}) {}", i, body);
}
other => {
println!("Consumer ended: {:?}", other);
break;
}
}
}
connection.close()
}
}

impl Publisher for QAEventMQ{
Expand Down

0 comments on commit 6e979ca

Please sign in to comment.