||
- #include "grpcpc.h"
- #include <memory>
- static grpcpc * ggrpcpc;
- grpcpc::grpcpc(std::string stryamlpath)
- {
- mstrpicmsgname[0] = "picfront";
- mstrpicmsgname[1] = "picrear";
- mstrpicmsgname[2] = "picleft";
- mstrpicmsgname[3] = "picright";
- ggrpcpc = this;
- unsigned int i;
- for(i=0;i<NUM_CAM;i++)
- {
- mnPicUpLatency[i] = 1000;
- mnFrameRate[i] = 0;
- mnPicDownLatency[i] = 1000;
- }
- for(i=0;i<NUM_CAM;i++)
- {
- unsigned int j;
- for(j=0;j<NUM_THREAD_PERCAM;j++)
- {
- mpThread[i*NUM_THREAD_PERCAM + j] = new std::thread(&grpcpc::threadpicdownload,this,i);
- }
- }
- }
- void grpcpc::run()
- {
- int nsize = mvectormsgunit.size();
- int nctrlsize = mvectorctrlmsgunit.size();
- int i;
- qint64 nlasttime = 0;
- int ninterval = atoi(gstruploadinterval.data());
- if(ninterval<=0)ninterval = 100;
- QTime xTime;
- xTime.start();
- int nlastsend = xTime.elapsed();
- std::string target_str = gstrserverip+":";
- target_str = target_str + gstrserverport ;//std::to_string()
- auto cargs = grpc::ChannelArguments();
- cargs.SetMaxReceiveMessageSize(1024 * 1024 * 1024); // 1 GB
- cargs.SetMaxSendMessageSize(1024 * 1024 * 1024);
- std::shared_ptr<Channel> channel = grpc::CreateCustomChannel(
- target_str, grpc::InsecureChannelCredentials(),cargs);
- std::unique_ptr<iv::UploadThread::Stub> stub_ = iv::UploadThread::NewStub(channel);
- iv::queryReqThread request;
- iv::queryReplyThread xreply;
- int nid = 0;
- int nctrlid = 0;
- // Container for the data we expect from the server.
- // iv::queryReply reply;
- gpr_timespec timespec;
- timespec.tv_sec = 30;//设置阻塞时间为2秒
- timespec.tv_nsec = 0;
- timespec.clock_type = GPR_TIMESPAN;
- qint64 nLastCtrlTime = 0;
- while(!QThread::isInterruptionRequested())
- {
- std::this_thread::sleep_for(std::chrono::milliseconds(1));
- if(abs(xTime.elapsed()-nlastsend)<ninterval)
- {
- continue;
- }
- bool bImportant = false;
- int nkeeptime = 0;
- iv::cloud::cloudmsg xmsg;
- xmsg.set_xtime(QDateTime::currentMSecsSinceEpoch());
- nlastsend = xTime.elapsed();
- {
- ClientContext context ;
- context.set_deadline(timespec);
- // qint64 time1 = QDateTime::currentMSecsSinceEpoch();
- request.set_strquerymd5(gstrqueryMD5);
- request.set_strvin(gstrVIN);
- request.set_nlasttime(nlasttime);
- request.set_id(nctrlid);nctrlid++;
- request.set_strctrlmd5(gstrctrlMD5);
- request.set_strvin(gstrVIN);
- request.set_ntime(QDateTime::currentMSecsSinceEpoch());
- request.set_bimportant(bImportant);
- request.set_kepptime(nkeeptime);
- if(nLastCtrlTime != mnmsgsendupdatetime)
- {
- mMutexmsgsend.lock();
- nLastCtrlTime = mnmsgsendupdatetime;
- xmsg.CopyFrom(mmsgsend);
- mMutexmsgsend.unlock();
- if(xmsg.xclouddata_size()>0)
- {
- int nbytesize = xmsg.ByteSize();
- std::vector<char> pvectordata;
- pvectordata.resize(nbytesize);
- if(xmsg.SerializeToArray(pvectordata.data(),nbytesize))
- {
- request.set_xdata(pvectordata.data(),pvectordata.size());
- }
- }
- }
- QDateTime xTime;
- xTime.fromMSecsSinceEpoch(1607905685318); //1607914763641
- // qDebug("time:%s",xTime.toString("yyyy-MM-dd:hh:mm:ss:zzz").toLatin1().data());
- // qDebug("nlasttime is %ld",nlasttime);//1607905685318
- nid++;
- // The actual RPC.
- Status status = stub_->queryctrl(&context, request, &xreply);
- if (status.ok()) {
- // std::cout<<nid<<" query successfully, res is "<<xreply.nres()<<std::endl;
- if(xreply.nres() == 1)
- {
- if(nlasttime != xmsg.xtime())
- {
- iv::cloud::cloudmsg xmsg;
- if(xmsg.ParseFromArray(xreply.xdata().data(),xreply.xdata().size()))
- {
- mMutexmsgrecv.lock();
- mmsgrecv.CopyFrom(xmsg);
- mnmsgrecvupdatetime = QDateTime::currentMSecsSinceEpoch();
- mMutexmsgrecv.unlock();
- }
- }
- nlasttime = xreply.ntime();
- }
- else
- {
- std::this_thread::sleep_for(std::chrono::milliseconds(30));
- }
- } else {
- std::cout << status.error_code() << ": " << status.error_message()
- << std::endl;
- std::cout<<"RPC failed"<<std::endl;
- if(status.error_code() == 4)
- {
- std::cout<<" RPC Exceed Time, Create New stub_"<<std::endl;
- channel = grpc::CreateCustomChannel(
- target_str, grpc::InsecureChannelCredentials(),cargs);
- stub_ = iv::UploadThread::NewStub(channel);
- }
- std::this_thread::sleep_for(std::chrono::milliseconds(900));
- }
- }
- }
- }
- std::string grpcpc::GetVIN()
- {
- return gstrVIN;
- }
- //int gnPicNum[NUM_CAM];
- //QMutex gMutexPic[NUM_CAM];
- //qint64 gnTimeSecond[NUM_CAM];
- void grpcpc::threadpicdownload(int nCamPos)
- {
- std::cout<<"thread cam "<<nCamPos<<"run"<<std::endl;
- int nsize = mvectormsgunit.size();
- int i;
- std::string strcclientid = "civetweb";//ServiceRCIni.GetClientID();
- int ninterval = atoi(gstruploadinterval.data());
- if(ninterval<=0)ninterval = 100;
- QTime xTime;
- xTime.start();
- int nlastsend = xTime.elapsed();
- std::string target_str = gstrserverip+":";
- target_str = target_str + gstrserverport ;//std::to_string()
- auto cargs = grpc::ChannelArguments();
- cargs.SetMaxReceiveMessageSize(1024 * 1024 * 1024); // 1 GB
- cargs.SetMaxSendMessageSize(1024 * 1024 * 1024);
- std::shared_ptr<Channel> channel = grpc::CreateCustomChannel(
- target_str, grpc::InsecureChannelCredentials(),cargs);
- std::unique_ptr<iv::UploadThread::Stub> stub_ = iv::UploadThread::NewStub(channel);
- iv::PicDownReqThread request;
- int nid = 0;
- // Container for the data we expect from the server.
- iv::PicDownReplyThread reply;
- gpr_timespec timespec;
- timespec.tv_sec = 30;//设置阻塞时间为2秒
- timespec.tv_nsec = 0;
- timespec.clock_type = GPR_TIMESPAN;
- // ClientContext context;
- while(true)
- {
- std::shared_ptr<char> pstr_ptr;
- if((nCamPos<0)||(nCamPos >= NUM_CAM))
- {
- std::cout<<"Cam Pos Error. "<<"Pos: "<<nCamPos<<" TOTAL:"<<NUM_CAM<<std::endl;
- std::this_thread::sleep_for(std::chrono::milliseconds(100));
- continue;
- }
- request.set_strclientid(strcclientid);
- request.set_ncampos(nCamPos);
- request.set_strquerymd5(gstrqueryMD5);
- request.set_strvin(gstrVIN);
- ClientContext context ;
- context.set_deadline(timespec);
- qint64 time1 = QDateTime::currentMSecsSinceEpoch();
- //If extend 10 seconds not request picture, pause get picture from server.
- if((time1 - mnLastGetPicTime)>10000)
- {
- std::cout<<"not need updata"<<std::endl;
- std::this_thread::sleep_for(std::chrono::milliseconds(50));
- continue;
- }
- nlastsend = xTime.elapsed();
- // The actual RPC.
- Status status = stub_->querypic(&context, request, &reply);
- if (status.ok()) {
- if(reply.nres() == 1)
- {
- std::cout<<nCamPos<<":pic time is "<<reply.npictime()<<std::endl;
- mnPicUpLatency[nCamPos] = reply.npicuplatency();
- mnFrameRate[nCamPos] = reply.npicframerate();
- mnPicDownLatency[nCamPos] = QDateTime::currentMSecsSinceEpoch() - time1;
- iv::vision::rawpic xrawpic;
- if(xrawpic.ParseFromArray(reply.xdata().data(),reply.xdata().size()))
- {
- mMutexPic[nCamPos].lock();
- mRawPic[nCamPos].CopyFrom(xrawpic);
- mnPicUpdateTime[nCamPos] = QDateTime::currentMSecsSinceEpoch();
- xrawpic.CopyFrom(mRawPic[nCamPos]);
- // iv::modulecomm::ModuleSendMsg(mpaPic[nCamPos],reply.xdata().data(),reply.xdata().size());
- mMutexPic[nCamPos].unlock();
- }
- // iv::cloud::cloudmsg xmsg;
- // if(xmsg.ParseFromArray(reply.xdata().data(),reply.xdata().size()))
- // {
- // sharectrlmsg(&xmsg);
- // }
- }
- else
- {
- std::this_thread::sleep_for(std::chrono::milliseconds(10*NUM_THREAD_PERCAM));
- }
- } else {
- std::cout << status.error_code() << ": " << status.error_message()
- << std::endl;
- std::cout<<"camera dowm"<<nCamPos<<" RPC failed"<<std::endl;
- if(status.error_code() == 4)
- {
- std::cout<<nCamPos<<" RPC Exceed Time, Create New stub_"<<std::endl;
- channel = grpc::CreateCustomChannel(
- target_str, grpc::InsecureChannelCredentials(),cargs);
- stub_ = iv::UploadThread::NewStub(channel);
- }
- std::this_thread::sleep_for(std::chrono::milliseconds(900));
- }
- }
- }
- qint64 grpcpc::GetPicLatency(int nCamPos)
- {
- if((nCamPos < 0)||(nCamPos >= NUM_CAM))return -1;
- return mnPicUpLatency[nCamPos];
- }
- int grpcpc::GetFrameRate(int nCamPos)
- {
- if((nCamPos < 0)||(nCamPos >= NUM_CAM))return -1;
- return mnFrameRate[nCamPos];
- }
- qint64 grpcpc::GetPicDownLatency(int nCamPos)
- {
- if((nCamPos < 0)||(nCamPos >= NUM_CAM))return -1;
- return mnPicDownLatency[nCamPos];
- }
- void grpcpc::setserverip(std::string strip)
- {
- gstrserverip = strip;
- }
- void grpcpc::setserverport(std::string strport)
- {
- gstrserverport = strport;
- }
- void grpcpc::setqueryinterval(std::string strinterval)
- {
- gstruploadinterval = strinterval;
- }
- void grpcpc::setVIN(std::string strVIN)
- {
- gstrVIN = strVIN;
- }
- void grpcpc::setqueryMD5(std::string strmd5)
- {
- gstrqueryMD5 = strmd5;
- }
- void grpcpc::setctrlMD5(std::string strmd5)
- {
- gstrctrlMD5 = strmd5;
- }
- int grpcpc::GetRawPic(unsigned int camindex,iv::vision::rawpic & xrawpic)
- {
- mnLastGetPicTime = QDateTime::currentMSecsSinceEpoch();
- if((camindex >= NUM_CAM))return -1;
- qint64 now = QDateTime::currentMSecsSinceEpoch();
- if((now - mnPicUpdateTime[camindex])>1000)
- {
- return 0;
- }
- mMutexPic[camindex].lock();
- xrawpic.CopyFrom(mRawPic[camindex]);
- mMutexPic[camindex].unlock();
- return 1;
- }
- int grpcpc::GetRecvMsg(iv::cloud::cloudmsg & xmsg)
- {
- mnLastGetmsgTime = QDateTime::currentMSecsSinceEpoch();
- qint64 now = QDateTime::currentMSecsSinceEpoch();
- if((now - mnmsgrecvupdatetime) > 1000)
- {
- return 0;
- }
- mMutexmsgrecv.lock();
- xmsg.CopyFrom(mmsgrecv);
- mMutexmsgrecv.unlock();
- return 1;
- }
- int grpcpc::SetSendMsg(iv::cloud::cloudmsg & xmsg)
- {
- mMutexmsgsend.lock();
- mmsgsend.CopyFrom(xmsg);
- mnmsgsendupdatetime = QDateTime::currentMSecsSinceEpoch();
- mMutexmsgsend.unlock();
- return 0;
- }
|