Loading... 业务场景中,需要快速筛选出**消息上下文**命中**输入词组**的数据,便被迫学习了ES的一些基础。 <!--more --> **前置:** [狂神说](https://www.bilibili.com/video/BV17a4y1x7zq) 过于基础建议会安装的直接跳过,看[尚硅谷](https://www.bilibili.com/video/BV1hh411D7sb) +[官方文档](https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-getting-started-initialization.html) 就够了(英文有难度可以看中文官方文档但不是基于最新版) ## 一、ElasticSearch概述 `ElaticSearch`,简称为 `ES`,是一个开源的高扩展的**分布式全文检索引擎**,它可以近乎**实时的存储、检索数据**。ES本身扩展性很好,可以扩展到上百台服务器,处理PB级别的数据。ES使用java开发并使用 `Lucene`作为其核心来实现所有**索引和搜索**的功能,但是它的目的是**通过简单的 `RESTful API`来隐藏Lucene的复杂性,从而让全文搜索变得简。** 总的来说,ElasticSearch是一个**实时分布式搜索和分析**引擎。通过**全文搜索**、**结构化搜索**、**分析**以及将这三者混合使用,让你以前所未有的速度处理大数据成为可能。 ## 二、下载安装 - `elasticsearch`: https://mirrors.huaweicloud.com/elasticsearch/?C=N&O=D - `logstash`: https://mirrors.huaweicloud.com/logstash/?C=N&O=D - `kibana`: https://mirrors.huaweicloud.com/kibana/?C=N&O=D - `elasticsearch-head`: https://github.com/mobz/elasticsearch-head - `cerebro`(head升级版,推荐): https://github.com/lmenezes/cerebro/releases 建议共同放在一个大的 `ES`目录下,其中 `elasticsearch-analysis-ik`需要放在 `elasticsearch/plugins`下。 这个 `elasticsearch-head`,我们只是把它当做可视化数据展示工具,之后所有的查询都在 `kibana`中进行,因为不支持json格式化,不方便。 **ES目录结构:** ```java ├─bin // 含elasticsearch.bat启动文件 ├─config// 配置文件 │ └─jvm.options // java虚拟机相关配置(内存大小已变更为自动分配) │ └─log4j2.properties // 日志配置 ├─data ├─jdk ├─lib ├─logs ├─modules // 功能模块 └─plugins // 插件 └─ik-7.15.1 // ik分词器,解压后丢这里 └─config // dic分词文件配置 └─IKAnalyzerr.cfg.xml // ik分词扩展配置,可指定自定义分词文件 └─xxx.dic // 分词文件 ``` **启动:** - `elasticsearch`: 双击 `bin/elasticsearch.bat` 端口:[http://127.0.0.1:9200](http://127.0.0.1:9200) - `logstash`: (暂未用到) - `kibana`: 双击 `kibana.bat` 端口:[http://127.0.0.1:5601](http://127.0.0.1:5601) - `elasticsearch-analysis-ik`: 放在 `ElasticSearch/plugins`下 - `elasticsearch-head`: `npm install` + `npm run start` 端口:[http://127.0.0.1:9100](http://127.0.0.1:9100) - `cerebro`: (暂未用到) **解决跨域:** `elasticsearch/config/elasticsearch.yml` ```yml # 开启跨域 http.cors.enabled: true # 所有人访问 http.cors.allow-origin: "*" ``` **kibana汉化:** `kibana/config/kibana.yml` ```yml i18n.locale: "zh-CN" ``` ### LInux集群部署 详见 [Elasticsearch学习笔记](https://blog.csdn.net/u011863024/article/details/115721328) **32-环境-Linux集群部署** ## 三、相关插件 #### (1)elasticsearch-analysis-ik **地址**:https://github.com/medcl/elasticsearch-analysis-ik/releases 下载完后解压到es对应的plugins目录下即可 **描述**:最常用的中文分词器,分为**最粗力度算法(`ik_smart`)** 和 **最细粒度算法(`ik_max_word`)** **使用**:创建的时候带上(下文)或者直接使用 ```JSON GET _analyze { "analyzer": "ik_max_word", "text": "你好⑦③⑤③啊[嘿嘿]" } ``` #### (2)analysis-icu **地址**:https://github.com/elastic/elasticsearch-analysis-icu **描述**:同ik分词器,但更好的支持亚洲语(泰语、老挝语、中文、日文、和韩文),更好避免了中文、日文的过度分词 #### (3)analysis-kuromoji **地址**:https://github.com/elastic/elasticsearch-analysis-kuromoji **描述**:最强日文分析器 #### (4)analysis-pinyin **地址**:https://github.com/medcl/elasticsearch-analysis-pinyin **描述**:分词,但是拼音 #### (5)analysis-smartcn **地址**:https://github.com/elastic/elasticsearch-analysis-smartcn **描述**:基于中文切割的分词(eg:**中国人** = **中国/人** != **中国/国人/人**) ### 查看已安装插件 ``` GET /_cat/plugins ``` ### 自定义分词 在 `elasticsearch-xxx/plugins/ik/config`目录下新增 `xx.dic`文件,作为用户个性化词典,而后在 `plugins/ik/config`目录的 `IKAnalyzer.cfg.xml`配置文件内新增自定义扩展词典 。  这种方式在每次新增个性化词典中词汇后,都需要重启es,一旦新增次数频繁会造成大量的时间浪费,且不利于es本身。 可以通过**配置远程自定义词典**或者**修改IK源码,使用mysql数据库实现热更新**。具体实现逻辑可以看:[ElasticSearch-IK分词使用踩坑总结](https://www.jianshu.com/p/f178e59ffaf2) ## 四、基础概念 对于入门的一些结构化/非结构化数据、结构化/非结构化数据搜索,以及顺序扫描、全文搜索、Lucene等概念直接看[这里](https://mp.weixin.qq.com/s/KMr7d7rh8N_FrqUbb9k9Kg)。 概念主要涉及到**集群,节点,索引,类型,文档,分片,映射**,初学的话可以把这些概念带入到传统的关系型数据库中。(**注意:** 在7.X之后ES移除了文档的概念,默认 `_doc`) | Relational DB | ElasticSearch | | ------------- | --------------- | | 数据库(db) | 索引(indices) | | 表(tables) | 类型(types) | | 行(rows) | 文档(documents) | | 列(columns) | 字段(fields) | **注意:** 两者只是客观上的对比,**ElasticSearch是面向文档,一切都是JSON**。 * **节点**:一个运行中的 Elasticsearch 实例称为一个节点。 * **集群**:由一个或者多个相同 `cluster.name` 配置的节点组成,它们共同承担数据和负载的压力。当有节点加入或从集群中移除时,集群将会重新平均分布所有的数据。 * **映射 `Mapping`**:是数据处理和规则方面的设置,如:某个字段的数据类型、默认值、分析器、是否被索引等等。 * **分片 `Shards`**:es提供了将索引划分成多份的能力,每一份就称之为分片(P0、P1、P2...)。创建索引时可以指定分片数,创建后不可修改。 每个分片本身也是一个功能完善并且独立的“索引”,会均分到集群中的节点上。 * **副本 `Replicas`**:分片的拷贝就是副本,是高可用的保障,分片与相对应的副本会分配到不同节点上(R0、R1、R2...)。副本数可以为0,但生产必须保障至少有1个副本存在;副本数在创建后仍可以修改。 * **故障转移**:单节点变成集群(至少2节点),防止数据丢失。 * **水平扩容**:往集群中加入新节点,重新分配分片来分散负载。 ## 五、基本使用 ### 0.查看集群健康状态 ```json GET /_cluster/health { "cluster_name" : "es-cn-v0h1brbhs000lh3cz", "status" : "green", "timed_out" : false, "number_of_nodes" : 3, "number_of_data_nodes" : 3, "active_primary_shards" : 1072, "active_shards" : 2133, "relocating_shards" : 0, "initializing_shards" : 0, "unassigned_shards" : 0, "delayed_unassigned_shards" : 0, "number_of_pending_tasks" : 0, "number_of_in_flight_fetch" : 0, "task_max_waiting_in_queue_millis" : 0, "active_shards_percent_as_number" : 100.0 } ``` ### 1.创建索引(分片副本+分析器+别名) ```json PUT xxx_2022 { "index": { "number_of_shards": "5", "number_of_replicas": "1" }, "analysis": { "analyzer": { "ik_max_word_analyzer": { "tokenizer": "ik_max_word" }, "ik_smart_analyzer": { "tokenizer": "ik_smart" } } } } POST xxx_2022/_doc/_mapping { "xxx": { "dynamic": "false", "_all": { "enabled": false }, "properties": { "uuid": { "type": "keyword" }, "content": { "type": "text", "analyzer": "ik_max_word_analyzer" } } } } POST _aliases { "actions": [ { "add": { "index": "xxx_202202", "alias": "xxx" } } ] } ``` ### 2.查看索引基本信息 ```shell GET /_cat/indices?v ```  ### 3.删除索引、文档 删除索引、文档和简单的查找没什么值得说的点,会了复杂查询就可以用 `_delete_by_query`、`_update_by_query`等高级API进行操作。 ```json POST xxx_2022/_delete_by_query { "query": { "bool": { "must": [ { "term": { "scene": { "value": "XXX" } } }, { "term": { "status": { "value": "0" } } }, { "range": { "createTime": { "gte": 1649671824, "lte": 1650535824 } } } ] } } } ``` ### 4.更新文档 更新分为**更新整个文档**(`PUT`)和**更新部分文档**(`POST`),因为文档不可被修改和替换,所以两者本质都是:**从旧文档构建 JSON—更改 JSON—删除旧文档—建新文档**,部分更新区别在于过程发生在分片内部,避免了多次请求的网络开销,也减少了其他进程的变更带来冲突的可能性。 部分更新是覆盖现有的字段,增加新的字段。也可以采用 `script`脚本的方式进行修改,但是默认5分钟只能修改75次需要手动修改配置: ```json PUT _cluster/settings { "persistent": { "script.max_compilations_rate": "75/1m" } } ``` ```json POST /website/pageviews/1/_update?retry_on_conflict=5 { "script" : "ctx._source.views+=1", "upsert": { "views": 0 } } ``` * `retry_on_conflict`:重试次数 * `upsert`:更新字段不存在则新增 ```json POST my-index-000001/_update/1 { "script": { "source": "if (ctx._source.tags.contains(params['tag'])) { ctx.op = 'delete' } else { ctx.op = 'none' }", "lang": "painless", "params": { "tag": "green" } } } ``` * `params`:作为临时变量到缓存中复用,避免硬编码重新构建脚本,弊端是没办法动态修改(即使修改了值也只会从缓存里读取旧值) * `ctx.op`:`delete`表示(逻辑)删除;`none/noop`表示不执行操作;其余报错 ```json POST xxx_2022/_update_by_query?conflicts=proceed&wait_for_completion=false { "script": { "source": "ctx._source.qualityTestingTime = 0; ctx._source.taskId = 0;", "lang": "painless" }, "query": { "bool": { "must": [ { "term": { "qualityTestingStatus": { "value": "0" } } }, { "range": { "createTime": { "gte": 1652976000, "lte": 1654185599 } } } ] } } } ``` * `conflicts=proceed`:查询或更新失败不终止,执行完毕后返回冲突数 * `wait_for_completion=false`:将执行命令打成task任务,防止超时终止;根据返回的taskId可以查看任务的执行情况和停止任务 ### 5.高级JAVA API 这里需要说明一下GitHub上有一个sql转es DML语句的插件:[elasticsearch-sql](https://github.com/NLPchina/elasticsearch-sql) 但是目前只维护到 `7.17`。后续可以使用官方推荐的 [x-pack-sql](https://github.com/elastic/elasticsearch/tree/master/x-pack/plugin/sql) 或 [OpenDistro for Elasticsearch SQL](https://github.com/opendistro-for-elasticsearch/sql) 。需要在集群中安装插件并且在项目中引入依赖: ```xml <dependency> <groupId>org.elasticsearch.plugin.nlpcn</groupId> <artifactId>elasticsearch-sql</artifactId> <version>xxx</version> </dependency> ``` 对于一般的查询来说已经够用了,但是对于复杂的组合查询构造得太过复杂,并且不支持复杂类型如 `nested in(xxx)`操作,还有一些聚合操作的支持也不是很友好。后续可以汇总为插件。 而 `Spring-Data`不仅与代码的耦合性过高,且复杂的查询语句这是需要手写代码,所以同样不推荐。 **组合查询** ```java public class QueryDoc { public static final ElasticsearchTask SEARCH_BY_BOOL_CONDITION = client -> { // 创建搜索请求对象 SearchRequest request = new SearchRequest(); request.indices("user"); // 构建查询的请求体 SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); // 必须包含 boolQueryBuilder.must(QueryBuilders.matchQuery("age", "30")); // 一定不含 boolQueryBuilder.mustNot(QueryBuilders.matchQuery("name", "zhangsan")); // 可能包含 boolQueryBuilder.should(QueryBuilders.matchQuery("sex", "男")); sourceBuilder.query(boolQueryBuilder); request.source(sourceBuilder); SearchResponse response = client.search(request, RequestOptions.DEFAULT); // 查询匹配 SearchHits hits = response.getHits(); // 输出 }; public static void main(String[] args) { ConnectElasticsearch.connect(SEARCH_BY_BOOL_CONDITION); } } ``` **范围查询** ```java public class QueryDoc { public static final ElasticsearchTask SEARCH_BY_RANGE = client -> { // 创建搜索请求对象 SearchRequest request = new SearchRequest(); request.indices("user"); // 构建查询的请求体 SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); RangeQueryBuilder rangeQuery = QueryBuilders.rangeQuery("age"); // 大于等于 //rangeQuery.gte("30"); // 小于等于 rangeQuery.lte("40"); sourceBuilder.query(rangeQuery); request.source(sourceBuilder); SearchResponse response = client.search(request, RequestOptions.DEFAULT); // 查询匹配 SearchHits hits = response.getHits(); //输出 }; public static void main(String[] args) { ConnectElasticsearch.connect(SEARCH_BY_RANGE); } } ``` **模糊查询** ```java public class QueryDoc { public static final ElasticsearchTask SEARCH_BY_FUZZY_CONDITION = client -> { // 创建搜索请求对象 SearchRequest request = new SearchRequest(); request.indices("user"); // 构建查询的请求体 SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.query(QueryBuilders.fuzzyQuery("name","wangwu").fuzziness(Fuzziness.ONE)); request.source(sourceBuilder); SearchResponse response = client.search(request, RequestOptions.DEFAULT); // 查询匹配 SearchHits hits = response.getHits(); //输出每条查询的结果信息 }; public static void main(String[] args) { ConnectElasticsearch.connect(SEARCH_BY_FUZZY_CONDITION); } } ``` **高亮查询** ```java public class QueryDoc { public static final ElasticsearchTask SEARCH_WITH_HIGHLIGHT = client -> { SearchRequest request = new SearchRequest().indices("user"); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); TermsQueryBuilder termsQueryBuilder = QueryBuilders.termsQuery("name","zhangsan"); // 设置查询方式 sourceBuilder.query(termsQueryBuilder); // 构建高亮字段 HighlightBuilder highlightBuilder = new HighlightBuilder(); highlightBuilder.preTags("<font color='red'>");// 设置标签前缀 highlightBuilder.postTags("</font>");// 设置标签后缀 highlightBuilder.field("name");// 设置高亮字段 // 设置高亮构建对象 sourceBuilder.highlighter(highlightBuilder); // 设置请求体 request.source(sourceBuilder); // 客户端发送请求,获取响应对象 SearchResponse response = client.search(request, RequestOptions.DEFAULT); // 打印响应结果 SearchHits hits = response.getHits(); }; public static void main(String[] args) { ConnectElasticsearch.connect(SEARCH_WITH_HIGHLIGHT); } } ``` **聚合—最大值查询** ```java public class QueryDoc { public static final ElasticsearchTask SEARCH_WITH_MAX = client -> { SearchRequest request = new SearchRequest().indices("user"); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.aggregation(AggregationBuilders.max("maxAge").field("age")); request.source(sourceBuilder); SearchResponse response = client.search(request, RequestOptions.DEFAULT); SearchHits hits = response.getHits(); System.out.println(response); }; public static void main(String[] args) { ConnectElasticsearch.connect(SEARCH_WITH_MAX); } } ``` **聚合—分组查询** ```java public class QueryDoc { public static final ElasticsearchTask SEARCH_WITH_GROUP = client -> { SearchRequest request = new SearchRequest().indices("user"); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.aggregation(AggregationBuilders.terms("age_groupby").field("age")); request.source(sourceBuilder); SearchResponse response = client.search(request, RequestOptions.DEFAULT); SearchHits hits = response.getHits(); System.out.println(response); }; public static void main(String[] args) { ConnectElasticsearch.connect(SEARCH_WITH_GROUP); } } ``` **批量删除** ```java public class BatchDeleteDoc { public static void main(String[] args) { ConnectElasticsearch.connect(client -> { //创建批量删除请求对象 BulkRequest request = new BulkRequest(); request.add(new DeleteRequest().index("user").id("1001")); request.add(new DeleteRequest().index("user").id("1002")); request.add(new DeleteRequest().index("user").id("1003")); //客户端发送请求,获取响应对象 BulkResponse responses = client.bulk(request, RequestOptions.DEFAULT); //打印结果信息 System.out.println("took:" + responses.getTook()); System.out.println("items:" + responses.getItems()); }); } } ``` ## 六、ES数据读写流程 ### 数据写入流程(增删改)  1. 客户端请求集群中的任意节点,该节点称为协调节点(coordinate node) 2. 协调节点通过**路由计算(`shard = hash(routing) % number_of_primary_shards`)** 将请求转到指定节点 1. `shard`:具体分片Px,范围是 `[0, number_of_primary_shards-1]` 2. `routing`:默认为 `_id`,也可以自定义 3. `number_of_primary_shards`:主分片数 3. 主分片保存数据 4. 主分片将数据复制到对应副本(乐观锁) 5. 副本保存数据 6. 副本向主分片上报成功 7. 主分片向客户端上报成功 **consistency**: 一致性。在写操作之前,要求必须要有规定数量(`quorum)`的分片+副本处于活跃可用状态。这是为了避免在发生网络分区故障(network partition)的时候进行写操作,进而导致数据不一致。 * `quorum`:规定数量活跃, `quorum=int((primary + number_of_replicas) / 2) + 1 ` `primary`为主分片数量;`number_of_replicas`为副本分片数量;3分片1副本的6分片,需活跃数: `((3 + 1) / 2) + 1 = 3 one`:只要有一个主分片活跃即可 * `all`:所有分片都需要活跃 该判断只有当 `number_of_replicas > 1`时才生效。假如一个只有1分片1副本,此时 `quorum = (1 + 1 / 2) + 1 = 2`,但可能就主分片活跃,如果不特殊处理将导致单节点集群无法工作。 **timeout**: 超时时间。活跃数不够时默认等待1min,期望活跃数量增加,超过则抛出timeout异常;timeout可配置:100(ms)、30s #### 更新操作  上文提到更新的本质就是删除重建,在3的时候如果冲突会根据 `retry_on_conflict`配置的次数重试。而4中转发的是完整文档的新版本,而不是修改请求(update转index和delete),防止因异步同步部分时顺序错乱而导致脏数据。 #### 文档刷新、刷写、合并  1. 主分片写入数据 1. 数据同步: 1. 主分片将数据写到Memory Buffer中,此时不可被搜索 2. 每秒执行一次Refresh,将数据刷到文件缓存Filesystem Cache中,转成段Segment并打开,此时可被搜索 Memory Buffer被占满时也会触发Refresh,大小为JVM 内存的10% 3. 清空Memory Buffer 2. 数据保障: 1. 主分片将数据写到Translog中 同时Translog 也用来提供实时CRUD,当通过_id查询、更新、删除一个文档,从相应的段中检索之前,会先检查Translog最近的变更 2. Translog追加到磁盘 追加写入性能比较优越。当宕机未落盘时,启动ES服务时会读取Translog的信息,将中间的操作命令进行回放来回复数据 2. 每执行一次Refresh就会在Filesystem Cache中生成一份Segment,每次请求都需要访问到对应的Segment,这样会加重请求负担,需要将小Segment合并Merge成大的Segment来提高搜索性能 1. 合并部分大小相似的Segment(不管是否Commit)到新的大Segment中,过程中不影响搜索 2. 删除旧Segment 3. 将数据Flush刷到磁盘(每30min or Translog达到512M): 1. 数据同步 1. Memory Buffer所有文档写入新Segment中 2. 清空Memory Buff 3. 创建Commit Point,追加.del文件,写入硬盘 4. Segment 被 Fsync 刷新到磁盘 2. 数据保障 1. 内存中的Translog Flush 到磁盘中的Translog 2. 删除内存中旧Translog,创建新的 #### 优化参数 (一)如果是非近实时的大索引,可以降低刷新频率 ```json PUT _settings { "settings": { "refresh_interval": "1s" } } ``` (二)构建一个大索引时,可以先关闭刷新,构建完再开启 ```json # 关闭自动刷新 PUT /xxx_2022/_settings { "refresh_interval": "-1" } # 每一秒刷新 PUT /xxx_2022/_settings { "refresh_interval": "1s" } ``` (三)将 `index.translog.durability` 配置为 `async`,通过异步写入磁盘来提升性能。写入磁盘的频率可通过 `index.translog.sync_interval` 来控制。 ### 数据读取流程(查) es的查询流程分为 `GET`和 `SEARCH`两种模式。 #### GET查询  1. 客户端请求集群中的任意节点,该节点称为协调节点(coordinate node) 2. 协调节点通过路由计算出文档所在的分片,再通过集群状态中的内容路由表信息获知该分片的副本信息 3. 协调节点将客户端请求轮询发送到集群的所有副本来实现负载均衡(读取失败后会尝试从其它分片读取) 4. 收到请求的节点将文档返回给协调节点 5. 协调节点将文档返回给客户端 如果对是实时性要求不高,可以GET查询时不要刷新来提升性能。 #### SEARCH查询 Search查询分为**查询(query)** 和 **拉取(fetch)** 两个阶段。在查询时不知道文档位于哪个分片,因此索引的所有分片都要参与搜索,再由协调节点将结果合并。其中查询又分为以下两种: 1. `query_then_fetch`:默认。查询时只返回_id,拉取时再走GET查询拉取完整文档。 2. `query_and_fetch`:查询时直接返回完整文档。一般用于算分时用到全局指标,让结果更加精确。非算分必要,建议只在指定单分片的时候使用。其优化版为 `DFS_query_and_fetch`。  1. 客户端请求集群中的任意节点,该节点称为协调节点(coordinate node) 协调节点会创建一个大小为 from + size 的空优先队列 2. 将查询请求转发到索引的每个主分片或副本分片中 每个分片在本地执行查询并添加结果到大小为 from + size 的本地有序优先队列中 3. 每个分片返回各自优先队列中所有 **_id**和**排序值**给协调节点 4. 协调节点合并这些值到自己的优先队列中来产生一个全局排序后的结果列表 5. 协调节点通过GET查询方式获取到完整文档给客户端 search需要遍历相关的所有分片,每个分片可能有多个lucene段,每个段都需要遍历,因此ES的常见优化策略就是将段进行合并。 es的分页其实是重新搜索,即使是查后面几页,也会将前几页的数据聚合进行分页,因此非常耗费内存,对于这种有深度分页的需求可以用Scroll滚动查询。 聚会操作是在lucene检索完毕后es中实现的。 ## 七、ES相关优化 ### (0)配置参数优化 * 给每个文档指定有序的具有压缩良好的序列模式ID,避免随机的UUID-4 这样的 ID,这样的ID压缩比很低,会明显拖慢 Lucene。 * 对于那些不需要聚合和排序的索引字段禁用Doc values。Doc Values是有序的基于document => field value的映射列表; * 不需要做模糊检索的字段使用 keyword类型代替 text 类型,这样可以避免在建立索引前对这些文本进行分词。 * 如果你的搜索结果不需要近实时的准确度,考虑把每个索引的 index.refresh_interval 改到 30s 。如果你是在做大批量导入,导入期间你可以通过设置这个值为 -1 关掉刷新,还可以通过设置 index.number_of_replicas: 0关闭副本。别忘记在完工的时候重新开启它。 * 避免深度分页查询建议使用Scroll进行分页查询。普通分页查询时,会创建一个from + size的空优先队列,每个分片会返回from + size 条数据,默认只包含文档id和得分score给协调节点,如果有n个分片,则协调节点再对(from + size)× n 条数据进行二次排序,然后选择需要被取回的文档。当from很大时,排序过程会变得很沉重占用CPU资源严重。 * 减少映射字段,只提供需要检索,聚合或排序的字段。其他字段可存在其他存储设备上,例如Hbase,在ES中得到结果后再去Hbase查询这些字段。 * 创建索引和查询时指定路由routing值,这样可以精确到具体的分片查询,提升查询效率。路由的选择需要注意数据的分布均衡。 * 通过设置 gateway.recover_after_nodes、 gateway.expected_nodes、 gateway.recover_after_time 可以在集群重启的时候避免过多的分片交换,这可能会让数据恢复从数个小时缩短为几秒钟。 ### (1)硬件优化 阿里云选贵的买就完事了 ES的硬件优化本质就是磁盘I/O的优化,可以针对以下几个点做参考: * 使用 SSD。就像其他地方提过的, 他们比机械磁盘优秀多了。 * 使用 RAID 0。条带化 RAID 会提高磁盘 I/O,代价显然就是当一块硬盘故障时整个就故障了。不要使用镜像或者奇偶校验 RAID 因为副本已经提供了这个功能。 * 另外,使用多块硬盘,并允许 Elasticsearch 通过多个 path.data 目录配置把数据条带化分配到它们上面。 * 不要使用远程挂载的存储,比如 NFS 或者 SMB/CIFS。这个引入的延迟对性能来说完全是背道而驰的。 * 如果你用的是 EC2,当心 EBS。即便是基于 SSD 的 EBS,通常也比本地实例的存储要慢。 * 即使数据中心们近在咫尺,也要避免集群跨越多个数据中心。绝对要避免集群跨越大的地理距离。 ### (2)分片优化 **分片设置:** (1)每个分片不超过JVM堆空间配置。如:索引500G / JVM32G = 分片16 (2)总分片数 <= 节点数 * 3 (3)节点数 <= 主分片数 * (副本数 + 1) **延迟分片分配:** 有分片瞬时中断的时候可以先等一小段时间看看,避免触发重新分配带来的极大开销 ```json PUT /_all/_settings { "settings": { "index.unassigned.node_left.delayed_timeout": "5m" } } ``` ### (3)路由优化 走 `_id`查询就是最快的(先查Translog再查Segment),不用搜索再聚合(即使是精确查询),所以可以用业务uuid去构造_id。 查找的时候走 getById() 而不是 getByUuid() ### (4)写入速度优化 (1)对于实时性要求不高的索引,可以适当减少Refresh次数,配置上文有 (2)适当调大内存中Translog的Flush阈值(默认512M):`index.translog.flush_threshold_size ` (3)减少副本数,一份副本就够了 ### (5)内存优化 **不要超过物理内存的 50%**: Lucene 的设计目的是把底层 OS 里的数据缓存到内存中。Lucene 的段是分别存储到单个文件中的,这些文件都是不会变化的,所以很利于缓存,同时操作系统也会把这些段文件缓存起来,以便更快的访问。如果我们设置的堆内存过大, Lucene 可用的内存将会减少,就会严重影响降低 Lucene 的全文本查询性能。 **堆内存的大小最好不要超过 32GB**: 在 Java 中,所有对象都分配在堆上,然后有一个 Klass Pointer 指针指向它的类元数据。这个指针在 64 位的操作系统上为 64 位, 64 位的操作系统可以使用更多的内存(2^64)。在 32 位 的系统上为 32 位, 32 位的操作系统的最大寻址空间为 4GB(2^32)。 但是 64 位的指针意味着更大的浪费,因为你的指针本身大了。更糟糕的是,更大的指针在主内存和缓存器(例如 LLC, L1 等)之间移动数据的时候,会占用更多的带宽。 ```xml # 确保 Xmx 和 Xms 的大小是相同的,其目的是为了能够在 Java 垃圾回收机制清理完堆区后不需要重新分隔计算堆区的大小而浪费资源,可以减轻伸缩堆大小带来的压力。64G内存分配如下: # 堆的初始大小 -Xms 31g # 可分配的最大内存 -Xmx 31g ``` **GC 默认采用CMS的方式,并发但是有STW的问题,可以考虑使用G1收集器,但如果对这块不是特别熟悉****不要随意修改垃圾回收器(CMS)和各个线程池的大小。** ## 八、面试常问 ### 为什么要使用 Elasticsearch? 大多数的业务系统中,我们需要通过**模糊搜索**去查询数据,而模糊搜索会导致索引失效走全表扫描,容易把数据库打挂。对于日增在万级的表,即使通过时间去圈定范围,查询一周的量就会导致CPU飙到顶,妥妥的慢SQL,在网关层就超时了。 而ES采用倒排索引的方式更好的支持了全文检索,响应速度是近实时的。 ### Elasticsearch 的 master 选举流程? 1. Elasticsearch的选主是Zen Discovery模块负责的,主要包含Ping(节点之间通过这个RPC来发现彼此) 和Unicast(单播模块包含一个主机列表以控制哪些节点需要Ping通)这两部分。 2. 对所有可以成为master的节点(`node master: true`)根据 `nodeId`字典排序,每次选举每个节点都把自 己所知道节点排一次序,然后选出第一个(第0位)节点,暂且认为它是master节点。 3. 如果对某个节点的投票数达到一定的值(`可以成为master节点数n/2+1`)并且该节点自己也选举自己, 那这个节点就是master。否则重新选举一直到满足上述条件。 master节点的职责主要包括集群、节点和索引的管理,不负责文档级别的管理;data节点可以关闭http 功能。 ### 脑裂现象 同时如果由于网络或其他原因导致集群中选举出多个Master节点,使得数据更新时出现不一致,这种现象称之为**脑裂** ,即集群中不同的节点对于master的选择出现了分歧,出现了多个master竞争。 “脑裂”问题可能有以下几个原因造成: 1. **网络问题** :集群间的网络延迟导致一些节点访问不到master,认为master挂掉了从而选举出新的master,并对master上的分片和副本标红,分配新的主分片 2. **节点负载** :主节点的角色既为master又为data,访问量较大时可能会导致ES停止响应(假死状态)造成大面积延迟,此时其他节点得不到主节点的响应认为主节点挂掉了,会重新选取主节点。 3. **内存回收** :主节点的角色既为master又为data,当data节点上的ES进程占用的内存较大,引发JVM的大规模内存回收,造成ES进程失去响应。 为了避免脑裂现象的发生,我们可以从原因着手通过以下几个方面来做出优化措施: 1. **适当调大响应时间,减少误判**通过参数 `discovery.zen.ping_timeout`设置节点状态的响应时间,默认为3s,可以适当调大,如果master在该响应时间的范围内没有做出响应应答,判断该节点已经挂掉了。调大参数(如6s,discovery.zen.ping_timeout:6),可适当减少误判。 2. **选举触发**我们需要在候选集群中的节点的配置文件中设置参数 `discovery.zen.munimum_master_nodes`的值,这个参数表示在选举主节点时需要参与选举的候选主节点的节点数,默认值是1,官方建议取值 `(master_eligibel_nodes/2) + 1`,其中 `master_eligibel_nodes`为候选主节点的个数。这样做既能防止脑裂现象的发生,也能最大限度地提升集群的高可用性,因为只要不少于 `discovery.zen.munimum_master_nodes`个候选节点存活,选举工作就能正常进行。当小于这个值的时候,无法触发选举行为,集群无法使用,不会造成分片混乱的情况。 3. **角色分离**即候选主节点和数据节点进行角色分离,这样可以减轻主节点的负担,防止主节点的假死状态发生,减少对主节点“已死”的误判: 1. 从节点配置:`node master: false`、`node data: true` 2. 主节点配置:`node master: true`、`node data: false` ### GC 方面,在使用 Elasticsearch 时要注意什么? 倒排词典的索引需要常驻内存,无法 GC,需要监控 data node 上 segment memory 增长趋势。 各类缓存, field cache, filter cache, indexing cache, bulk queue 等等,要设置合理的大小,并且要应该根据最坏的情况来看 heap 是否够用,也就是各类缓存全部占满的时候,还有 heap 空间可以分配给其他任务吗?避免采用 clear cache 等“自欺欺人”的方式来释放内存。 避免返回大量结果集的搜索与聚合。确实需要大量拉取数据的场景,可以采用 scan & scroll api 来实现。 cluster stats 驻留内存并无法水平扩展,超大规模集群可以考虑分拆成多个集群通过 tribe node 连接。 想知道 heap 够不够,必须结合实际应用场景,并对集群的 heap 使用情况做持续的监控。 ### 在并发情况下, Elasticsearch 如果保证读写一致? ES可以通过版本号使用乐观并发控制,以确保新版本不会被旧版本覆盖,由应用层来处理具体的冲突; 另外对于写操作,一致性级别支持 quorum/one/all,默认为 quorum,即只有当大多数分片可用时才允许写操作。但即使大多数可用,也可能存在因为网络等原因导致写入副本失败,这样该副本被认为故障,分片将会在一个不同的节点上重建。 对于读操作,可以设置 replication 为 sync(默认),这使得操作在主分片和副本分片都完成后才会返回;如果设置 replication 为 async 时,也可以通过设置搜索请求参数_preference 为 primary 来查询主分片,确保文档是最新版本。 --- 还有就是字典树、倒排索引、es的复杂类型、各种名词字段的含义等等 Last modification:August 22, 2022 © Allow specification reprint Like 1 喵ฅฅ