blob: bcb1642faab27b21e8de758530c4e0d8ad2cd020 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
import sys
import unittest
import requests
import logging
from elasticsearch import Elasticsearch, TransportError
#from utility.tools import ELASTICSEARCH_HOST, ELASTICSEARCH_PORT
GN2_SERVER = None
ES_SERVER = None
class TestRegistration(unittest.TestCase):
def setUp(self):
self.url = GN2_SERVER+"/n/register"
self.es = Elasticsearch([ES_SERVER])
self.es_cleanup = []
es_logger = logging.getLogger("elasticsearch")
es_logger.addHandler(
logging.FileHandler("/tmp/es_TestRegistrationInfo.log"))
es_trace_logger = logging.getLogger("elasticsearch.trace")
es_trace_logger.addHandler(
logging.FileHandler("/tmp/es_TestRegistrationTrace.log"))
def tearDown(self):
for item in self.es_cleanup:
self.es.delete(index="users", doc_type="local", id=item["_id"])
def testRegistrationPage(self):
if self.es.ping():
data = {
"email_address": "test@user.com",
"full_name": "Test User",
"organization": "Test Organisation",
"password": "test_password",
"password_confirm": "test_password"
}
requests.post(self.url, data)
response = self.es.search(
index="users"
, doc_type="local"
, body={
"query": {"match": {"email_address": "test@user.com"}}})
self.assertEqual(len(response["hits"]["hits"]), 1)
self.es_cleanup.append(response["hits"]["hits"][0])
else:
self.skipTest("The elasticsearch server is down")
def main():
suite = unittest.TestSuite()
suite.addTest(TestRegistration("testRegistrationPage"))
runner = unittest.TextTestRunner()
runner.run(suite)
if __name__ == "__main__":
GN2_SERVER = sys.argv[1]
ES_SERVER = sys.argv[2]
main()
|