From 81c2dfa856479203c2a392de82865b4964e422e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alessandro=20Vigano=CC=80?= Date: Mon, 30 Nov 2015 08:52:40 +0100 Subject: [PATCH] Implemented call function. Improved Log. Improved c++ api for msgunpack. --- CMakeLists.txt | 2 +- source/AppDesktop.cpp | 15 +++- source/CMakeLists.txt | 2 +- source/LogConfig.h | 8 ++ source/MsgPackCPP.cpp | 2 +- source/MsgUnpack.cpp | 52 ++++++++++++- source/MsgUnpack.h | 40 +++++++++- source/SocketMBED.cpp | 3 +- source/SocketOSX.cpp | 4 +- source/{WampMBED.cpp => Wamp.cpp} | 125 +++++++++++++++++++++++------- source/{WampMBED.h => Wamp.h} | 21 +++-- source/WampMBEDMain.cpp | 2 +- source/WampTransportRaw.cpp | 5 +- source/WampTransportWS.cpp | 6 +- source/mpacktest.cpp | 12 ++- 15 files changed, 243 insertions(+), 56 deletions(-) create mode 100644 source/LogConfig.h rename source/{WampMBED.cpp => Wamp.cpp} (63%) rename source/{WampMBED.h => Wamp.h} (64%) diff --git a/CMakeLists.txt b/CMakeLists.txt index 1ed21ea..7c8edb3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -3,7 +3,7 @@ if(YOTTA_CFG_MBED) else(YOTTA_CFG_MBED) cmake_minimum_required(VERSION 3.3) project(wamp_mbed) - set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -Werror -Wall -Wextra -pedantic") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -Wall -Wextra -pedantic") endif(YOTTA_CFG_MBED) diff --git a/source/AppDesktop.cpp b/source/AppDesktop.cpp index 1e0c56e..0874d0e 100644 --- a/source/AppDesktop.cpp +++ b/source/AppDesktop.cpp @@ -1,5 +1,5 @@ #include -#include "WampMBED.h" +#include "Wamp.h" #include "logger.h" #include "MpackPrinter.h" #include "WampTransportRaw.h" @@ -34,10 +34,19 @@ int main() { wamp->publish("test", MsgPackArr {"hello"}, MsgPackMap {}); - wamp->subscribe("com.example.oncounter", [](mpack_node_t &args, mpack_node_t &kwargs) { + wamp->subscribe("com.example.oncounter", [](MPNode args, MPNode kwargs) { (void) kwargs; - LOG("Received event: " << MpackPrinter(args).toJSON()); + LOG("Received event: " << args.toJson()); }); + + wamp->call("com.example.add", MsgPackArr {20,3}, MsgPackMap{}, + [](WampError *err, MPNode args, MPNode kwargs) { + if (!err) { + LOG("Received result:" << args.toJson()); + } + + }); + }); diff --git a/source/CMakeLists.txt b/source/CMakeLists.txt index acd3619..0951f57 100644 --- a/source/CMakeLists.txt +++ b/source/CMakeLists.txt @@ -2,7 +2,7 @@ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") include_directories(../mpack/src) LIST(APPEND SOURCES MsgPackCPP.cpp MsgUnpack.cpp MpackPrinter.cpp - WampMBED.cpp + Wamp.cpp WampTransportWS.cpp WampTransportRaw.cpp SocketTypes.cpp) diff --git a/source/LogConfig.h b/source/LogConfig.h new file mode 100644 index 0000000..9cf548c --- /dev/null +++ b/source/LogConfig.h @@ -0,0 +1,8 @@ +#ifndef LOGCONFIG_H +#define LOGCONFIG_H + +//#define DEBUG_WAMP_TRANSPORT //Rawsocket/Websocket Layer transport +//#define DEBUG_WAMP_SOCKET //Low level socket layer +#define DEBUG_WAMP //Wamp Protocol Layer + +#endif //LOGCONFIG_H diff --git a/source/MsgPackCPP.cpp b/source/MsgPackCPP.cpp index 2459075..67d3b80 100644 --- a/source/MsgPackCPP.cpp +++ b/source/MsgPackCPP.cpp @@ -56,6 +56,6 @@ void MsgPack::pack(MsgPack mp) { std::string MsgPack::getJson() { MsgUnpack unp {getData(), getUsedBuffer()}; - return unp.toJson(); + return unp.getRoot().toJson(); } diff --git a/source/MsgUnpack.cpp b/source/MsgUnpack.cpp index 7ed00d0..5bc5787 100644 --- a/source/MsgUnpack.cpp +++ b/source/MsgUnpack.cpp @@ -5,6 +5,8 @@ #include "MsgUnpack.h" #include "MpackPrinter.h" #include +#include +#include MsgUnpack::MsgUnpack(char *buffer, size_t size) { mpack_tree_init(&tree, buffer, size); @@ -14,16 +16,58 @@ MsgUnpack::~MsgUnpack() { mpack_tree_destroy(&tree); } -mpack_node_t MsgUnpack::getRoot() { - return mpack_tree_root(&tree); +MPNode MsgUnpack::getRoot() { + return MPNode(mpack_tree_root(&tree)); } mpack_error_t MsgUnpack::getError() { return tree.error; } -std::string MsgUnpack::toJson() { - MpackPrinter printer(getRoot()); +std::string MPNode::toJson() { + MpackPrinter printer(node); return printer.toJSON(); +} + +MPNode MPNode::operator[](const u_int16_t &index) { + return at(index); +} + +MPNode MPNode::at(const u_int16_t &index, bool ignore_errors) { + + if (mpack_node_error(node) != mpack_ok) + return MPNode(mpack_tree_nil_node(node.tree)); + + if (node.data->type != mpack_type_array) { + if (!ignore_errors) + mpack_node_flag_error(node, mpack_error_type); + + return MPNode(mpack_tree_nil_node(node.tree)); + } + + if (index >= node.data->value.content.n) { + if (!ignore_errors) + mpack_node_flag_error(node, mpack_error_data); + return MPNode(mpack_tree_nil_node(node.tree)); + } + + return MPNode(mpack_node(node.tree, mpack_node_child(node, index))); +} + + +size_t MPNode::arrayLength() { + return mpack_node_array_length(node); +} + +MPNode::operator std::string() { + if (mpack_node_error(node) != mpack_ok) { + return std::string(); + } + + if (node.data->type != mpack_type_str) { + mpack_node_flag_error(node, mpack_error_type); + return std::string(); + } + return std::string(mpack_node_data(node), mpack_node_strlen(node)); } diff --git a/source/MsgUnpack.h b/source/MsgUnpack.h index 5c81660..9ded2b0 100644 --- a/source/MsgUnpack.h +++ b/source/MsgUnpack.h @@ -10,6 +10,35 @@ #include "mpack/mpack-node.h" #include +class MPNode { +private: + +public: + mpack_node_t node; + MPNode(mpack_node_t node):node(node){}; + MPNode(); + + MPNode operator[](const u_int16_t &index); + MPNode at(const u_int16_t &index, bool ignore_errors=false); + + size_t arrayLength (); + std::string toJson(); + + operator std::string(); + + operator int() { + return mpack_node_i16(node); + } + + operator u_int16_t () { + return mpack_node_u16(node); + } + + operator unsigned long long int() { + return mpack_node_u64(node); + } +}; + class MsgUnpack { private: mpack_tree_t tree; @@ -17,12 +46,17 @@ class MsgUnpack { MsgUnpack(char* buffer, size_t size); ~MsgUnpack(); - mpack_node_t getRoot(); + MPNode getRoot(); + MPNode nil() { + return MPNode(mpack_node(&tree, &tree.nil_node)); + }; + mpack_error_t getError(); - std::string toJson(); - static void nodeToJson(mpack_node_t node, std::stringstream &s); + }; + + #endif //WAMP_MBED_MSGUNPACK_H diff --git a/source/SocketMBED.cpp b/source/SocketMBED.cpp index 643c2c1..9069352 100644 --- a/source/SocketMBED.cpp +++ b/source/SocketMBED.cpp @@ -2,8 +2,7 @@ #include "SocketMBED.h" #include "sal-stack-lwip/lwipv4_init.h" -#define DEBUG_WAMP_SOCKET - +#include "LogConfig.h" #ifdef DEBUG_WAMP_SOCKET #include "logger.h" #else diff --git a/source/SocketOSX.cpp b/source/SocketOSX.cpp index f8333e8..60dbeb7 100644 --- a/source/SocketOSX.cpp +++ b/source/SocketOSX.cpp @@ -6,9 +6,9 @@ #include "SocketOSX.h" #include #include +#include -#define DEBUG_WAMP_SOCKET - +#include "LogConfig.h" #ifdef DEBUG_WAMP_SOCKET #include "logger.h" #else diff --git a/source/WampMBED.cpp b/source/Wamp.cpp similarity index 63% rename from source/WampMBED.cpp rename to source/Wamp.cpp index c1ed491..0bf7e3d 100644 --- a/source/WampMBED.cpp +++ b/source/Wamp.cpp @@ -2,17 +2,17 @@ // Created by Alessandro ViganĂ² on 16/11/15. // -#define DEBUGWAMP - #include #include -#include "WampMBED.h" +#include "Wamp.h" #include "WampTransport.h" #include "wampConstants.h" #include "MsgUnpack.h" -#ifdef DEBUGWAMP +#include "LogConfig.h" + +#ifdef DEBUG_WAMP #include "logger.h" #else #define LOG(X) @@ -132,15 +132,38 @@ void WampMBED::publish(string const &topic, const MsgPack& arguments, const MsgP this->transport.sendMessage(mp.getData(), mp.getUsedBuffer()); } +void WampMBED::call(string const &procedure, const MsgPack &arguments, const MsgPack &argumentsKW, TCallCallback cb) { + if (!connected) + return; + + callRequests[requestCount] = cb; + + mp.clear(); + mp.pack_array(6); + mp.pack((int) WAMP_MSG_CALL); + mp.pack(requestCount); + mp.pack_map(0); + mp.pack(procedure); + mp.pack(arguments); + mp.pack(argumentsKW); + + LOG ("Calling " << procedure << "- " << mp.getJson()); + + requestCount++; + + this->transport.sendMessage(mp.getData(), mp.getUsedBuffer()); +} + void WampMBED::parseMessage(char* buffer, size_t size) { int msgType = 0; MsgUnpack munp(buffer,size); - mpack_node_t root = munp.getRoot(); - LOG ("Received msg "<< munp.toJson()); + MPNode root = munp.getRoot(); + LOG ("Received msg "<< root.toJson()); - msgType = mpack_node_u16(mpack_node_array_at(root,0)); + //msgType = mpack_node_u16(mpack_node_array_at(root,0)); + msgType = root[0]; if (munp.getError() != mpack_ok) { LOG ("Unknown msg"); @@ -150,15 +173,16 @@ void WampMBED::parseMessage(char* buffer, size_t size) { LOG ("Msg type:" << msgType); switch (msgType) { case WAMP_MSG_WELCOME: { - sessionID = mpack_node_u64(mpack_node_array_at(root,1)); + //sessionID = mpack_node_u64(mpack_node_array_at(root,1)); + sessionID = root[1]; LOG ("Received welcome message with SessionID " << sessionID); connected = true; onJoin(); break; } case WAMP_MSG_SUBSCRIBED: { - unsigned long long int requestID = mpack_node_u64(mpack_node_array_at(root,1)); - unsigned long long int subscriptionID = mpack_node_u64(mpack_node_array_at(root,2)); + WampID_t requestID = root[1]; + WampID_t subscriptionID = root[2]; if (munp.getError() != mpack_ok) { LOG ("Bad Subscribe Message"); @@ -182,31 +206,15 @@ void WampMBED::parseMessage(char* buffer, size_t size) { case WAMP_MSG_EVENT: { - unsigned long long int subscriptionID = mpack_node_u64(mpack_node_array_at(root,1)); + WampID_t subscriptionID = root[1]; if (munp.getError() != mpack_ok) { LOG ("Bad EVENT Message"); return; } - mpack_node_t args; - mpack_node_t kwargs; - - if (mpack_node_array_length(root) >4) { - args = mpack_node_array_at(root, 4); - } - else { - args.data = nullptr; - args.tree = nullptr; - } - - if (mpack_node_array_length(root) >5) { - kwargs = mpack_node_array_at(root, 5); - } - else { - kwargs.data = nullptr; - kwargs.tree = nullptr; - } + MPNode args = root.at(4, true); + MPNode kwargs = root.at(5, true); LOG ("Received EVENT message with SubscriptionID " << subscriptionID); @@ -222,6 +230,63 @@ void WampMBED::parseMessage(char* buffer, size_t size) { break; } + case WAMP_MSG_RESULT: + { + WampID_t requestID = root[1]; + if (munp.getError() != mpack_ok) { + LOG ("Bad RESULT Message"); + return; + } + + MPNode args = root.at(3, true); + MPNode kwargs = root.at(4, true); + + + LOG ("Received RESULT message with requestID " << requestID); + + if (callRequests.find(requestID) == callRequests.end()) { + LOG ("RequestID not found - # Subscriptions" << subscriptions.size()); + return; + } + + TCallCallback cb = callRequests.at(requestID); + cb(nullptr, args, kwargs); + + break; + } + + case WAMP_MSG_ERROR: { + u_int16_t errMsg = root[1]; + + switch (errMsg) { + case WAMP_MSG_CALL: { + std::string err = root[4]; + WampID_t callRequest = root[2]; + + if (munp.getError() != mpack_ok) { + LOG ("Bad ERROR Message"); + return; + } + + if (callRequests.find(callRequest) == callRequests.end()) { + LOG ("RequestID not found - # CallRequests" << callRequests.size()); + return; + } + + LOG("Received error for CALL request "<< callRequest); + TCallCallback cb = callRequests.at(callRequest); + callRequests.erase(callRequest); + WampError e {err}; + cb(&e, munp.nil(), munp.nil()); + break; + + } + default: + LOG("Uknown erorr"); + } + } + + } } @@ -233,3 +298,5 @@ void WampMBED::close() { if (onClose) onClose(); } + + diff --git a/source/WampMBED.h b/source/Wamp.h similarity index 64% rename from source/WampMBED.h rename to source/Wamp.h index dbd7223..c513127 100644 --- a/source/WampMBED.h +++ b/source/Wamp.h @@ -10,9 +10,16 @@ #include "WampTransport.h" #include "MsgPackCPP.h" #include "mpack/mpack.h" +#include "MsgUnpack.h" + +struct WampError { + string URI; +}; + +typedef unsigned long long int WampID_t; +typedef function TSubscriptionCallback; +typedef function TCallCallback; -typedef unsigned long long int WampID; -typedef function TSubscriptionCallback; class WampMBED { private: @@ -20,8 +27,9 @@ class WampMBED { std::random_device rd; std::mt19937_64 gen; MsgPack mp; - unordered_map subscriptionsRequests; - unordered_map subscriptions; + unordered_map subscriptionsRequests; + unordered_map subscriptions; + unordered_map callRequests; unsigned long long int requestCount = 0; std::function onJoin {nullptr}; @@ -29,7 +37,7 @@ class WampMBED { public: - WampID sessionID; + WampID_t sessionID; bool connected {false}; std::function onClose {nullptr}; @@ -43,7 +51,8 @@ class WampMBED { void subscribe(string topic, TSubscriptionCallback callback); void publish(string const &topic); void publish(string const &topic, const MsgPack& arguments, const MsgPack& argumentsKW); - + void call(string const &procedure, const MsgPack& arguments, + const MsgPack& argumentsKW, TCallCallback cb); void parseMessage(char *buffer, size_t size); diff --git a/source/WampMBEDMain.cpp b/source/WampMBEDMain.cpp index 1b241e7..eda27b9 100644 --- a/source/WampMBEDMain.cpp +++ b/source/WampMBEDMain.cpp @@ -1,5 +1,5 @@ #include -#include "WampMBED.h" +#include "Wamp.h" #include "logger.h" #include "MpackPrinter.h" #include "mbed-drivers/mbed.h" diff --git a/source/WampTransportRaw.cpp b/source/WampTransportRaw.cpp index 77030c4..3c5b824 100644 --- a/source/WampTransportRaw.cpp +++ b/source/WampTransportRaw.cpp @@ -5,7 +5,7 @@ #include #include "WampTransportRaw.h" -#define DEBUG_WAMP_TRANSPORT +#include "LogConfig.h" #ifdef DEBUG_WAMP_TRANSPORT #include "logger.h" @@ -202,6 +202,9 @@ void WampTransportRaw::onDisconnect() { } void WampTransportRaw::onError(spal::error serr) { +#ifndef DEBUG_WAMP_TRANSPORT + (void) serr; +#endif LOG("ERROR "<< spal::getError(serr)); onDisconnect(); } diff --git a/source/WampTransportWS.cpp b/source/WampTransportWS.cpp index 2d1a389..f068504 100644 --- a/source/WampTransportWS.cpp +++ b/source/WampTransportWS.cpp @@ -7,7 +7,7 @@ #include #include -#define DEBUG_WAMP_TRANSPORT +#include "LogConfig.h" #ifdef DEBUG_WAMP_TRANSPORT #include "logger.h" @@ -373,7 +373,11 @@ WampTransportWS::WampTransportWS(const std::string &url, const string &origin):u } void WampTransportWS::onError(spal::error serr) { +#ifndef DEBUG_WAMP_TRANSPORT + (void) serr; +#endif LOG("ERROR "<< spal::getError(serr)); + onDisconnect(); } diff --git a/source/mpacktest.cpp b/source/mpacktest.cpp index a635a7b..d027548 100644 --- a/source/mpacktest.cpp +++ b/source/mpacktest.cpp @@ -1,6 +1,8 @@ #include "mpack/mpack.h" #include "MsgPackCPP.h" +#include "MsgUnpack.h" #include "logger.h" +#include int main() { MsgPack mp; @@ -25,9 +27,17 @@ int main() { mp2.print(); - MsgPackArr mp3 {"gattina"}; + MsgPackArr mp3 {"gatto","cane","pesce"}; mp3.print(); MsgPackMap mp4 {"colore","rosso","pesce","spigola"}; mp4.print(); + + MsgUnpack munp(mp3.getData(),mp3.getUsedBuffer()); + MPNode root = munp.getRoot(); + + + std::cout << root.toJson() << ":" << (std::string) root[1]; + + } \ No newline at end of file