diff --git a/README.md b/README.md index b74d2b3..e49c889 100644 --- a/README.md +++ b/README.md @@ -133,6 +133,19 @@ The on_retry_exceeded method allows you to specify a block that should execute w .... end +#### #on_killed + +The on_killed method allows you to specify a block that should execute whenever an event kills itself. The event object passed to the block is a **[QueueMessage]** object. + +**Example** + + worker.on_killed do |event| + .... + #Do something with the failed event + .... + end + + #### #on_retry The on_retry method allows you to specify a block that should execute whenever an event fails to process and is retried. The event object passed to the block is a **[QueueMessage]** object, and the abort arg is a Boolean that specifies if the message was aborted (true or false). diff --git a/lib/eventq/eventq_aws/aws_queue_worker.rb b/lib/eventq/eventq_aws/aws_queue_worker.rb index 5e9c4d2..a8c53fd 100644 --- a/lib/eventq/eventq_aws/aws_queue_worker.rb +++ b/lib/eventq/eventq_aws/aws_queue_worker.rb @@ -104,15 +104,9 @@ def process_message(msg, poller, queue, block) def reject_message(queue, poller, msg, retry_attempts, message, args) if !queue.allow_retry || retry_attempts >= queue.max_retry_attempts - EventQ.logger.error("[#{self.class}] - Message Id: #{args.id}. Rejected removing from queue. Message: #{serialize_message(message)}") - - # remove the message from the queue so that it does not get retried again - poller.delete_message(msg) - - if retry_attempts >= queue.max_retry_attempts - EventQ.logger.error("[#{self.class}] - Message Id: #{args.id}. Retry attempt limit exceeded.") - context.call_on_retry_exceeded_block(message) - end + queue_will_not_retry_message(queue, poller, msg, message) + elsif args.kill + queue_will_kill_message(poller, msg, message) elsif queue.allow_retry retry_attempts += 1 @@ -135,6 +129,28 @@ def reject_message(queue, poller, msg, retry_attempts, message, args) context.call_on_retry_block(message) end end + + def queue_will_not_retry_message(queue, poller, msg, message) + EventQ.logger.error("[#{self.class}] - Message Id: #{args.id}. Rejected removing from queue. Message: #{serialize_message(message)}") + + # remove the message from the queue so that it does not get retried again + poller.delete_message(msg) + + if retry_attempts >= queue.max_retry_attempts + EventQ.logger.error("[#{self.class}] - Message Id: #{args.id}. Retry attempt limit exceeded.") + context.call_on_retry_exceeded_block(message) + end + end + + def queue_will_kill_message(poller, msg, message) + EventQ.logger.error("[#{self.class}] - Message Id: #{args.id}. Rejected without retry. Message: #{serialize_message(message)}") + + # remove the message from the queue so that it does not get retried again + poller.delete_message(msg) + + EventQ.logger.error("[#{self.class}] - Message Id: #{args.id}. Message killed.") + context.call_on_killed_block(message) + end end end end diff --git a/lib/eventq/eventq_base/message_args.rb b/lib/eventq/eventq_base/message_args.rb index 854743a..40374ff 100644 --- a/lib/eventq/eventq_base/message_args.rb +++ b/lib/eventq/eventq_base/message_args.rb @@ -5,6 +5,7 @@ class MessageArgs attr_reader :retry_attempts attr_accessor :abort attr_accessor :drop + attr_accessor :kill attr_reader :context attr_reader :id attr_reader :sent @@ -14,6 +15,7 @@ def initialize(type:, retry_attempts:, context: {}, content_type:, id: nil, sent @retry_attempts = retry_attempts @abort = false @drop = false + @kill = false @context = context @content_type = content_type @id = id diff --git a/lib/eventq/queue_worker.rb b/lib/eventq/queue_worker.rb index a61b730..874fbc5 100644 --- a/lib/eventq/queue_worker.rb +++ b/lib/eventq/queue_worker.rb @@ -113,6 +113,7 @@ def start_thread(queue, options, block) # @return [Symbol, MessageArgs] :accepted, :duplicate, :reject def process_message(block, message, retry_attempts, acceptance_args) abort = false + kill = false error = false status = nil @@ -140,6 +141,9 @@ def process_message(block, message, retry_attempts, acceptance_args) if message_args.abort == true abort = true EventQ.logger.debug("[#{self.class}] - Message aborted. Id: #{message.id}.") + elsif message_args.kill == true + kill = true + EventQ.logger.debug("[#{self.class}] - Message killed. Id: #{message.id}.") else # accept the message as processed status = :accepted @@ -156,7 +160,7 @@ def process_message(block, message, retry_attempts, acceptance_args) call_on_error_block(error: e, message: message) end - if error || abort + if error || abort || kill EventQ::NonceManager.failed(message.id) status = :reject else @@ -248,6 +252,10 @@ def on_retry_exceeded(&block) @on_retry_exceeded_block = block end + def on_killed(&block) + @on_killed_block = block + end + def on_retry(&block) @on_retry_block = block end @@ -264,6 +272,10 @@ def call_on_retry_exceeded_block(message) call_block(:on_retry_exceeded_block, message) end + def call_on_killed_block(message) + call_block(:on_killed_block, message) + end + def call_on_retry_block(message) call_block(:on_retry_block, message) end diff --git a/spec/eventq_aws/integration/aws_queue_worker_v2_spec.rb b/spec/eventq_aws/integration/aws_queue_worker_v2_spec.rb index a17d1c5..86e7a4d 100644 --- a/spec/eventq_aws/integration/aws_queue_worker_v2_spec.rb +++ b/spec/eventq_aws/integration/aws_queue_worker_v2_spec.rb @@ -173,7 +173,7 @@ received = false received_count = 0 - received_attribute = 0; + received_attribute = 0 # wait 1 second to allow the message to be sent and broadcast to the queue sleep(1) @@ -200,6 +200,45 @@ expect(queue_worker.running?).to eq(false) end + it 'should receive an event from the subscriber queue and not retry it (kill).' do + + subscriber_queue.retry_delay = 1000 + subscriber_queue.allow_retry = true + + subscription_manager.subscribe(event_type, subscriber_queue) + eventq_client.raise_event(event_type, message) + + received = false + received_count = 0 + received_attribute = 0 + + # wait 1 second to allow the message to be sent and broadcast to the queue + sleep(1) + + queue_worker.start(subscriber_queue, { worker_adapter: subject, wait: false, block_process: false, client: queue_client }) do |event, args| + expect(event).to eq(message) + expect(args).to be_a(EventQ::MessageArgs) + received = true + received_count += 1 + received_attribute = args.retry_attempts + EventQ.logger.debug { "Message Received: #{event}" } + if received_count == 3 + args.kill = true + else + args.abort = true + end + end + + sleep(4) + + queue_worker.stop + + expect(received).to eq(true) + expect(received_count).to eq(3) + expect(received_attribute).to eq(2) + expect(queue_worker.running?).to eq(false) + end + it 'should receive multiple events from the subscriber queue' do subscription_manager.subscribe(event_type2, subscriber_queue) diff --git a/spec/eventq_rabbitmq/rabbitmq_queue_worker_spec.rb b/spec/eventq_rabbitmq/rabbitmq_queue_worker_spec.rb index e518dfe..278ee4c 100644 --- a/spec/eventq_rabbitmq/rabbitmq_queue_worker_spec.rb +++ b/spec/eventq_rabbitmq/rabbitmq_queue_worker_spec.rb @@ -235,6 +235,31 @@ end end + describe '#call_on_killed_block' do + let(:message) { double } + context 'when a block is specified' do + let(:block) { Proc.new { } } + before do + queue_worker.on_killed &block + allow(block).to receive(:call) + end + it 'should execute the block' do + expect(block).to receive(:call).with(message).once + queue_worker.call_on_killed_block(message) + end + end + context 'when a block is NOT specified' do + let(:block) { nil } + before do + queue_worker.on_killed &block + end + it 'should NOT execute the block' do + expect(block).not_to receive(:call) + queue_worker.call_on_killed_block(message) + end + end + end + it 'should receive an event from the subscriber queue' do event_type = 'queue.worker.event1'