| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294 |
- #include <QCoreApplication>
- #include <QDateTime>
- #include <iostream>
- #include <vector>
- #include <iostream>
- #include <memory>
- #include <string>
- #include <yaml-cpp/yaml.h>
- #include <grpcpp/grpcpp.h>
- #include <grpcpp/health_check_service_interface.h>
- #include <grpcpp/ext/proto_server_reflection_plugin.h>
- #include "ivgrpc.grpc.pb.h"
- #include "rpcmsgunit.h"
- #include "modulecomm.h"
- using grpc::Server;
- using grpc::ServerBuilder;
- using grpc::ServerContext;
- using grpc::Status;
- #include <QDateTime>
- #include <QMutex>
- QMutex gMutexMsg;
- QMutex gMutexCtrl;
- std::string gstrserverport;
- std::vector<iv::rpcmsgunit> gvectorquerymsgunit;
- std::vector<iv::rpcmsgunit> gvectorctrlmsgunit;
- qint64 gnqueryindex = 0;
- void dec_yaml(const char * stryamlpath)
- {
- YAML::Node config;
- try
- {
- config = YAML::LoadFile(stryamlpath);
- }
- catch(YAML::BadFile e)
- {
- qDebug("load error.");
- return;
- }
- std::vector<std::string> vecmodulename;
- if(config["port"])
- {
- gstrserverport = config["port"].as<std::string>();
- }
- std::string strmsgname;
- if(config["querymessage"])
- {
- for(YAML::const_iterator it= config["querymessage"].begin(); it != config["querymessage"].end();++it)
- {
- std::string strtitle = it->first.as<std::string>();
- std::cout<<strtitle<<std::endl;
- if(config["querymessage"][strtitle]["msgname"]&&config["querymessage"][strtitle]["buffersize"]&&config["querymessage"][strtitle]["buffercount"])
- {
- iv::rpcmsgunit xmu;
- strmsgname = config["querymessage"][strtitle]["msgname"].as<std::string>();
- strncpy(xmu.mstrmsgname,strmsgname.data(),255);
- xmu.mnBufferSize = config["querymessage"][strtitle]["buffersize"].as<int>();
- xmu.mnBufferCount = config["querymessage"][strtitle]["buffercount"].as<int>();
- if(config["querymessage"][strtitle]["keeptime"])
- {
- std::string strkeep = config["querymessage"][strtitle]["keeptime"].as<std::string>();
- xmu.mnkeeptime = atoi(strkeep.data());
- }
- gvectorquerymsgunit.push_back(xmu);
- }
- }
- }
- else
- {
- }
- if(config["ctrlmessage"])
- {
- std::string strnodename = "ctrlmessage";
- for(YAML::const_iterator it= config[strnodename].begin(); it != config[strnodename].end();++it)
- {
- std::string strtitle = it->first.as<std::string>();
- std::cout<<strtitle<<std::endl;
- if(config[strnodename][strtitle]["msgname"]&&config[strnodename][strtitle]["buffersize"]&&config[strnodename][strtitle]["buffercount"])
- {
- iv::rpcmsgunit xmu;
- strmsgname = config[strnodename][strtitle]["msgname"].as<std::string>();
- strncpy(xmu.mstrmsgname,strmsgname.data(),255);
- xmu.mnBufferSize = config[strnodename][strtitle]["buffersize"].as<int>();
- xmu.mnBufferCount = config[strnodename][strtitle]["buffercount"].as<int>();
- gvectorctrlmsgunit.push_back(xmu);
- }
- }
- }
- else
- {
- }
- return;
- }
- // Logic and data behind the server's behavior.
- class ivgrpcServiceImpl final : public iv::ivgrpc::Service {
- Status query(ServerContext* context, const iv::queryreq* request,
- iv::queryReply* reply) override {
- (void)context;
- qint64 nCurTime = QDateTime::currentMSecsSinceEpoch();
- gMutexMsg.lock();
- int i;
- for(i=0;i<request->strmsgname_size();i++)
- {
- int j;
- int nsize = gvectorquerymsgunit.size();
- for(j=0;j<nsize;j++)
- {
- if(strncmp(request->strmsgname(i).data(),gvectorquerymsgunit[j].mstrmsgname,256) == 0)
- {
- if(((nCurTime - gvectorquerymsgunit[j].mRecvTime)<gvectorquerymsgunit[j].mnkeeptime) &&((request->nindex()<gvectorquerymsgunit[j].mnIndex)||(request->nindex()>gnqueryindex)))
- {
- iv::ModuleMsg xmsg;
- xmsg.set_index(gvectorquerymsgunit[j].mnIndex);
- xmsg.set_msgname(gvectorquerymsgunit[j].mstrmsgname);
- xmsg.set_xdata(gvectorquerymsgunit[j].mpstrmsgdata.get(),gvectorquerymsgunit[j].mndatasize);
- xmsg.set_nlen(gvectorquerymsgunit[j].mndatasize);
- iv::ModuleMsg * pmsg = reply->add_msg();
- pmsg->CopyFrom(xmsg);
- }
- }
- }
- }
- gMutexMsg.unlock();
- reply->set_nlastindex(gnqueryindex);
- return Status::OK;
- }
- Status ctrl(ServerContext* context, const iv::ctrlreq* request,
- iv::ctrlReply * reply) override {
- (void)context;
- int i;
- for(i=0;i<request->msg_size();i++)
- {
- gMutexCtrl.lock();
- int j;
- int nsize = gvectorctrlmsgunit.size();
- for(j=0;j<nsize;j++)
- {
- if(strncmp(request->msg(i).msgname().data(),gvectorctrlmsgunit[j].mstrmsgname,256) == 0)
- {
- std::cout<<"ctrl message"<<std::endl;
- iv::modulecomm::ModuleSendMsg(gvectorctrlmsgunit[j].mpa,request->msg(i).xdata().data(),
- request->msg(i).xdata().size());
- }
- }
- gMutexCtrl.unlock();
- }
- reply->set_nid(request->nid());
- return Status::OK;
- }
- };
- void RunServer() {
- std::string server_address("0.0.0.0:30051");
- ivgrpcServiceImpl service;
- grpc::EnableDefaultHealthCheckService(true);
- // grpc::reflection::InitProtoReflectionServerBuilderPlugin();
- ServerBuilder builder;
- // Listen on the given address without any authentication mechanism.
- builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
- builder.SetMaxReceiveMessageSize(300000000);
- // builder.SetMaxMessageSize(100000000);
- // builder.SetMaxSendMessageSize(100000000);
- // Register "service" as the instance through which we'll communicate with
- // clients. In this case it corresponds to an *synchronous* service.
- builder.RegisterService(&service);
- // Finally assemble the server.
- std::unique_ptr<Server> server(builder.BuildAndStart());
- std::cout << "Server listening on " << server_address << std::endl;
- // Wait for the server to shutdown. Note that some other thread must be
- // responsible for shutting down the server for this call to ever return.
- server->Wait();
- }
- void ListenData(const char * strdata,const unsigned int nSize,const unsigned int index,const QDateTime * dt,const char * strmemname)
- {
- (void)&index;
- (void)dt;
- int nsize = gvectorquerymsgunit.size();
- int i;
- for(i=0;i<nsize;i++)
- {
- if(strncmp(strmemname,gvectorquerymsgunit[i].mstrmsgname,255) == 0)
- {
- gMutexMsg.lock();
- char * strtem = new char[nSize];
- memcpy(strtem,strdata,nSize);
- gvectorquerymsgunit[i].mpstrmsgdata.reset(strtem);
- gvectorquerymsgunit[i].mndatasize = nSize;
- // std::cout<<" nsize is "<<nSize<<std::endl;
- gvectorquerymsgunit[i].mbRefresh = true;
- gvectorquerymsgunit[i].mRecvTime = QDateTime::currentMSecsSinceEpoch();
- gnqueryindex++;gvectorquerymsgunit[i].mnIndex = gnqueryindex;
- gMutexMsg.unlock();
- break;
- }
- }
- }
- void Init()
- {
- unsigned int i;
- for(i=0;i<gvectorctrlmsgunit.size();i++)
- {
- gvectorctrlmsgunit[i].mpa = iv::modulecomm::RegisterSend(gvectorctrlmsgunit[i].mstrmsgname,gvectorctrlmsgunit[i].mnBufferSize,
- gvectorctrlmsgunit[i].mnBufferCount);
- }
- for(i=0;i<gvectorquerymsgunit.size();i++)
- {
- gvectorquerymsgunit[i].mpa = iv::modulecomm::RegisterRecv(gvectorquerymsgunit[i].mstrmsgname,ListenData);
- }
- }
- int main(int argc, char *argv[])
- {
- QCoreApplication a(argc, argv);
- if(argc<2)
- {
- dec_yaml("./driver_grpc_server.yaml");
- }
- else
- {
- dec_yaml(argv[1]);
- }
- Init();
- RunServer();
- return a.exec();
- }
|