diff --git a/example/bullmq.js b/example/bullmq.js index 8b358b5b..64e323f1 100644 --- a/example/bullmq.js +++ b/example/bullmq.js @@ -37,7 +37,7 @@ async function main() { } }, { - concurrency: 3, + concurrency: 10, connection: {port: REDIS_SERVER_PORT}, } ); diff --git a/public/dashboard.js b/public/dashboard.js index 0afdea96..952456b0 100644 --- a/public/dashboard.js +++ b/public/dashboard.js @@ -286,6 +286,48 @@ $(document).ready(() => { } }); + $('.js-toggle-update-queue-meta').on('click', function () { + const updateMetaText = $('.js-toggle-update-queue-meta').text(); + const shouldNotHide = updateMetaText === 'Update'; + const newUpdateMetaText = shouldNotHide ? 'Cancel' : 'Update'; + $('.meta-config-editor').toggleClass('hide', !shouldNotHide); + $('.js-toggle-update-queue-meta').text(newUpdateMetaText); + }); + + $('.js-update-queue-meta').on('click', function () { + const {queueHost, queueName} = window.arenaInitialPayload; + const concurrency = $('input.js-update-meta-concurrency').val() || null; + const max = $('input.js-update-meta-rl-max').val() || null; + const duration = $('input.js-update-meta-rl-duration').val() || null; + + const stringifiedData = JSON.stringify({ + concurrency: concurrency ? parseInt(concurrency, 10) : null, + max: max ? parseInt(max, 10) : null, + duration: duration ? parseInt(duration, 10) : null, + }); + + const response = window.confirm( + `Are you sure you want to update the queue "${queueHost}/${queueName}" configuration?` + ); + if (response) { + $.ajax({ + url: `${basePath}/api/queue/${encodeURIComponent( + queueHost + )}/${encodeURIComponent(queueName)}/update-meta`, + type: 'PUT', + data: stringifiedData, + contentType: 'application/json', + }) + .done(() => { + window.location.reload(); + }) + .fail((jqXHR) => { + window.alert(`Request failed, check console for error.`); + console.error(jqXHR.responseText); + }); + } + }); + $('.js-toggle-add-flow-editor').on('click', function () { const addFlowText = $('.js-toggle-add-flow-editor').text(); const shouldNotHide = addFlowText === 'Add Flow'; diff --git a/src/server/views/api/bulkAction.js b/src/server/views/api/bulkAction.js index 850c20b5..97ca1281 100644 --- a/src/server/views/api/bulkAction.js +++ b/src/server/views/api/bulkAction.js @@ -40,7 +40,12 @@ function bulkAction(action) { await Promise.all(actionPromises); return res.sendStatus(200); } else if (action === 'clean') { - await queue.clean(1000, queueState); + if (queue.IS_BULLMQ) { + await queue.clean(0, 1000, queueState); + } else { + await queue.clean(1000, queueState); + } + return res.sendStatus(200); } } catch (e) { diff --git a/src/server/views/api/index.js b/src/server/views/api/index.js index ef4b6350..d340ec21 100644 --- a/src/server/views/api/index.js +++ b/src/server/views/api/index.js @@ -14,6 +14,7 @@ const bulkJobsRemove = require('./bulkJobsRemove'); const bulkJobsRetry = require('./bulkJobsRetry'); const queuePause = require('./queuePause'); const queueResume = require('./queueResume'); +const queueUpdateMeta = require('./queueUpdateMeta'); router.post('/queue/:queueHost/:queueName/job', jobAdd); router.post('/flow/:flowHost/:connectionName/flow', addFlow); @@ -30,6 +31,7 @@ router.put('/queue/:queueHost/:queueName/job/:id/data', jobDataUpdate); router.patch('/queue/:queueHost/:queueName/job/:id', jobRetry); router.put('/queue/:queueHost/:queueName/pause', queuePause); router.put('/queue/:queueHost/:queueName/resume', queueResume); +router.put('/queue/:queueHost/:queueName/update-meta', queueUpdateMeta); router.delete('/queue/:queueHost/:queueName/job/:id', jobRemove); router.delete('/queue/:queueHost/:queueName/jobs/bulk', bulkJobsClean); diff --git a/src/server/views/api/queueUpdateMeta.js b/src/server/views/api/queueUpdateMeta.js new file mode 100644 index 00000000..d553e87b --- /dev/null +++ b/src/server/views/api/queueUpdateMeta.js @@ -0,0 +1,37 @@ +async function handler(req, res) { + const {queueName, queueHost} = req.params; + const data = req.body; + + const {Queues} = req.app.locals; + + const queue = await Queues.get(queueName, queueHost); + + if (!queue) return res.status(404).json({error: 'queue not found'}); + + try { + if (queue.setGlobalConcurrency && queue.setGlobalRateLimit) { + if (data.concurrency !== null && data.concurrency !== undefined) { + await queue.setGlobalConcurrency(data.concurrency); + } else { + await queue.removeGlobalConcurrency(); + } + + if ( + data.max !== null && + data.max !== undefined && + data.duration !== null && + data.duration !== undefined + ) { + await queue.setGlobalRateLimit(data.max, data.duration); + } else { + await queue.removeGlobalRateLimit(); + } + } + } catch (err) { + console.log('err', err); + return res.status(500).json({error: err.message}); + } + return res.sendStatus(200); +} + +module.exports = handler; diff --git a/src/server/views/dashboard/templates/queueDetails.hbs b/src/server/views/dashboard/templates/queueDetails.hbs index 703109df..19b5c773 100644 --- a/src/server/views/dashboard/templates/queueDetails.hbs +++ b/src/server/views/dashboard/templates/queueDetails.hbs @@ -89,10 +89,40 @@ {{#if globalConfig}}
+
Update

Global Configuration

- +
+
+ +
+ +
+
+
+ +
+ +
+
+
+ +
+ +
+
+
+ Update +
+
+
{{#each globalConfig}}
{{ @key }}