Skip to content

Fixув SQlite duplicate telemetry race condition#2099

Merged
samson0v merged 2 commits intothingsboard:masterfrom
lucas-souza-enerlab:fix/sqlite-duplicate-telemetry-race-condition
Mar 10, 2026
Merged

Fixув SQlite duplicate telemetry race condition#2099
samson0v merged 2 commits intothingsboard:masterfrom
lucas-souza-enerlab:fix/sqlite-duplicate-telemetry-race-condition

Conversation

@lucas-souza-enerlab
Copy link
Contributor

Fix SQLite storage duplicate telemetry race condition

Summary

When using SQLite event storage ("type": "sqlite"), the gateway sends duplicate telemetry to ThingsBoard. Every reading appears twice with timestamps 1–10 ms apart. Switching to "type": "memory" eliminates the issue.

Root cause

The Database background thread pre-fetches the next batch of records into an in-memory cache (__next_batch). The pre-fetch is unlocked by can_prepare_new_batch(), which is called inside get_event_pack()before event_pack_processing_done() deletes the records from the database.

This creates a race window:

Thread A (gateway)                    Thread B (Database)
─────────────────                     ───────────────────
get_event_pack()
  → SELECT id, msg FROM ... LIMIT 10
  → returns batch1 [msg_0..msg_9]
  → can_prepare_new_batch()  ───────→ flag set, pre-fetch triggered
                                      → SELECT ... LIMIT 10
                                      → caches [msg_0..msg_9] (same records!)
  ... send via MQTT ...
event_pack_processing_done()
  → DELETE WHERE id <= 10
get_event_pack()
  → read_data() returns stale cache
  → returns [msg_0..msg_9] again  ←── DUPLICATE

Memory storage is not affected because Queue.get_nowait() is a destructive read — once consumed, data cannot be re-read.

Fix

Move can_prepare_new_batch() from get_event_pack() to event_pack_processing_done(), after delete_data(). This ensures the Database thread only pre-fetches the next batch after the current batch has been deleted.

Before (buggy):

# sqlite_event_storage.py

def get_event_pack(self):
    data_from_storage = self.read_data()
    # ... process data ...
    self.__read_database.can_prepare_new_batch()  # ← unlocks pre-fetch TOO EARLY
    return event_pack_messages

def event_pack_processing_done(self):
    if not self.stopped.is_set():
        self.delete_data(self.delete_time_point)  # ← records deleted AFTER pre-fetch

After (fixed):

# sqlite_event_storage.py

def get_event_pack(self):
    data_from_storage = self.read_data()
    # ... process data ...
    return event_pack_messages  # ← can_prepare_new_batch() removed

def event_pack_processing_done(self):
    if not self.stopped.is_set():
        self.delete_data(self.delete_time_point)
        self.__read_database.can_prepare_new_batch()  # ← moved HERE, after deletion

Commits

# Commit Description
1 2b057d8b Regression test that fails on the original code, proving the bug exists
2 ddb4ae76 The fix — test now passes

Reviewers can checkout commit 1 and run the test to reproduce the bug independently:

git checkout 2b057d8b
python -m pytest tests/unit/service/test_sqlite_duplicate_race_condition.py -v -s
# → SQLite test FAILS (duplicates detected), memory test PASSES

Then checkout commit 2 to verify the fix:

git checkout ddb4ae76
python -m pytest tests/unit/service/test_sqlite_duplicate_race_condition.py -v -s
# → Both tests PASS

Test plan

  • Regression test reproduces the race condition on original code (commit 1 fails)
  • Fix resolves the race condition (commit 2 passes)
  • Memory storage control test passes on both commits
  • Existing test_sqlite_storage in tests/unit/service/test_storage.py still passes
  • Manual validation with a real connector over extended runtime

Environment

  • ThingsBoard IoT Gateway version: 3.8.2
  • Python: 3.13
  • OS: Debian (Docker)

The test demonstrates a race condition in the SQLite event storage where
the Database background thread pre-fetches records into __next_batch cache
BEFORE event_pack_processing_done() deletes them from the database. This
causes get_event_pack() to return the same records twice — producing
duplicate telemetry on ThingsBoard.

The test inserts 20 messages, reads the first batch of 10, waits for the
pre-fetch, deletes the batch, then reads again. The second batch returns
the same 10 messages instead of the next 10 — confirming the bug.

A control test with MemoryEventStorage proves the issue is SQLite-specific
(Queue.get_nowait() is a destructive read, immune to this race condition).

Expected result: test_sqlite_prefetch_returns_stale_data_after_deletion FAILS
                 test_memory_storage_no_duplicates PASSES
Move can_prepare_new_batch() from get_event_pack() to
event_pack_processing_done(), after delete_data(). This ensures the
Database thread only pre-fetches the next batch AFTER the current
batch has been deleted from the database, preventing stale cache
from being served as duplicate telemetry.
@CLAassistant
Copy link

CLAassistant commented Mar 5, 2026

CLA assistant check
All committers have signed the CLA.

@harkatos
Copy link

harkatos commented Mar 5, 2026

Good fix, very needed to use SQLite

@lucasamoreira28
Copy link

Thanks, I was facing the same problem and I tested your solution. It solved my issue.

@samson0v samson0v changed the title Fix/sqlite duplicate telemetry race condition Fixув SQlite duplicate telemetry race condition Mar 10, 2026
@samson0v
Copy link
Contributor

@lucas-souza-enerlab, many thanks for your contribution!

@samson0v samson0v merged commit 2398f12 into thingsboard:master Mar 10, 2026
4 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants