Node.js使用Kafka-node模块实现生产者与消费者

更新日期: 2019-07-10阅读: 5.3k标签: 模块

基于Node.js,采用Kafka-node模块实现生产者与消费者,我正在做一个的项目,我在自己的服务器上有一个Kafka生产者,并使用Kafka-Node作为我的应用程序的消费者。


生产者代码

/**
 * 生产者
 */

const kafka = require('kafka-node');

let conn = {'kafkaHost':'127.0.0.1:9092'};

var MQ = function (){
    this.mq_producers = {};
    this.client = {};
}

MQ.prototype.AddProducer = function (conn, handler){
    console.log('增加生产者',conn, this);
    this.client = new kafka.KafkaClient(conn);
    let producer = new kafka.Producer(this.client);

    producer.on('ready', function(){
        if(!!handler){
            handler(producer);
        }
    });

    producer.on('error', function(err){
        console.error('producer error ',err.stack);
    });

    this.mq_producers['common'] = producer;
    return producer;
}
console.log(MQ);
var mq = new MQ();

mq.AddProducer(conn, function (producer){
    producer.createTopics(['broadcast'], function (){
        setInterval(function(){

            var _msg = {
                topic:['broadcast'], 
                messages:[JSON.stringify({"cmd":"testRpc","value":"Hello World"})],
                partition:0
            }


            //console.log('clientId : ',mq.client.clientId);
            //console.log('topicMetadata ',mq.client.topicMetadata);
            //console.log('brokerMetadata ',mq.client.brokerMetadata);
            //console.log('clusterMetadata ',mq.client.clusterMetadata);
            //console.log('brokerMetadataLastUpdate ',mq.client.brokerMetadataLastUpdate);

            mq.mq_producers['common'].send([_msg], function (err, data){
                console.log("..... ",data);
            })
        }, 2000);
    })
});


消费者代码

/**
 * 消费者
 */

const kafka = require('kafka-node');

let conn = {'kafkaHost':'127.0.0.1:9092'};
let consumers = [
    {
        'type': 'consumer',
        'options': {'autoCommit': true},
        'name':'common',
        'topic':[
            {'topic': 'broadcast', 'partition': 0}
        ]
    }
];

let MQ = function(){
    this.client = {};
    this.mq_consumers = {};
}

MQ.prototype.AddConsumer = function (conn, topics, options, handler){
    this.client = new kafka.KafkaClient(conn);
    let consumer = new kafka.Consumer(this.client, topics, options);

    if(!!handler){
        consumer.on('message', handler);
    }

    consumer.on('error', function(err){
        console.error('consumer error ',err.stack);
    });
    this.mq_consumers['common'] = consumer;
}

var mq = new MQ();


mq.AddConsumer(conn, consumers[0].topic, consumers[0].options, function (message){
    //console.log('clientId : ',mq.client.clientId);
    //console.log('topicMetadata ',mq.client.topicMetadata);
    //console.log('correlationId ',mq.client.correlationId);
    //console.log('brokerMetadata ',mq.client.brokerMetadata);
    //console.log('clusterMetadata ',mq.client.clusterMetadata);
    let _consumer = mq.mq_consumers['common'];

    //console.log("----------consumer");
    //console.log('topicMetadata ',_consumer.client.topicMetadata);
    //console.log('brokerMetadata ',_consumer.client.brokerMetadata);
    //console.log('clusterMetadata ',_consumer.client.clusterMetadata);
    //console.log(_consumer.payloads);
    console.log(message.value);
});

链接: https://www.fly63.com/article/detial/4592

ES6模块功能:export和import的加载方式

ES6之前已经出现了js模块加载的方案,最主要的是CommonJS和AMD规范。commonjs主要应用于服务器,实现同步加载,如nodejs。AMD规范应用于浏览器,如requirejs,为异步加载。

Node的https模块_创建HTTPS服务器

Node的https模块:HTTPS服务器使用HTTPS协议,需要证书授权,SSL安全加密后传输,使用443端口

如何让 node 运行 es6 模块文件,及其原理

最新版的 node 支持最新版 ECMAScript 几乎所有特性,但有一个特性却一直到现在都还没有支持,那就是从 ES2015 开始定义的模块化机制。而现在我们很多项目都是用 es6 的模块化规范来写代码的,包括 node 项目

module、export、require、import的使用

module每个文件就是一个模块。文件内定义的变量、函数等等都是在自己的作用域内,都是自身所私有的,对其它文件不可见。在module中有一个属性exports,即:module.exports。它是该模块对外的输出值,是一个对象。

Node.js - 模块系统

模块是Node.js 应用程序的基本组成部分,文件和模块是一一对应的。换言之,一个 Node.js 文件就是一个模块,这个文件可能是JavaScript 代码、JSON 或者编译过的C/C++ 扩展。Node.js 提供了 exports 和 require 两个对象

ES模块基础用法及常见使用问题

ES6中引入了模块(Modules)的概念,相信大家都已经挺熟悉的了,在日常的工作中应该也都有使用。本文会简单介绍一下ES模块的优点、基本用法以及常见问题。

ES6 export 和 export default的区别

ES6中 export 和 export default 与 import使用的区别,使用 react native 代码详解,现在流行的前端框架,angular+ 主要使用 export 导出模块,react native 中使用 export default 导出模块,如今编辑器非常强大,安装插件会自动弹出模块名称,知道其导出怎么使用就可以了

export和export default的区别

export与export default均可用于导出常量、函数、文件、模块;你可以在其它文件或模块中通过import+(常量 | 函数 | 文件 | 模块)名的方式,将其导入,以便能够对其进行使用;

关于export和export default你不知道的事

网上有很多关于export和export default的文章,他们大部门都是只讲了用法,但是没有提到性能,打包等关键的东西。大家应该应该能理解import * from xxx会把文件中export default的内容都打包到文件中,而import {func} from xxx只会把文件中的func导入

最全的前端模块化方案

模块化主要是用来抽离公共代码,隔离作用域,避免变量冲突等。将一个复杂的系统分解为多个模块以方便编码。会讲述以下内容:CommonJS、AMD 及 核心原理实现、CMD 及 核心原理实现

点击更多...

内容以共享、参考、研究为目的,不存在任何商业目的。其版权属原作者所有,如有侵权或违规,请与小编联系!情况属实本人将予以删除!