Skip to content

Commit

Permalink
x
Browse files Browse the repository at this point in the history
  • Loading branch information
yutiansut committed Dec 5, 2019
1 parent ccc9316 commit 9635c16
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 64 deletions.
92 changes: 40 additions & 52 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub mod qamongo;
pub mod qawebsocket;
pub mod qaeventmq;

extern crate ndarray;

extern crate chrono;
Expand All @@ -9,43 +10,49 @@ use ndarray::array;

use std::time::Duration;
use std::thread;
use qaeventmq::Subscriber;
use crate::qaeventmq::{Publisher, Callback};

use crate::qaeventmq::{Publisher,Subscriber, Callback};

fn main() {

qamongo::query_account("192.168.2.24".to_string(), "288870".to_string());
let mut puber = qaeventmq::QAEventMQ{
amqp: "amqp:https://admin:[email protected]:5672/".to_string(),
exchange: "tick".to_string(),
model: "direct".to_string(),
routing_key: "rb2001".to_string()
};


let mut i = 1;
thread::spawn(move|| {
while i<1000 {
puber.publish_routing("s".to_string());
i+=1;
thread::sleep(Duration::from_secs(1));
}
});
// qamongo::query_account("192.168.2.24".to_string(), "288870".to_string());
// let mut puber = qaeventmq::QAEventMQ{
// amqp: "amqp:https://admin:[email protected]:5672/".to_string(),
// exchange: "tick".to_string(),
// model: "direct".to_string(),
// routing_key: "rb2001".to_string()
// };
//
//
// let mut i = 1;
// thread::spawn(move|| {
// while i<1000 {
// puber.publish_routing("s".to_string());
// i+=1;
// thread::sleep(Duration::from_secs(1));
// }
// });
//
// impl Callback for qaeventmq::QAEventMQ{
// fn callback(&mut self, message:String) -> Option<i32>{
// println!("receive x! {}",message);
// Some(1)
// }
//}
// let mut client = qaeventmq::QAEventMQ{
// amqp: "amqp:https://admin:[email protected]:5672/".to_string(),
// exchange: "tick".to_string(),
// model: "direct".to_string(),
// routing_key: "rb2001".to_string()
// };
// client.subscribe_routing();
// println!("12212");


qawebsocket::QAtradeR(
"test1".to_string(),"test1".to_string(), "QUANTAXIS".to_string(), "192.168.2.24".to_string(),
"ws:https://192.168.2.118:7988".to_string(), "amqp:https://admin:[email protected]:5672/".to_string());

impl Callback for qaeventmq::QAEventMQ{
fn callback(&mut self, message:String) -> Option<i32>{
println!("receive x! {}",message);
Some(1)
}
}
let mut client = qaeventmq::QAEventMQ{
amqp: "amqp:https://admin:[email protected]:5672/".to_string(),
exchange: "tick".to_string(),
model: "direct".to_string(),
routing_key: "rb2001".to_string()
};
client.subscribe_routing();
println!("12212");
// thread::sleep(Duration::from_secs(200));
// thread::spawn(move || {
// client.subscribe_routing();
Expand Down Expand Up @@ -78,25 +85,6 @@ fn main() {



fn qatrader(account_cookie:String, password:String, broker:String, wsuri:String){


let mut client = qaeventmq::QAEventMQ{
amqp: "amqp:https://admin:[email protected]:5672/".to_string(),
exchange: "QAORDER_ROUTER".to_string(),
model: "direct".to_string(),
routing_key: account_cookie
};

thread::spawn(move || {
client.subscribe_routing();

});



}


fn test_ndarray() {
let a3 = array![[[1, 2], [3, 4]],
Expand Down
100 changes: 88 additions & 12 deletions src/qawebsocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,88 @@ pub fn wsmain(wsuri:String, user_name:String, password:String) {
let _ = sys.run();
}

struct ChatClient<T>(SinkWrite<SplitSink<Framed<T, Codec>>>)
use crate::qaeventmq;
use crate::qaeventmq::{Publisher,Subscriber, Callback};

pub fn QAtradeR(user_name : String, password: String, broker: String,
mongo_uri:String, ws_uri: String, eventmq_uri: String,) {


let sys = actix::System::new("qatrader");


Arbiter::spawn(lazy(|| {
Client::new()
.ws(ws_uri)
.connect()
.map_err(|e| {
println!("QAConnection Error: {}", e);
})
.map(|(response, framed)| {
println!("{:?}", response);
let (sink, stream) = framed.split();
let addr = ChatClient::create(|ctx| {
ChatClient::add_stream(stream, ctx);
ChatClient(SinkWrite::new(sink, ctx))
});

impl Callback for qaeventmq::QAEventMQ {
fn callback(&mut self, message: String) -> Option<i32> {

Some(1)

}
}
let mut client = qaeventmq::QAEventMQ{
amqp: eventmq_uri,
exchange: "QAORDER_ROUTER".to_string(),
model: "direct".to_string(),
routing_key: user_name.clone(),
};
// start console loop loop
thread::spawn(move || {
let login = ReqLogin {
aid: "req_login".to_string(),
bid: broker.clone(),
user_name: user_name.clone(),
password: password.clone(),};

// if io::stdin().read_line(&mut cmd).is_err() {
// println!("error");
// return;
// }
let b = serde_json::to_string(&login).unwrap();
addr.do_send(ClientCommand(b));




thread::spawn(move || {
client.subscribe_routing();
});





});
})
}));

let _ = sys.run();


}




pub struct ChatClient<T>(SinkWrite<SplitSink<Framed<T, Codec>>>)
where
T: AsyncRead + AsyncWrite;

#[derive(Message)]
struct ClientCommand(String);
pub struct ClientCommand(String);

impl<T: 'static> Actor for ChatClient<T>
where
Expand Down Expand Up @@ -155,20 +231,20 @@ impl<T: 'static> StreamHandler<Frame, WsProtocolError> for ChatClient<T>


#[derive(Serialize, Deserialize, Debug)]
struct Peek {
pub struct Peek {
aid: String,
}


#[derive(Serialize, Deserialize, Debug)]
struct Broker {
pub struct Broker {
aid: String,
brokers: Vec<String>,

}

#[derive(Serialize, Deserialize, Debug)]
struct ReqLogin {
pub struct ReqLogin {
aid: String,
bid: String,
user_name: String,
Expand All @@ -177,7 +253,7 @@ struct ReqLogin {


#[derive(Serialize, Deserialize, Debug)]
struct ReqOrder {
pub struct ReqOrder {
aid: String,
user_id:String,
order_id: String,
Expand All @@ -193,14 +269,14 @@ struct ReqOrder {
}

#[derive(Serialize, Deserialize, Debug)]
struct ReqCancel {
pub struct ReqCancel {
aid: String,
user_id:String,
order_id: String
}

#[derive(Serialize, Deserialize, Debug)]
struct ReqQueryBank {
pub struct ReqQueryBank {
aid: String,
bank_id: String,
future_account: String,
Expand All @@ -210,22 +286,22 @@ struct ReqQueryBank {
}

#[derive(Serialize, Deserialize, Debug)]
struct ReqQuerySettlement {
pub struct ReqQuerySettlement {
aid: String,
trading_day: i32
}


#[derive(Serialize, Deserialize, Debug)]
struct ReqChangePassword {
pub struct ReqChangePassword {
aid: String,
old_password: String,
new_password: String
}


#[derive(Serialize, Deserialize, Debug)]
struct ReqTransfer {
pub struct ReqTransfer {
aid: String,
bank_id: String,
future_account: String,
Expand All @@ -238,7 +314,7 @@ struct ReqTransfer {


#[derive(Serialize, Deserialize, Debug)]
struct RtnData {
pub struct RtnData {
aid: String,
data: Vec<String>
}
Expand Down

0 comments on commit 9635c16

Please sign in to comment.