欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 健康 > 美食 > Nodejs 第八十章(Kafka高级)

Nodejs 第八十章(Kafka高级)

2024/11/30 6:47:04 来源:https://blog.csdn.net/qq1195566313/article/details/140349752  浏览:    关键词:Nodejs 第八十章(Kafka高级)

在这里插入图片描述

kafka前置知识在前几章章讲过了 不再复述

Kafka集群操作

1.创建多个kafka服务

拷贝一份kafka完整目录改名为kafka2

修改配置文件 kafka2/config/server.properties 这个文件

broker.id=1 //唯一broker
port=9093 //切换端口
listeners=PLAINTEXT://:9093 //切换监听源

启动zooKeeper和kafka和kafka2

.\bin\windows\kafka-server-start.bat .\config\server.properties

2.客户端管理

查看集群信息和客户端对象

import { Kafka, CompressionTypes } from 'kafkajs'const kafka = new Kafka({clientId: 'my-app', //客户端标识brokers: ['localhost:9092', 'localhost:9093'], //kafka集群
})const admin = kafka.admin() //创建admin对象
await admin.connect() //连接kafka
const cluster = await admin.describeCluster() //获取集群信息

返回值 可以查看连接集群的信息比如端口id等

{brokers: [{ nodeId: 0, host: '26.26.26.1', port: 9092 },{ nodeId: 1, host: '26.26.26.1', port: 9093 }],controller: 0,clusterId: 'XHa77me4TZWO8cfWSTHoaQ'
}

创建主题createTopics将解析true主题是否已成功创建或false是否已存在。如果发生错误,该方法将抛出异常

删除主题admin.deleteTopics 传入删除的主题

查看主题列表listTopics列出所有现有主题的名称,并返回一个字符串数组。如果发生错误,该方法将抛出异常`

//创建主题
await admin.createTopics({topics: [{ topic: 'xiaoman', numPartitions: 1, replicationFactor: 1 },{ topic: 'xiaoman2', numPartitions: 1, replicationFactor: 1 },],
})
//删除主题
await admin.deleteTopics({ topics: ['xiaoman', 'xiaoman2'] })
//查看主题
await admin.listTopics().then(topics => {console.log('topics', topics)
})
3.事务

KafkaJS 提供了对 Kafka 事务的支持,可以使用它来执行具有事务特性的操作。Kafka 事务用于确保一组相关的消息要么全部成功提交要么全部回滚,从而保持数据的一致性

import { Kafka, CompressionTypes } from 'kafkajs'const kafka = new Kafka({clientId: 'my-app', //客户端标识brokers: ['localhost:9092', 'localhost:9093'], //kafka集群
})//生产者
const producer = kafka.producer({transactionalId: '填写事务ID',maxInFlightRequests: 1, //最大同时发送请求数idempotent: true, //是否开启幂等提交
})
//连接服务器
await producer.connect()const transaction = await producer.transaction()
try {await transaction.send({topic: 'xiaoman',messages: [{ value: '100元' }],})await transaction.commit() // 事务提交
}
catch (e) {console.log(e)await transaction.abort() // 事务提交失败,回滚
}
await admin.disconnect()

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com