about summary refs log tree commit diff
path: root/wqflask/utility/elasticsearch_tools.py
diff options
context:
space:
mode:
authoracenteno2020-04-21 17:35:34 -0500
committerGitHub2020-04-21 17:35:34 -0500
commit660589b9c2a507529e8e51ca6ce66ca97ad982c5 (patch)
tree27f63957278581bc2fce2b88744bfe20c8a81558 /wqflask/utility/elasticsearch_tools.py
parentd97fdc18359233f07c1a1c7b83fe7e88eb225043 (diff)
parentf2a3ae13231a7d270a5bb6911c248aa713f1ef91 (diff)
downloadgenenetwork2-660589b9c2a507529e8e51ca6ce66ca97ad982c5.tar.gz
Merge pull request #1 from genenetwork/testing
Updating my testing branch
Diffstat (limited to 'wqflask/utility/elasticsearch_tools.py')
-rw-r--r--wqflask/utility/elasticsearch_tools.py112
1 files changed, 112 insertions, 0 deletions
diff --git a/wqflask/utility/elasticsearch_tools.py b/wqflask/utility/elasticsearch_tools.py
new file mode 100644
index 00000000..15cdd0bc
--- /dev/null
+++ b/wqflask/utility/elasticsearch_tools.py
@@ -0,0 +1,112 @@
+# Elasticsearch support
+#
+# Some helpful commands to view the database:
+#
+# You can test the server being up with
+#
+#   curl -H 'Content-Type: application/json' http://localhost:9200
+#
+# List all indices
+#
+#   curl -H 'Content-Type: application/json' 'localhost:9200/_cat/indices?v'
+#
+# To see the users index 'table'
+#
+#   curl http://localhost:9200/users
+#
+# To list all user ids
+#
+# curl -H 'Content-Type: application/json' http://localhost:9200/users/local/_search?pretty=true -d '
+# {
+#     "query" : {
+#         "match_all" : {}
+#     },
+#     "stored_fields": []
+# }'
+#
+# To view a record
+#
+#   curl -H 'Content-Type: application/json' http://localhost:9200/users/local/_search?pretty=true -d '
+#   {
+#     "query" : {
+#       "match" : { "email_address": "pjotr2017@thebird.nl"}
+#     }
+#   }'
+#
+#
+# To delete the users index and data (dangerous!)
+#
+#   curl -XDELETE -H 'Content-Type: application/json' 'localhost:9200/users'
+
+
+from elasticsearch import Elasticsearch, TransportError
+import logging
+
+from utility.logger import getLogger
+logger = getLogger(__name__)
+
+from utility.tools import ELASTICSEARCH_HOST, ELASTICSEARCH_PORT
+
+def test_elasticsearch_connection():
+    es = Elasticsearch(['http://'+ELASTICSEARCH_HOST+":"+str(ELASTICSEARCH_PORT)+'/'], verify_certs=True)
+    if not es.ping():
+        logger.warning("Elasticsearch is DOWN")
+
+def get_elasticsearch_connection(for_user=True):
+    """Return a connection to ES. Returns None on failure"""
+    logger.info("get_elasticsearch_connection")
+    es = None
+    try:
+        assert(ELASTICSEARCH_HOST)
+        assert(ELASTICSEARCH_PORT)
+        logger.info("ES HOST",ELASTICSEARCH_HOST)
+
+        es = Elasticsearch([{
+            "host": ELASTICSEARCH_HOST, "port": ELASTICSEARCH_PORT
+        }], timeout=30, retry_on_timeout=True) if (ELASTICSEARCH_HOST and ELASTICSEARCH_PORT) else None
+
+        if for_user:
+            setup_users_index(es)
+
+        es_logger = logging.getLogger("elasticsearch")
+        es_logger.setLevel(logging.INFO)
+        es_logger.addHandler(logging.NullHandler())
+    except Exception as e:
+        logger.error("Failed to get elasticsearch connection", e)
+        es = None
+
+    return es
+
+def setup_users_index(es_connection):
+    if es_connection:
+        index_settings = {
+            "properties": {
+                "email_address": {
+                    "type": "keyword"}}}
+
+        es_connection.indices.create(index='users', ignore=400)
+        es_connection.indices.put_mapping(body=index_settings, index="users", doc_type="local")
+
+def get_user_by_unique_column(es, column_name, column_value, index="users", doc_type="local"):
+    return get_item_by_unique_column(es, column_name, column_value, index=index, doc_type=doc_type)
+
+def save_user(es, user, user_id):
+    es_save_data(es, "users", "local", user, user_id)
+
+def get_item_by_unique_column(es, column_name, column_value, index, doc_type):
+    item_details = None
+    try:
+        response = es.search(
+            index = index, doc_type = doc_type, body = {
+                "query": { "match": { column_name: column_value } }
+            })
+        if len(response["hits"]["hits"]) > 0:
+            item_details = response["hits"]["hits"][0]["_source"]
+    except TransportError as te:
+        pass
+    return item_details
+
+def es_save_data(es, index, doc_type, data_item, data_id,):
+    from time import sleep
+    es.create(index, doc_type, body=data_item, id=data_id)
+    sleep(1) # Delay 1 second to allow indexing