欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 科技 > IT业 > Java通过calcite实时读取kafka中的数据

Java通过calcite实时读取kafka中的数据

2025/4/29 3:30:03 来源:https://blog.csdn.net/u010479989/article/details/143849934  浏览:    关键词:Java通过calcite实时读取kafka中的数据

引入maven依赖

        <dependency>

            <groupId>org.apache.calcite</groupId>

            <artifactId>calcite-kafka</artifactId>

            <version>1.28.0</version>

        </dependency>

测试代码

import java.sql.Connection;

import java.sql.DriverManager;

import java.sql.PreparedStatement;

import java.sql.ResultSet;

import java.sql.ResultSetMetaData;

import java.sql.SQLException;

import java.util.Properties;

public class CalciteDemo {

    public static void main(String[] args) throws SQLException {

        String model = "inline:" +

                "{\n" +

                "  \"version\": \"1.0\",\n" +

                "  \"defaultSchema\": \"KAFKA\",\n" +

                "  \"schemas\": [\n" +

                "    {\n" +

                "    \"name\": \"KAFKA\",\n" +

                "    \"tables\": [\n" +

                "      {\n" +

                "        \"name\": \"TEST_TABLE\",\n" +

                "        \"factory\": \"org.apache.calcite.adapter.kafka.KafkaTableFactory\",\n" +

                "        \"stream\": { \"stream\": true },\n" +

                "        \"operand\": {\n" +

                "          \"bootstrap.servers\": \"192.168.x.xx:9092\",\n" +

                "          \"topic.name\": \"my-cloud-events\",\n" +

                "          \"consumer.params\": {\n" +

                "            \"group.id\": \"calcite-ut-consumer\",\n" +

                "            \"key.deserializer\": \"org.apache.kafka.common.serialization.ByteArrayDeserializer\",\n" +

                "            \"value.deserializer\": \"org.apache.kafka.common.serialization.ByteArrayDeserializer\"\n" +

                "          }\n" +

                "        }\n" +

                "      }\n" +

                "    ]\n" +

                "    }\n" +

                "  ]\n" +

                "}";

        Properties info = new Properties();

        info.put("model", model);

        Connection connection = DriverManager.getConnection("jdbc:calcite:", info);

        final CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);

        final String sql7 = "SELECT STREAM * FROM \"KAFKA\".\"TEST_TABLE\"";

        print(calciteConnection,sql7);

        connection.close();

        calciteConnection.close();

    }

    public static void print(CalciteConnection calciteConnection, String sql7) throws SQLException {

        final PreparedStatement statement = calciteConnection.prepareStatement(sql7);

        final ResultSet resultSet = statement.executeQuery();

        ResultSetMetaData metadata = resultSet.getMetaData();

        while (resultSet.next()) {

            for (int i = 1; i <= metadata.getColumnCount(); i++) {

                System.out.print(metadata.getColumnLabel(i) + "=" + resultSet.getString(i) + ",");

            }

            System.out.println();

        }

    }

}

发送测试数据

运行结果

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词