modulecomm_impl.cpp 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. #include "modulecomm_impl.h"
  2. #include <thread>
  3. #include <iostream>
  4. #include <QDateTime>
  5. #include <QMutex>
  6. #include <QFile>
  7. namespace iv {
  8. namespace modulecomm {
  9. QMutex gmodulecomm_dds_Mutex;
  10. int createcount = 0;
  11. }
  12. }
  13. void modulecomm_impl::callbackTopic(const TopicSample::Message& message) {
  14. // static int counter = 0;
  15. // std::cout << "Message: counter = " << message.counter << std::endl
  16. // << " message = " << message.msgname.in() << std::endl;
  17. // std::cout<<"xlen is "<<message.xdata.length()<<std::endl;
  18. QDateTime dt = QDateTime::fromMSecsSinceEpoch(message.sendtime);
  19. // std::cout<<" recv time is "<<QDateTime::currentDateTime().toString("yyyy-MM-dd-hh-mm-ss-zzz").toLatin1().data()<<std::endl;
  20. if(mbFunPlus)
  21. {
  22. mFun((char *)message.xdata.get_buffer(),message.xdata.length(),message.counter,&dt,message.msgname.in());
  23. }
  24. else
  25. {
  26. (*mpCall)((char *)message.xdata.get_buffer(),message.xdata.length(),message.counter,&dt,message.msgname.in());
  27. }
  28. }
  29. int modulecomm_impl::GetTempConfPath(char *strpath)
  30. {
  31. char strtmppath[256];
  32. QDateTime dt = QDateTime::currentDateTime();
  33. snprintf(strtmppath,256,"/tmp/adc_modulecomm_conf_%04d%02d%02d%02d%02d.ini",dt.date().year(),
  34. dt.date().month(),dt.date().day(),dt.time().hour(),dt.time().minute());
  35. QFile xFile;
  36. xFile.setFileName(strtmppath);
  37. char strtem[256];
  38. char strdata[10000];
  39. snprintf(strdata,10000,"");
  40. if(!xFile.exists())
  41. {
  42. if(xFile.open(QIODevice::ReadWrite))
  43. {
  44. snprintf(strtem,256,"[common]\n");strncat(strdata,strtem,10000);
  45. snprintf(strtem,256,"DCPSDefaultDiscovery=TheRTPSConfig\n");strncat(strdata,strtem,10000);
  46. #ifdef dds_use_shm
  47. snprintf(strtem,256,"DCPSGlobalTransportConfig=myconfig\n");strncat(strdata,strtem,10000);
  48. snprintf(strtem,256,"[config/myconfig]\n");strncat(strdata,strtem,10000);
  49. snprintf(strtem,256,"transports=share\n");strncat(strdata,strtem,10000);
  50. snprintf(strtem,256,"[transport/share]\n");strncat(strdata,strtem,10000);
  51. snprintf(strtem,256,"transport_type=shmem\n");strncat(strdata,strtem,10000);
  52. snprintf(strtem,256,"pool_size=100000000\n");strncat(strdata,strtem,10000);
  53. #endif
  54. snprintf(strtem,256,"[rtps_discovery/TheRTPSConfig]\n");strncat(strdata,strtem,10000);
  55. snprintf(strtem,256,"ResendPeriod=5\n");strncat(strdata,strtem,10000);
  56. xFile.write(strdata,strnlen(strdata,10000));
  57. xFile.close();
  58. }
  59. }
  60. strncpy(strpath,strtmppath,255);
  61. return 0;
  62. }
  63. modulecomm_impl::modulecomm_impl(const char * strcommname,int ntype )
  64. {
  65. int xargc = 3;
  66. char * xargv[3];
  67. xargv[0] = "pub";
  68. xargv[1]= "-DCPSConfigFile";
  69. xargv[2] = "configuration.ini";
  70. char strtmppath[256];
  71. QFile xFile;
  72. xFile.setFileName(xargv[2]);
  73. if(!xFile.exists())
  74. {
  75. GetTempConfPath(strtmppath);
  76. xargv[2] = strtmppath;
  77. }
  78. strncpy(mstrtopic,strcommname,255);
  79. iv::modulecomm::gmodulecomm_dds_Mutex.lock();
  80. if(ntype == type_recv)
  81. {
  82. mpSub = new Subscriber(xargc,xargv,strcommname);
  83. // std::this_thread::sleep_for(std::chrono::milliseconds(10));
  84. mnType = type_recv;
  85. }
  86. else
  87. {
  88. mpPub = new Publisher(xargc,xargv,strcommname);
  89. // std::this_thread::sleep_for(std::chrono::milliseconds(10));
  90. mnType = type_send;
  91. }
  92. iv::modulecomm::createcount++;
  93. std::cout<<"count is "<<iv::modulecomm::createcount<<std::endl;
  94. iv::modulecomm::gmodulecomm_dds_Mutex.unlock();
  95. }
  96. int modulecomm_impl::listenmsg(ModuleFun xFun)
  97. {
  98. if(mnType == type_send)
  99. {
  100. std::cout<<"send not listen."<<std::endl;;
  101. return -1;
  102. }
  103. mbFunPlus = true;
  104. mFun = xFun;
  105. std::function<void (const TopicSample::Message&)> topicFunction = std::bind(&modulecomm_impl::callbackTopic,this,std::placeholders::_1);
  106. mpSub->setReceivedTopicFunction(topicFunction);
  107. return 0;
  108. }
  109. int modulecomm_impl::listenmsg(SMCallBack pCall)
  110. {
  111. if(mnType == type_send)
  112. {
  113. std::cout<<"send not listen."<<std::endl;
  114. return -1;
  115. }
  116. mbFunPlus = false;
  117. mpCall = pCall;
  118. std::function<void (const TopicSample::Message&)> topicFunction = std::bind(&modulecomm_impl::callbackTopic,this,std::placeholders::_1);
  119. mpSub->setReceivedTopicFunction(topicFunction);
  120. return 0;
  121. }
  122. void modulecomm_impl::writemsg(const char *str, int nlen)
  123. {
  124. if(mnType == type_recv)
  125. {
  126. std::cout<<"recv not send."<<std::endl;
  127. return ;
  128. }
  129. mpPub->sendMessage(mstrtopic,str,nlen);
  130. }