Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,19 @@ The on_retry_exceeded method allows you to specify a block that should execute w
....
end

#### #on_killed

The on_killed method allows you to specify a block that should execute whenever an event kills itself. The event object passed to the block is a **[QueueMessage]** object.

**Example**

worker.on_killed do |event|
....
#Do something with the failed event
....
end


#### #on_retry

The on_retry method allows you to specify a block that should execute whenever an event fails to process and is retried. The event object passed to the block is a **[QueueMessage]** object, and the abort arg is a Boolean that specifies if the message was aborted (true or false).
Expand Down
34 changes: 25 additions & 9 deletions lib/eventq/eventq_aws/aws_queue_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,9 @@ def process_message(msg, poller, queue, block)

def reject_message(queue, poller, msg, retry_attempts, message, args)
if !queue.allow_retry || retry_attempts >= queue.max_retry_attempts
EventQ.logger.error("[#{self.class}] - Message Id: #{args.id}. Rejected removing from queue. Message: #{serialize_message(message)}")

# remove the message from the queue so that it does not get retried again
poller.delete_message(msg)

if retry_attempts >= queue.max_retry_attempts
EventQ.logger.error("[#{self.class}] - Message Id: #{args.id}. Retry attempt limit exceeded.")
context.call_on_retry_exceeded_block(message)
end
queue_will_not_retry_message(queue, poller, msg, message)
elsif args.kill
queue_will_kill_message(poller, msg, message)
elsif queue.allow_retry
retry_attempts += 1

Expand All @@ -135,6 +129,28 @@ def reject_message(queue, poller, msg, retry_attempts, message, args)
context.call_on_retry_block(message)
end
end

def queue_will_not_retry_message(queue, poller, msg, message)
EventQ.logger.error("[#{self.class}] - Message Id: #{args.id}. Rejected removing from queue. Message: #{serialize_message(message)}")

# remove the message from the queue so that it does not get retried again
poller.delete_message(msg)

if retry_attempts >= queue.max_retry_attempts
EventQ.logger.error("[#{self.class}] - Message Id: #{args.id}. Retry attempt limit exceeded.")
context.call_on_retry_exceeded_block(message)
end
end

def queue_will_kill_message(poller, msg, message)
EventQ.logger.error("[#{self.class}] - Message Id: #{args.id}. Rejected without retry. Message: #{serialize_message(message)}")

# remove the message from the queue so that it does not get retried again
poller.delete_message(msg)

EventQ.logger.error("[#{self.class}] - Message Id: #{args.id}. Message killed.")
context.call_on_killed_block(message)
end
end
end
end
2 changes: 2 additions & 0 deletions lib/eventq/eventq_base/message_args.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ class MessageArgs
attr_reader :retry_attempts
attr_accessor :abort
attr_accessor :drop
attr_accessor :kill
attr_reader :context
attr_reader :id
attr_reader :sent
Expand All @@ -14,6 +15,7 @@ def initialize(type:, retry_attempts:, context: {}, content_type:, id: nil, sent
@retry_attempts = retry_attempts
@abort = false
@drop = false
@kill = false
@context = context
@content_type = content_type
@id = id
Expand Down
14 changes: 13 additions & 1 deletion lib/eventq/queue_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ def start_thread(queue, options, block)
# @return [Symbol, MessageArgs] :accepted, :duplicate, :reject
def process_message(block, message, retry_attempts, acceptance_args)
abort = false
kill = false
error = false
status = nil

Expand Down Expand Up @@ -140,6 +141,9 @@ def process_message(block, message, retry_attempts, acceptance_args)
if message_args.abort == true
abort = true
EventQ.logger.debug("[#{self.class}] - Message aborted. Id: #{message.id}.")
elsif message_args.kill == true
kill = true
EventQ.logger.debug("[#{self.class}] - Message killed. Id: #{message.id}.")
else
# accept the message as processed
status = :accepted
Expand All @@ -156,7 +160,7 @@ def process_message(block, message, retry_attempts, acceptance_args)
call_on_error_block(error: e, message: message)
end

if error || abort
if error || abort || kill
EventQ::NonceManager.failed(message.id)
status = :reject
else
Expand Down Expand Up @@ -248,6 +252,10 @@ def on_retry_exceeded(&block)
@on_retry_exceeded_block = block
end

def on_killed(&block)
@on_killed_block = block
end

def on_retry(&block)
@on_retry_block = block
end
Expand All @@ -264,6 +272,10 @@ def call_on_retry_exceeded_block(message)
call_block(:on_retry_exceeded_block, message)
end

def call_on_killed_block(message)
call_block(:on_killed_block, message)
end

def call_on_retry_block(message)
call_block(:on_retry_block, message)
end
Expand Down
41 changes: 40 additions & 1 deletion spec/eventq_aws/integration/aws_queue_worker_v2_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@

received = false
received_count = 0
received_attribute = 0;
received_attribute = 0

# wait 1 second to allow the message to be sent and broadcast to the queue
sleep(1)
Expand All @@ -200,6 +200,45 @@
expect(queue_worker.running?).to eq(false)
end

it 'should receive an event from the subscriber queue and not retry it (kill).' do

subscriber_queue.retry_delay = 1000
subscriber_queue.allow_retry = true

subscription_manager.subscribe(event_type, subscriber_queue)
eventq_client.raise_event(event_type, message)

received = false
received_count = 0
received_attribute = 0

# wait 1 second to allow the message to be sent and broadcast to the queue
sleep(1)

queue_worker.start(subscriber_queue, { worker_adapter: subject, wait: false, block_process: false, client: queue_client }) do |event, args|
expect(event).to eq(message)
expect(args).to be_a(EventQ::MessageArgs)
received = true
received_count += 1
received_attribute = args.retry_attempts
EventQ.logger.debug { "Message Received: #{event}" }
if received_count == 3
args.kill = true
else
args.abort = true
end
end

sleep(4)

queue_worker.stop

expect(received).to eq(true)
expect(received_count).to eq(3)
expect(received_attribute).to eq(2)
expect(queue_worker.running?).to eq(false)
end

it 'should receive multiple events from the subscriber queue' do

subscription_manager.subscribe(event_type2, subscriber_queue)
Expand Down
25 changes: 25 additions & 0 deletions spec/eventq_rabbitmq/rabbitmq_queue_worker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,31 @@
end
end

describe '#call_on_killed_block' do
let(:message) { double }
context 'when a block is specified' do
let(:block) { Proc.new { } }
before do
queue_worker.on_killed &block
allow(block).to receive(:call)
end
it 'should execute the block' do
expect(block).to receive(:call).with(message).once
queue_worker.call_on_killed_block(message)
end
end
context 'when a block is NOT specified' do
let(:block) { nil }
before do
queue_worker.on_killed &block
end
it 'should NOT execute the block' do
expect(block).not_to receive(:call)
queue_worker.call_on_killed_block(message)
end
end
end

it 'should receive an event from the subscriber queue' do

event_type = 'queue.worker.event1'
Expand Down