欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 科技 > 名人名企 > 5.Elasticsearch - Spring Data 框架

5.Elasticsearch - Spring Data 框架

2025/4/16 23:58:02 来源:https://blog.csdn.net/weixin_51351637/article/details/147088588  浏览:    关键词:5.Elasticsearch - Spring Data 框架

一、Kibana 介绍

Kibana 是一个免费且开放的用户界面,能够让你对 Elasticsearch 数据进行可视化,并让你在 Elastic Stack 中进行导航。你可以进行各种操作,从跟踪查询负载,到理解请求如何流经你的整个应用,都能轻松完成。
下载地址:https://artifacts.elastic.co/downloads/kibana/kibana-7.8.0-windows-x86_64.zip

1.1 配置

  1. 修改 config/kibana.yml 文件
# 默认端口
server.port: 5601
# ES 服务器的地址
elasticsearch.hosts: ["http://localhost:9200"]
# 索引名
kibana.index: ".kibana"
# 支持中文
i18n.locale: "zh-CN"
  1. Windows 环境下执行 bin/kibana.bat 文件
  2. 通过浏览器访问 : http://localhost:5601
    在这里插入图片描述
    在这里插入图片描述

二、Spring Data框架

2.1 介绍

Spring Data Elasticsearch 基于 spring data API 简化 Elasticsearch 操作,将原始操作Elasticsearch 的客户端 API 进行封装 。

Spring Data 为 Elasticsearch 项目提供集成搜索引擎。

Spring Data Elasticsearch POJO 的关键功能区域为中心的模型与 Elastichsearch 交互文档和轻松地编写一个存储索引库数据访问层。

官方网站: https://spring.io/projects/spring-data-elasticsearch

2.2 Maven坐标

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>springdata</artifactId><version>1.0-SNAPSHOT</version><name>springdata</name><packaging>war</packaging><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.6.RELEASE</version><relativePath/></parent><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-elasticsearch</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><scope>runtime</scope><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-test</artifactId></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-test</artifactId></dependency></dependencies>
</project>

2.3 相关类

2.3.1 实体类Product

@Data
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class Product {private Long id;//商品唯一标识private String title;//商品名称private String category;//分类名称private Double price;//商品价格private String images;//图片地址
}

2.3.2 application.properties 文件

# es 服务地址
elasticsearch.host=127.0.0.1
# es 服务端口
elasticsearch.port=9200
# 配置日志级别,开启 debug 日志
logging.level.com.atguigu.es=debug

2.3.3 配置类

  • ElasticsearchRestTemplate 是 spring-data-elasticsearch 项目中的一个类,和其他 spring 项目中的 template
    类似。
  • 在新版的 spring-data-elasticsearch 中,ElasticsearchRestTemplate 代替了原来的 ElasticsearchTemplate。
  • 原因是 ElasticsearchTemplate 基于 TransportClient,TransportClient 即将在 8.x 以后的版本中移除。所以,我们推荐使用 ElasticsearchRestTemplate。
  • ElasticsearchRestTemplate 基 于 RestHighLevelClient 客户端的。需要自定义配置类,继承
    AbstractElasticsearchConfiguration,并实现 elasticsearchClient()抽象方法,创建RestHighLevelClient 对象。
@ConfigurationProperties(prefix = "elasticsearch")
@Configuration
@Data
public class ElasticsearchConfig extends AbstractElasticsearchConfiguration {private String host;private Integer port;//重写父类方法@Overridepublic RestHighLevelClient elasticsearchClient() {RestClientBuilder builder = RestClient.builder(new HttpHost(host, port));RestHighLevelClient restHighLevelClient = newRestHighLevelClient(builder);return restHighLevelClient;}
}

2.3.4 DAO 数据访问对象

@Repository
public interface ProductDao  extends ElasticsearchRepository<Product,Long>{
}

2.3.5 实体类映射操作

@Data
@NoArgsConstructor
@AllArgsConstructor
@ToString
@Document(indexName = "shopping", shards = 3, replicas = 1)
public class Product {//必须有 id,这里的 id 是全局唯一的标识,等同于 es 中的"_id"@Idprivate Long id;//商品唯一标识/*** type : 字段数据类型* analyzer : 分词器类型 , analyzer = "ik_max_word"* index : 是否索引(默认:true)* Keyword : 短语,不进行分词*/@Field(type = FieldType.Text)private String title;//商品名称@Field(type = FieldType.Keyword)private String category;//分类名称@Field(type = FieldType.Double)private Double price;//商品价格@Field(type = FieldType.Keyword, index = false)private String images;//图片地址
}

2.5.6 索引创建及删除测试

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringDataESIndexTest {//注入 ElasticsearchRestTemplate@Autowiredprivate ElasticsearchRestTemplate elasticsearchRestTemplate;/*** 当我们执行这一步后,会自动读取我们环境中的实体类,自动创建索引* 创建索引并增加映射配置*/@Testpublic void createIndex() {//创建索引,系统初始化会自动创建索引System.out.println("创建索引");}@Testpublic void deleteIndex() {//创建索引,系统初始化会自动创建索引boolean flg = elasticsearchRestTemplate.deleteIndex(Product.class);System.out.println("删除索引 = " + flg);}
}

2.5.7 文档操作

    /*** 新增*/@Testpublic void save() {Product product = new Product();product.setId(2L);product.setTitle("华为手机");product.setCategory("手机");product.setPrice(2999.0);product.setImages("http://www.atguigu/hw.jpg");productDao.save(product);}//修改 下面的api还是save@Testpublic void update() {Product product = new Product();product.setId(1L);product.setTitle("小米 2 手机");product.setCategory("手机");product.setPrice(9999.0);product.setImages("http://www.atguigu/xm.jpg");productDao.save(product);}//根据 id 查询@Testpublic void findById() {Product product = productDao.findById(2L).get();System.out.println(product);}//查询所有@Testpublic void findAll(){Iterable<Product> products = productDao.findAll();for (Product product : products) {System.out.println(product);}}//删除@Testpublic void delete(){Product product = new Product();product.setId(1L);productDao.delete(product);}//批量新增@Testpublic void saveAll(){List<Product> productList = new ArrayList<>();for (int i = 0; i < 10; i++) {Product product = new Product();product.setId(Long.valueOf(i));product.setTitle("["+i+"]小米手机");product.setCategory("手机");product.setPrice(1999.0+i);product.setImages("http://www.atguigu/xm.jpg");productList.add(product);}productDao.saveAll(productList);}//分页查询@Testpublic void findByPageable(){//设置排序(排序方式,正序还是倒序,排序的 id)Sort sort = Sort.by(Sort.Direction.DESC,"id");int currentPage=0;//当前页,第一页从 0 开始,1 表示第二页int pageSize = 5;//每页显示多少条//设置查询分页PageRequest pageRequest = PageRequest.of(currentPage, pageSize,sort);//分页查询Page<Product> productPage = productDao.findAll(pageRequest);for (Product Product : productPage.getContent()) {System.out.println(Product);}}

2.5.8 文档搜索

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringDataESSearchTest {@Autowiredprivate ProductDao productDao;/*** term 查询* search(termQueryBuilder) 调用搜索方法,参数查询构建器对象*/@Testpublic void termQuery() {TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery("title", "小米");Iterable<Product> products = productDao.search(termQueryBuilder);for (Product product : products) {System.out.println(product);}}/*** term 查询加分页*/@Testpublic void termQueryByPage() {int currentPage = 0;int pageSize = 5;//设置查询分页PageRequest pageRequest = PageRequest.of(currentPage, pageSize);TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery("title", "小米");Iterable<Product> products =productDao.search(termQueryBuilder, pageRequest);for (Product product : products) {System.out.println(product);}}}

三、Flink 框架集成

3.1 框架介绍

Apache Spark 是一种基于内存的快速、通用、可扩展的大数据分析计算引擎。
Apache Spark 掀开了内存计算的先河,以内存作为赌注,赢得了内存计算的飞速发展。
但是在其火热的同时,开发人员发现,在 Spark 中,计算框架普遍存在的缺点和不足依然没
有完全解决,而这些问题随着 5G 时代的来临以及决策者对实时数据分析结果的迫切需要而
凸显的更加明显:

  • 数据精准一次性处理(Exactly-Once)

  • 乱序数据,迟到数据

  • 低延迟,高吞吐,准确性

  • 容错性

Apache Flink 是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。在
Spark 火热的同时,也默默地发展自己,并尝试着解决其他计算框架的问题。

慢慢地,随着这些问题的解决,Flink 慢慢被绝大数程序员所熟知并进行大力推广,阿里公
司在 2015 年改进 Flink,并创建了内部分支 Blink,目前服务于阿里集团内部搜索、推荐、广告和蚂蚁等大量核心实时业务。

3.2 集成框架

3.2.1 Maven坐标

    <dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.12</artifactId><version>1.12.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.12</artifactId><version>1.12.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.12.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch7_2.11</artifactId><version>1.12.0</version></dependency><!-- jackson --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-core</artifactId><version>2.11.1</version></dependency></dependencies>

3.2.2 功能实现

import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
importorg.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;public class FlinkElasticsearchSinkTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//Source:数据的输入DataStreamSource<String> source = env.socketTextStream("localhost", 9999);//下面完成数据的采集和数据的输出//httpHosts要连接的数据库的地址集合List<HttpHost> httpHosts = new ArrayList<>();httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));//httpHosts.add(new HttpHost("10.2.3.1", 9200, "http"));// 使用ES的Builder来构建输出 这个地方ElasticsearchSinkFunction就是用来构建输出的ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts, new ElasticsearchSinkFunction<String>() {//匿名内部类 重写方法@Overridepublic void process(String element, RuntimeContext ctx,RequestIndexer indexer) {indexer.add(createIndexRequest(element));}public IndexRequest createIndexRequest(String element) {Map<String, String> json = new HashMap<>();json.put("data", element);//构建索引请求return Requests.indexRequest().index("my-index")//.type("my-type").source(json);}});esSinkBuilder.setBulkFlushMaxActions(1);//为了演示加的这一行,实际去除掉就行//Sink:数据的输出source.addSink(esSinkBuilder.build());//执行的操作env.execute("flink-es");}
}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词