Skip to content

Commit

Permalink
work around mqtt memory leak
Browse files Browse the repository at this point in the history
  • Loading branch information
the-butcher committed May 15, 2024
1 parent 088b355 commit 39a80c0
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 19 deletions.
1 change: 1 addition & 0 deletions moth_core/moth_core_p22/platformio.ini
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ board_build.f_cpu = 80000000L
board_build.f_flash = 80000000L
board_build.flash_mode = qio
board_build.partitions = min_spiffs.csv
monitor_speed = 115200
lib_deps =
adafruit/Adafruit NeoPixel@^1.12.0
adafruit/Adafruit BusIO@^1.15.0
Expand Down
4 changes: 2 additions & 2 deletions moth_core/moth_core_p22/src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ uint16_t actionNum = 0;
* OK reimplement OTA update, TODO :: test
* -- MQTT:
* -- enable MQTT publishing of non-published data from file?
* -- how can a file that has been fully published be tagged appropriately?
* -- can not reconnect MQTT after a number of connections, mosquitto or device problem, TODO :: analyze SSL error from mosquitto log?
* -- files still having publishable records need to be identifiable (could be by file ending, dat(a) | arc(hive) )
* -- mqtt update after MQTT permanent failure freezes the device action cycle (wifi is still responsive though)
* -- SENSOR: scd041 does not properly reconfigure after i.e. temperatureOffset update through upload
* -- WIFI: wifi sometimes turns off for unknown reasons (more likely when multiple requests are pending), and sometimes ends up in a state where it can not be turned back on
* -- MISC: be sure data is written every 60 minutes (not 59 or 61)
Expand Down
2 changes: 1 addition & 1 deletion moth_core/moth_core_p22/src/modules/DatCsvResponse.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ DatCsvResponse::DatCsvResponse(String path) : AsyncAbstractResponse() {

lastModified = SensorTime::getDateTimeLastModString(_content);

String dayPath = SensorTime::getFile32Def(SensorTime::getSecondstime(), "dat").name; // slash in first char pos
String dayPath = SensorTime::getFile32Def(SensorTime::getSecondstime() - SECONDS_PER_____________HOUR, "dat").name; // slash in first char pos
if (dayPath.indexOf(path) < 0) {
addHeader("Cache-Control", "max-age=31536000");
} else {
Expand Down
3 changes: 2 additions & 1 deletion moth_core/moth_core_p22/src/modules/File32Response.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ File32Response::File32Response(String path, String contentType) : AsyncAbstractR
lastModified = SensorTime::getDateTimeLastModString(_content);

if (path.indexOf(".dat") > 0) {
String dayPath = SensorTime::getFile32Def(SensorTime::getSecondstime(), "dat").name; // slash in first char pos
// the last write on the previous day file may be up to one hour delayed into the next day
String dayPath = SensorTime::getFile32Def(SensorTime::getSecondstime() - SECONDS_PER_____________HOUR, "dat").name; // slash in first char pos
if (dayPath.indexOf(path) < 0) {
addHeader("Cache-Control", "max-age=31536000");
} else {
Expand Down
1 change: 0 additions & 1 deletion moth_core/moth_core_p22/src/modules/ModuleCard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ void ModuleCard::historyValues(config_t& config, values_all_t history[HISTORY___
if (abs(secondstimeDiff) <= 30) {
historyIndexMax = historyIndex;
history[historyIndex] = readValue;
} else {
}
if (datFile.available() <= 1) {
break;
Expand Down
5 changes: 4 additions & 1 deletion moth_core/moth_core_p22/src/modules/ModuleHttp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,14 @@ void ModuleHttp::handleApiValCsv(AsyncWebServerRequest *request) {
}

void ModuleHttp::handleApiDatCsv(AsyncWebServerRequest *request) {

ModuleWifi::access();
if (request->hasParam("file")) {
// TODO :: dat|arc
String datFileName = "/" + request->getParam("file")->value();
String csvLine;
if (ModuleCard::existsPath(datFileName)) {
DatCsvResponse *response = new DatCsvResponse(datFileName); // cache headers in ValcsvResponse
DatCsvResponse *response = new DatCsvResponse(datFileName);
if (request->hasHeader("If-Modified-Since")) {
String ifModifiedSince = request->getHeader("If-Modified-Since")->value();
if (!response->wasModifiedSince(ifModifiedSince)) {
Expand Down Expand Up @@ -324,6 +326,7 @@ void ModuleHttp::handleApiValOut(AsyncWebServerRequest *request) {
void ModuleHttp::handleApiDatOut(AsyncWebServerRequest *request) {
ModuleWifi::access();
if (request->hasParam("file")) {
// TODO :: dat|arc
ModuleHttp::serveFile32(request, request->getParam("file")->value());
} else {
ModuleHttp::serve400Json(request, "file required");
Expand Down
86 changes: 73 additions & 13 deletions moth_core/moth_core_p22/src/modules/ModuleMqtt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ mqtt____stat__e ModuleMqtt::checkCliStat(PubSubClient* mqttClient) {

void ModuleMqtt::publish(config_t& config) {

#ifdef USE___SERIAL
Serial.printf("before mqtt publish, heap: %u\n", ESP.getFreeHeap());
#endif

ModuleCard::begin();

if (!ModuleCard::existsPath(MQTT_CONFIG__DAT)) {
Expand All @@ -123,15 +127,19 @@ void ModuleMqtt::publish(config_t& config) {
if (datStat == MQTT______________OK) { // a set of config worth trying

WiFiClient* wifiClient;
char* certFileData = NULL;
if (mqtt.crt != "" && ModuleCard::existsPath(mqtt.crt)) {
wifiClient = new WiFiClientSecure();
File32 certFile;
certFile.open(mqtt.crt, O_RDONLY);
((WiFiClientSecure*)wifiClient)->loadCACert(certFile, certFile.size());
certFileData = (char*)malloc(certFile.size() + 1);
certFile.readBytes(certFileData, certFile.size());
((WiFiClientSecure*)wifiClient)->setCACert(certFileData);
certFile.close();
} else {
wifiClient = new WiFiClient();
}

PubSubClient* mqttClient;
mqttClient = new PubSubClient(*wifiClient);
mqttClient->setServer(mqtt.srv, mqtt.prt);
Expand All @@ -140,24 +148,69 @@ void ModuleMqtt::publish(config_t& config) {
} else {
mqttClient->connect(mqtt.cli); // connect without credentials
}

if (mqttClient->connected()) {

config.mqtt.mqttStatus = MQTT______________OK;
config.mqtt.mqttPublishMinutes = mqtt.min; // set to configured interval
config.mqtt.mqttFailureCount = 0;

// max publishable features
uint32_t lineLimit = min((uint32_t)Values::values->nextMeasureIndex, (uint32_t)MEASUREMENT_BUFFER_SIZE);
values_all_t datValue;
uint16_t year = 2024;
File32 yearFolder;
File32 mnthFolder;
File32 dataFile;
values_all_t readValue;

while (year > 0) {
String folderYearName = String(year);
if (ModuleCard::existsPath(folderYearName)) {
if (yearFolder) {
yearFolder.close();
}
yearFolder.open(folderYearName.c_str(), O_RDONLY);
while (mnthFolder.openNext(&yearFolder, O_RDONLY)) {
if (mnthFolder.isDirectory()) {
while (dataFile.openNext(&mnthFolder, O_RDONLY)) {
char fileDataNameBuffer[32];
dataFile.getName(fileDataNameBuffer, 32);
String fileDataName = String(fileDataNameBuffer);
// TODO :: check for file extension, then open and read data
if (fileDataName.endsWith("dat")) {
while (dataFile.available() > 1) {
dataFile.read((byte*)&readValue, sizeof(readValue)); // read one measurement from the file
// TODO :: if publishable -> publish
}
// TODO :: rename file to pub (instead of pnd, or a better fitting file extension)
}
dataFile.close();
}
}
mnthFolder.close();
}
year++; // continue with next year
} else {
year = 0; // no further year search
}
}
if (dataFile) {
dataFile.close();
}
if (mnthFolder) {
mnthFolder.close();
}
if (yearFolder) {
yearFolder.close();
}

uint32_t lineLimit = min((uint32_t)Values::values->nextMeasureIndex, (uint32_t)MEASUREMENT_BUFFER_SIZE);
uint32_t dataIndex;
for (uint32_t lineIndex = 0; lineIndex < lineLimit; lineIndex++) { // similar code in ValcsvResponse
dataIndex = lineIndex + Values::values->nextMeasureIndex - lineLimit;
datValue = Values::values->measurements[dataIndex % MEASUREMENT_BUFFER_SIZE];
if (datValue.publishable) {

// char mqttClidCO2[mqttClid.length() + 5];
// sprintf(mqttClidCO2, "%s/%s", mqttClid, "CO2");

DynamicJsonBuffer jsonBuffer;
JsonObject& root = jsonBuffer.createObject();
root[FIELD_NAME____TIME] = SensorTime::getDateTimeSecondsString(datValue.secondstime);
Expand Down Expand Up @@ -206,16 +259,23 @@ void ModuleMqtt::publish(config_t& config) {
// if publishing measurements already stored in file, those files would have to be rewritten with publishable = false flags
// how could the file be tagged as completely published

mqttClient->disconnect();
delete mqttClient;
delete wifiClient;
mqttClient = NULL;
wifiClient = NULL;
mqttClient->disconnect(); // calls stop() on wificlient

} else {
config.mqtt.mqttStatus = ModuleMqtt::checkCliStat(mqttClient);
}

wifiClient->stop(); // explicit stop to be sure it happened (before resetting certFileData)
if (certFileData != NULL) {
free(const_cast<char*>(certFileData)); // there seemed to be a memory issue with _CA_cert not being released when closing/destroying the WifiClient
certFileData = NULL;
}

delete mqttClient; // releases some memory buffer
delete wifiClient; // calls stop (again) and deletes an internal sslclient instance
mqttClient = NULL;
wifiClient = NULL;

} else { // datStat other than MQTT______________OK
config.mqtt.mqttStatus = datStat;
config.mqtt.mqttFailureCount = 5; // treat as non-recoverable
Expand All @@ -239,18 +299,18 @@ void ModuleMqtt::publish(config_t& config) {
config.mqtt.mqttFailureCount++;
if (config.mqtt.mqttFailureCount > 3) {
#ifdef USE___SERIAL
Serial.printf("before returning from mqtt failure (non-recoverable), stat: %d\n", config.mqtt.mqttStatus);
Serial.printf("returning from mqtt failure (non-recoverable), stat: %d, heap: %u\n", config.mqtt.mqttStatus, ESP.getFreeHeap());
#endif
config.mqtt.mqttPublishMinutes = MQTT_PUBLISH___NEVER; // no update
} else {
#ifdef USE___SERIAL
Serial.printf("before returning from mqtt failure (recoverable), stat: %d\n", config.mqtt.mqttStatus);
Serial.printf("returning from mqtt failure (recoverable), stat: %d, heap: %u\n", config.mqtt.mqttStatus, ESP.getFreeHeap());
#endif
}
} else {
// do nothing the correct interval must have been set when the status was set to OK, failure count reset there as well
#ifdef USE___SERIAL
Serial.println("before returning from mqtt success");
Serial.printf("returning from mqtt success, heap: %u\n", ESP.getFreeHeap());
#endif
}
}

0 comments on commit 39a80c0

Please sign in to comment.