Skip to content

Thread-Safe Message Publishing to Prevent 505 UNEXPECTED_FRAME Errors #121

@emmanuelgonz

Description

@emmanuelgonz

Description

Applications intermittently experience RabbitMQ 505 UNEXPECTED_FRAME - expected content header for class 60, got non content header frame instead errors, which force a connection reset and 15-second reconnection delay.

The root cause is that send_message() in application.py calls channel.basic_publish() directly from whichever thread invokes it. Since pika's SelectConnection channels are not thread-safe, concurrent calls from multiple threads (e.g., the simulator thread firing TimeStatusPublisher and ScenarioTimeIntervalCallback simultaneously) interleave AMQP frames on the wire. Each basic_publish sends a 3-frame sequence (method → content header → content body), and when two threads publish at the same time, the broker receives frames out of order and closes the connection.

This issue is most likely to affect applications with frequent observer callbacks, such as the Simulator, but can occur in any application where send_message() is called from multiple threads.

Proposed Changes

Route all channel.basic_publish() calls through connection.ioloop.add_callback_threadsafe() to serialize publishes onto pika's single IO thread, eliminating frame interleaving.

Specific changes in application.py:

  • Initialize _message_queue and _queue_lock in __init__: Replace lazy hasattr-based initialization with eager initialization and add a threading.Lock to protect queue access from multiple threads
  • Add _do_publish() method: Performs the actual basic_publish call, designed to run exclusively on the IO thread. Re-queues the message if the connection dropped between scheduling and execution
  • Refactor send_message(): Replace direct basic_publish calls with connection.ioloop.add_callback_threadsafe(functools.partial(self._do_publish, ...)) to schedule publishes on the IO thread
  • Add lock protection to _process_message_queue(): Wrap all _message_queue reads and writes with _queue_lock for thread safety

The signature of send_message(app_name, app_topics, payload) is unchanged. No caller modifications are required. All existing applications (Manager, ManagedApplication, and unmanaged Application) inherit the fix automatically.

Metadata

Metadata

Assignees

Labels

enhancementNew feature or request

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions