Limit spamming the MQTT server

This commit is contained in:
Adrian Jagielak 2025-07-23 22:31:26 +02:00
parent a0c3c67926
commit 8af915cebc
No known key found for this signature in database
GPG Key ID: 0818CF7AF6C62BFB
2 changed files with 24 additions and 6 deletions

View File

@ -1,4 +1,4 @@
import { connectHub, connectHA } from "./client";
import { connectHub, connectHA, RetainedMessage } from "./client";
import { log } from "./logger";
import { FimpResponse, sendFimpMsg, setFimp } from "./fimp/fimp";
import { haCommandHandlers, setHa, setHaCommandHandlers } from "./ha/globals";
@ -25,10 +25,18 @@ import { haUpdateAvailability } from "./ha/update_availability";
log.info("Connected to HA broker");
if (!demoMode && (!hubUsername || !hubPassword)) {
log.info("Empty username or password in non-demo mode. Removing all Futurehome devices from Home Assistant...");
retainedMessages.forEach((retainedMessage) => {
ha?.publish(retainedMessage.topic, '', { retain: true, qos: 2 });
});
const delay = 50; // milliseconds between each publish
const publishWithDelay = (messages: RetainedMessage[], index = 0) => {
if (index >= messages.length) return;
const msg = messages[index];
ha?.publish(msg.topic, '', { retain: true, qos: 2 });
setTimeout(() => publishWithDelay(messages, index + 1), delay);
};
publishWithDelay(retainedMessages);
return;
}
@ -77,16 +85,19 @@ import { haUpdateAvailability } from "./ha/update_availability";
if (!basicDeviceData || !firstServiceAddr) {
log.debug('Device was removed, removing from HA.');
ha?.publish(haDevice.topic, '', { retain: true, qos: 2 });
await delay(50);
}
} else if (deviceId.toLowerCase() === "hub") {
// Hub admin tools, ignore
} else {
log.debug('Invalid format, removing.');
ha?.publish(haDevice.topic, '', { retain: true, qos: 2 });
await delay(50);
}
} else {
log.debug('Invalid format, removing.');
ha?.publish(haDevice.topic, '', { retain: true, qos: 2 });
await delay(50);
}
}
@ -107,12 +118,14 @@ import { haUpdateAvailability } from "./ha/update_availability";
const deviceInclusionReport = undefined;
const result = haPublishDevice({ hubId, vinculumDeviceData, deviceInclusionReport });
await delay(50);
Object.assign(commandHandlers, result.commandHandlers);
if (!retainedMessages.some(msg => msg.topic === `homeassistant/device/futurehome_${hubId}_${deviceId}/availability`)) {
// Set initial availability
haUpdateAvailability({ hubId, deviceAvailability: { address: deviceId, status: 'UP' } });
await delay(50);
}
} catch (e) {
log.error('Failed publishing device', device, e);
@ -123,7 +136,7 @@ import { haUpdateAvailability } from "./ha/update_availability";
// todo
// exposeSmarthubTools();
fimp.on("message", (topic, buf) => {
fimp.on("message", async (topic, buf) => {
try {
const msg: FimpResponse = JSON.parse(buf.toString());
log.debug(`Received FIMP message on topic "${topic}":\n${JSON.stringify(msg, null, 0)}`);
@ -134,6 +147,7 @@ import { haUpdateAvailability } from "./ha/update_availability";
if (!devicesState) { return; }
for (const deviceState of devicesState) {
haUpdateState({ hubId, deviceState });
await delay(50);
}
break;
}
@ -152,6 +166,7 @@ import { haUpdateAvailability } from "./ha/update_availability";
if (!devicesAvailability) { return; }
for (const deviceAvailability of devicesAvailability) {
haUpdateAvailability({ hubId, deviceAvailability });
await delay(50);
}
break;
}

3
futurehome/src/utils.ts Normal file
View File

@ -0,0 +1,3 @@
function delay(ms: number) {
return new Promise(resolve => setTimeout(resolve, ms));
}