Easy to use pub-sub scheduler
A pub-sub system makes it easy for multiple processes to communicate with each other by sending events. Subscribed processes will receive the events at the appropriate time.
This way, by creating (and reacting) to events you create loose coupling, little components that are easy to maintain, deploy and remove.
A storage-plugin system to store your events in your used technology like MongoDB, RethinkDB, Postgres, Redis, etc.
Although it could work pretty speedy, it’s not build for performance, it’s for ease of use, flexibility and introspection.
Backend | Link | Status |
---|---|---|
MongoDB | https://github.com/whyhankee/dbwrkr-mongodb | |
RethinkDB | https://github.com/whyhankee/dbwrkr-rethinkdb | |
PostgreSQL | https://github.com/HPieters/dbwrkr-pg |
npm install dbwrkr --save
npm install dbwrkr-<storage> --save
wrkr.connect()
to connect to the DBWorker storagewrkr.publish()
an eventwrkr.disconnect()
when your are done and want to exitwrkr.connect()
to connect to the DBWorker storagewrkr.queue()
queue.subscribe()
queue.on('event', fn)
to setup your eventHandler (receives the events)wrkr.listen()
to start receiving eventswrkr.disconnect()
when your are done and want to exit
var wrkr = require('dbwrkr');
var DBWrkrMongodb = require('dbwrkr-mongodb');
var wrkr = new wrkr.DBWrkr({
storage: new DBWrkrMongodb({
dbName: 'dbwrkr'
})
});
wrkr.on('error', (errType, err, event) => {
// do something with the error
});
The Wrkr API is the main interface, it provides the main API but is also an EventEmitter.
It will Emit errors using the error
event
Connect to the backend storage engine
wrkr.connect(callback);
Disconnect from the backend storage engine
wrkr.disconnect(callback);
Get a queue object representing a queue in the system.
wrkr.queue('queueName`, (err, q) => {
// q = the Queue interface
});
Get a list of queues that are subscribed to the event.
wrkr.subscriptions(eventName, (err, queues) => {
// queues is an array with the names of the queues
})
Publish new event(s). Events can be a single object or an array of objects.
Events will we created for each queue that is subscribed to the event.
properties:
name
- name of the event (required)tid
- target ID of the event (optional)payload
- Object with additional info (optional)when
- Date()
when the event should be processed (optional, default immediate)
var events = [{
name: 'yourapp.user.signup',
tid: user.id,
}];
wrkr.publish(events, (err, eventIds) => {
// eventIds is an array with the ids the created events
})
FollowUp one event with another event. This will publish new event(s) with the parent set to the current event. This will help with the introspection system.
var newEvent = {
name: 'yourapp.user.sentWelcomeMessage',
tid: user.id,
};
wrkr.followUp(event, newEvent, (err, eventIds) => {
// eventIds is an array with the created events
})
Create a new retry event with the data of the current event also increasing the retryCount on the new event.
You can set the when
property to specify when the event should be retried, when
defaults to an algorithm that should slowly increase to about 50 hours in 20 attempts.
When the retryCount becomes >= 20, an error will be returned.
wrkr.retry(event, when, (err, eventIds) => {
// eventIds is an array with the created events
})
Find unprocessed events in the system.
var criteria = {
name: eventName,
tid: user.id,
};
wrkr.find(criteria, (err, events) => {
// events is an array with matched events
})
Remove events in the system.
var criteria = {
name: eventName,
tid: user.id,
};
wrkr.remove(criteria, (err, events) => {
// events is an array with matched events
})
You will receive a queue interface using the wrkr.queue(queueName)
method.
Events will be emitted to the Queue interace. You can handle the events by listening for the name of the event. Listening for *
will receive the event if a specific event-listener was not available.
Listen to specific events
queue.on('eventName', function (event, callback) {
console.log(event) // event is the processed event
return callback(); // always call the callback the mark the event as done
})
Subscribe an event to a queue.
The queue will emit the eventName
queue.subscribe(eventName, callback)
Unsubscribe an event from a queue.
New events will no longer be created for this queue, although previously created qitems could still be received.
queue.unsubscribe(eventName, callback)
Notes:
DEBUG=wrkr* node ./example/example-mongodb.js
id Unique id of the event String (indexed)
name Name of the event String (indexed)
queue Name of the queue String (indexed)
tid Target id of the event (eg. userId/orderId) String
payload Object with extra properties for the event
parent Id of the parent event (in case of a followUp) String
created Date created Date
when Date when the event should be processed Date (sparse indexed)
done Date when the event was processed Date (sparse indexed)
retryCount in case of an error followUp, the retryCount Number
v0.0.11
v0.0.10
v0.0.9
v0.0.8
v0.0.7
v0.0.6
v0.0.5
v0.0.4
v0.0.3
v0.0.2
v0.0.1
v0.0.0