diff options
Diffstat (limited to 'wqflask/utility/elasticsearch_tools.py')
-rw-r--r-- | wqflask/utility/elasticsearch_tools.py | 112 |
1 files changed, 112 insertions, 0 deletions
diff --git a/wqflask/utility/elasticsearch_tools.py b/wqflask/utility/elasticsearch_tools.py new file mode 100644 index 00000000..15cdd0bc --- /dev/null +++ b/wqflask/utility/elasticsearch_tools.py @@ -0,0 +1,112 @@ +# Elasticsearch support +# +# Some helpful commands to view the database: +# +# You can test the server being up with +# +# curl -H 'Content-Type: application/json' http://localhost:9200 +# +# List all indices +# +# curl -H 'Content-Type: application/json' 'localhost:9200/_cat/indices?v' +# +# To see the users index 'table' +# +# curl http://localhost:9200/users +# +# To list all user ids +# +# curl -H 'Content-Type: application/json' http://localhost:9200/users/local/_search?pretty=true -d ' +# { +# "query" : { +# "match_all" : {} +# }, +# "stored_fields": [] +# }' +# +# To view a record +# +# curl -H 'Content-Type: application/json' http://localhost:9200/users/local/_search?pretty=true -d ' +# { +# "query" : { +# "match" : { "email_address": "pjotr2017@thebird.nl"} +# } +# }' +# +# +# To delete the users index and data (dangerous!) +# +# curl -XDELETE -H 'Content-Type: application/json' 'localhost:9200/users' + + +from elasticsearch import Elasticsearch, TransportError +import logging + +from utility.logger import getLogger +logger = getLogger(__name__) + +from utility.tools import ELASTICSEARCH_HOST, ELASTICSEARCH_PORT + +def test_elasticsearch_connection(): + es = Elasticsearch(['http://'+ELASTICSEARCH_HOST+":"+str(ELASTICSEARCH_PORT)+'/'], verify_certs=True) + if not es.ping(): + logger.warning("Elasticsearch is DOWN") + +def get_elasticsearch_connection(for_user=True): + """Return a connection to ES. Returns None on failure""" + logger.info("get_elasticsearch_connection") + es = None + try: + assert(ELASTICSEARCH_HOST) + assert(ELASTICSEARCH_PORT) + logger.info("ES HOST",ELASTICSEARCH_HOST) + + es = Elasticsearch([{ + "host": ELASTICSEARCH_HOST, "port": ELASTICSEARCH_PORT + }], timeout=30, retry_on_timeout=True) if (ELASTICSEARCH_HOST and ELASTICSEARCH_PORT) else None + + if for_user: + setup_users_index(es) + + es_logger = logging.getLogger("elasticsearch") + es_logger.setLevel(logging.INFO) + es_logger.addHandler(logging.NullHandler()) + except Exception as e: + logger.error("Failed to get elasticsearch connection", e) + es = None + + return es + +def setup_users_index(es_connection): + if es_connection: + index_settings = { + "properties": { + "email_address": { + "type": "keyword"}}} + + es_connection.indices.create(index='users', ignore=400) + es_connection.indices.put_mapping(body=index_settings, index="users", doc_type="local") + +def get_user_by_unique_column(es, column_name, column_value, index="users", doc_type="local"): + return get_item_by_unique_column(es, column_name, column_value, index=index, doc_type=doc_type) + +def save_user(es, user, user_id): + es_save_data(es, "users", "local", user, user_id) + +def get_item_by_unique_column(es, column_name, column_value, index, doc_type): + item_details = None + try: + response = es.search( + index = index, doc_type = doc_type, body = { + "query": { "match": { column_name: column_value } } + }) + if len(response["hits"]["hits"]) > 0: + item_details = response["hits"]["hits"][0]["_source"] + except TransportError as te: + pass + return item_details + +def es_save_data(es, index, doc_type, data_item, data_id,): + from time import sleep + es.create(index, doc_type, body=data_item, id=data_id) + sleep(1) # Delay 1 second to allow indexing |