У меня есть следующий код, который работает как служба на устройстве на основе Linux.
У него есть один ответ mqtt, который получает сообщение, когда кто-то публикует его в подписке. Один поток предназначен для обработки входящих сообщений в очереди. Как только они обрабатываются, сообщение результата будет перенесено в очередь out_message. Еще один поток - обработка очереди исходящих сообщений. Я использовал условие_variable для совместного использования ресурсов между потоками.Проблема возникает через некоторое время (в случайное время), использование процессора в этом приложении достигает 100%. Любые проблемы в данном коде для исправления моего процесса. Пожалуйста, помогите мне !! Заранее большое спасибо.
void pushMessage(std::string rData) {
in_mutex.lock();
in_queue.push(rData);
in_mutex.unlock();
in_cv.notify_all();
}
void pushOutGoingMessage(Json::Value data) {
out_mutex.lock();
out_queue.push(data);
out_mutex.unlock();
out_cv.notify_all();
}
void processOutGoingMessages() {
while (true) {
Json::Value data;
{
std::unique_lock<std::mutex> lock(out_mutex);
while (out_queue.empty()) {
out_cv.wait(lock);
}
data = out_queue.front();
out_queue.pop();
lock.unlock();
}
if (!data.isNull()) {
parseOutGoingMessages(data);
}
}
}
void processMessage() {
while (true) {
std::string data = "NO_DATA";
{
std::unique_lock<std::mutex> lock(in_mutex, std::try_to_lock);
if (!lock.owns_lock()) {
} else {
while (in_queue.empty()) {
in_cv.wait(lock);
}
data = in_queue.front();
in_queue.pop();
lock.unlock();
}
}
if (data.compare("NO_DATA") != 0) {
parseMessage(data);
}
}
}
void parseOutGoingMessages(Json::Value rJsonMessage) {
// mqtt client send method
mqtt_client.push_message(rJsonMessage.toStyledString(),
rJsonMessage["destination"].asString());
}
void parseMessage(std::string rMessage) {
try {
debug(rMessage);
// application logic
} catch (std::exception &e) {
debug("ERRO HANDLED IN PARSING ::" + std::string(e.what()));
}
}
void connectMQTT() {
// connection params
}
void OnConnectionLost(void *context, char *cause) {
// retry logic
connectMQTT();
}
void delivered(void *context, MQTTClient_deliveryToken dt) {
}
int OnMessageArrived(void *context, char *topicName, int topicLen,
MQTTClient_message *message) {
if (!message->retained) {
std::string msg((char *) message->payload, message->payloadlen);
pushMessage(msg);
}
MQTTClient_freeMessage(&message);
MQTTClient_free(topicName);
return 1;
}
void send(Json::Value rData,std::string rDestination) {
Json::Value jsonNotification;
jsonNotification["destination"] = rDestination;
jsonNotification["data"] = rData;
pushOutGoingMessage(jsonNotification);
}
int main(int argc, char **argv) {
connectMQTT();
std::thread procInMessage(processMessage);
std::thread procOutMessage(processOutGoingMessages);
procInMessage.join();
procOutMessage.join();
}
Спасибо @MatthewFisher и @Mike Vine. Я просто модифицирую методы push для обеих очередей.
void pushMessage(std::string rData) {
// in_mutex.lock();
// in_queue.push(rData);
// in_mutex.unlock();
// in_cv.notify_all();
std::unique_lock<std::mutex> lock(in_mutex);
in_queue.push(rData);
lock.unlock();
in_cv.notify_all();
}
void pushOutGoingMessage(Json::Value data) {
// out_mutex.lock();
// out_queue.push(data);
// out_mutex.unlock();
// out_cv.notify_all();
std::unique_lock<std::mutex> lock(out_mutex);
out_queue.push(data);
lock.unlock();
out_cv.notify_all();
}
И проблема решена, я думаю. Это было purley из-за (! Lock.owns_lock ()) {} в методе push для обеих очередей.