Prechádzať zdrojové kódy

add driver_cloud_grpc_pc_stream and driver_cloud_grpc_server_stream

yuchuli 4 rokov pred
rodič
commit
1c2e074f50
22 zmenil súbory, kde vykonal 2084 pridanie a 5 odobranie
  1. 115 0
      deploywithfind.sh
  2. 1 1
      src/driver/driver_cloud_grpc_client_stream/prototocpp.txt
  3. 2 2
      src/driver/driver_cloud_grpc_client_stream/uploadstreammsg.proto
  4. 60 0
      src/driver/driver_cloud_grpc_pc_stream/driver_cloud_grpc_pc_stream.pro
  5. 33 0
      src/driver/driver_cloud_grpc_pc_stream/driver_cloud_grpc_pc_stream.yaml
  6. 339 0
      src/driver/driver_cloud_grpc_pc_stream/grpcpc.cpp
  7. 93 0
      src/driver/driver_cloud_grpc_pc_stream/grpcpc.h
  8. 14 0
      src/driver/driver_cloud_grpc_pc_stream/main.cpp
  9. 443 0
      src/driver/driver_cloud_grpc_pc_stream/mainwindow.cpp
  10. 25 0
      src/driver/driver_cloud_grpc_pc_stream/mainwindow.h
  11. 22 0
      src/driver/driver_cloud_grpc_pc_stream/mainwindow.ui
  12. 3 0
      src/driver/driver_cloud_grpc_pc_stream/prototocpp.txt
  13. 73 0
      src/driver/driver_cloud_grpc_pc_stream/uploadstreammsg.proto
  14. 109 0
      src/driver/driver_cloud_grpc_server_stream/cumsgbuffer.cpp
  15. 53 0
      src/driver/driver_cloud_grpc_server_stream/cumsgbuffer.h
  16. 54 0
      src/driver/driver_cloud_grpc_server_stream/driver_cloud_grpc_server_stream.pro
  17. 330 0
      src/driver/driver_cloud_grpc_server_stream/main.cpp
  18. 171 0
      src/driver/driver_cloud_grpc_server_stream/pcmsgbuffer.cpp
  19. 66 0
      src/driver/driver_cloud_grpc_server_stream/pcmsgbuffer.h
  20. 3 0
      src/driver/driver_cloud_grpc_server_stream/prototocpp.txt
  21. 73 0
      src/driver/driver_cloud_grpc_server_stream/uploadstreammsg.proto
  22. 2 2
      src/include/proto3/uploadstreammsg.proto

+ 115 - 0
deploywithfind.sh

@@ -0,0 +1,115 @@
+#! /bin/bash
+
+Qtgccdir=''
+if [ ${#Qtgccdir} -lt 6 ]; then
+  echo "Because not set gcc_64 , so auto find gcc_64 "
+  optfiles=`find /opt -name 'gcc_64'` 
+  for entry in $optfiles
+  do
+     if [ ${entry:0-6:6} == "gcc_64" ];  then
+       if [ -d $entry ];  then
+          Qtgccdir="$entry"
+	  echo -e "\033[32m""  -----found gccdir:"$Qtgccdir"\033[0m"
+       fi
+     fi
+  done
+fi
+
+if [ ${#Qtgccdir} -lt 6 ]; then
+  if [ -d '/usr/lib/aarch64-linux-gnu/qt5' ]; then
+    Qtgccdir='/usr/lib/aarch64-linux-gnu/qt5'
+  else 
+    echo "if NVIDIA,please sudo apt install qt"
+  fi
+fi
+
+if [ ${#Qtgccdir} -lt 6 ]; then
+   echo -e "\033[31m""  -----not found gccdir: so exit""\033[0m"
+   exit 1
+fi
+
+
+ignore_lib_name=(
+#libstdc++.so.*
+libm.so.*
+#libgcc_s.so.*
+#libc.so.*
+libpthread.so.*
+libGL.so.*
+libz.so.*
+libgthread*
+libglib*
+libexpat*
+libxcb*
+libdl.so.*
+libxshmfence*
+libglapi.so.*
+libXext.so.*
+libXdamage.so.*
+libXfixes.so.*
+libX11*
+libXxf86vm.so.*
+libdrm.so.*
+libpcre.so.*
+libXau.so.*
+libXdmcp.so.*
+)
+
+EXE="$1"
+PWD=`pwd`
+rm -rf app
+mkdir app
+cd app
+mkdir lib
+cd ..
+mkdir commonlib
+cd commonlib
+mkdir platforms
+cp $QtPlatformdir/libqxcb.so platforms
+cd platforms
+mkdir lib
+libfiles=`ldd libqxcb.so | awk '{ if(match($3,"^/"))printf("%s "),$3 }'` 
+cp $libfiles $PWD/lib 
+cd ..
+mkdir lib
+cp $QtLibDir/libQt5DBus.* $PWD/lib
+cp $QtLibDir/libQt5XcbQpa.* $PWD/lib
+rm -rf $PWD/platforms/lib
+cd platforms
+
+cd ..
+cd ..
+
+files=`ldd $EXE | awk '{ if(match($3,"^/"))printf("%s "),$3 }'`
+cp $files $PWD/app/lib
+cp $PWD/commonlib/lib/* $PWD/app/lib
+cp -r  $PWD/commonlib/platforms $PWD/app
+cp $EXE $PWD/app
+
+for x in ${ignore_lib_name[@]}
+do
+rm -f $PWD/app/lib/${x}
+done
+
+rm -rf commonlib
+
+cd app
+patchelf --set-rpath '$ORIGIN/lib/' $EXE
+if [ "$?" != 0 ];then
+	echo -e "\e[31m deploy.sh: patchelf $EXE faile, Ensure patchelf tool installed\e[0m"
+	exit 1
+fi
+cd platforms
+patchelf --set-rpath '$ORIGIN/../lib/' libqxcb.so
+if [ "$?" != 0 ];then
+	echo -e "\e[31m deploy.sh: patchelf $EXE faile, Ensure patchelf tool installed\e[0m"
+#	exit 1
+fi
+cd ..
+cd ..
+
+cp -r app $PWD/deploy/
+
+rm -rf app
+
+

+ 1 - 1
src/driver/driver_cloud_grpc_client_stream/prototocpp.txt

@@ -1,3 +1,3 @@
-protoc -I . --plugin=protoc-gen-grpc=/home/yuchuli/git/grpc-framework/build2/grpc_cpp_plugin --grpc_out=. uploadmsg.proto
+protoc -I . --plugin=protoc-gen-grpc=/home/yuchuli/git/grpc-framework/build2/grpc_cpp_plugin --grpc_out=. uploadstreammsg.proto
 
 protoc -I . --cpp_out=. uploadmsg.proto 

+ 2 - 2
src/driver/driver_cloud_grpc_client_stream/uploadstreammsg.proto

@@ -57,7 +57,7 @@ message queryReqStream {
   string strqueryMD5 = 2;
   int64 ntime = 3;  
   string strctrlMD5 = 4;
-  bytes data = 5;
+  bytes xdata = 5;
   bool bimportant = 6;  //if 1, is important.
   int32 kepptime = 7;   //If important keep this data before ctrl ms.  if -1 must send.
   int32 ntype = 8;  //0 only query  1 ctrl.
@@ -67,7 +67,7 @@ message queryReplyStream {
     int32 nres = 1;  //0 not online  1 online  -1 querMD5 error  -2 ctrlMD5 error 
     int32 id = 2;
     int64 ntime = 3;
-    bytes data = 4;
+    bytes xdata = 4;
 }
 
 

+ 60 - 0
src/driver/driver_cloud_grpc_pc_stream/driver_cloud_grpc_pc_stream.pro

@@ -0,0 +1,60 @@
+QT       += core gui
+
+greaterThan(QT_MAJOR_VERSION, 4): QT += widgets
+
+CONFIG += c++11
+
+# The following define makes your compiler emit warnings if you use
+# any Qt feature that has been marked deprecated (the exact warnings
+# depend on your compiler). Please consult the documentation of the
+# deprecated API in order to know how to port your code away from it.
+DEFINES += QT_DEPRECATED_WARNINGS
+
+# You can also make your code fail to compile if it uses deprecated APIs.
+# In order to do so, uncomment the following line.
+# You can also select to disable deprecated APIs only up to a certain version of Qt.
+#DEFINES += QT_DISABLE_DEPRECATED_BEFORE=0x060000    # disables all the APIs deprecated before Qt 6.0.0
+
+SOURCES += \
+    ../../include/msgtype/cloud.pb.cc \
+    ../../include/msgtype/uploadstreammsg.pb.cc \
+    main.cpp \
+    mainwindow.cpp \
+    grpcpc.cpp \
+    uploadstreammsg.grpc.pb.cc
+
+HEADERS += \
+    ../../include/msgtype/cloud.pb.h \
+    ../../include/msgtype/uploadstreammsg.pb.h \
+    mainwindow.h \
+    grpcpc.h
+
+FORMS += \
+    mainwindow.ui
+
+# Default rules for deployment.
+qnx: target.path = /tmp/$${TARGET}/bin
+else: unix:!android: target.path = /opt/$${TARGET}/bin
+!isEmpty(target.path): INSTALLS += target
+
+
+!include(../../../include/common.pri ) {
+    error( "Couldn't find the common.pri file!" )
+}
+
+!include(../../../include/ivprotobuf.pri ) {
+    error( "Couldn't find the ivprotobuf.pri file!" )
+}
+
+!include(../../../include/ivboost.pri ) {
+    error( "Couldn't find the ivboost.pri file!" )
+}
+
+!include(../../../include/ivgrpc.pri ) {
+    error( "Couldn't find the ivgrpc.pri file!" )
+}
+
+!include(../../../include/ivyaml-cpp.pri ) {
+    error( "Couldn't find the ivyaml-cpp.pri file!" )
+}
+

+ 33 - 0
src/driver/driver_cloud_grpc_pc_stream/driver_cloud_grpc_pc_stream.yaml

@@ -0,0 +1,33 @@
+server : 192.168.50.62
+port : 9000
+
+VIN : AAAAAAAAAAAAAAAAA
+queryMD5 : 5d41402abc4b2a76b9719d911017c592
+ctrlMD5  : 5d41402abc4b2a76b9719d911017c592
+
+querymessage:
+  usbpic:
+    msgname: usbpic
+    buffersize: 10000000
+    buffercount: 1
+  canrecv0:
+    msgname: canrecv0
+    buffersize: 10000
+    buffercount: 3
+  tracemap:
+    msgname: tracemap
+    buffersize: 10000000
+    buffercount: 1
+
+ctrlmessage:
+  xodrsrc:
+    msgname: xodrsrc
+    buffersize: 1000
+    buffercount: 1
+  tracemap:
+    msgname: xodrreq
+    buffersize: 1000
+    buffercount: 1
+
+
+

+ 339 - 0
src/driver/driver_cloud_grpc_pc_stream/grpcpc.cpp

@@ -0,0 +1,339 @@
+#include "grpcpc.h"
+
+
+static grpcpc * ggrpcpc;
+
+void ListenData(const char * strdata,const unsigned int nSize,const unsigned int index,const QDateTime * dt,const char * strmemname)
+{
+    ggrpcpc->UpdateData(strdata,nSize,strmemname);
+}
+
+
+grpcpc::grpcpc(std::string stryamlpath)
+{
+    ggrpcpc = this;
+    dec_yaml(stryamlpath.data());
+    int i;
+    for(i=0;i<mvectormsgunit.size();i++)
+    {
+        mvectormsgunit[i].mpa = iv::modulecomm::RegisterSend(mvectormsgunit[i].mstrmsgname,mvectormsgunit[i].mnBufferSize,mvectormsgunit[i].mnBufferCount);
+    }
+
+    for(i=0;i<mvectorctrlmsgunit.size();i++)
+    {
+        mvectorctrlmsgunit[i].mpa = iv::modulecomm::RegisterRecv(mvectorctrlmsgunit[i].mstrmsgname,ListenData);
+    }
+}
+
+
+void grpcpc::threadsend(std::shared_ptr<::grpc::ClientReaderWriter<iv::queryReqStream, iv::queryReplyStream> > writer, bool *pbrun)
+{
+    int nctrlsize = mvectorctrlmsgunit.size();
+    int i;
+
+    int ninterval = atoi(gstruploadinterval.data());
+    if(ninterval<=0)ninterval = 100;
+
+    QTime xTime;
+    xTime.start();
+    int nlastsend = xTime.elapsed();
+
+    int nid= 0;
+
+
+
+    while(*pbrun)
+    {
+        std::this_thread::sleep_for(std::chrono::milliseconds(1));
+        if((xTime.elapsed()-nlastsend)<ninterval)
+        {
+            continue;
+        }
+
+        iv::queryReqStream request;
+
+        bool bImportant = false;
+        int nkeeptime = 0;
+
+            iv::cloud::cloudmsg xmsg;
+            xmsg.set_xtime(QDateTime::currentMSecsSinceEpoch());
+            nlastsend = xTime.elapsed();
+            gMutexMsg.lock();
+
+  //          std::vector<iv::msgunit> xv = mvectorctrlmsgunit;
+            for(i=0;i<nctrlsize;i++)
+            {
+                if(mvectorctrlmsgunit[i].mbRefresh)
+                {
+                    mvectorctrlmsgunit[i].mbRefresh = false;
+
+                    if(mvectorctrlmsgunit[i].mbImportant)
+                    {
+                        bImportant = true;
+                    }
+                    if(mvectorctrlmsgunit[i].mnkeeptime > nkeeptime)
+                    {
+                        nkeeptime = mvectorctrlmsgunit[i].mnkeeptime;
+                    }
+                    iv::cloud::cloudunit xcloudunit;
+                    xcloudunit.set_msgname(mvectorctrlmsgunit[i].mstrmsgname);
+                    xcloudunit.set_data(mvectorctrlmsgunit[i].mpstrmsgdata.get(),mvectorctrlmsgunit[i].mndatasize);
+                    iv::cloud::cloudunit * pcu = xmsg.add_xclouddata();
+                    pcu->CopyFrom(xcloudunit);
+                }
+
+            }
+            gMutexMsg.unlock();
+
+            request.set_strquerymd5(gstrqueryMD5);
+            request.set_strvin(gstrVIN);
+//            request.set_ntime(QDateTime::currentMSecsSinceEpoch());
+            request.set_ntype(0);
+
+            if(xmsg.xclouddata_size()>0)
+            {
+                int nbytesize = xmsg.ByteSize();
+                std::vector<char> pvectordata;
+                pvectordata.resize(nbytesize);
+                if(xmsg.SerializeToArray(pvectordata.data(),nbytesize))
+                {
+
+//                    request.set_id(nctrlid);nctrlid++;
+                    request.set_strctrlmd5(gstrctrlMD5);
+
+                    request.set_xdata(pvectordata.data(),pvectordata.size());
+                    request.set_bimportant(bImportant);
+                    request.set_kepptime(nkeeptime);
+                    request.set_ntype(1);
+                }
+            }
+            nlastsend = xTime.elapsed();
+
+            bool bsend = writer->Write(request);
+            std::cout<<"send msg. rtn is "<<bsend<<std::endl;
+
+    }
+}
+
+void grpcpc::run()
+{
+    int nsize = mvectormsgunit.size();
+    int nctrlsize = mvectorctrlmsgunit.size();
+    int i;
+
+    qint64 nlasttime = 0;
+
+    int ninterval = atoi(gstruploadinterval.data());
+    if(ninterval<=0)ninterval = 100;
+
+    QTime xTime;
+    xTime.start();
+    int nlastsend = xTime.elapsed();
+
+    std::string target_str = gstrserverip+":";
+    target_str = target_str + gstrserverport ;//std::to_string()
+    auto cargs = grpc::ChannelArguments();
+    cargs.SetMaxReceiveMessageSize(1024 * 1024 * 1024); // 1 GB
+    cargs.SetMaxSendMessageSize(1024 * 1024 * 1024);
+
+    std::shared_ptr<Channel> channel = grpc::CreateCustomChannel(
+             target_str, grpc::InsecureChannelCredentials(),cargs);
+
+    std::unique_ptr<iv::UploadStream::Stub> stub_ = iv::UploadStream::NewStub(channel);
+
+
+    int nfail = 0;
+
+    while(!QThread::isInterruptionRequested())
+    {
+        ClientContext context ;
+
+        std::shared_ptr<::grpc::ClientReaderWriter<iv::queryReqStream, iv::queryReplyStream> > writerRead(stub_->queryctrl(&context));
+
+        bool bRun = true;
+        std::thread * pthread = new std::thread(&grpcpc::threadsend,this,writerRead,&bRun);
+        (void )pthread;
+        iv::queryReplyStream reply;
+        while (writerRead->Read(&reply)) {
+            nfail = 0;
+    //        std::cout << "接收到回复:" << reply.remsg()<<"--\n" << std::endl;
+            if(reply.nres() == 1)
+            {
+                iv::cloud::cloudmsg xmsg;
+                if(xmsg.ParseFromArray(reply.xdata().data(),reply.xdata().size()))
+                {
+                    sharequerymsg(&xmsg);
+                }
+            }
+            std::cout<<"read data from server."<<std::endl;
+        }
+        bRun = false;
+        pthread->join();
+        nfail++;
+        if(nfail > 100)std::this_thread::sleep_for(std::chrono::milliseconds(3000));
+        else std::this_thread::sleep_for(std::chrono::milliseconds(100));
+        std::cout<<"reconnnect to server. nfail is "<<nfail<<std::endl;
+    }
+
+}
+
+
+void grpcpc::dec_yaml(const char *stryamlpath)
+{
+    YAML::Node config;
+    try
+    {
+        config = YAML::LoadFile(stryamlpath);
+    }
+    catch(YAML::BadFile e)
+    {
+        qDebug("load error.");
+        return;
+    }
+
+    std::vector<std::string> vecmodulename;
+
+
+    if(config["server"])
+    {
+        gstrserverip = config["server"].as<std::string>();
+    }
+    if(config["port"])
+    {
+        gstrserverport = config["port"].as<std::string>();
+    }
+    if(config["uploadinterval"])
+    {
+        gstruploadinterval = config["uploadinterval"].as<std::string>();
+    }
+
+
+    if(config["VIN"])
+    {
+        gstrVIN = config["VIN"].as<std::string>();
+    }
+
+    if(config["queryMD5"])
+    {
+        gstrqueryMD5 = config["queryMD5"].as<std::string>();
+    }
+
+    if(config["ctrlMD5"])
+    {
+        gstrctrlMD5 = config["ctrlMD5"].as<std::string>();
+    }
+
+
+    std::string strmsgname;
+
+    if(config["querymessage"])
+    {
+
+        for(YAML::const_iterator it= config["querymessage"].begin(); it != config["querymessage"].end();++it)
+        {
+            std::string strtitle = it->first.as<std::string>();
+            std::cout<<strtitle<<std::endl;
+
+            if(config["querymessage"][strtitle]["msgname"]&&config["querymessage"][strtitle]["buffersize"]&&config["querymessage"][strtitle]["buffercount"])
+            {
+                iv::msgunit xmu;
+                strmsgname = config["querymessage"][strtitle]["msgname"].as<std::string>();
+                strncpy(xmu.mstrmsgname,strmsgname.data(),255);
+                xmu.mnBufferSize = config["querymessage"][strtitle]["buffersize"].as<int>();
+                xmu.mnBufferCount = config["querymessage"][strtitle]["buffercount"].as<int>();
+                mvectormsgunit.push_back(xmu);
+
+
+            }
+        }
+    }
+    else
+    {
+
+    }
+
+    if(config["ctrlmessage"])
+    {
+        std::string strnodename = "ctrlmessage";
+        for(YAML::const_iterator it= config[strnodename].begin(); it != config[strnodename].end();++it)
+        {
+            std::string strtitle = it->first.as<std::string>();
+            std::cout<<strtitle<<std::endl;
+
+            if(config[strnodename][strtitle]["msgname"]&&config[strnodename][strtitle]["buffersize"]&&config[strnodename][strtitle]["buffercount"])
+            {
+                iv::msgunit xmu;
+                strmsgname = config[strnodename][strtitle]["msgname"].as<std::string>();
+                strncpy(xmu.mstrmsgname,strmsgname.data(),255);
+                xmu.mnBufferSize = config[strnodename][strtitle]["buffersize"].as<int>();
+                xmu.mnBufferCount = config[strnodename][strtitle]["buffercount"].as<int>();
+
+                if(config[strnodename][strtitle]["bimportant"])
+                {
+                   std::string strimportant =    config[strnodename][strtitle]["bimportant"].as<std::string>();
+                   if(strimportant == "true")
+                   {
+                       xmu.mbImportant = true;
+                   }
+                }
+                if(config[strnodename][strtitle]["keeptime"])
+                {
+                   std::string strkeep =    config[strnodename][strtitle]["keeptime"].as<std::string>();
+                   xmu.mnkeeptime = atoi(strkeep.data());
+                }
+                mvectorctrlmsgunit.push_back(xmu);
+            }
+        }
+    }
+    else
+    {
+
+    }
+
+    return;
+}
+
+void grpcpc::sharequerymsg(iv::cloud::cloudmsg *pxmsg)
+{
+    int i;
+    int nsize = pxmsg->xclouddata_size();
+    for(i=0;i<nsize;i++)
+    {
+        int j;
+        int nquerysize = mvectormsgunit.size();
+        for(j=0;j<nquerysize;j++)
+        {
+            if(strncmp(pxmsg->xclouddata(i).msgname().data(), mvectormsgunit[j].mstrmsgname,255) == 0)
+            {
+                qDebug("size is %d ",pxmsg->xclouddata(i).data().size());
+                iv::modulecomm::ModuleSendMsg(mvectormsgunit[j].mpa,pxmsg->xclouddata(i).data().data(),pxmsg->xclouddata(i).data().size());
+                break;
+            }
+        }
+    }
+}
+
+void grpcpc::UpdateData(const char *strdata, const unsigned int nSize,const char * strmemname)
+{
+    int nsize = mvectorctrlmsgunit.size();
+    int i;
+    for(i=0;i<nsize;i++)
+    {
+        if(strncmp(strmemname,mvectorctrlmsgunit[i].mstrmsgname,255) == 0)
+        {
+            gMutexMsg.lock();
+            char * strtem = new char[nSize];
+            memcpy(strtem,strdata,nSize);
+            mvectorctrlmsgunit[i].mpstrmsgdata.reset(strtem);
+            mvectorctrlmsgunit[i].mndatasize = nSize;
+            mvectorctrlmsgunit[i].mbRefresh = true;
+            gMutexMsg.unlock();
+            break;
+        }
+    }
+}
+
+std::string grpcpc::GetVIN()
+{
+    return gstrVIN;
+}

+ 93 - 0
src/driver/driver_cloud_grpc_pc_stream/grpcpc.h

@@ -0,0 +1,93 @@
+#ifndef GRPCPC_H
+#define GRPCPC_H
+
+#include <QThread>
+
+#include <yaml-cpp/yaml.h>
+
+#include <QDateTime>
+
+#include <iostream>
+
+#include <vector>
+
+#include <memory>
+
+#include <QMutex>
+
+#include <thread>
+
+#include "modulecomm.h"
+
+
+#include "cloud.pb.h"
+
+#include <iostream>
+#include <memory>
+#include <string>
+
+#include <grpcpp/grpcpp.h>
+
+#include "uploadstreammsg.grpc.pb.h"
+
+using grpc::Channel;
+using grpc::ClientContext;
+using grpc::Status;
+
+namespace iv {
+struct msgunit
+{
+    char mstrmsgname[256];
+    int mnBufferSize = 10000;
+    int mnBufferCount = 1;
+    void * mpa;
+    std::shared_ptr<char> mpstrmsgdata;
+    int mndatasize = 0;
+    bool mbRefresh = false;
+    bool mbImportant = false;
+    int mnkeeptime = 100;
+};
+}
+
+class grpcpc : public QThread
+{
+public:
+    grpcpc(std::string stryamlpath);
+
+private:
+    void run();
+
+private:
+    std::string gstrserverip =  "127.0.0.1";
+    std::string gstrserverport = "50051";
+    std::string gstruploadinterval = "100";
+    void * gpa;
+    QMutex gMutexMsg;
+    std::vector<iv::msgunit> mvectormsgunit;
+
+    std::vector<iv::msgunit> mvectorctrlmsgunit;
+
+
+    std::string gstrVIN = "AAAAAAAAAAAAAAAAA";
+    std::string gstrqueryMD5 = "5d41402abc4b2a76b9719d911017c592";
+    std::string gstrctrlMD5 = "5d41402abc4b2a76b9719d911017c592";
+
+
+
+
+    int gindex = 0;
+
+
+private:
+    void dec_yaml(const char * stryamlpath);
+    void sharequerymsg(iv::cloud::cloudmsg * pxmsg);
+
+public:
+    void UpdateData(const char * strdata,const unsigned int nSize,const char * strmemname);
+    std::string GetVIN();
+    void threadsend(std::shared_ptr<::grpc::ClientReaderWriter<iv::queryReqStream, iv::queryReplyStream> > writer,bool * pbrun);
+
+
+};
+
+#endif // GRPCPC_H

+ 14 - 0
src/driver/driver_cloud_grpc_pc_stream/main.cpp

@@ -0,0 +1,14 @@
+#include "mainwindow.h"
+
+#include <QApplication>
+
+#include "ivversion.h"
+
+int main(int argc, char *argv[])
+{
+    showversion("driver_cloud_grpc_pc");
+    QApplication a(argc, argv);
+    MainWindow w;
+    w.show();
+    return a.exec();
+}

+ 443 - 0
src/driver/driver_cloud_grpc_pc_stream/mainwindow.cpp

@@ -0,0 +1,443 @@
+#include "mainwindow.h"
+#include "ui_mainwindow.h"
+/*
+#include <yaml-cpp/yaml.h>
+
+#include <QDateTime>
+
+#include <iostream>
+
+#include <vector>
+
+#include <memory>
+
+#include <QMutex>
+
+#include <thread>
+
+#include "modulecomm.h"
+
+
+#include "cloud.pb.h"
+
+#include <iostream>
+#include <memory>
+#include <string>
+
+#include <grpcpp/grpcpp.h>
+
+#include "uploadmsg.grpc.pb.h"
+
+
+using grpc::Channel;
+using grpc::ClientContext;
+using grpc::Status;
+
+std::string gstrserverip =  "140.143.237.38";
+std::string gstrserverport = "9000";
+std::string gstruploadinterval = "100";
+void * gpa;
+QMutex gMutexMsg;
+std::thread * guploadthread;
+
+
+
+
+namespace iv {
+struct msgunit
+{
+    char mstrmsgname[256];
+    int mnBufferSize = 10000;
+    int mnBufferCount = 1;
+    void * mpa;
+    std::shared_ptr<char> mpstrmsgdata;
+    int mndatasize = 0;
+    bool mbRefresh = false;
+    bool mbImportant = false;
+    int mnkeeptime = 100;
+};
+}
+
+
+
+std::vector<iv::msgunit> mvectormsgunit;
+
+std::vector<iv::msgunit> mvectorctrlmsgunit;
+
+
+std::string gstrVIN = "AAAAAAAAAAAAAAAAA";
+std::string gstrqueryMD5 = "5d41402abc4b2a76b9719d911017c592";
+std::string gstrctrlMD5 = "5d41402abc4b2a76b9719d911017c592";
+
+
+
+
+int gindex = 0;
+
+void ListenData(const char * strdata,const unsigned int nSize,const unsigned int index,const QDateTime * dt,const char * strmemname)
+{
+
+    int nsize = mvectorctrlmsgunit.size();
+    int i;
+    for(i=0;i<nsize;i++)
+    {
+        if(strncmp(strmemname,mvectorctrlmsgunit[i].mstrmsgname,255) == 0)
+        {
+            gMutexMsg.lock();
+            char * strtem = new char[nSize];
+            memcpy(strtem,strdata,nSize);
+            mvectorctrlmsgunit[i].mpstrmsgdata.reset(strtem);
+            mvectorctrlmsgunit[i].mndatasize = nSize;
+            mvectorctrlmsgunit[i].mbRefresh = true;
+            gMutexMsg.unlock();
+            break;
+        }
+    }
+}
+
+void sharequerymsg(iv::cloud::cloudmsg * pxmsg)
+{
+    int i;
+    int nsize = pxmsg->xclouddata_size();
+    for(i=0;i<nsize;i++)
+    {
+        int j;
+        int nquerysize = mvectormsgunit.size();
+        for(j=0;j<nquerysize;j++)
+        {
+            if(strncmp(pxmsg->xclouddata(i).msgname().data(), mvectormsgunit[j].mstrmsgname,255) == 0)
+            {
+                qDebug("size is %d ",pxmsg->xclouddata(i).data().size());
+                iv::modulecomm::ModuleSendMsg(mvectormsgunit[j].mpa,pxmsg->xclouddata(i).data().data(),pxmsg->xclouddata(i).data().size());
+                break;
+            }
+        }
+    }
+}
+
+void threadquery()
+{
+
+    int nsize = mvectormsgunit.size();
+    int nctrlsize = mvectorctrlmsgunit.size();
+    int i;
+
+    qint64 nlasttime = 0;
+
+    int ninterval = atoi(gstruploadinterval.data());
+    if(ninterval<=0)ninterval = 100;
+
+    QTime xTime;
+    xTime.start();
+    int nlastsend = xTime.elapsed();
+
+    std::string target_str = gstrserverip+":";
+    target_str = target_str + gstrserverport ;//std::to_string()
+    auto cargs = grpc::ChannelArguments();
+    cargs.SetMaxReceiveMessageSize(1024 * 1024 * 1024); // 1 GB
+    cargs.SetMaxSendMessageSize(1024 * 1024 * 1024);
+
+    std::shared_ptr<Channel> channel = grpc::CreateCustomChannel(
+             target_str, grpc::InsecureChannelCredentials(),cargs);
+
+    std::unique_ptr<iv::Upload::Stub> stub_ = iv::Upload::NewStub(channel);
+
+
+    iv::queryreq request;
+    iv::ctrlreq ctrreq;
+    iv::ctrlReply ctrreply;
+
+    int nid = 0;
+    int nctrlid = 0;
+
+    // Container for the data we expect from the server.
+    iv::queryReply reply;
+
+
+    while(true)
+    {
+        std::this_thread::sleep_for(std::chrono::milliseconds(1));
+        if(abs(xTime.elapsed()-nlastsend)<ninterval)
+        {
+            continue;
+        }
+
+        bool bImportant = false;
+        int nkeeptime = 0;
+
+            iv::cloud::cloudmsg xmsg;
+            xmsg.set_xtime(QDateTime::currentMSecsSinceEpoch());
+            nlastsend = xTime.elapsed();
+            gMutexMsg.lock();
+
+  //          std::vector<iv::msgunit> xv = mvectorctrlmsgunit;
+            for(i=0;i<nctrlsize;i++)
+            {
+                if(mvectorctrlmsgunit[i].mbRefresh)
+                {
+                    mvectorctrlmsgunit[i].mbRefresh = false;
+
+                    if(mvectorctrlmsgunit[i].mbImportant)
+                    {
+                        bImportant = true;
+                    }
+                    if(mvectorctrlmsgunit[i].mnkeeptime > nkeeptime)
+                    {
+                        nkeeptime = mvectorctrlmsgunit[i].mnkeeptime;
+                    }
+                    iv::cloud::cloudunit xcloudunit;
+                    xcloudunit.set_msgname(mvectorctrlmsgunit[i].mstrmsgname);
+                    xcloudunit.set_data(mvectorctrlmsgunit[i].mpstrmsgdata.get(),mvectorctrlmsgunit[i].mndatasize);
+                    iv::cloud::cloudunit * pcu = xmsg.add_xclouddata();
+                    pcu->CopyFrom(xcloudunit);
+                }
+
+            }
+            gMutexMsg.unlock();
+
+
+            {
+
+                ClientContext context ;
+ //               qint64 time1 = QDateTime::currentMSecsSinceEpoch();
+
+                request.set_strquerymd5(gstrqueryMD5);
+                request.set_strvin(gstrVIN);
+
+                request.set_nlasttime(nlasttime);
+                QDateTime xTime;
+                xTime.fromMSecsSinceEpoch(1607905685318);  //1607914763641
+//                qDebug("time:%s",xTime.toString("yyyy-MM-dd:hh:mm:ss:zzz").toLatin1().data());
+                qDebug("nlasttime is %ld",nlasttime);//1607905685318
+                nid++;
+                // The actual RPC.
+                Status status = stub_->query(&context, request, &reply);
+                if (status.ok()) {
+                    std::cout<<nid<<" query successfully, res is "<<reply.nres()<<std::endl;
+                    if(reply.nres() == 1)
+                    {
+
+                        if(nlasttime != xmsg.xtime())
+                        {
+                            iv::cloud::cloudmsg xmsg;
+                            if(xmsg.ParseFromArray(reply.data().data(),reply.data().size()))
+                            {
+                                sharequerymsg(&xmsg);
+                            }
+                        }
+
+                        nlasttime = reply.ntime();
+                    }
+                } else {
+                  std::cout << status.error_code() << ": " << status.error_message()
+                            << std::endl;
+                  std::cout<<"RPC failed"<<std::endl;
+                  std::this_thread::sleep_for(std::chrono::milliseconds(900));
+
+                }
+
+            }
+
+
+            if(xmsg.xclouddata_size()>0)
+            {
+                int nbytesize = xmsg.ByteSize();
+                std::vector<char> pvectordata;
+                pvectordata.resize(nbytesize);
+                if(xmsg.SerializeToArray(pvectordata.data(),nbytesize))
+                {
+
+                    ClientContext context ;
+
+                    qint64 time1 = QDateTime::currentMSecsSinceEpoch();
+
+                    ctrreq.set_id(nctrlid);nctrlid++;
+                    ctrreq.set_strctrlmd5(gstrctrlMD5);
+                    ctrreq.set_strvin(gstrVIN);
+                    ctrreq.set_ntime(time1);
+                    ctrreq.set_data(pvectordata.data(),pvectordata.size());
+                    ctrreq.set_bimportant(bImportant);
+                    ctrreq.set_kepptime(nkeeptime);
+
+                    Status status = stub_->ctrl(&context, ctrreq, &ctrreply);
+                    if (status.ok()) {
+
+                        std::cout<<"send id "<<ctrreply.nsendid()<<std::endl;
+                    } else {
+                      std::cout << status.error_code() << ": " << status.error_message()
+                                << std::endl;
+                      std::cout<<"RPC failed"<<std::endl;
+                      std::this_thread::sleep_for(std::chrono::milliseconds(900));
+
+                    }
+
+                }
+                pvectordata.clear();
+            }
+
+
+    }
+
+
+
+}
+
+
+void dec_yaml(const char * stryamlpath)
+{
+
+    YAML::Node config;
+    try
+    {
+        config = YAML::LoadFile(stryamlpath);
+    }
+    catch(YAML::BadFile e)
+    {
+        qDebug("load error.");
+        return;
+    }
+
+    std::vector<std::string> vecmodulename;
+
+
+    if(config["server"])
+    {
+        gstrserverip = config["server"].as<std::string>();
+    }
+    if(config["port"])
+    {
+        gstrserverport = config["port"].as<std::string>();
+    }
+    if(config["uploadinterval"])
+    {
+        gstruploadinterval = config["uploadinterval"].as<std::string>();
+    }
+
+    if(config["VIN"])
+    {
+        gstrVIN = config["VIN"].as<std::string>();
+    }
+
+    if(config["VIN"])
+    {
+        gstrVIN = config["VIN"].as<std::string>();
+    }
+
+    if(config["queryMD5"])
+    {
+        gstrqueryMD5 = config["queryMD5"].as<std::string>();
+    }
+
+    if(config["ctrlMD5"])
+    {
+        gstrctrlMD5 = config["ctrlMD5"].as<std::string>();
+    }
+
+
+    std::string strmsgname;
+
+    if(config["querymessage"])
+    {
+
+        for(YAML::const_iterator it= config["querymessage"].begin(); it != config["querymessage"].end();++it)
+        {
+            std::string strtitle = it->first.as<std::string>();
+            std::cout<<strtitle<<std::endl;
+
+            if(config["querymessage"][strtitle]["msgname"]&&config["querymessage"][strtitle]["buffersize"]&&config["querymessage"][strtitle]["buffercount"])
+            {
+                iv::msgunit xmu;
+                strmsgname = config["querymessage"][strtitle]["msgname"].as<std::string>();
+                strncpy(xmu.mstrmsgname,strmsgname.data(),255);
+                xmu.mnBufferSize = config["querymessage"][strtitle]["buffersize"].as<int>();
+                xmu.mnBufferCount = config["querymessage"][strtitle]["buffercount"].as<int>();
+                mvectormsgunit.push_back(xmu);
+
+
+            }
+        }
+    }
+    else
+    {
+
+    }
+
+    if(config["ctrlmessage"])
+    {
+        std::string strnodename = "ctrlmessage";
+        for(YAML::const_iterator it= config[strnodename].begin(); it != config[strnodename].end();++it)
+        {
+            std::string strtitle = it->first.as<std::string>();
+            std::cout<<strtitle<<std::endl;
+
+            if(config[strnodename][strtitle]["msgname"]&&config[strnodename][strtitle]["buffersize"]&&config[strnodename][strtitle]["buffercount"])
+            {
+                iv::msgunit xmu;
+                strmsgname = config[strnodename][strtitle]["msgname"].as<std::string>();
+                strncpy(xmu.mstrmsgname,strmsgname.data(),255);
+                xmu.mnBufferSize = config[strnodename][strtitle]["buffersize"].as<int>();
+                xmu.mnBufferCount = config[strnodename][strtitle]["buffercount"].as<int>();
+
+                if(config[strnodename][strtitle]["bimportant"])
+                {
+                   std::string strimportant =    config[strnodename][strtitle]["bimportant"].as<std::string>();
+                   if(strimportant == "true")
+                   {
+                       xmu.mbImportant = true;
+                   }
+                }
+                if(config[strnodename][strtitle]["keeptime"])
+                {
+                   std::string strkeep =    config[strnodename][strtitle]["keeptime"].as<std::string>();
+                   xmu.mnkeeptime = atoi(strkeep.data());
+                }
+                mvectorctrlmsgunit.push_back(xmu);
+            }
+        }
+    }
+    else
+    {
+
+    }
+
+    return;
+
+}
+
+*/
+
+
+
+MainWindow::MainWindow(QWidget *parent)
+    : QMainWindow(parent)
+    , ui(new Ui::MainWindow)
+{
+    ui->setupUi(this);
+
+    std::string stryamlpath = "./driver_cloud_grpc_pc_steam.yaml";
+//    dec_yaml(stryamlpath.data());
+
+//    int i;
+//    for(i=0;i<mvectormsgunit.size();i++)
+//    {
+//        mvectormsgunit[i].mpa = iv::modulecomm::RegisterSend(mvectormsgunit[i].mstrmsgname,mvectormsgunit[i].mnBufferSize,mvectormsgunit[i].mnBufferCount);
+//    }
+
+//    for(i=0;i<mvectorctrlmsgunit.size();i++)
+//    {
+//        mvectorctrlmsgunit[i].mpa = iv::modulecomm::RegisterRecv(mvectorctrlmsgunit[i].mstrmsgname,ListenData);
+//    }
+
+//    guploadthread = new std::thread(threadquery);
+
+    mgrpcpc = new grpcpc(stryamlpath);
+    mgrpcpc->start();
+}
+
+MainWindow::~MainWindow()
+{
+    mgrpcpc->requestInterruption();
+    while(mgrpcpc->isFinished());
+    delete ui;
+}
+

+ 25 - 0
src/driver/driver_cloud_grpc_pc_stream/mainwindow.h

@@ -0,0 +1,25 @@
+#ifndef MAINWINDOW_H
+#define MAINWINDOW_H
+
+#include <QMainWindow>
+
+#include "grpcpc.h"
+
+QT_BEGIN_NAMESPACE
+namespace Ui { class MainWindow; }
+QT_END_NAMESPACE
+
+class MainWindow : public QMainWindow
+{
+    Q_OBJECT
+
+public:
+    MainWindow(QWidget *parent = nullptr);
+    ~MainWindow();
+
+private:
+    Ui::MainWindow *ui;
+
+    grpcpc * mgrpcpc;
+};
+#endif // MAINWINDOW_H

+ 22 - 0
src/driver/driver_cloud_grpc_pc_stream/mainwindow.ui

@@ -0,0 +1,22 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<ui version="4.0">
+ <class>MainWindow</class>
+ <widget class="QMainWindow" name="MainWindow">
+  <property name="geometry">
+   <rect>
+    <x>0</x>
+    <y>0</y>
+    <width>800</width>
+    <height>600</height>
+   </rect>
+  </property>
+  <property name="windowTitle">
+   <string>MainWindow</string>
+  </property>
+  <widget class="QWidget" name="centralwidget"/>
+  <widget class="QMenuBar" name="menubar"/>
+  <widget class="QStatusBar" name="statusbar"/>
+ </widget>
+ <resources/>
+ <connections/>
+</ui>

+ 3 - 0
src/driver/driver_cloud_grpc_pc_stream/prototocpp.txt

@@ -0,0 +1,3 @@
+protoc -I . --plugin=protoc-gen-grpc=/home/yuchuli/git/grpc-framework/build2/grpc_cpp_plugin --grpc_out=. uploadstreammsg.proto
+
+protoc -I . --cpp_out=. uploadmsg.proto 

+ 73 - 0
src/driver/driver_cloud_grpc_pc_stream/uploadstreammsg.proto

@@ -0,0 +1,73 @@
+// Copyright 2015 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+option java_multiple_files = true;
+option java_package = "io.grpc.adc.uploadmsg";
+option java_outer_classname = "UploadMsgProto";
+option objc_class_prefix = "HLW";
+
+package iv;
+
+// The Upload service definition.
+service UploadStream {
+  // Sends a Upload
+  rpc upload (stream UploadRequestStream) returns (stream UploadReplyStream) {}
+  rpc queryctrl (stream queryReqStream) returns (stream queryReplyStream) {}
+  
+}
+
+// The request message containing the user's name.
+message UploadRequestStream {
+ // string name = 1;
+
+ //   int nres; //0 no message 1 have message
+    int32 id = 1;
+    int64 ntime = 2;
+    string strVIN = 3;
+    string strqueryMD5 = 4;
+    string strctrlMD5 = 5;
+    bytes xdata = 6;
+    bool bimportant = 7;  //if 1, is important.
+    int32 kepptime = 8;   //If important keep this data before query ms.
+}
+
+// The response message containing the greetings
+message UploadReplyStream {
+  int32 nres = 1;  //0 no message 1 have ctrl message
+  bytes xdata = 2;
+//  string message = 1;
+}
+
+
+message queryReqStream {
+  string strvin = 1;
+  string strqueryMD5 = 2;
+  int64 ntime = 3;  
+  string strctrlMD5 = 4;
+  bytes xdata = 5;
+  bool bimportant = 6;  //if 1, is important.
+  int32 kepptime = 7;   //If important keep this data before ctrl ms.  if -1 must send.
+  int32 ntype = 8;  //0 only query  1 ctrl.
+}
+
+message queryReplyStream {
+    int32 nres = 1;  //0 not online  1 online  -1 querMD5 error  -2 ctrlMD5 error 
+    int32 id = 2;
+    int64 ntime = 3;
+    bytes xdata = 4;
+}
+
+

+ 109 - 0
src/driver/driver_cloud_grpc_server_stream/cumsgbuffer.cpp

@@ -0,0 +1,109 @@
+#include "cumsgbuffer.h"
+
+cumsgbuffer::cumsgbuffer()
+{
+
+}
+
+void cumsgbuffer::addmsg(int id, qint64 ntime, std::string strVIN, std::string strqueryMD5, std::string strctrlMD5, std::vector<char> *pxdata,bool bImportant,int nkeeptime)
+{
+    qDebug("ntime is %ld",ntime);
+    mMutex.lock();
+    iv::cumsg * pmsg = 0;
+    int nsize = mvectormsg.size();
+    int i;
+    for(i=0;i<nsize;i++)
+    {
+        if(strncmp(mvectormsg[i].strVIN.data(),strVIN.data(),255) == 0)
+        {
+            pmsg = &mvectormsg[i];
+            break;
+        }
+    }
+    if(pmsg == 0)
+    {
+        iv::cumsg cmsg;
+        cmsg.id = id;
+        cmsg.ntime = ntime;
+        cmsg.strVIN = strVIN;
+        cmsg.strqueryMD5 = strqueryMD5;
+        cmsg.strctrlMD5 = strctrlMD5;
+        if(pxdata->size() > 0)
+        {
+            cmsg.xdata.resize(pxdata->size());
+            memcpy(cmsg.xdata.data(),pxdata->data(),pxdata->size());
+        }
+        cmsg.mlastuptime = QDateTime::currentMSecsSinceEpoch();
+        cmsg.mbImportant = bImportant;
+        cmsg.mkeeptime = nkeeptime;
+        cmsg.mbhavequery = false;
+        mvectormsg.push_back(cmsg);
+        mMutex.unlock();
+        return;
+    }
+
+    if((pmsg->mbImportant != true)||(pmsg->mbhavequery)||((QDateTime::currentMSecsSinceEpoch() - pmsg->mlastuptime)>=pmsg->mkeeptime))
+    {
+        pmsg->id = id;
+        pmsg->ntime = ntime;
+        pmsg->mbImportant = bImportant;
+        pmsg->mbhavequery = false;
+        pmsg->mlastuptime = QDateTime::currentMSecsSinceEpoch();
+        pmsg->mkeeptime = nkeeptime;
+        pmsg->strqueryMD5 = strqueryMD5;
+        pmsg->strctrlMD5 = strctrlMD5;
+        pmsg->xdata.clear();
+        if(pxdata->size()>0)
+        {
+            pmsg->xdata.resize(pxdata->size());
+            memcpy(pmsg->xdata.data(),pxdata->data(),pxdata->size());
+        }
+    }
+
+    mMutex.unlock();
+}
+
+
+int cumsgbuffer::getmsg(std::string strVIN,std::string strqueryMD5, qint64 nlasttime, int &id, qint64 &ntime,   std::vector<char> *pxdata)
+{
+    mMutex.lock();
+    iv::cumsg * pmsg = 0;
+    int nsize = mvectormsg.size();
+    int i;
+    for(i=0;i<nsize;i++)
+    {
+        if(strncmp(mvectormsg[i].strVIN.data(),strVIN.data(),255) == 0)
+        {
+            pmsg = &mvectormsg[i];
+            break;
+        }
+    }
+
+    if(pmsg == 0)
+    {
+        std::cout<<" no this vin data"<<std::endl;;
+        mMutex.unlock();
+        return -1;
+    }
+
+    if(strqueryMD5 != pmsg->strqueryMD5)
+    {
+        std::cout<<" query error."<<std::endl;
+        mMutex.unlock();
+        return -2;
+    }
+    pmsg->mbhavequery = true;
+    if(nlasttime == pmsg->ntime)
+    {
+        mMutex.unlock();
+        return 0;
+    }
+    id = pmsg->id;
+    ntime = pmsg->ntime;
+    pxdata->clear();
+    int ndatasize = pmsg->xdata.size();
+    pxdata->resize(ndatasize);
+    memcpy(pxdata->data(),pmsg->xdata.data(),ndatasize);
+    mMutex.unlock();
+    return 1;
+}

+ 53 - 0
src/driver/driver_cloud_grpc_server_stream/cumsgbuffer.h

@@ -0,0 +1,53 @@
+#ifndef CUMSGBUFFER_H
+#define CUMSGBUFFER_H
+
+#include <QDateTime>
+#include <QMutex>
+#include <string>
+#include <iostream>
+#include <vector>
+
+#include <memory>
+
+namespace iv {
+struct cumsg
+{
+    int id;
+    qint64 ntime;
+    std::string strVIN;
+    std::string strqueryMD5;
+    std::string strctrlMD5;
+    std::vector<char> xdata;
+    qint64 mlastuptime;  //更新时间
+    bool mbImportant = false;
+    int mkeeptime;
+    bool mbhavequery = false;
+};
+}
+
+class cumsgbuffer
+{
+public:
+    cumsgbuffer();
+
+private:
+    std::vector<iv::cumsg> mvectormsg;
+
+    QMutex mMutex;
+
+public:
+    void addmsg(int id,qint64 ntime,std::string strVIN,std::string strqueryMD5,
+                std::string strctrlMD5,std::vector<char> * pxdata,bool bImportant,int nkeeptime);
+
+
+    //if no new msg return 0
+    // -1 no this vin
+    // -2 queryMD5 error
+    int getmsg(std::string strVIN,std::string strqueryMD5,qint64 nlasttime, int & id,qint64 & ntime,
+                std::vector<char> * pxdata);
+
+
+
+};
+
+#endif // CUMSGBUFFER_H

+ 54 - 0
src/driver/driver_cloud_grpc_server_stream/driver_cloud_grpc_server_stream.pro

@@ -0,0 +1,54 @@
+QT -= gui
+
+CONFIG += c++11 console
+CONFIG -= app_bundle
+
+QMAKE_LFLAGS += -no-pie
+
+
+# The following define makes your compiler emit warnings if you use
+# any Qt feature that has been marked deprecated (the exact warnings
+# depend on your compiler). Please consult the documentation of the
+# deprecated API in order to know how to port your code away from it.
+DEFINES += QT_DEPRECATED_WARNINGS
+
+# You can also make your code fail to compile if it uses deprecated APIs.
+# In order to do so, uncomment the following line.
+# You can also select to disable deprecated APIs only up to a certain version of Qt.
+#DEFINES += QT_DISABLE_DEPRECATED_BEFORE=0x060000    # disables all the APIs deprecated before Qt 6.0.0
+
+SOURCES += \
+    ../../include/msgtype/uploadstreammsg.pb.cc \
+        cumsgbuffer.cpp \
+        main.cpp \
+        pcmsgbuffer.cpp \
+    uploadstreammsg.grpc.pb.cc
+
+# Default rules for deployment.
+qnx: target.path = /tmp/$${TARGET}/bin
+else: unix:!android: target.path = /opt/$${TARGET}/bin
+!isEmpty(target.path): INSTALLS += target
+
+!include(../../../include/common.pri ) {
+    error( "Couldn't find the common.pri file!" )
+}
+
+!include(../../../include/ivprotobuf.pri ) {
+    error( "Couldn't find the ivprotobuf.pri file!" )
+}
+
+!include(../../../include/ivboost.pri ) {
+    error( "Couldn't find the ivboost.pri file!" )
+}
+
+!include(../../../include/ivgrpc.pri ) {
+    error( "Couldn't find the ivgrpc.pri file!" )
+}
+
+
+HEADERS += \
+    ../../include/msgtype/uploadstreammsg.pb.h \
+    cumsgbuffer.h \
+    pcmsgbuffer.h \
+    uploadstreammsg.grpc.pb.h
+

+ 330 - 0
src/driver/driver_cloud_grpc_server_stream/main.cpp

@@ -0,0 +1,330 @@
+#include <QCoreApplication>
+#include <QDateTime>
+#include <iostream>
+#include <vector>
+
+#include "cumsgbuffer.h"
+#include "pcmsgbuffer.h"
+
+#include <iostream>
+#include <memory>
+#include <string>
+
+#include <grpcpp/grpcpp.h>
+#include <grpcpp/health_check_service_interface.h>
+#include <grpcpp/ext/proto_server_reflection_plugin.h>
+
+#include "uploadstreammsg.grpc.pb.h"
+
+using grpc::Server;
+using grpc::ServerBuilder;
+using grpc::ServerContext;
+using grpc::Status;
+
+
+
+#include <QDateTime>
+
+
+
+
+
+
+static cumsgbuffer gcumsgbuf;
+static pcmsgbuffer gpcmsgbuf;
+
+
+void uploadsend(::grpc::ServerReaderWriter<iv::UploadReplyStream, iv::UploadRequestStream>* stream,bool * pbrun,
+                std::string * pstrvin,std::string * pstrmd5,bool *pbUpdatemd4orvin,QMutex * pmutex)
+{
+    std::string strvin;
+    std::string strmd5;
+    pmutex->lock();
+    strvin = *pstrmd5;
+    strmd5 = *pstrvin;
+    pmutex->unlock();
+
+    QTime xTime;
+    xTime.start();
+    int nlastsend = xTime.elapsed();
+    while(*pbrun)
+    {
+        if(*pbUpdatemd4orvin)
+        {
+            pmutex->lock();
+            strvin = *pstrmd5;
+            strmd5 = *pstrvin;
+            *pbUpdatemd4orvin = false;
+            pmutex->unlock();
+        }
+
+        int id;
+        qint64 ntime;
+
+        std::vector<char > xvectorctrldata;
+
+        int nres = gpcmsgbuf.getmsg(strvin,strmd5,id,ntime,&xvectorctrldata);
+
+        iv::UploadReplyStream reply;
+        if(nres == 1)
+        {
+            reply.set_nres(nres);
+            reply.set_xdata(xvectorctrldata.data(),xvectorctrldata.size());
+            stream->Write(reply);
+            nlastsend = xTime.elapsed();
+        }
+        else
+        {
+            if(abs(xTime.elapsed() - nlastsend)>1000)
+            {
+                reply.set_nres(nres);
+                stream->Write(reply);
+                nlastsend = xTime.elapsed();
+            }
+        }
+        std::this_thread::sleep_for(std::chrono::milliseconds(1));
+
+
+    }
+}
+
+void queryctrlsend(::grpc::ServerReaderWriter<iv::queryReplyStream, iv::queryReqStream>* stream,bool * pbrun,
+                std::string * pstrvin,std::string * pstrmd5,bool *pbUpdatemd4orvin,QMutex * pmutex)
+{
+    std::string strvin;
+    std::string strmd5;
+    pmutex->lock();
+    strvin = *pstrmd5;
+    strmd5 = *pstrvin;
+    pmutex->unlock();
+
+    QTime xTime;
+    xTime.start();
+    int nlastsend = xTime.elapsed();
+    int nlastdatatime = 0;
+    while(*pbrun)
+    {
+        if(*pbUpdatemd4orvin)
+        {
+            pmutex->lock();
+            strvin = *pstrmd5;
+            strmd5 = *pstrvin;
+            *pbUpdatemd4orvin = false;
+            pmutex->unlock();
+        }
+
+        int id;
+        qint64 ntime;
+        std::vector<char > xvectorquerydata;
+
+        int nres = gcumsgbuf.getmsg(strvin,strmd5,nlastdatatime,id,ntime,&xvectorquerydata);
+
+        nlastdatatime = ntime;
+
+        iv::queryReplyStream reply;
+        reply.set_nres(nres);
+        if(nres > 0)
+        {
+            reply.set_xdata(xvectorquerydata.data(),xvectorquerydata.size());
+            reply.set_id(id);
+            reply.set_ntime(ntime);
+            stream->Write(reply);
+            nlastsend = xTime.elapsed();
+        }
+        else
+        {
+            if(abs(xTime.elapsed() - nlastsend)>1000)
+            {
+                stream->Write(reply);
+                nlastsend = xTime.elapsed();
+            }
+        }
+
+
+        std::this_thread::sleep_for(std::chrono::milliseconds(1));
+
+
+    }
+}
+
+
+// Logic and data behind the server's behavior.
+class UploadServiceImpl final : public iv::UploadStream::Service {
+  Status upload(ServerContext* context, ::grpc::ServerReaderWriter<iv::UploadReplyStream, iv::UploadRequestStream>* stream) override {
+
+      iv::UploadRequestStream request;
+      bool brun = true;
+      std::string strctrlmd5 = "md5";
+      std::string strvin = "aaa";
+      bool bUpdatemd4orvin = false;
+      QMutex uploadmutex;
+      std::thread * pthread = new std::thread(uploadsend,stream,&brun,&strvin,&strctrlmd5,&bUpdatemd4orvin,&uploadmutex);
+      std::cout<<"new connect."<<std::endl;
+      while (stream->Read(&request))
+      {
+          std::cout<<" rec req."<<std::endl;
+          std::vector<char> xvectordata;
+          qDebug("size is %d",request.xdata().size());
+          if((strctrlmd5 != request.strctrlmd5())||(strvin != request.strvin()))
+          {
+              uploadmutex.lock();
+              strctrlmd5 = request.strctrlmd5();
+              strvin = request.strvin();
+              bUpdatemd4orvin = true;
+              uploadmutex.unlock();
+          }
+          if(request.xdata().size()>0)
+          {
+
+              xvectordata.resize(request.xdata().size());
+              memcpy(xvectordata.data(),request.xdata().data(),request.xdata().size());
+          }
+
+          gcumsgbuf.addmsg(request.id(),request.ntime(),request.strvin(),request.strquerymd5(),request.strctrlmd5(),
+                           &xvectordata,request.bimportant(),request.kepptime());
+//          std::cout << "收到请求,类型为" << request.askmsg() <<"\n"<<std::endl;
+      }
+      std::cout<<" no conn"<<std::endl;
+      brun = false;
+      pthread->join();
+      std::cout<<"dis connect."<<std::endl;
+
+    return Status::OK;
+  }
+
+  Status queryctrl(ServerContext* context, ::grpc::ServerReaderWriter<iv::queryReplyStream, iv::queryReqStream>* stream) override {
+
+      iv::queryReqStream request;
+      bool brun = true;
+      std::string strctrlmd5 = "md5";
+      std::string strquerymd5 = "md5";
+      std::string strvin = "aaa";
+      bool bUpdatemd4orvin = false;
+      QMutex uploadmutex;
+      std::thread * pthread = new std::thread(queryctrlsend,stream,&brun,&strvin,&strquerymd5,&bUpdatemd4orvin,&uploadmutex);
+      std::cout<<"new connect."<<std::endl;
+      while (stream->Read(&request))
+      {
+          std::cout<<" rec req."<<std::endl;
+          std::vector<char> xvectordata;
+          qDebug("size is %d",request.xdata().size());
+          if((strquerymd5 != request.strquerymd5())||(strvin != request.strvin()))
+          {
+              uploadmutex.lock();
+              strquerymd5 = request.strquerymd5();
+              strvin = request.strvin();
+              bUpdatemd4orvin = true;
+              uploadmutex.unlock();
+          }
+          if(request.xdata().size()>0)
+          {
+
+              xvectordata.resize(request.xdata().size());
+              memcpy(xvectordata.data(),request.xdata().data(),request.xdata().size());
+          }
+          static int tempid = 0;
+          tempid++;
+          int nid = gpcmsgbuf.addmsg(tempid,request.ntime(),request.strvin(),request.strctrlmd5(),&xvectordata,
+                                     request.bimportant(),request.kepptime());
+
+          (void)&nid;
+
+//          std::cout << "收到请求,类型为" << request.askmsg() <<"\n"<<std::endl;
+      }
+      std::cout<<" no conn"<<std::endl;
+      brun = false;
+      pthread->join();
+      std::cout<<"dis connect."<<std::endl;
+    return Status::OK;
+  }
+
+//  Status query(ServerContext* context, const iv::queryreq* request,
+//                  iv::queryReply* reply) override {
+
+//      int id;
+//      qint64 ntime;
+//      std::vector<char > xvectorquerydata;
+
+//      int nres = gcumsgbuf.getmsg(request->strvin(),request->strquerymd5(),request->nlasttime(),id,ntime,&xvectorquerydata);
+
+//      reply->set_nres(nres);
+//      if(nres > 0)
+//      {
+//          reply->set_data(xvectorquerydata.data(),xvectorquerydata.size());
+//          reply->set_id(id);
+//          reply->set_ntime(ntime);
+//      }
+
+//      return Status::OK;
+
+
+//  }
+
+//  Status ctrl(ServerContext* context, const iv::ctrlreq* request,
+//                  iv::ctrlReply * reply) override {
+
+//      std::vector<char> xvectordata;
+//      if(request->data().size()>0)
+//      {
+//          xvectordata.resize(request->data().size());
+//          memcpy(xvectordata.data(),request->data().data(),request->data().size());
+//      }
+
+//      int nid = gpcmsgbuf.addmsg(request->id(),request->ntime(),request->strvin(),request->strctrlmd5(),&xvectordata,
+//                                 request->bimportant(),request->kepptime());
+
+//      reply->set_nsendid(nid);
+
+
+//      return Status::OK;
+
+
+//  }
+
+
+};
+
+void RunServer() {
+  std::string server_address("0.0.0.0:50051");
+  UploadServiceImpl service;
+
+  grpc::EnableDefaultHealthCheckService(true);
+//  grpc::reflection::InitProtoReflectionServerBuilderPlugin();
+  ServerBuilder builder;
+  // Listen on the given address without any authentication mechanism.
+  builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
+
+  builder.SetMaxReceiveMessageSize(300000000);
+ // builder.SetMaxMessageSize(100000000);
+
+
+//  builder.SetMaxSendMessageSize(100000000);
+  // Register "service" as the instance through which we'll communicate with
+  // clients. In this case it corresponds to an *synchronous* service.
+  builder.RegisterService(&service);
+  // Finally assemble the server.
+  std::unique_ptr<Server> server(builder.BuildAndStart());
+  std::cout << "Server listening on " << server_address << std::endl;
+
+  // Wait for the server to shutdown. Note that some other thread must be
+  // responsible for shutting down the server for this call to ever return.
+  server->Wait();
+}
+
+
+
+
+
+int main(int argc, char *argv[])
+{
+    QCoreApplication a(argc, argv);
+
+    gpcmsgbuf.start();
+
+    RunServer();
+
+
+
+
+    return a.exec();
+}

+ 171 - 0
src/driver/driver_cloud_grpc_server_stream/pcmsgbuffer.cpp

@@ -0,0 +1,171 @@
+#include "pcmsgbuffer.h"
+
+pcmsgbuffer::pcmsgbuffer()
+{
+
+}
+
+void pcmsgbuffer::run()
+{
+    while(!QThread::isInterruptionRequested())
+    {
+        mMutex.lock();
+        qint64 nowtime = QDateTime::currentMSecsSinceEpoch();
+        int i;
+        for(i=0;i<mvectorpcmsgstate.size();i++)
+        {
+            if((nowtime - mvectorpcmsgstate[i].nsendtime) > OVERTIME)
+            {
+                mvectorpcmsgstate.erase(mvectorpcmsgstate.begin() + i);
+                i--;
+            }
+        }
+        mMutex.unlock();
+        msleep(100);
+    }
+}
+
+unsigned int pcmsgbuffer::addmsg(int id, qint64 ntime, std::string strVIN, std::string strctrlMD5, std::vector<char> *pxdata,bool bImportant,int nkeeptime)
+{
+    unsigned int nrtn;
+    mMutex.lock();
+    iv::pcmsg * pmsg = 0;
+    int nsize = mvectormsg.size();
+    int i;
+    for(i=0;i<nsize;i++)
+    {
+        if(strncmp(mvectormsg[i].strVIN.data(),strVIN.data(),255) == 0)
+        {
+            pmsg = &mvectormsg[i];
+            break;
+        }
+    }
+    if(pmsg == 0)
+    {
+        iv::pcmsg cmsg;
+        cmsg.id = id;
+        cmsg.ntime = ntime;
+        cmsg.strVIN = strVIN;
+        cmsg.strctrlMD5 = strctrlMD5;
+        if(pxdata->size() > 0)
+        {
+            cmsg.xdata.resize(pxdata->size());
+            memcpy(cmsg.xdata.data(),pxdata->data(),pxdata->size());
+        }
+        cmsg.mlastuptime = QDateTime::currentMSecsSinceEpoch();
+        cmsg.nsendid = muid;
+        cmsg.mbImportant = bImportant;
+        cmsg.mkeeptime = nkeeptime;
+        nrtn = muid;
+        muid++;
+        mvectormsg.push_back(cmsg);
+        iv::pcmsgstate xpms;xpms.nsendid = cmsg.nsendid;
+        xpms.nsendtime = QDateTime::currentMSecsSinceEpoch();
+        mvectorpcmsgstate.push_back(xpms);
+        mMutex.unlock();
+        return nrtn;
+    }
+
+    if((pmsg->mbImportant != true)||((QDateTime::currentMSecsSinceEpoch() - pmsg->mlastuptime)>=pmsg->mkeeptime))
+    {
+        changesendstate(pmsg->nsendid,-2);
+        pmsg->mbImportant = bImportant;
+        pmsg->mkeeptime = nkeeptime;
+        pmsg->id = id;
+        pmsg->ntime = ntime;
+        pmsg->strctrlMD5 = strctrlMD5;
+        pmsg->xdata.clear();
+        pmsg->nsendid = muid;
+ //       cmsg.mlastuptime = QDateTime::currentMSecsSinceEpoch();
+        nrtn = muid;
+        iv::pcmsgstate xpms;xpms.nsendid = pmsg->nsendid;
+        pmsg->mlastuptime = QDateTime::currentMSecsSinceEpoch();
+        mvectorpcmsgstate.push_back(xpms);
+        muid++;
+        if(pxdata->size()>0)
+        {
+            pmsg->xdata.resize(pxdata->size());
+            memcpy(pmsg->xdata.data(),pxdata->data(),pxdata->size());
+        }
+    }
+
+
+    mMutex.unlock();
+    return nrtn;
+}
+
+int pcmsgbuffer::getmsg(std::string strVIN, std::string strctrlMD5, int &id, qint64 &ntime, std::vector<char> *pxdata)
+{
+    mMutex.lock();
+    iv::pcmsg * pmsg = 0;
+    int nsize = mvectormsg.size();
+    int i;
+    int npos;
+    for(i=0;i<nsize;i++)
+    {
+        if(strncmp(mvectormsg[i].strVIN.data(),strVIN.data(),255) == 0)
+        {
+            pmsg = &mvectormsg[i];
+            npos = i;
+            break;
+        }
+    }
+
+    if(pmsg == 0)
+    {
+//        std::cout<<" no this vin data"<<std::endl;;
+        mMutex.unlock();
+        return 0;
+    }
+    if(strctrlMD5 != pmsg->strctrlMD5)
+    {
+        std::cout<<" ctrl error."<<std::endl;
+        changesendstate(pmsg->nsendid,-1);
+        mMutex.unlock();
+        return -2;
+    }
+
+    id = pmsg->id;
+    ntime = pmsg->ntime;
+    pxdata->clear();
+    int ndatasize = pmsg->xdata.size();
+    pxdata->resize(ndatasize);
+    memcpy(pxdata->data(),pmsg->xdata.data(),ndatasize);
+    changesendstate(pmsg->nsendid,1);
+
+    mvectormsg.erase(mvectormsg.begin()+npos);
+    mMutex.unlock();
+    return 1;
+}
+
+void pcmsgbuffer::changesendstate(int nsendid, int nstate)
+{
+    int nstatesize = mvectorpcmsgstate.size();
+    int i;
+    for(i=0;i<nstatesize;i++)
+    {
+        if(nsendid == mvectorpcmsgstate[i].nsendid)
+        {
+            mvectorpcmsgstate[i].nstate = nstate; //Send OK
+            break;
+        }
+    }
+}
+
+int pcmsgbuffer::querysendstate(int nsendid)
+{
+    int nrtn = -2;
+    mMutex.lock();
+    int nstatesize = mvectorpcmsgstate.size();
+    int i;
+    for(i=0;i<nstatesize;i++)
+    {
+        if(nsendid == mvectorpcmsgstate[i].nsendid)
+        {
+            nrtn = mvectorpcmsgstate[i].nstate;
+            break;
+        }
+    }
+    mMutex.unlock();
+    return nrtn;
+}

+ 66 - 0
src/driver/driver_cloud_grpc_server_stream/pcmsgbuffer.h

@@ -0,0 +1,66 @@
+#ifndef PCMSGBUFFER_H
+#define PCMSGBUFFER_H
+
+#include <QDateTime>
+#include <QMutex>
+#include <string>
+#include <iostream>
+
+#include <QThread>
+
+namespace iv {
+struct pcmsg
+{
+    int id;
+    qint64 ntime;
+    std::string strVIN;
+    std::string strctrlMD5;
+    std::vector<char> xdata;
+    qint64 mlastuptime;  //更新时间
+    unsigned int nsendid; //send id
+    bool mbImportant = false;
+    int mkeeptime;
+    bool mbhavequery = false;
+};
+}
+
+namespace  iv {
+struct pcmsgstate
+{
+    qint64 nsendtime;
+    unsigned int nsendid;
+    int nstate = 0; //0 wait send 1 send ok -1 md5 error -2 timeout.
+};
+}
+
+class pcmsgbuffer:public QThread
+{
+public:
+    pcmsgbuffer();
+
+private:
+    std::vector<iv::pcmsg> mvectormsg;
+    std::vector<iv::pcmsgstate> mvectorpcmsgstate;
+    QMutex mMutex;
+    const int OVERTIME = 60000; //60 seconds
+    unsigned int muid = 0;
+
+public:
+
+    unsigned int addmsg(int id,qint64 ntime,std::string strVIN,
+                std::string strctrlMD5,std::vector<char> * pxdata,bool bImportant,int nkeeptime);
+
+    //if no new msg return 0
+    // -2 queryMD5 error
+    int getmsg(std::string strVIN,std::string strctrlMD5,int & id,qint64 & ntime,
+                std::vector<char> * pxdata);
+
+    int querysendstate(int nsendid);
+private:
+    void changesendstate(int nsendid,int nstate);
+
+private:
+    void run();
+};
+
+#endif // PCMSGBUFFER_H

+ 3 - 0
src/driver/driver_cloud_grpc_server_stream/prototocpp.txt

@@ -0,0 +1,3 @@
+protoc -I . --plugin=protoc-gen-grpc=/home/yuchuli/git/grpc-framework/build2/grpc_cpp_plugin --grpc_out=. uploadstreammsg.proto
+
+protoc -I . --cpp_out=. uploadmsg.proto 

+ 73 - 0
src/driver/driver_cloud_grpc_server_stream/uploadstreammsg.proto

@@ -0,0 +1,73 @@
+// Copyright 2015 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+option java_multiple_files = true;
+option java_package = "io.grpc.adc.uploadmsg";
+option java_outer_classname = "UploadMsgProto";
+option objc_class_prefix = "HLW";
+
+package iv;
+
+// The Upload service definition.
+service UploadStream {
+  // Sends a Upload
+  rpc upload (stream UploadRequestStream) returns (stream UploadReplyStream) {}
+  rpc queryctrl (stream queryReqStream) returns (stream queryReplyStream) {}
+  
+}
+
+// The request message containing the user's name.
+message UploadRequestStream {
+ // string name = 1;
+
+ //   int nres; //0 no message 1 have message
+    int32 id = 1;
+    int64 ntime = 2;
+    string strVIN = 3;
+    string strqueryMD5 = 4;
+    string strctrlMD5 = 5;
+    bytes xdata = 6;
+    bool bimportant = 7;  //if 1, is important.
+    int32 kepptime = 8;   //If important keep this data before query ms.
+}
+
+// The response message containing the greetings
+message UploadReplyStream {
+  int32 nres = 1;  //0 no message 1 have ctrl message
+  bytes xdata = 2;
+//  string message = 1;
+}
+
+
+message queryReqStream {
+  string strvin = 1;
+  string strqueryMD5 = 2;
+  int64 ntime = 3;  
+  string strctrlMD5 = 4;
+  bytes xdata = 5;
+  bool bimportant = 6;  //if 1, is important.
+  int32 kepptime = 7;   //If important keep this data before ctrl ms.  if -1 must send.
+  int32 ntype = 8;  //0 only query  1 ctrl.
+}
+
+message queryReplyStream {
+    int32 nres = 1;  //0 not online  1 online  -1 querMD5 error  -2 ctrlMD5 error 
+    int32 id = 2;
+    int64 ntime = 3;
+    bytes xdata = 4;
+}
+
+

+ 2 - 2
src/include/proto3/uploadstreammsg.proto

@@ -57,7 +57,7 @@ message queryReqStream {
   string strqueryMD5 = 2;
   int64 ntime = 3;  
   string strctrlMD5 = 4;
-  bytes data = 5;
+  bytes xdata = 5;
   bool bimportant = 6;  //if 1, is important.
   int32 kepptime = 7;   //If important keep this data before ctrl ms.  if -1 must send.
   int32 ntype = 8;  //0 only query  1 ctrl.
@@ -67,7 +67,7 @@ message queryReplyStream {
     int32 nres = 1;  //0 not online  1 online  -1 querMD5 error  -2 ctrlMD5 error 
     int32 id = 2;
     int64 ntime = 3;
-    bytes data = 4;
+    bytes xdata = 4;
 }