欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 教育 > 锐评 > 访问kerberos认证华为的kafka集群

访问kerberos认证华为的kafka集群

2024/10/25 2:22:07 来源:https://blog.csdn.net/xufwind/article/details/143038658  浏览:    关键词:访问kerberos认证华为的kafka集群
背景说明
公司使用华为提供的大数据集群管理公司数据
集群的组件使用了kerberos认证,现在想使用程序和setreamsets访问集群kafka需要通过认证
集群kafka加入了华为自己的安全相关的东西,要访问kafka,必须要用华为提供的客户端包!
华为为外部访问的组件和程序提供认证所需的东西: keytab,krb5.conf,principal

注意:在集群外访问华为集群,除了打通网络,还要开启认证相关的端口
查看krb5.conf,找到每个认证域下的 kdc = xxx,这个xxx后面的端口是UDP端口,需要将UDP端口开启!

java代码认证

华为官方提供了代码认证的实例,可以按照这个来,示例代码地址: https://github.com/huaweicloud/huaweicloud-mrs-example
也可以根据通用的认证模式来写测试代码,只要kafka客户端使用华为提供的就行
示例代码:

public class App {  private static final Logger logger = LoggerFactory.getLogger(App.class);  public static void setBootstrapServer(Properties props) {  props.put("bootstrap.servers", "k1:9092,k2:9092,k3:9092");        }  public static void main(String[] args) {  // 认证相关配置文件指定  setSystemProperty();  Properties props = new Properties();  setBootstrapServer(props); // kafka地址配置  setCommonKafkaProps(props); // kafka通用配置  securityAuth(props); // 认证相关配置  System.out.println("begin send message to kafka!");  Producer<String, String> producer;  try {  producer  = new KafkaProducer<>(props);  } catch (Exception e) {  logger.error("创建producer失败:", e);  for (StackTraceElement element : e.getStackTrace()) {  System.out.println(element);  }  Throwable cause = e.getCause();  while (cause != null) {  System.out.println("Caused by: " + cause);  for (StackTraceElement element : cause.getStackTrace()) {  System.out.println(element);  }  cause = cause.getCause();  }  return;  }  System.out.println("创建生产者成功!");  long timeBegin = System.currentTimeMillis();  String topic = "test_tpc";  for (int i = 0; i < 10; i++) {  String msg = "send one message: " + i;  logger.info("准备发送消息: {}", msg);  producer.send(new ProducerRecord<String, String>(topic, msg), new SendCallBack());  logger.info("send to kafka finish, message is: {}", msg);  }  logger.info("send message success, cost time: {}", (System.currentTimeMillis() - timeBegin) / 1000);  producer.flush();  producer.close();  logger.info("kafka send msg to topic finished!");  }  public static void setSystemProperty() {  System.setProperty("zookeeper.server.principal", "zookeeper/hadoop.hadoop.com");  System.setProperty("sun.security.krb5.debug", "true");  System.setProperty("java.security.auth.login.config", "/xxx/jaas.conf"); System.setProperty("java.security.krb5.conf", "/xxx/krb5.conf");  }  public static void setCommonKafkaProps(Properties props) {  props.put("acks", "all");  props.put("retries", 0);  props.put("batch.size", 16384);  props.put("linger.ms", 1);  props.put("buffer.memory", 33554432);  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");  props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");  }  public static void securityAuth(Properties props) {  // kerberos 安全认证  props.put("security.protocol", "SASL_PLAINTEXT");  props.put("sasl.mechanism", "GSSAPI");  props.put("sasl.kerberos.service.name", "kafka");  
//        props.put("kerberos.domain.name", "hadoop.hadoop.com");  }  
}

pom.xml 引用华为相关包,这个可以抄示例代码,具体依赖就不列出来了

   <properties>  <zookeeper.version>3.6.3-h0.cbu.mrs.320.r33</zookeeper.version>  <kafka.version>2.4.0-h0.cbu.mrs.320.r33</kafka.version>  <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>  </properties>  

代码中使用的jaas.conf如下:

KafkaClient {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=trueprincipal="youprincipal@HADOOP.COM"keyTab="/xxx/your.keytab"storeKey=trueuseTicketCache=falsedebug=true;
};Client {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=trueprincipal="youprincipal@HADOOP.COM"keyTab="/xxx/your.keytab"storeKey=trueuseTicketCache=falsedebug=true;
};
streamsets认证

修改streamsets配置:
找到streamsets配置文件 streamsets/etc/sdc.properties,修改下面三个

# Runs the Data Collector within a Kerberos session which is propagated to all stages.
# This is useful for stages that require Kerberos authentication with the services they interact with
# kerberos.client.enabled=false
kerberos.client.enabled=true# The Kerberos principal to use for the Kerberos session.
# It should be a service principal. If the hostname part of the service principal is '_HOST' or '0.0.0.0',
# the hostname will be replaced with the actual complete hostname of Data Collector as advertised by the
# unix command 'hostname -f'
# kerberos.client.principal=sdc/_HOST@EXAMPLE.COM
kerberos.client.principal=yourprincipal@HADOOP.COM# The location of the keytab file for the specified principal. If the path is relative, the keytab file will be
# looked for in the Data Collector configuration directory
# kerberos.client.keytab=sdc.keytab
kerberos.client.keytab=/xxx/your.keytab

启动时添加启动参数,指定jaas配置,和上面代码中配置一样

SDC_JAVA_OPTS="-Djava.security.auth.login.config=/xxx/jaas.conf" ./streamsets dc

访问kafka时,因使用的是华为的包,需要在kafka配置中上传华为的kafka相关包
并添加认证相关的配置

security.protocol = SASL_PLAINTEXT
sasl.mechanism = GSSAPI
sasl.kerberos.service.name = kafka

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com