-
Notifications
You must be signed in to change notification settings - Fork 4
Description
Description
Applications intermittently experience RabbitMQ 505 UNEXPECTED_FRAME - expected content header for class 60, got non content header frame instead errors, which force a connection reset and 15-second reconnection delay.
The root cause is that send_message() in application.py calls channel.basic_publish() directly from whichever thread invokes it. Since pika's SelectConnection channels are not thread-safe, concurrent calls from multiple threads (e.g., the simulator thread firing TimeStatusPublisher and ScenarioTimeIntervalCallback simultaneously) interleave AMQP frames on the wire. Each basic_publish sends a 3-frame sequence (method → content header → content body), and when two threads publish at the same time, the broker receives frames out of order and closes the connection.
This issue is most likely to affect applications with frequent observer callbacks, such as the Simulator, but can occur in any application where send_message() is called from multiple threads.
Proposed Changes
Route all channel.basic_publish() calls through connection.ioloop.add_callback_threadsafe() to serialize publishes onto pika's single IO thread, eliminating frame interleaving.
Specific changes in application.py:
- Initialize
_message_queueand_queue_lockin__init__: Replace lazyhasattr-based initialization with eager initialization and add athreading.Lockto protect queue access from multiple threads - Add
_do_publish()method: Performs the actualbasic_publishcall, designed to run exclusively on the IO thread. Re-queues the message if the connection dropped between scheduling and execution - Refactor
send_message(): Replace directbasic_publishcalls withconnection.ioloop.add_callback_threadsafe(functools.partial(self._do_publish, ...))to schedule publishes on the IO thread - Add lock protection to
_process_message_queue(): Wrap all_message_queuereads and writes with_queue_lockfor thread safety
The signature of send_message(app_name, app_topics, payload) is unchanged. No caller modifications are required. All existing applications (Manager, ManagedApplication, and unmanaged Application) inherit the fix automatically.