Why Your MQTT Client Is Silently Losing Messages (And How I Fixed It)
I learned this the hard way.
I was building a sensor system for a field deployment — Raspberry Pi units publishing temperature and humidity data over 4G cellular to an MQTT broker. The dashboard looked fine. The graphs looked fine. Then one day I compared the raw sensor logs against what actually made it to the broker.
Thousands of readings. Gone. No errors. No warnings. Just gone.
The culprit? paho-mqtt's default behaviour when the broker is unreachable: it silently drops your message and moves on.
After losing enough data I wrote a library to fix it. It's now on PyPI as robmqtt.
pip install robmqtt
But before I show you how it works, let me show you exactly what the problem is — because it's subtler than most people realise.
The Problem with Standard MQTT Clients
When you call client.publish() in paho-mqtt and the broker is unreachable, one of two things happens:
- The message is silently discarded (QoS 0)
- The message is queued in memory for QoS 1/2 — but that queue is lost on process restart, and there's a second gap that even QoS 1 doesn't close
That second gap is the sneaky one. Here's what happens with QoS 1:
The message was "sent" from your perspective. It was never confirmed from the broker's perspective. And paho has no mechanism to track this gap across reconnections.
On a stable data centre network, this almost never matters. On a Raspberry Pi running on 4G cellular in a field cabinet, it happens constantly.
What a Resilient Edge Client Actually Needs
After losing enough data, I sat down and wrote out what a proper edge MQTT client needs to do:
1. Persist offline messages to disk
If the broker is unreachable when publish() is called, the message should be written to disk and replayed later. Not held in memory — memory is lost on restart.
2. Track in-flight messages separately
Messages that have been handed to the broker but not yet ACK'd need to be tracked. On reconnect, they must be re-sent before any queued messages start draining.
3. Priority-based eviction
When the queue fills up, not all messages are equal. A critical alarm should survive. A routine telemetry reading from 6 hours ago should not block it.
4. Exponential backoff on reconnect
A fleet of 50 devices coming back online after a broker restart should not all hammer the broker at the same second.
5. Thread-safe storage
The MQTT network thread and your application thread are both touching message state. This needs to be safe without forcing the caller to think about locking.
None of this is exotic. All of it is missing from the standard paho-mqtt client when used out of the box.
How robmqtt Solves It
Here's the architecture:
The Offline Queue
When the client detects it's disconnected, publish() routes to an OfflineQueue backed by SQLite:
# Simplified from offline_queue.py
class OfflineQueue:
def enqueue(self, topic, payload, qos, priority):
with self._lock:
self._db.execute("""
INSERT INTO queue (topic, payload, qos, priority, timestamp)
VALUES (?, ?, ?, ?, ?)
""", (topic, payload, qos, priority, time.time()))
def dequeue_batch(self, batch_size=10):
with self._lock:
# Highest priority first, then oldest first within same priority
return self._db.execute("""
SELECT id, topic, payload, qos FROM queue
ORDER BY priority DESC, timestamp ASC
LIMIT ?
""", (batch_size,)).fetchall()
SQLite gives you durability without a separate process. It survives power cycles. The threading lock means your application thread and the drain thread never step on each other.
The Inflight Tracker
This closes the gap QoS 1 leaves open:
# Simplified from inflight_tracker.py
class InflightTracker:
def track(self, mid, topic, payload, qos):
"""Call this when you hand a message to paho."""
with self._lock:
self._db.execute("""
INSERT OR REPLACE INTO inflight (mid, topic, payload, qos)
VALUES (?, ?, ?, ?)
""", (mid, topic, payload, qos))
def acknowledge(self, mid):
"""Call this in on_publish callback."""
with self._lock:
self._db.execute("DELETE FROM inflight WHERE mid = ?", (mid,))
def get_all_pending(self):
"""Call this on reconnect — re-send everything unacknowledged."""
with self._lock:
return self._db.execute(
"SELECT topic, payload, qos FROM inflight"
).fetchall()
On reconnect, the client replays all inflight messages first, then starts draining the offline queue. Delivery order is preserved.
Priority Eviction
Each message gets a priority from 1 (lowest) to 10 (highest):
# Routine telemetry — can be evicted when queue is full
client.publish(
topic="sensors/temperature",
payload='{"value": 23.5}',
qos=1,
priority=3
)
# Critical alert — survives eviction, displaces old telemetry
client.publish(
topic="alerts/critical",
payload='{"type": "over_temp", "value": 87.2}',
qos=2,
priority=9
)
When the queue hits capacity, the lowest-priority messages are evicted first. Your critical alerts are never blocked by a backlog of stale routine data.
Using robmqtt
Install:
pip install robmqtt
Basic usage — this is everything you need:
from robmqtt import ProductionMQTTClient
import json
client = ProductionMQTTClient(
client_id="field_device_001",
broker_host="mqtt.yourdomain.com",
broker_port=1883,
max_queue_size=5000, # holds ~5000 messages during outages
min_backoff=2, # start retrying after 2s
max_backoff=60, # cap retry interval at 60s
db_path="./device.db", # SQLite lives here — survives reboots
)
client.connect()
client.start()
# From here just call publish() — routing is handled internally.
# Connected: sends directly and tracks inflight.
# Disconnected: writes to SQLite, drains automatically on reconnect.
while True:
reading = read_sensor()
client.publish(
topic="sensors/temperature",
payload=json.dumps(reading),
qos=1,
priority=5,
)
time.sleep(30)
The application code doesn't need to know whether the broker is reachable. That's the point.
Check what's happening at runtime:
stats = client.get_statistics()
print(stats)
# {
# 'is_connected': True,
# 'offline_queue_size': 0,
# 'inflight_count': 2,
# 'reconnect_count': 4,
# ...
# }
TLS is supported if your broker requires it:
client = ProductionMQTTClient(
client_id="secure_device_001",
broker_host="mqtt.yourdomain.com",
broker_port=8883,
use_tls=True,
ca_certs="/etc/ssl/certs/broker-ca.crt",
username="device001",
password="your_password",
)
Seeing It in Action
The repo includes test_13.py, a simulation designed specifically to demo the offline behaviour:
# Terminal 1 — run the simulation (publishes every 5 seconds)
python test_13.py
# Terminal 2 — simulate a network outage
sudo systemctl stop mosquitto
# Watch messages queue up in Terminal 1
# Queue stats print every 10 readings
# Restore connectivity
sudo systemctl start mosquitto
# Watch the offline queue drain automatically — zero messages lost
The queue drain happens in a background daemon thread. Your application code does nothing. It just works.
Real-World Context
I've deployed this pattern on:
- Battery management systems — monitoring cell voltages and temperatures in production energy storage systems. A 10-minute broker outage during a network switch should not cause a gap in the battery health record.
- Robotics telemetry — ROS2 robots publishing sensor and status data. Process restarts during OTA updates should not lose the last known state.
- MQTT edge gateways — aggregating data from downstream sensors over serial or CAN and forwarding to a cloud broker over 4G. The gateway may reconnect dozens of times per day.
In all of these cases, the pattern is the same: treat disconnection as normal, not exceptional. Design the client to buffer, not to fail.
Who This Is For
robmqtt is specifically for edge device deployments where:
- Network connectivity is unreliable (cellular, Wi-Fi roaming, VPNs)
- Process restarts happen (watchdog resets, power cycles, OTA updates)
- Message loss has real consequences (industrial monitoring, remote sensors, fleet telemetry)
- You don't want to build and maintain this infrastructure yourself
If you're running MQTT on a stable cloud-to-cloud connection, paho-mqtt alone is probably fine. If you're deploying on Raspberry Pi, industrial gateways, field sensors, or anything running on 4G/LTE — this is for you.
What's Next
The Prometheus metrics endpoint is on the roadmap. The structured logging already writes .jsonl metrics files — exposing them via HTTP is a small step and would make robmqtt slot naturally into standard observability stacks.
If you try it and hit an issue, open a GitHub issue. If you want a feature, open a discussion.
Have you run into MQTT message loss on edge devices? How did you handle it — drop a comment below.














