Skip to content

Commit

Permalink
20240324
Browse files Browse the repository at this point in the history
  • Loading branch information
hansolo12334 committed Mar 24, 2024
1 parent 6a77595 commit cb1b4d4
Show file tree
Hide file tree
Showing 16 changed files with 329 additions and 103 deletions.
58 changes: 0 additions & 58 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,67 +1,9 @@
cmake_minimum_required(VERSION 3.20)
project(hansolo_thread_rebuild)

# set(PROTOBUF_ROOT "/usr/local/include/protobuf25_1")
# set(CMAKE_PREFIX_PATH
# ${CMAKE_PREFIX_PATH}
# ${PROTOBUF_ROOT}/lib/cmake/
# )
# option(protobuf_MODULE_COMPATIBLE TRUE)
# set(_GRPC_GRPCPP gRPC::grpc++)


# find_package(Protobuf CONFIG REQUIRED )
# message(STATUS "Using protobuf ${Protobuf_VERSION}")
# set(_PROTOBUF_LIBPROTOBUF protobuf::libprotobuf)
# set(_REFLECTION gRPC::grpc++_reflection)
# set(_PROTOBUF_PROTOC $<TARGET_FILE:protobuf::protoc>)

# find_package(gRPC CONFIG REQUIRED)
# message(STATUS "Using gRPC ${gRPC_VERSION}")
# set(_GRPC_GRPCPP gRPC::grpc++)
# set(_GRPC_CPP_PLUGIN_EXECUTABLE $<TARGET_FILE:gRPC::grpc_cpp_plugin>)



# get_filename_component(hw_proto "./msgs/coreConnection.proto" ABSOLUTE)
# get_filename_component(hw_proto_path "${hw_proto}" PATH)


# set(hw_proto_srcs "${CMAKE_CURRENT_BINARY_DIR}/coreConnection.pb.cc")
# set(hw_proto_hdrs "${CMAKE_CURRENT_BINARY_DIR}/coreConnection.pb.h")
# set(hw_grpc_srcs "${CMAKE_CURRENT_BINARY_DIR}/coreConnection.grpc.pb.cc")
# set(hw_grpc_hdrs "${CMAKE_CURRENT_BINARY_DIR}/coreConnection.grpc.pb.h")


# get_filename_component(hw_proto1 "./msgs/stdMsg.proto" ABSOLUTE)
# get_filename_component(hw_proto_path1 "${hw_proto1}" PATH)


# set(hw_proto_srcs1 "${CMAKE_CURRENT_BINARY_DIR}/stdMsg.pb.cc")
# set(hw_proto_hdrs1 "${CMAKE_CURRENT_BINARY_DIR}/stdMsg.pb.h")



# add_custom_command(
# OUTPUT "${hw_proto_srcs}" "${hw_proto_hdrs}" "${hw_grpc_srcs}" "${hw_grpc_hdrs}" "${hw_proto_srcs1}" "${hw_proto_hdrs1}"
# COMMAND ${_PROTOBUF_PROTOC}
# ARGS --grpc_out "${CMAKE_CURRENT_BINARY_DIR}"
# --cpp_out "${CMAKE_CURRENT_BINARY_DIR}"
# -I "${hw_proto_path}"
# -I "${hw_proto_path1}"
# --plugin=protoc-gen-grpc="${_GRPC_CPP_PLUGIN_EXECUTABLE}"
# "${hw_proto}" "${hw_proto1}"
# DEPENDS "${hw_proto}" "${hw_proto1}")

include(./msgs/protoGen.cmake)






#

file(GLOB_RECURSE all_include_dirs RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} ./include/*)
foreach(dir ${all_include_dirs})
get_filename_component(dir_path ${dir} PATH)
Expand Down
3 changes: 3 additions & 0 deletions hansolotopic.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ int main(int argc,char **argv)
if(posArgv.size()==2 && std::string(posArgv[1],strlen(posArgv[1])) =="list"){
topic.GetTopics();
}
if(posArgv.size()==3 && std::string(posArgv[1],strlen(posArgv[1])) =="echo"){
topic.EchoTopic(posArgv[2]);
}

return 0;
}
18 changes: 17 additions & 1 deletion include/core/hansolo_master_new.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,18 @@ using hansolo::singleTopic;
using hansolo::replyTopics;
using hansolo::requestTopics;

using hansolo::replyEchoTopic;
using hansolo::requestEchoTopic;

using hansolo::replyStopEchoTopic;
using hansolo::requestStopEchoTopic;

class hansolo_master final : public Register::CallbackService
{

private:
int m_port{4321};
std::unordered_map<std::string, int> m_node_names{};



struct node_publish_item
Expand Down Expand Up @@ -88,6 +94,8 @@ class hansolo_master final : public Register::CallbackService
};
std::vector<node_items> m_all_nodes{};

std::vector<std::string> all_hansoloTopic_nodes{};
int hansoloTopicNums{0};

public:
ServerUnaryReactor *SayRegist(CallbackServerContext *context,
Expand All @@ -110,6 +118,14 @@ class hansolo_master final : public Register::CallbackService
ServerUnaryReactor *GetTopics(CallbackServerContext *context,
const requestTopics *request,
replyTopics *reply) override;

ServerUnaryReactor *EchoTopic(CallbackServerContext *context,
const requestEchoTopic *request,
replyEchoTopic *reply) override;

ServerUnaryReactor *StopEchoTopic(CallbackServerContext *context,
const requestStopEchoTopic *request,
replyStopEchoTopic *reply) override;
};

#endif
9 changes: 8 additions & 1 deletion include/core/hansolo_publisher.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,21 @@ class HansoloPublisher
}
~HansoloPublisher(){}

google::protobuf::Any any;

void publish(T &msg)
{
if(!my_tcp->already){
return;
}
std::string temp;
msg.get_msg().SerializeToString(&temp);
//测试 any

any.PackFrom(msg.get_msg());
// msg.get_msg().SerializeToString(&temp);
any.SerializeToString(&temp);
my_tcp->sendData = temp;
any.Clear();
}
};

Expand Down
13 changes: 13 additions & 0 deletions include/msg/hansolo_msg_base.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#ifndef HANSOLO_MSG_BASE_H
#define HANSOLO_MSG_BASE_H


class hansolo_msg_base
{
public:
int x = 0;

virtual void printMessage() const = 0;
};

#endif
13 changes: 10 additions & 3 deletions include/msg/hansolo_std_msg.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,19 @@
#include <unistd.h>

#include"hansolo_time.h"
#include"colormod.h"

using Color::Code;

class hansolo_stdMsg : public hansoloTime
{
public:
hansolo_stdMsg(){}
~hansolo_stdMsg(){}


hansolo_std::std_msg ss{};


int data{};
hansolo_std::std_msg msg{};

Expand All @@ -34,8 +39,10 @@ class hansolo_stdMsg : public hansoloTime
data = msg.data();
seconds = msg.timestamp().seconds();
}


void printMessage() const
{
hDebug(Color::FG_DEFAULT) << data;
}
};

#endif
3 changes: 2 additions & 1 deletion include/msg/hansolo_time.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@

#include <ctime>
#include <unistd.h>
#include"hansolo_msg_base.h"

class hansoloTime
class hansoloTime : public hansolo_msg_base
{
public:
time_t seconds{};
Expand Down
86 changes: 82 additions & 4 deletions include/tcp/hansolo_tcp_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,13 @@
#include<future>

#include"colormod.h"
#include"hansolo_msg_base.h"

#include"baseMsg.pb.h"

#include"msg_type_map.h"

// #include"protobuf/any.pb.h"

class hansolo_tcp_thread
{
Expand All @@ -29,6 +35,9 @@ class hansolo_tcp_thread
int m_port{};
std::string m_node_name;
std::string m_topic_name;
google::protobuf::Any any;
bool findAnyRealType = false;
std::function<void(google::protobuf::Any &)> func;

public:
hansolo_tcp_thread(int port,std::string node_name,std::string topic_name);
Expand All @@ -39,6 +48,69 @@ class hansolo_tcp_thread
void server_update(int port);
void server_start();

template<typename T>
void topic_echo_start(void(*fp)(const T&))
{
int sucess= my_tcp->init_client_tcp(m_port);
hDebug(Color::FG_BLUE) << m_node_name <<' '<<m_topic_name << " hansoloTopic启动";
while (1)
{
if(!m_stopE)
{
std::this_thread::sleep_for(std::chrono::milliseconds(1));
std::unique_lock<std::mutex> lock(mu);
m_cv.wait(lock, [this] { return !m_pause; });
}
if(m_stopE)
{
break;
}

char buf[100] = {'h'};
int send_len=send(my_tcp->serverfd,(uint8_t *)buf, strlen(buf),0);
if (send_len <= 0)
{
Close(my_tcp->serverfd);
int sucess= my_tcp->init_client_tcp(m_port);
continue;
// printf("tcp_send error!\n");
// close(my_tcp->serverfd);
// exit(EXIT_FAILURE);
}
bzero(buf, sizeof(buf));
// std::cout << sizeof(buf) << ' ' << strlen(buf) << std::endl;
int recv_len = recv(my_tcp->serverfd, buf, sizeof(buf), 0);
if(recv_len<=0){
Close(my_tcp->serverfd);
int sucess= my_tcp->init_client_tcp(m_port);
continue;
// printf("tcp_receive error!\n");
// close(my_tcp->serverfd);
// exit(EXIT_FAILURE);
}
std::string temp = std::string(buf, recv_len);



// data_msg.msg.ParseFromString(temp);
// 从字节流中恢复数据结构 存入类中
// data_msg.write_msg();

// call_back(data_msg);
any.ParseFromString(temp);
if(!findAnyRealType){
hDebug(Color::BG_DEFAULT) << any.type_url();
findAnyRealType = true;
func=topic_type_map[any.type_url()];
}

if(findAnyRealType){
func(any);
}
}
}


template<typename M>
void client_update(std::function<void(const M&)> call_back)
{
Expand Down Expand Up @@ -83,13 +155,19 @@ class hansolo_tcp_thread
// exit(EXIT_FAILURE);
}
std::string temp = std::string(buf, recv_len);



any.ParseFromString(temp);

M data_msg;
data_msg.msg.ParseFromString(temp);
//从字节流中恢复数据结构 存入类中
any.UnpackTo(&data_msg.msg);
any.Clear();
// data_msg.msg.ParseFromString(temp);
// 从字节流中恢复数据结构 存入类中
data_msg.write_msg();

// hDebug(Color::FG_BLUE) << any.type_url();
call_back(data_msg);

}
}

Expand Down
Loading

0 comments on commit cb1b4d4

Please sign in to comment.