Высокое использование ЦП после некоторого времени

У меня есть следующий код, который работает как служба на устройстве на основе Linux.

У него есть один ответ mqtt, который получает сообщение, когда кто-то публикует его в подписке. Один поток предназначен для обработки входящих сообщений в очереди. Как только они обрабатываются, сообщение результата будет перенесено в очередь out_message. Еще один поток - обработка очереди исходящих сообщений. Я использовал условие_variable для совместного использования ресурсов между потоками.

Проблема возникает через некоторое время (в случайное время), использование процессора в этом приложении достигает 100%. Любые проблемы в данном коде для исправления моего процесса. Пожалуйста, помогите мне !! Заранее большое спасибо.

void pushMessage(std::string rData) {
    in_mutex.lock();
    in_queue.push(rData);
    in_mutex.unlock();
    in_cv.notify_all();
}

void pushOutGoingMessage(Json::Value data) {
    out_mutex.lock();
    out_queue.push(data);
    out_mutex.unlock();
    out_cv.notify_all();
}
void processOutGoingMessages() {
    while (true) {
        Json::Value data;
        {
            std::unique_lock<std::mutex> lock(out_mutex);
            while (out_queue.empty()) {
                out_cv.wait(lock);
            }
            data = out_queue.front();
            out_queue.pop();
            lock.unlock();
        }
        if (!data.isNull()) {
            parseOutGoingMessages(data);
        }
    }
}

void processMessage() {
    while (true) {
        std::string data = "NO_DATA";
        {
            std::unique_lock<std::mutex> lock(in_mutex, std::try_to_lock);
            if (!lock.owns_lock()) {

            } else {
                while (in_queue.empty()) {
                    in_cv.wait(lock);
                }
                data = in_queue.front();
                in_queue.pop();
                lock.unlock();
            }
        }
        if (data.compare("NO_DATA") != 0) {
            parseMessage(data);
        }
    }
}

void parseOutGoingMessages(Json::Value rJsonMessage) {
    // mqtt client send method
    mqtt_client.push_message(rJsonMessage.toStyledString(),
            rJsonMessage["destination"].asString());
}

void parseMessage(std::string rMessage) {
    try {
        debug(rMessage);
        // application logic
    } catch (std::exception &e) {
        debug("ERRO HANDLED IN PARSING ::" + std::string(e.what()));
    }
}

void connectMQTT() {
    // connection params
}

void OnConnectionLost(void *context, char *cause) {
    // retry logic
    connectMQTT();
}

void delivered(void *context, MQTTClient_deliveryToken dt) {

}
int OnMessageArrived(void *context, char *topicName, int topicLen,
        MQTTClient_message *message) {
    if (!message->retained) {
        std::string msg((char *) message->payload, message->payloadlen);
        pushMessage(msg);
    }
    MQTTClient_freeMessage(&message);
    MQTTClient_free(topicName);
    return 1;
}
void send(Json::Value rData,std::string rDestination) {
    Json::Value jsonNotification;
    jsonNotification["destination"] = rDestination;
    jsonNotification["data"] = rData;
    pushOutGoingMessage(jsonNotification);
}
int main(int argc, char **argv) {
    connectMQTT();
    std::thread procInMessage(processMessage);
    std::thread procOutMessage(processOutGoingMessages);
    procInMessage.join();
    procOutMessage.join();
}
1
задан 13 August 2018 в 15:48

1 ответ

Спасибо @MatthewFisher и @Mike Vine. Я просто модифицирую методы push для обеих очередей.

void pushMessage(std::string rData) {
//  in_mutex.lock();
//  in_queue.push(rData);
//  in_mutex.unlock();
//  in_cv.notify_all();
    std::unique_lock<std::mutex> lock(in_mutex);
    in_queue.push(rData);
    lock.unlock();
    in_cv.notify_all();
}

void pushOutGoingMessage(Json::Value data) {
//  out_mutex.lock();
//  out_queue.push(data);
//  out_mutex.unlock();
//  out_cv.notify_all();
    std::unique_lock<std::mutex> lock(out_mutex);
    out_queue.push(data);
    lock.unlock();
    out_cv.notify_all();
}

И проблема решена, я думаю. Это было purley из-за (! Lock.owns_lock ()) {} в методе push для обеих очередей.

1
ответ дан 15 August 2018 в 17:01

Другие вопросы по тегам:

Похожие вопросы: