项目作者: iilunin

项目描述 :
WebSocket to Kafka connector
高级语言: JavaScript
项目地址: git://github.com/iilunin/node-ws-kafka-connector.git
创建时间: 2017-07-06T02:50:12Z
项目社区:https://github.com/iilunin/node-ws-kafka-connector

开源协议:Apache License 2.0

下载


License

WebSocket connector to Kafka

Server

WebSocket server which implements message protocol to work with some basic Kafka APIs,
such as create, list, subscribe to topics, push messages.

Based on SOHU-Co/kafka-node
and WebSockets

  1. conf_module.kafka_config = {
  2. //node-kafka options
  3. kafkaHost: getBrokerList(),
  4. clientId: 'test-kafka-client-2',
  5. connectTimeout: 1000,
  6. requestTimeout: 60000,
  7. autoConnect: true,
  8. //custom options
  9. no_zookeeper_client: true
  10. };
  11. conf_module.websocket_config ={
  12. port: getWebSocketPort()
  13. };
  14. conf_module.producer_config = {
  15. requireAcks: 1,
  16. ackTimeoutMs: 100,
  17. partitionerType: 2,
  18. // custom options
  19. mq_limit: 20000,
  20. mq_interval: 200 //if null, then messages published immediately
  21. };
  22. conf_module.consumer_config ={
  23. // host: 'zookeeper:2181', // zookeeper host omit if connecting directly to broker (see kafkaHost below)
  24. kafkaHost: getBrokerList(),
  25. ssl: true, // optional (defaults to false) or tls options hash
  26. groupId: 'kafka-node-group', //should be set by message to ws
  27. autoCommit: true,
  28. autoCommitIntervalMs: 500,
  29. // Fetch message config
  30. fetchMaxWaitMs: 100,
  31. paused: false,
  32. maxNumSegments: 1000,
  33. fetchMinBytes: 1,
  34. fetchMaxBytes: 1024 * 1024,
  35. maxTickMessages: 1000,
  36. fromOffset: 'latest',
  37. outOfRangeOffset: 'earliest',
  38. sessionTimeout: 30000,
  39. retries: 10,
  40. retryFactor: 1.8,
  41. retryMinTimeout: 1000,
  42. connectOnReady: true,
  43. migrateHLC: false,
  44. migrateRolling: true,
  45. protocol: ['roundrobin'],
  46. // custom options
  47. mq_limit: 5000,
  48. mq_interval: 50 //if null, then messages published immediately
  49. };
  50. const wsk = new WSKafka(conf_module);
  51. wsk.on('ws-connection', (ws, req) => debug('connection'))
  52. .on('ws-close', () => debug('ws-close'))
  53. .on('wss-ready', () => debug('wss-ready'))
  54. .on('producer-ready', () => debug('producer-ready'))
  55. .on('producer-error', (e) => console.log(`producer-error ${e}`))
  56. .on('consumer-ready', () => debug('consumer-ready'))
  57. .on('consumer-error', (e) => console.log(`consumer-error ${e}`))
  58. .on('consumer-message', () => {})
  59. .on('error', (e) => console.log(`error ${e}`));
  60. wsk.start();

Each WebSocket connection may have one Kafka producer and one consumer.
The producer is initialized with each WebSocket connection. The consumer is initialized after subscription message received.

Notes on configuration

producer_config and consumer_config objects both have two custom options
mq_limit: 5000 specifies the maximum message number to be stored in the buffer before sending them.
mq_interval: 50 specifies the number in ms how often should message from the buffer be sent to Kafka brokers or the WebSocket.

Message Structure

General

All messages are JSON based. Generic message structure looks like this:

  1. {
  2. "id":"any_id",
  3. "refid":"id of orignial message", //returned by server to keep track of messages
  4. "t":"message type",
  5. "a":"action",
  6. "s":"success", //0 or 1
  7. "p":"payload"
  8. }

Topics

Create

  1. {
  2. "id":1,
  3. "t":"topic",
  4. "a":"create",
  5. "p":["topic1", "topic2", "topicN"]
  6. }

NOTE: kafka-node does not support confiuration of number of paritions per particular topic.
You can only use Kafka broker setting for now KAFKA_NUM_PARTITIONS in docker or num.partitions in Kafka config.

Response message:

  1. {"id":1,"refid":1,"t":"topic","a":"create","s":0,"p":["topic1","topic2","topicN"]}

List

  1. {
  2. "id":23,
  3. "t":"topic",
  4. "a":"list"
  5. }

Response message:

  1. {
  2. "id":6,
  3. "refid":23,
  4. "t":"topic",
  5. "a":"list",
  6. "s":0,
  7. "p":
  8. [{"topic":"topic2","partition":0},{"topic":"topic2","partition":1},{"topic":"topic2","partition":2},{"topic":"topic1","partition":0},{"topic":"topic1","partition":1},{"topic":"topic1","partition":2},{"topic":"topicN","partition":0},{"topic":"topicN","partition":1},{"topic":"topicN","partition":2}]}

Payload is the list of topic-partitions structures.

Subscribe

Subscribe to topic and join consumer group

  1. {
  2. "id":1000,
  3. "t":"topic",
  4. "a":"subscribe",
  5. "p":
  6. {
  7. "t":["topic1","topic2"],
  8. "consumer_group":"ingestion_1"
  9. }
  10. }

Payload is the structure of t - list of topics and consumer_group to join.
if consumer_group is not specified then default groupId from conf_module.consumer_config
will be used.

Response message:

  1. {"id":0,"refid":1000,"t":"topic","a":"subscribe","p":["topic1","topic2"],"s":0}

All notifications from Kafka will be sent to the same WebSocket connection where the subscription was made.

Unsubscribe

Unsubscribe from topics and consumer group

  1. {
  2. "id":1300,
  3. "t":"topic",
  4. "a":"unsubscribe"
  5. }

Response message:

  1. {"id":567,"refid":1300,"t":"topic","a":"unsubscribe","s":0}

Notification

Send

  1. {"id":1320,"t":"notif","a":"create","p":{"t":"topic1", "m":"{custom_message:'msg'}"}}

Response message: - No response. TODO: Need to create an option to receive acknowledgement receipt.

Receive

Notifications are received automatically after subscription.

  1. [{"id":3,"t":"notif","p":"hello"}]

A list of notifications with the payload which was sent using send notificaiton message.

Healthcheck

  1. {
  2. "id":1500,
  3. "t":"health"
  4. }

Response message:

  1. {"id":5,"refid":1500,"t":"health","s":0,"p":{"consumers":[{"ws":1,"topics":["topic1","topic2"]}]}}

Payload contains a list of consumers and topics currently connected to web socket server.