Elasticsearch and Python

Dec 3, 2020 12:11 · 1868 words · 9 minute read elasticsearch python search nosql json

Now that we have explored the fundamentals of Elasticsearch in our previous blog post, we’ll have a look at the Elasticsearch Python client. Two clients are made available by Elasticsearch:

  • Low level client: it has all the functionalities and sets the basis for the Python clients.
  • High level library: Elasticsearch dsl is a higher level library that is built on top of the low level client.

Both libraries have advantages.

The DSL library has a more concise and pythonic syntax but also has two main limits:

  1. it depends on the low level client
  2. it is usually updated with delay compared to the low level client

We’ll then stick with the low level client that gets all the latest functionalities first and still has a reasonably straightforward syntax. We’ll look at basic operations on indexes and documents.

Initiating the Elasticsearch client

Elasticsearch recently introduced support for asyncio in the Python low level client. For now, we’ll stick with the synchronous client to get a basic understanding of how the client works. We’ll see later that the async client has a very similar syntax and that most of what you’ll need for using it is a basic understanding of asyncio itself.

The first thing we’ll do is to initialise the Python client. For that, you’ll need to create a virtual environment and install the Python client. We’ll first create a test folder and then setup the virtual env. You can do it like follows

# create a test folder and change directory
mkdir test && cd test

# create and activate the virtual environment, this assumes Python >= 3.6
virtualenv venv && source venv/bin/activate

# create a requirements file, add elasticsearch python requirement and install it
touch requirements.txt
echo "elasticsearch==7.10.0" > requirements.txt
pip install -r requirements.txt

You now have the Python client installed in your virtual environment. The next step is to actually have an Elasticsearch cluster running. You can do that with Docker running the instructions in the blog post mentioned in introduction. Once done you’ll be all good to go for starting to write Python code with the Elasticsearch Python client.

You can either run your code in the Python interpreter or create a Python script and run it from the command line. We’ll use the latter option to keep track of our progress.

Let’s create our script file

touch __init__.py

You can then open the __init__.py file in your text editor and start iterating by copying code to the file and running the script.

import json
from typing import List, Optional

from elasticsearch import Elasticsearch
from elasticsearch.client import CatClient, ClusterClient, IndicesClient
from elasticsearch.exceptions import NotFoundError, RequestError
from elasticsearch.helpers import bulk


ES_HOSTS = ["localhost:9200"]
es_client = Elasticsearch(ES_HOSTS)

This intial step adds the basic imports we’ll need and instantiates the base Elasticsearch client.

Cat and Cluster clients

We’ll start with a couple of basic operations to confirm that our cluster is healthy.

# init the cat and cluster clients
cat_client = CatClient(es_client)
cluster_client = ClusterClient(es_client)


# check the Elasticsearch cluster health
cluster_health = {
    k: v
    for k, v in cluster_client.health().items()
    if k in ("status", "number_of_nodes")
}
print("Cluster health:")
print(json.dumps(cluster_health, indent=4))

If you have successfully setup your Elasticsearch cluster and your virtual environment, you should get a response similar to this one

Cluster health:
{
    "status": "yellow",
    "number_of_nodes": 1
}

Note that your cluster status is yellow (and not green) because you only have one node and data can’t be replicated across more than one node.

Let’s try the CAT API now. We’ve already setup the cat_client above and we just have to run the following

# check the Elasticsearch indexes available
print("List indices:")
print(cat_client.indices())

If you have followed our onboarding this should show one index, similar to the following

List indices:
yellow open my-index-0                   -vsGlUbKQEW3DuTqEWgdnQ 1 1  1   0   3.5kb   3.5kb

OK we have confirmed our cluster is correctly setup and we have one index available. We are ready to move on.

Indices client

We’ll look at basic operations on indexes, namely creation and deletion. While looking at this, we’ll also check how to verify that an index exists and how to catch a couple of elasticsearch exceptions.

# init the indices client
indices_client = IndicesClient(es_client)

First, I’ll show you how to delete an index.

# delete index
print("Deleting index 'my-index-0'")
try:
    indices_client.delete(index='my-index-0')
except NotFoundError:
    print("'my-index-0' does not exist")

Nothing very fancy. Please just note that you need to handle the NotFoundError as an index delete will raise if the index does not exist.

Creating an index is as simple

# create index
print("Creating index 'my-index-1'")
try:
    indices_client.create(index='my-index-1')
except RequestError as err:
    if err.error == 'resource_already_exists_exception':
        print("'my-index-1' already exists")
    else:
        raise

Afer running these two operations, you should now have a my-index-1 index and no my-index-0 index.

print(f"'my-index-0' exists: {indices_client.exists(index='my-index-0')}")
print(f"'my-index-1' exists: {indices_client.exists(index='my-index-1')}")

That was pretty straightforward, right?

CRUD operations

Ok, let’s now have a look at the CRUD operations on documents. We’ll also explore how to bulk these operations. We won’t mention the full list of endpoints available but that should give you a pretty good starting point.

We’ll also create some helpers but you don’t have to. These are just there to abstract out some of the internals of calling the client and handling responses and exceptions.

Create

To create a document, you can use the index method.

def create(*, index: str, body: dict, id: Optional[str] = None,
           refresh: Optional[str] = "wait_for") -> None:
    es_client.index(index=index, id=id, body=body, refresh=refresh)

You can either specify an id if you want the doc ID to be deterministic, you can omit it if you are fine leaving it to Elasticsearch.

create(index="my-index-1", id="1", body={"user": "test"})

Make sure that the index my-index-1 does exist before sending the create request. If you visit localhost:9200my-index-1/_doc/1 you should see your freshly create doc.

Read

You can use get to retrieve a single document

def get(*, index: str, id: str) -> dict:
    r = es_client.get(index=index, id=id)
    return r['_source'] if r.get('found') else None
doc = get(index="my-index-1", id="1")

This should return your document if you have followed along.

You can use msearch for retrieving more than one ID.

If you don’t know the document(s) ID, you can build your query and perform a search.

def search(*, index: str, body: dict) -> List[dict]:
    response = es_client.search(index=index, body=body)

    return [
        {
            "_id": hit["_id"],
            **hit["_source"],
        }
        for hit in response["hits"]["hits"]
    ]

And you can call it as follows

# should return one result
body = {
    "query": {
        "bool": {
            "filter": {
                "term": {"user": "test"}
            }
        }
    }
}
results = search(index="my-index-1", body=body)

# should return no result
body = {
    "query": {
        "bool": {
            "filter": {
                "term": {"user": "nothing"}
            }
        }
    }
}
results = search(index="my-index-1", body=body)

If you are not familiar with Elasticsearch Query DSL, you can learn more about in Elasticsearch reference docs and the dedicated reference page about term queries.

Update

OK, let’s have a look at the update operation.

def update(*, index: str, id: str, doc: dict, refresh: Optional[str] = "wait_for") -> None:
    body = {
        "doc": doc
    }
    es_client.update(index=index, id=id, body=body, refresh=refresh)
doc = {
    "user": "prod"
}
update(index="my-index-1", id="1", doc=doc)

We do not handle any initial check or any exception here but you might want to investigate that in a production environment.

Delete

And the delete operation

def delete(*, index: str, id: str, refresh: Optional[str] = "wait_for") -> None:
    es_client.delete(index=index, id=id, ignore=[404], refresh=refresh)
delete(index="my-index-1", id="1")

Note that we use the ignore named parameter here and ignore 404 responses, i.e. the requests on not found documents.

Ok, that’s it for our basic CRUD operations. We’ll just have a quick look at the bulk operations as they can seriously improve the performance of your operations.

Bulk

Bulk is handy for performing multiple index/update/delete operations at once.

The bulk api accepts index, create, delete, and update actions. You can use the _op_type field to specify an action (default value is index):

def bulk_create(*, es_client: Elasticsearch, index: str, sources: List[dict],
                refresh: Optional[str] = "wait_for") -> None:
    actions = [
        {
            "_op_type": "index",
            "_index": index,
            "_source": source
        }
        for source in sources
    ]

    if actions:
        bulk(client=es_client, actions=actions, refresh=refresh)
sources = [
    {"user": "John"},
    {"user": "Malcom"},
]
bulk_create(es_client=es_client, index="my-index-1", sources=sources)

The actions parameter can also be a generator and that’s recommended if your list of sources is reasonably large.

Conclusion

I’ve copied the full script here for your convenience (it includes quite a few print statements to mark the different steps)

import json
from typing import List, Optional

from elasticsearch import Elasticsearch
from elasticsearch.client import CatClient, ClusterClient, IndicesClient
from elasticsearch.exceptions import NotFoundError, RequestError
from elasticsearch.helpers import bulk


ES_HOSTS = ["localhost:9200"]
es_client = Elasticsearch(ES_HOSTS)

# init the cat and cluster clients
cat_client = CatClient(es_client)
cluster_client = ClusterClient(es_client)


# check the Elasticsearch cluster health
cluster_health = {
    k: v
    for k, v in cluster_client.health().items()
    if k in ("status", "number_of_nodes")
}
print("Cluster health:")
print(json.dumps(cluster_health, indent=4))


# check the Elasticsearch indexes available
print("List indices:")
print(cat_client.indices())

# init the indices client
indices_client = IndicesClient(es_client)

# delete index
print("Deleting index 'my-index-0'")
try:
    indices_client.delete(index='my-index-0')
except NotFoundError:
    print("'my-index-0' does not exist")


# create index
print("Creating index 'my-index-1'")
try:
    indices_client.create(index='my-index-1')
except RequestError as err:
    if err.error == 'resource_already_exists_exception':
        print("'my-index-1' already exists")
    else:
        raise


print(f"'my-index-0' exists: {indices_client.exists(index='my-index-0')}")
print(f"'my-index-1' exists: {indices_client.exists(index='my-index-1')}")


def create(*, index: str, body: dict, id: Optional[str] = None,
           refresh: Optional[str] = "wait_for") -> None:
    es_client.index(index=index, id=id, body=body, refresh=refresh)


create(index="my-index-1", id="1", body={"user": "test"})


def get(*, index: str, id: str) -> dict:
    r = es_client.get(index=index, id=id)
    return r['_source'] if r.get('found') else None


doc = get(index="my-index-1", id="1")
print(f"Doc with index 1: {doc}")


def search(*, index: str, body: dict) -> List[dict]:
    response = es_client.search(index=index, body=body)

    return [
        {
            "_id": hit["_id"],
            **hit["_source"],
        }
        for hit in response["hits"]["hits"]
    ]


# should return one result
body = {
    "query": {
        "bool": {
            "filter": {
                "term": {"user": "test"}
            }
        }
    }
}
results = search(index="my-index-1", body=body)
print(f"Search for user 'test': {results}")

# should return no result
body = {
    "query": {
        "bool": {
            "filter": {
                "term": {"user": "nothing"}
            }
        }
    }
}
results = search(index="my-index-1", body=body)
print(f"Search for user 'nothing': {results}")


def update(*, index: str, id: str, doc: dict, refresh: Optional[str] = "wait_for") -> None:
    body = {
        "doc": doc
    }
    es_client.update(index=index, id=id, body=body, refresh=refresh)


doc = {
    "user": "prod"
}
update(index="my-index-1", id="1", doc=doc)
doc = get(index="my-index-1", id="1")
print(f"Doc with index 1: {doc}")


def delete(*, index: str, id: str, refresh: Optional[str] = "wait_for") -> None:
    es_client.delete(index=index, id=id, ignore=[404], refresh=refresh)


delete(index="my-index-1", id="1")

# should return no result
body = {'query': {'match_all': {}}}
results = search(index="my-index-1", body=body)
print(f"Math all query: {results}")


def bulk_create(*, es_client: Elasticsearch, index: str, sources: List[dict],
                refresh: Optional[str] = "wait_for") -> None:
    actions = [
        {
            "_op_type": "index",
            "_index": index,
            "_source": source
        }
        for source in sources
    ]

    if actions:
        bulk(client=es_client, actions=actions, refresh=refresh)


sources = [
    {"user": "John"},
    {"user": "Malcom"},
]
bulk_create(es_client=es_client, index="my-index-1", sources=sources)

# should return two results
body = {"query": {"match_all": {}}}
results = search(index="my-index-1", body=body)
print(f"Math all query: {results}")


# final cleaning for the index
body = {"query": {"match_all": {}}}
es_client.delete_by_query(index="my-index-1", body=body)
print("All done!")

And… all done! It was a lot of code and not that many comments. I hope you managed to follow along and that helped you get a first idea on how to use the Elasticsearch Python client (low level) to perform the basic CRUD operations on your Elasticsearch cluster.

References

tweet Share