From 8af915cebc150cd349d02e5d51e252f70496dc90 Mon Sep 17 00:00:00 2001 From: Adrian Jagielak Date: Wed, 23 Jul 2025 22:31:26 +0200 Subject: [PATCH] Limit spamming the MQTT server --- futurehome/src/index.ts | 27 +++++++++++++++++++++------ futurehome/src/utils.ts | 3 +++ 2 files changed, 24 insertions(+), 6 deletions(-) create mode 100644 futurehome/src/utils.ts diff --git a/futurehome/src/index.ts b/futurehome/src/index.ts index a4405a6..a8ba736 100644 --- a/futurehome/src/index.ts +++ b/futurehome/src/index.ts @@ -1,4 +1,4 @@ -import { connectHub, connectHA } from "./client"; +import { connectHub, connectHA, RetainedMessage } from "./client"; import { log } from "./logger"; import { FimpResponse, sendFimpMsg, setFimp } from "./fimp/fimp"; import { haCommandHandlers, setHa, setHaCommandHandlers } from "./ha/globals"; @@ -25,10 +25,18 @@ import { haUpdateAvailability } from "./ha/update_availability"; log.info("Connected to HA broker"); if (!demoMode && (!hubUsername || !hubPassword)) { - log.info("Empty username or password in non-demo mode. Removing all Futurehome devices from Home Assistant..."); - retainedMessages.forEach((retainedMessage) => { - ha?.publish(retainedMessage.topic, '', { retain: true, qos: 2 }); - }); + const delay = 50; // milliseconds between each publish + + const publishWithDelay = (messages: RetainedMessage[], index = 0) => { + if (index >= messages.length) return; + + const msg = messages[index]; + ha?.publish(msg.topic, '', { retain: true, qos: 2 }); + + setTimeout(() => publishWithDelay(messages, index + 1), delay); + }; + + publishWithDelay(retainedMessages); return; } @@ -77,16 +85,19 @@ import { haUpdateAvailability } from "./ha/update_availability"; if (!basicDeviceData || !firstServiceAddr) { log.debug('Device was removed, removing from HA.'); ha?.publish(haDevice.topic, '', { retain: true, qos: 2 }); + await delay(50); } } else if (deviceId.toLowerCase() === "hub") { // Hub admin tools, ignore } else { log.debug('Invalid format, removing.'); ha?.publish(haDevice.topic, '', { retain: true, qos: 2 }); + await delay(50); } } else { log.debug('Invalid format, removing.'); ha?.publish(haDevice.topic, '', { retain: true, qos: 2 }); + await delay(50); } } @@ -107,12 +118,14 @@ import { haUpdateAvailability } from "./ha/update_availability"; const deviceInclusionReport = undefined; const result = haPublishDevice({ hubId, vinculumDeviceData, deviceInclusionReport }); + await delay(50); Object.assign(commandHandlers, result.commandHandlers); if (!retainedMessages.some(msg => msg.topic === `homeassistant/device/futurehome_${hubId}_${deviceId}/availability`)) { // Set initial availability haUpdateAvailability({ hubId, deviceAvailability: { address: deviceId, status: 'UP' } }); + await delay(50); } } catch (e) { log.error('Failed publishing device', device, e); @@ -123,7 +136,7 @@ import { haUpdateAvailability } from "./ha/update_availability"; // todo // exposeSmarthubTools(); - fimp.on("message", (topic, buf) => { + fimp.on("message", async (topic, buf) => { try { const msg: FimpResponse = JSON.parse(buf.toString()); log.debug(`Received FIMP message on topic "${topic}":\n${JSON.stringify(msg, null, 0)}`); @@ -134,6 +147,7 @@ import { haUpdateAvailability } from "./ha/update_availability"; if (!devicesState) { return; } for (const deviceState of devicesState) { haUpdateState({ hubId, deviceState }); + await delay(50); } break; } @@ -152,6 +166,7 @@ import { haUpdateAvailability } from "./ha/update_availability"; if (!devicesAvailability) { return; } for (const deviceAvailability of devicesAvailability) { haUpdateAvailability({ hubId, deviceAvailability }); + await delay(50); } break; } diff --git a/futurehome/src/utils.ts b/futurehome/src/utils.ts new file mode 100644 index 0000000..3db253c --- /dev/null +++ b/futurehome/src/utils.ts @@ -0,0 +1,3 @@ +function delay(ms: number) { + return new Promise(resolve => setTimeout(resolve, ms)); +}