#include #include #include #include #include #include #include #include #include #include #include #include "ivgrpc.grpc.pb.h" #include "rpcmsgunit.h" #include "modulecomm.h" using grpc::Server; using grpc::ServerBuilder; using grpc::ServerContext; using grpc::Status; #include #include QMutex gMutexMsg; QMutex gMutexCtrl; std::string gstrserverport; std::vector gvectorquerymsgunit; std::vector gvectorctrlmsgunit; qint64 gnqueryindex = 0; void dec_yaml(const char * stryamlpath) { YAML::Node config; try { config = YAML::LoadFile(stryamlpath); } catch(YAML::BadFile e) { qDebug("load error."); return; } std::vector vecmodulename; if(config["port"]) { gstrserverport = config["port"].as(); } std::string strmsgname; if(config["querymessage"]) { for(YAML::const_iterator it= config["querymessage"].begin(); it != config["querymessage"].end();++it) { std::string strtitle = it->first.as(); std::cout<(); strncpy(xmu.mstrmsgname,strmsgname.data(),255); xmu.mnBufferSize = config["querymessage"][strtitle]["buffersize"].as(); xmu.mnBufferCount = config["querymessage"][strtitle]["buffercount"].as(); if(config["querymessage"][strtitle]["keeptime"]) { std::string strkeep = config["querymessage"][strtitle]["keeptime"].as(); xmu.mnkeeptime = atoi(strkeep.data()); } gvectorquerymsgunit.push_back(xmu); } } } else { } if(config["ctrlmessage"]) { std::string strnodename = "ctrlmessage"; for(YAML::const_iterator it= config[strnodename].begin(); it != config[strnodename].end();++it) { std::string strtitle = it->first.as(); std::cout<(); strncpy(xmu.mstrmsgname,strmsgname.data(),255); xmu.mnBufferSize = config[strnodename][strtitle]["buffersize"].as(); xmu.mnBufferCount = config[strnodename][strtitle]["buffercount"].as(); gvectorctrlmsgunit.push_back(xmu); } } } else { } return; } // Logic and data behind the server's behavior. class ivgrpcServiceImpl final : public iv::ivgrpc::Service { Status query(ServerContext* context, const iv::queryreq* request, iv::queryReply* reply) override { (void)context; qint64 nCurTime = QDateTime::currentMSecsSinceEpoch(); gMutexMsg.lock(); int i; for(i=0;istrmsgname_size();i++) { int j; int nsize = gvectorquerymsgunit.size(); for(j=0;jstrmsgname(i).data(),gvectorquerymsgunit[j].mstrmsgname,256) == 0) { if(((nCurTime - gvectorquerymsgunit[j].mRecvTime)nindex()nindex()>gnqueryindex))) { iv::ModuleMsg xmsg; xmsg.set_index(gvectorquerymsgunit[j].mnIndex); xmsg.set_msgname(gvectorquerymsgunit[j].mstrmsgname); xmsg.set_xdata(gvectorquerymsgunit[j].mpstrmsgdata.get(),gvectorquerymsgunit[j].mndatasize); xmsg.set_nlen(gvectorquerymsgunit[j].mndatasize); iv::ModuleMsg * pmsg = reply->add_msg(); pmsg->CopyFrom(xmsg); } } } } gMutexMsg.unlock(); reply->set_nlastindex(gnqueryindex); return Status::OK; } Status ctrl(ServerContext* context, const iv::ctrlreq* request, iv::ctrlReply * reply) override { (void)context; int i; for(i=0;imsg_size();i++) { gMutexCtrl.lock(); int j; int nsize = gvectorctrlmsgunit.size(); for(j=0;jmsg(i).msgname().data(),gvectorctrlmsgunit[j].mstrmsgname,256) == 0) { std::cout<<"ctrl message"<msg(i).xdata().data(), request->msg(i).xdata().size()); } } gMutexCtrl.unlock(); } reply->set_nid(request->nid()); return Status::OK; } }; void RunServer() { std::string server_address("0.0.0.0:30051"); ivgrpcServiceImpl service; grpc::EnableDefaultHealthCheckService(true); // grpc::reflection::InitProtoReflectionServerBuilderPlugin(); ServerBuilder builder; // Listen on the given address without any authentication mechanism. builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); builder.SetMaxReceiveMessageSize(300000000); // builder.SetMaxMessageSize(100000000); // builder.SetMaxSendMessageSize(100000000); // Register "service" as the instance through which we'll communicate with // clients. In this case it corresponds to an *synchronous* service. builder.RegisterService(&service); // Finally assemble the server. std::unique_ptr server(builder.BuildAndStart()); std::cout << "Server listening on " << server_address << std::endl; // Wait for the server to shutdown. Note that some other thread must be // responsible for shutting down the server for this call to ever return. server->Wait(); } void ListenData(const char * strdata,const unsigned int nSize,const unsigned int index,const QDateTime * dt,const char * strmemname) { (void)&index; (void)dt; int nsize = gvectorquerymsgunit.size(); int i; for(i=0;i