扫码阅读
手机扫码阅读

Odoo 集成 ElasticSearch

80 2024-03-14

本期作者

王泽宇

数据开发工程师

擅长 java 开发

Step 1 安装分词器

找到 ElasticSearch 安装目录的 bin 目录,在该目录下执行以下命令,安装中文 ik 分词器。

这里注意需要 切换 ElasticSearch 启动用户

注意:如果是集群安装 ElasticSearch,则要给每个节点的 ElasticSearch 都安装 ik 分词器。

su es./elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v7.6.0/elasticsearch-analysis-ik-7.6.0.zip

安装好后需要 kill -9 杀死 ElasticSearch 进程,再进行重启。

Step 2 测试分词器

首先创建一个测试索引,测试分词器到底安装成功了没有。

PUT test POST test/_analyze{ "analyzer": "ik_smart", "text": "测试语句分词,测一测ik分词器到底行不行"}

最终输出的分词结果应该是:

{ "tokens" : [ { "token" : "测试", "start_offset" : 0, "end_offset" : 2, "type" : "CN_WORD", "position" : 0 }, { "token" : "语句", "start_offset" : 2, "end_offset" : 4, "type" : "CN_WORD", "position" : 1 }, { "token" : "分词", "start_offset" : 4, "end_offset" : 6, "type" : "CN_WORD", "position" : 2 }, { "token" : "测", "start_offset" : 7, "end_offset" : 8, "type" : "CN_CHAR", "position" : 3 }, { "token" : "一测", "start_offset" : 8, "end_offset" : 10, "type" : "CN_WORD", "position" : 4 }, { "token" : "ik", "start_offset" : 10, "end_offset" : 12, "type" : "ENGLISH", "position" : 5 }, { "token" : "分词器", "start_offset" : 12, "end_offset" : 15, "type" : "CN_WORD", "position" : 6 }, { "token" : "到底", "start_offset" : 15, "end_offset" : 17, "type" : "CN_WORD", "position" : 7 }, { "token" : "行不行", "start_offset" : 17, "end_offset" : 20, "type" : "CN_WORD", "position" : 8 } ]}

ElasticSearch 安装好了以后,

剩下的任务就是,

如何将 Odoo 的数据

和 ElasticSearch 进行同步

选择操作包

首先我们要 选择操作包,python 操作 ElasticSearch 的包是 elasticsearch。

因为我安装的版本是 elasticsear7.6,所以选择 elasticsear7 这个包。

pip3 install elasticsearch7

字段映射

在业务设计中,经常出现这样的情形,两个不同的模型出现同名的字段。

如果要将这些字段全部放在同一个 ElasticSearch 索引下,就会导致 字段重名的问题,进而产生各种错误。

为了解决这个问题,可以利用 字段映射来解决。

用模型名和字段名做 key,去映射ElasticSearch 里面的一个唯一字段。

index_filed_list = { ("product", "product_name"): "product_name", ("product", "comment"): "product_comment", ("product", "solution"): "product_solution",  ("company", "company_name"): "company_name", ("company", "founder"): "company_founder", ("company", "comment"): "company_comment",  ("sales", "name"): "sale_names",}

获取配置信息

我们在 odoo.conf 中配置好 es 的 ip、端口和索引,再在代码中去获取这个配置信息。

import odoo.tools host = odoo.tools.config['es_host']port = odoo.tools.config['es_port']index = odoo.tools.config['index_name']

实现 ES 工具类

接下来就要去实现 ElasticSearch 的工具类了。

首先创建一个 ElasticSearch 的相关模型,并且禁止它去 PostgreSQL 中创建表。

# -*- coding: utf-8 -*-from odoo import models, apifrom elasticsearch7 import Elasticsearchimport odoo.tools host = odoo.tools.config['es_host']port = odoo.tools.config['es_port']index = odoo.tools.config['index_name'] index_filed_list = { ("product", "product_name"): "product_name", ("product", "comment"): "product_comment", ("product", "solution"): "product_solution",  ("company", "company_name"): "company_name", ("company", "founder"): "company_founder", ("company", "comment"): "company_comment",  ("sales", "name"): "sale_names",}  class ElasticSearchUtil(models.AbstractModel): _description = 'es工具类' _name = 'prod.util.elasticsearch' _auto = False

实现一个静态的获取 ElasticSearch 实例的方法。

@staticmethoddef get_es_instance(): return Elasticsearch([{'host': host, 'port': port}], http_auth=())

考虑到索引有可能是未创建的状态,因此需要先检查一下索引是否存在,如果不存在则创建一个新索引

@api.model def create_index(self, es): # 判断索引是否存在,不存在就创建一个 if not es.indices.exists(index=index): es.indices.create(index=index)

在某些特殊情况,我们需要删掉索引强制重建

@api.modeldef delete_index(self): # 存在就给他删了 es = self.get_es_instance() if es.indices.exists(index=index): es.indices.delete(index=index)

ElasticSearch 默认的分词器是不支持中文的。

因此在遇到新的字段时,必须显式的创建 mapping 并支持中文分词。

这里我们使用 ik 分词器。

@api.model def get_fileds_mapping_or_create(self, es, fileds): # 判断这些字段的mapping是否存在,不存在就创建 mapping_dict = es.indices.get_mapping(index=index).get(index).get('mappings') if mapping_dict.get('properties'): mapping_list = mapping_dict.get('properties').keys() else: mapping_list = [] body = { "properties": { } } update_mapping = False for filed in fileds: if filed not in mapping_list: update_mapping = True body['properties'][filed] = { "type": "text", "analyzer": "ik_max_word", "search_analyzer": "ik_smart" } if update_mapping: es.indices.put_mapping(index=index, body=body)

在实际的业务过程中,新插入一条数据,可以直接在 ElasticSearch 中创建一个文档。

而如果有对数据的修改,ElasticSearch 的包可不支持直接的修改,需要写一个修改的 json。

我们这里把修改字段内容的方法封装一下。

@api.model def update_filed_by_id(self, es, id, fileds: dict): # 更新一条数据 update_str = ';'.join([f"ctx._source.{key}='{value}'" for key, value in fileds.items()]) body = { "script": { "source": update_str } } es.update(index=index, id=id, body=body)

对于数据的删除,有两种情况:

一种情况是该数据是一对多存储的,删除这条数据,只会删除索引中的其中一部分,属于文档数据修改。

另一种情况是该数据是一对一存储,应该直接删除该字段。

@api.model def delete_filed(self, ids, model_name): # 删除某些字段 fileds = [] for key in index_filed_list.keys(): if key[0] == model_name: fileds.append(index_filed_list.get(key)) es = self.get_es_instance() delete_str = ';'.join([f"ctx._source.remove('{item}')" for item in fileds]) for id in ids: body = { "query": { "term": { "_id": f"{id}" } }, "script": { "source": delete_str } } es.update_by_query(index=index, body=body) es.close()

前置的方法已经基本完成,现在开始实现同步PostgreSQL 字段和 ElasticSearch 索引的方法。

@api.model def insert_or_update_filed(self, ids, fileds: dict, model_name): # 检查索引是否存在 es = self.get_es_instance() self.create_index(es) body = {} update_value = False for key, value in fileds.items(): # 只有字符类型的才建索引 if (model_name, key) in index_filed_list.keys(): update_value = True body[index_filed_list.get((model_name, key))] = value if update_value: self.get_fileds_mapping_or_create(es, body.keys()) # 检查该id的记录是否存在 for id in ids: if not es.exists(index=index, id=id): es.index(index=index, doc_type='_doc', id=id, refresh=True, body=body) else: self.update_filed_by_id(es, id, body) es.close()

至此,ElasticSearch 工具类基本实现。

在其他的模型中,我们需要重写 Odoo 的增删改方法,进而实现在对模型进行增删改操作时,ElasticSearch 的索引能够同步进行修改。

# 重写模型创建方法,创建模型时同步创建es相关文档@api.modeldef create(self, vals): result = super().write(vals) self.env['prod.util.elasticsearch'].insert_or_update_filed(result.ids, vals, "company") return result # 重写模型修改方法,创建模型时同步修改es相关文档def write(self, vals): self.env['prod.util.elasticsearch'].insert_or_update_filed(self.ids, vals, "company") return super().write(vals) # 重写模型删除方法,创建模型时同步删除es相关文档def unlink(self): self.env['prod.util.elasticsearch'].delete_filed(self.ids, "company") return super().unlink()

注意,这样操作之后,每次修改都必须使用 Odoo 的 self.create/write/unlink 方法。

如果直接执行 PostgreSQL 的话,依旧会跳过 ElasticSearch 的索引同步,导致 ElasticSearch 索引和 PostgreSQL 中索引不一致。

面对这种情况,我们需要 重写一个强制完全重建索引的方法,来保证索引的一致性。

这里强制重建索引的代码和业务相关性比较大,需要根据自己的业务需求自行调整。

@api.model def rebuild_elasticsearch_index(self): # 强制重建索引方法 self.env['prod.util.elasticsearch'].delete_index() prod_ids = self.search([('id', '!=', None)]).ids for prod_id in prod_ids: prod = self.search_read([('id', '=', prod_id)])[0] # 判断是否有选择公司 if prod.get('company'): company = self.env['prod.company'].search_read([('id', '=', prod.get('company')[0])]) self.env['prod.util.elasticsearch'].insert_or_update_filed([prod.get('id')], company[0], "company") # 判断是否选择了销售 if prod.get('sales_emp'): sale_ids = prod.get('sales_emp') sales = self.env['prod.sales'].search([('id', 'in', sale_ids)]) sale_names = [] for s in sales: sale_names.append(s.name) self.env['prod.util.elasticsearch'].insert_or_update_filed([prod.get('id')],{'name': ','.join(sale_names)}, "sales") # 更新本体的索引字段 self.env['prod.util.elasticsearch'].insert_or_update_filed([prod.get('id')], prod, "product")

千万不要忘了,我们使用 ElasticSearch 做第三方的索引,目的就是进行全文搜索,现在来实现一个全文搜索的方法。

需要对 ElasticSearch 的语法有一定的了解。

使用 bool:{should:{}} 来进行多个条件的或运算

使用 match 的方法,可以对字段进行分词索引。

如果不需要分词的字段,可以使用 term:{value:filed.keyword} 的方式进行匹配。

匹配的结果记得要加上 highlight,确定具体匹配到了哪些词语。

最后再从 PostgreSQL 中取出不在索引内的字段,把整体返回结果封装一下就可以了。

@api.model def search_info_by_keyword(self, keyword, size, page): es = self.get_es_instance() values = index_filed_list.values() match_query_list = [{"match": {value: keyword}} for value in values]  highlight_query_dict = {value: {} for value in values}  body = { "query": { "bool": { "should": match_query_list } }, "highlight": { "fields": highlight_query_dict }, "size": size, "from": (page - 1) * size } result = { "info": [], 'total': 0 } hits = es.search(index=index, body=body)['hits'] search_result = hits['hits'] for info in search_result: fileds = info['_source'] info_highlight = info['highlight'] if "company_name" in info_highlight.keys(): company_name = info_highlight['company_name'][0] del info_highlight['company_name'] else: company_name = fileds.get('company_name')  if "product_name" in info_highlight.keys(): product_name = info_highlight['product_name'][0] del info_highlight['product_name'] else: product_name = fileds.get('product_name')  if len(info_highlight) > 0: highlight = '...'.join(value[0] for value in info_highlight.values()) else: if "product_comment" in fileds: highlight = fileds.get('product_comment') elif "company_name" in fileds: highlight = fileds.get('company_name') elif "product_solution" in fileds: highlight = fileds.get('product_solution') else: highlight = None prod = self.env['prod.product'].search([('id', '=', info['_id'])]) logo = prod.img logo_name = prod.file_name  logo = self.env['prod.picture'].picture_add_base64_head({'logo': logo, 'file_name': logo_name}, {'logo': 'file_name'}) result['info'].append({ "id": info['_id'], "company_name": company_name, "product_name": product_name, "highlight": highlight, "logo": logo.get('logo'), }) result['total'] = hits['total']['value'] return result

看一下搜索的效果如何,

是否符合预期呢?

/ 搜索结果

原文链接: http://mp.weixin.qq.com/s?__biz=Mzg5MzUyOTgwMQ==&mid=2247489296&idx=1&sn=cf27527e59de71a1cf1581f1a7a5b263&chksm=c02c2cb6f75ba5a04c9ff970fb8826b2c0d7742db977ee4ebd3a47276a57d6add24e7ddc2101#rd