背景说明
公司使用华为提供的大数据集群管理公司数据
集群的组件使用了kerberos认证,现在想使用程序和setreamsets访问集群kafka需要通过认证
集群kafka加入了华为自己的安全相关的东西,要访问kafka,必须要用华为提供的客户端包!
华为为外部访问的组件和程序提供认证所需的东西: keytab,krb5.conf,principal
注意:在集群外访问华为集群,除了打通网络,还要开启认证相关的端口
查看krb5.conf
,找到每个认证域下的 kdc = xxx,这个xxx后面的端口是UDP
端口,需要将UDP端口开启!
java代码认证
华为官方提供了代码认证的实例,可以按照这个来,示例代码地址: https://github.com/huaweicloud/huaweicloud-mrs-example
也可以根据通用的认证模式来写测试代码,只要kafka客户端使用华为提供的就行
示例代码:
public class App { private static final Logger logger = LoggerFactory.getLogger(App.class); public static void setBootstrapServer(Properties props) { props.put("bootstrap.servers", "k1:9092,k2:9092,k3:9092"); } public static void main(String[] args) { // 认证相关配置文件指定 setSystemProperty(); Properties props = new Properties(); setBootstrapServer(props); // kafka地址配置 setCommonKafkaProps(props); // kafka通用配置 securityAuth(props); // 认证相关配置 System.out.println("begin send message to kafka!"); Producer<String, String> producer; try { producer = new KafkaProducer<>(props); } catch (Exception e) { logger.error("创建producer失败:", e); for (StackTraceElement element : e.getStackTrace()) { System.out.println(element); } Throwable cause = e.getCause(); while (cause != null) { System.out.println("Caused by: " + cause); for (StackTraceElement element : cause.getStackTrace()) { System.out.println(element); } cause = cause.getCause(); } return; } System.out.println("创建生产者成功!"); long timeBegin = System.currentTimeMillis(); String topic = "test_tpc"; for (int i = 0; i < 10; i++) { String msg = "send one message: " + i; logger.info("准备发送消息: {}", msg); producer.send(new ProducerRecord<String, String>(topic, msg), new SendCallBack()); logger.info("send to kafka finish, message is: {}", msg); } logger.info("send message success, cost time: {}", (System.currentTimeMillis() - timeBegin) / 1000); producer.flush(); producer.close(); logger.info("kafka send msg to topic finished!"); } public static void setSystemProperty() { System.setProperty("zookeeper.server.principal", "zookeeper/hadoop.hadoop.com"); System.setProperty("sun.security.krb5.debug", "true"); System.setProperty("java.security.auth.login.config", "/xxx/jaas.conf"); System.setProperty("java.security.krb5.conf", "/xxx/krb5.conf"); } public static void setCommonKafkaProps(Properties props) { props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); } public static void securityAuth(Properties props) { // kerberos 安全认证 props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.mechanism", "GSSAPI"); props.put("sasl.kerberos.service.name", "kafka");
// props.put("kerberos.domain.name", "hadoop.hadoop.com"); }
}
pom.xml 引用华为相关包,这个可以抄示例代码,具体依赖就不列出来了
<properties> <zookeeper.version>3.6.3-h0.cbu.mrs.320.r33</zookeeper.version> <kafka.version>2.4.0-h0.cbu.mrs.320.r33</kafka.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties>
代码中使用的jaas.conf如下:
KafkaClient {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=trueprincipal="youprincipal@HADOOP.COM"keyTab="/xxx/your.keytab"storeKey=trueuseTicketCache=falsedebug=true;
};Client {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=trueprincipal="youprincipal@HADOOP.COM"keyTab="/xxx/your.keytab"storeKey=trueuseTicketCache=falsedebug=true;
};
streamsets认证
修改streamsets配置:
找到streamsets配置文件 streamsets/etc/sdc.properties,修改下面三个
# Runs the Data Collector within a Kerberos session which is propagated to all stages.
# This is useful for stages that require Kerberos authentication with the services they interact with
# kerberos.client.enabled=false
kerberos.client.enabled=true# The Kerberos principal to use for the Kerberos session.
# It should be a service principal. If the hostname part of the service principal is '_HOST' or '0.0.0.0',
# the hostname will be replaced with the actual complete hostname of Data Collector as advertised by the
# unix command 'hostname -f'
# kerberos.client.principal=sdc/_HOST@EXAMPLE.COM
kerberos.client.principal=yourprincipal@HADOOP.COM# The location of the keytab file for the specified principal. If the path is relative, the keytab file will be
# looked for in the Data Collector configuration directory
# kerberos.client.keytab=sdc.keytab
kerberos.client.keytab=/xxx/your.keytab
启动时添加启动参数,指定jaas配置,和上面代码中配置一样
SDC_JAVA_OPTS="-Djava.security.auth.login.config=/xxx/jaas.conf" ./streamsets dc
访问kafka时,因使用的是华为的包,需要在kafka配置中上传华为的kafka相关包
并添加认证相关的配置
security.protocol = SASL_PLAINTEXT
sasl.mechanism = GSSAPI
sasl.kerberos.service.name = kafka