main.cpp 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294
  1. #include <QCoreApplication>
  2. #include <QDateTime>
  3. #include <iostream>
  4. #include <vector>
  5. #include <iostream>
  6. #include <memory>
  7. #include <string>
  8. #include <yaml-cpp/yaml.h>
  9. #include <grpcpp/grpcpp.h>
  10. #include <grpcpp/health_check_service_interface.h>
  11. #include <grpcpp/ext/proto_server_reflection_plugin.h>
  12. #include "ivgrpc.grpc.pb.h"
  13. #include "rpcmsgunit.h"
  14. #include "modulecomm.h"
  15. using grpc::Server;
  16. using grpc::ServerBuilder;
  17. using grpc::ServerContext;
  18. using grpc::Status;
  19. #include <QDateTime>
  20. #include <QMutex>
  21. QMutex gMutexMsg;
  22. QMutex gMutexCtrl;
  23. std::string gstrserverport;
  24. std::vector<iv::rpcmsgunit> gvectorquerymsgunit;
  25. std::vector<iv::rpcmsgunit> gvectorctrlmsgunit;
  26. qint64 gnqueryindex = 0;
  27. void dec_yaml(const char * stryamlpath)
  28. {
  29. YAML::Node config;
  30. try
  31. {
  32. config = YAML::LoadFile(stryamlpath);
  33. }
  34. catch(YAML::BadFile e)
  35. {
  36. qDebug("load error.");
  37. return;
  38. }
  39. std::vector<std::string> vecmodulename;
  40. if(config["port"])
  41. {
  42. gstrserverport = config["port"].as<std::string>();
  43. }
  44. std::string strmsgname;
  45. if(config["querymessage"])
  46. {
  47. for(YAML::const_iterator it= config["querymessage"].begin(); it != config["querymessage"].end();++it)
  48. {
  49. std::string strtitle = it->first.as<std::string>();
  50. std::cout<<strtitle<<std::endl;
  51. if(config["querymessage"][strtitle]["msgname"]&&config["querymessage"][strtitle]["buffersize"]&&config["querymessage"][strtitle]["buffercount"])
  52. {
  53. iv::rpcmsgunit xmu;
  54. strmsgname = config["querymessage"][strtitle]["msgname"].as<std::string>();
  55. strncpy(xmu.mstrmsgname,strmsgname.data(),255);
  56. xmu.mnBufferSize = config["querymessage"][strtitle]["buffersize"].as<int>();
  57. xmu.mnBufferCount = config["querymessage"][strtitle]["buffercount"].as<int>();
  58. if(config["querymessage"][strtitle]["keeptime"])
  59. {
  60. std::string strkeep = config["querymessage"][strtitle]["keeptime"].as<std::string>();
  61. xmu.mnkeeptime = atoi(strkeep.data());
  62. }
  63. gvectorquerymsgunit.push_back(xmu);
  64. }
  65. }
  66. }
  67. else
  68. {
  69. }
  70. if(config["ctrlmessage"])
  71. {
  72. std::string strnodename = "ctrlmessage";
  73. for(YAML::const_iterator it= config[strnodename].begin(); it != config[strnodename].end();++it)
  74. {
  75. std::string strtitle = it->first.as<std::string>();
  76. std::cout<<strtitle<<std::endl;
  77. if(config[strnodename][strtitle]["msgname"]&&config[strnodename][strtitle]["buffersize"]&&config[strnodename][strtitle]["buffercount"])
  78. {
  79. iv::rpcmsgunit xmu;
  80. strmsgname = config[strnodename][strtitle]["msgname"].as<std::string>();
  81. strncpy(xmu.mstrmsgname,strmsgname.data(),255);
  82. xmu.mnBufferSize = config[strnodename][strtitle]["buffersize"].as<int>();
  83. xmu.mnBufferCount = config[strnodename][strtitle]["buffercount"].as<int>();
  84. gvectorctrlmsgunit.push_back(xmu);
  85. }
  86. }
  87. }
  88. else
  89. {
  90. }
  91. return;
  92. }
  93. // Logic and data behind the server's behavior.
  94. class ivgrpcServiceImpl final : public iv::ivgrpc::Service {
  95. Status query(ServerContext* context, const iv::queryreq* request,
  96. iv::queryReply* reply) override {
  97. (void)context;
  98. qint64 nCurTime = QDateTime::currentMSecsSinceEpoch();
  99. gMutexMsg.lock();
  100. int i;
  101. for(i=0;i<request->strmsgname_size();i++)
  102. {
  103. int j;
  104. int nsize = gvectorquerymsgunit.size();
  105. for(j=0;j<nsize;j++)
  106. {
  107. if(strncmp(request->strmsgname(i).data(),gvectorquerymsgunit[j].mstrmsgname,256) == 0)
  108. {
  109. if(((nCurTime - gvectorquerymsgunit[j].mRecvTime)<gvectorquerymsgunit[j].mnkeeptime) &&((request->nindex()<gvectorquerymsgunit[j].mnIndex)||(request->nindex()>gnqueryindex)))
  110. {
  111. iv::ModuleMsg xmsg;
  112. xmsg.set_index(gvectorquerymsgunit[j].mnIndex);
  113. xmsg.set_msgname(gvectorquerymsgunit[j].mstrmsgname);
  114. xmsg.set_xdata(gvectorquerymsgunit[j].mpstrmsgdata.get(),gvectorquerymsgunit[j].mndatasize);
  115. xmsg.set_nlen(gvectorquerymsgunit[j].mndatasize);
  116. iv::ModuleMsg * pmsg = reply->add_msg();
  117. pmsg->CopyFrom(xmsg);
  118. }
  119. }
  120. }
  121. }
  122. gMutexMsg.unlock();
  123. reply->set_nlastindex(gnqueryindex);
  124. return Status::OK;
  125. }
  126. Status ctrl(ServerContext* context, const iv::ctrlreq* request,
  127. iv::ctrlReply * reply) override {
  128. (void)context;
  129. int i;
  130. for(i=0;i<request->msg_size();i++)
  131. {
  132. gMutexCtrl.lock();
  133. int j;
  134. int nsize = gvectorctrlmsgunit.size();
  135. for(j=0;j<nsize;j++)
  136. {
  137. if(strncmp(request->msg(i).msgname().data(),gvectorctrlmsgunit[j].mstrmsgname,256) == 0)
  138. {
  139. std::cout<<"ctrl message"<<std::endl;
  140. iv::modulecomm::ModuleSendMsg(gvectorctrlmsgunit[j].mpa,request->msg(i).xdata().data(),
  141. request->msg(i).xdata().size());
  142. }
  143. }
  144. gMutexCtrl.unlock();
  145. }
  146. reply->set_nid(request->nid());
  147. return Status::OK;
  148. }
  149. };
  150. void RunServer() {
  151. std::string server_address("0.0.0.0:30051");
  152. ivgrpcServiceImpl service;
  153. grpc::EnableDefaultHealthCheckService(true);
  154. // grpc::reflection::InitProtoReflectionServerBuilderPlugin();
  155. ServerBuilder builder;
  156. // Listen on the given address without any authentication mechanism.
  157. builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
  158. builder.SetMaxReceiveMessageSize(300000000);
  159. // builder.SetMaxMessageSize(100000000);
  160. // builder.SetMaxSendMessageSize(100000000);
  161. // Register "service" as the instance through which we'll communicate with
  162. // clients. In this case it corresponds to an *synchronous* service.
  163. builder.RegisterService(&service);
  164. // Finally assemble the server.
  165. std::unique_ptr<Server> server(builder.BuildAndStart());
  166. std::cout << "Server listening on " << server_address << std::endl;
  167. // Wait for the server to shutdown. Note that some other thread must be
  168. // responsible for shutting down the server for this call to ever return.
  169. server->Wait();
  170. }
  171. void ListenData(const char * strdata,const unsigned int nSize,const unsigned int index,const QDateTime * dt,const char * strmemname)
  172. {
  173. (void)&index;
  174. (void)dt;
  175. int nsize = gvectorquerymsgunit.size();
  176. int i;
  177. for(i=0;i<nsize;i++)
  178. {
  179. if(strncmp(strmemname,gvectorquerymsgunit[i].mstrmsgname,255) == 0)
  180. {
  181. gMutexMsg.lock();
  182. char * strtem = new char[nSize];
  183. memcpy(strtem,strdata,nSize);
  184. gvectorquerymsgunit[i].mpstrmsgdata.reset(strtem);
  185. gvectorquerymsgunit[i].mndatasize = nSize;
  186. // std::cout<<" nsize is "<<nSize<<std::endl;
  187. gvectorquerymsgunit[i].mbRefresh = true;
  188. gvectorquerymsgunit[i].mRecvTime = QDateTime::currentMSecsSinceEpoch();
  189. gnqueryindex++;gvectorquerymsgunit[i].mnIndex = gnqueryindex;
  190. gMutexMsg.unlock();
  191. break;
  192. }
  193. }
  194. }
  195. void Init()
  196. {
  197. unsigned int i;
  198. for(i=0;i<gvectorctrlmsgunit.size();i++)
  199. {
  200. gvectorctrlmsgunit[i].mpa = iv::modulecomm::RegisterSend(gvectorctrlmsgunit[i].mstrmsgname,gvectorctrlmsgunit[i].mnBufferSize,
  201. gvectorctrlmsgunit[i].mnBufferCount);
  202. }
  203. for(i=0;i<gvectorquerymsgunit.size();i++)
  204. {
  205. gvectorquerymsgunit[i].mpa = iv::modulecomm::RegisterRecv(gvectorquerymsgunit[i].mstrmsgname,ListenData);
  206. }
  207. }
  208. int main(int argc, char *argv[])
  209. {
  210. QCoreApplication a(argc, argv);
  211. if(argc<2)
  212. {
  213. dec_yaml("./driver_grpc_server.yaml");
  214. }
  215. else
  216. {
  217. dec_yaml(argv[1]);
  218. }
  219. Init();
  220. RunServer();
  221. return a.exec();
  222. }