ElasticSearch是广受欢迎的NoSQL数据库,其分布式架构提供了极佳的数据空间的水平扩展能力,同时保障了数据的可靠性;反向索引技术使得数据检索和查询速度非常快。更多功能参见官网介绍
https://www.elastic.co/cn/elasticsearch/
下面简单罗列了通过Python访问ES的方法。
注:本文不是Elasticsearch的入门介绍,需要有ES基本知识。
Elastic提供的Python ElasticSearch原生接口,源代码托管在Github上。项目链接和文档链接如下:
https://github.com/elastic/elasticsearch-py
https://www.elastic.co/guide/en/elasticsearch/client/python-api/7.17/examples.html#examples
下面是常见操作示例:
建立ES连接
from elasticsearch import Elasticsearch, helpers
from elasticsearch.exceptions import ConnectionError, ConnectionTimeout, TransportError
...
try :
#es = Elasticsearch(es_server, retry_on_timeout=True)
es = Elasticsearch(es_server, http_auth=(es_user, es_pass), timeout=30, max_retries=10, retry_on_timeout=True)
print("Connection failed, exit ...")
sys.exit(1)
创建ES数据
doc = {
'author': 'author_name',
'text': 'Interesting content...',
'timestamp': datetime.now(),
}
res = es.index(index="test-index", id=1, body=doc)
获取ES数据
res = es.get(index="test-index", id=1)
通过查询获取ES数据
query={"match_all":{}}
try :
result = es.search(index=index, query=query, size=10000)
except([ConnectionError, ConnectionTimeout, TransportError]):
print("Connection failed, exit ...")
sys.exit(1)
data=[]
for item in result['hits']['hits'] :
data.push(item['_source'])
更新ES数据
doc = {
'author': 'author_name',
'text': 'Interesting modified content...',
'timestamp': datetime.now(),
}
res = es.update(index="test-index", id=1, body=doc)
删除ES数据
es.delete(index="test-index", id=1)
原生ES python接口在查询时需要编写复杂的DSL查询语句,Elastic提供的ElasticSearch-DSL库极大地简化了查询语法,方便编写查询语句。相关项目和文档的URL:
https://github.com/elastic/elasticsearch-dsl-py
https://elasticsearch-dsl.readthedocs.io/en/latest/
示例代码如下:
from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search
client = Elasticsearch()
s = Search(using=client, index="my-index") \
.filter("term", category="search") \
.query("match", title="python") \
.exclude("match", description="beta")
s.aggs.bucket('per_tag', 'terms', field='tags') \
.metric('max_lines', 'max', field='lines')
response = s.execute()
for hit in response:
print(hit.meta.score, hit.title)
for tag in response.aggregations.per_tag.buckets:
print(tag.key, tag.max_lines.value)
Pandas是流行的大数据处理Python库,Elastic提供了Pandas DataFrame的接口 ,可以直接将索引(数据表)中的数据放到 pandas 的 dataframe 中,非常方便。相关项目和文档URL如下:
https://github.com/elastic/eland
https://eland.readthedocs.io/en/latest/reference/dataframe.html
注意:返回的并不是原生Pandas DataFrame,而是Elastic自己的实现,但并没有实现所有DataFrame的功能。
示例代码如下:
import eland as ed
# Connecting to an Elasticsearch instance running on 'localhost:9200'
df = ed.DataFrame("localhost:9200", es_index_pattern="flights")
也可以先建立 ES 连接
# Connecting to an Elastic Cloud instance
from elasticsearch import Elasticsearches = Elasticsearch(
"localhost:9200",
http_auth=("elastic", "<password>")
)
df = ed.DataFrame(es, es_index_pattern="flights")
?
eland虽然可以方便将 Elastic 中的数据转换为 dataframe,但没有提供将 dataframe 保存到 Elastic的接口。这时我们需要使用第三方的接口。es_pandas是开源的 ES Pandas接口,可以直接将ES查询得到的数据以Pandas DataFrame的方式返回,也可将 dataframe 保存到 Elastic 中。
https://github.com/fuyb1992/es_pandas
初始化与ES的连接
import pandas as pd
from es_pandas import es_pandas
...
epcon = None
try :
epcon = es_pandas(esurl)
except Exception as e:
logger.error("Initializa DB connection failed! Error[{}]".format(str(e)))
从ES表中获取数据,返回格式为Pandas DataFrame
# 从ES表中获取数据返回DataFrame
try:
if query is None:
data = epcon.to_pandas(dbname, infer_dtype=True, show_progress=False)
else:
data = epcon.to_pandas(dbname, infer_dtype=True, show_progress=False, query_rule=query)
except exceptions.NotFoundError:
logger.debug("Not found data. Params: dbname[{}] query[{}]".format(dbname, query))
将Pandas DataFrame中的数据写入ES表中
# 将DataFrame中的数据写入ES表中
ret = True
try:
epcon.to_es(df, dbname, use_index=True, _op_type='create', thread_count=2, chunk_size=10000, show_progress=False)
except ConnectionError:
ret = False
logger.error("Save data failed! Params: dbname[{}] data[{}],, connection error!".format(dbname, df))
将Pandas DataFrame中的数据更新到ES表中
# 将DataFrame中的更新到ES表中
ret = True
try:
epcon.to_es(df, dbname, use_index=True, _op_type='update', thread_count=2, chunk_size=10000, show_progress=False)
except ConnectionError:
ret = False
logger.error("Update data failed! Params: dbname[{}] data[{}],, connection error!".format(dbname, df))
将Pandas DataFrame中的数据从ES表中删除
# 将DataFrame中的数据从ES表中删除
ret = True
try:
epcon.to_es(df, dbname, use_index=True, _op_type='delete', thread_count=2, chunk_size=10000, show_progress=False)
except ConnectionError:
ret = False
logger.error("Delete data failed! Params: dbname[{}] data[{}],, connection error!".format(dbname, df))