Loading src/common/mixins/elasticsearch.py 0 → 100755 +155 −0 Original line number Diff line number Diff line #!/usr/bin/env python3 # -*- coding: utf-8 -*- """Provide elastic search helpers.""" import hashlib import json import time from elasticsearch import Elasticsearch from elasticsearch import helpers from elasticsearch.exceptions import AuthorizationException, \ AuthenticationException, ConflictError, ConnectionError, \ ConnectionTimeout, ElasticsearchException from common.env import check_environment as ce class ElasticSearch(): """provide method for uploading to elasticsearch.""" SCROLL_SIZE = int(ce('ES_SCROLL_SIZE', 100)) ES_SCHEME = ce("ES_SCHEME", 'http') INDEX = ce('ES_INDEX', '') def open(self, index=INDEX, scroll_size=SCROLL_SIZE, scheme=ES_SCHEME, **kwargs): """Open an ElasticSearch Database Connection. :param index: the index to connect to :param scroll_size: the default scroll size :return boolean: True or False depending upon successful connection """ self.index = index self.current_milli_time = lambda: int(round(time.time() * 1000)) self.logger.debug('Opening ElasticSearch Connection.') try: self.connection = Elasticsearch( host=self.connection_info['dbHost'].split(','), http_auth=( self.connection_info['dbUser'], self.connection_info['dbPassword']), scheme=scheme, port=self.connection_info['dbPort'], **kwargs) self.response = None self.doc_list = [] return True except (AuthorizationException, AuthenticationException, ConnectionError, ConnectionTimeout) as error: self.logger.critical(f'Unable to connect to cluster: {error}') return False except ElasticsearchException as error: self.logger.critical(f'Unable to connect to cluster: {error}') return False finally: self.logger.info( f'Attemtped conenction with {self.connection_info}') return False def create_index(self, index_name): """create index on cluster""" self.connection.indices.create(index=index_name) def create_doc_list(self, dct): """create the doc_list to be uploaded.""" for d in dct: # Set a default value for overwrite dct[d].setdefault('overwrite', False) dct[d]['pscreated'] = self.current_milli_time() dct[d]['psupsert'] = False self.doc_list.append(dct[d]) def upsert(self, dct, index=None): """perform an upsert on the es client.""" if index is None: index = self.index self.create_doc_list(dct) assert (self.__validate(dct)) if index != self.INDEX: self.index = index upsert_dicts = {} overwrite_dicts = {} for k, v in dct.items(): if v['overwrite'] is False: v['psupsert'] = True upsert_dicts[k] = v else: overwrite_dicts[k] = v upsert_dicts = self.__prepare_upsert_docs(upsert_dicts) resp = self.__index_data({**overwrite_dicts, **upsert_dicts}) return json.dumps(resp, indent=4) def __prepare_upsert_docs(self, upsert_docs): index_this = {} ## TODO: Merge New Document with Old Document and then Bulk Index for v in upsert_docs: new_doc = upsert_docs[v] old_doc = self.__get_doc(self.get_md5(new_doc['url'])) if old_doc and old_doc.__len__() > 0: index_this[v] = self.__merge_dict(old_doc, new_doc) else: index_this[v] = new_doc return index_this def __index_data(self, data): resp = None bulk_data = [] for i, (k, v) in enumerate(data.items()): bulk_data.append({ '_op_type': 'index', '_index': self.index, '_id': k, '_source': v, }) try: resp = helpers.bulk(client=self.client, actions=bulk_data) except Exception as e: self.logger.error(f'ElasticSearch failed to upload: {e}') return resp def __get_doc(self, doc_id): r = {} q = { 'query': { 'term': { '_id': doc_id } } } response = self.connection.search(index=self.index, body=q) try: if len(response['hits']['hits']) > 0: r = response['hits']['hits'][0]['_source'] except Exception as e: self.logger.warning( f'There is an issue pulling doc id: {doc_id} with error: ' + f'{e}. Defaulting to overwriting doc') finally: return r @staticmethod def __merge_dict(old_dict, new_dict): new_dict['psupsert'] = True return {**old_dict, **new_dict} @staticmethod def get_md5(url): return hashlib.md5(url.encode('utf-8')).hexdigest() @staticmethod def __validate(dct): for d in dct.items(): if all(key in d[1] for key in ('url', 'overwrite')): pass else: logging.error('Validation failure: ' + 'Required dictionary key is missing, dict must contain url, overwrite and psupsert') return False return True Loading
src/common/mixins/elasticsearch.py 0 → 100755 +155 −0 Original line number Diff line number Diff line #!/usr/bin/env python3 # -*- coding: utf-8 -*- """Provide elastic search helpers.""" import hashlib import json import time from elasticsearch import Elasticsearch from elasticsearch import helpers from elasticsearch.exceptions import AuthorizationException, \ AuthenticationException, ConflictError, ConnectionError, \ ConnectionTimeout, ElasticsearchException from common.env import check_environment as ce class ElasticSearch(): """provide method for uploading to elasticsearch.""" SCROLL_SIZE = int(ce('ES_SCROLL_SIZE', 100)) ES_SCHEME = ce("ES_SCHEME", 'http') INDEX = ce('ES_INDEX', '') def open(self, index=INDEX, scroll_size=SCROLL_SIZE, scheme=ES_SCHEME, **kwargs): """Open an ElasticSearch Database Connection. :param index: the index to connect to :param scroll_size: the default scroll size :return boolean: True or False depending upon successful connection """ self.index = index self.current_milli_time = lambda: int(round(time.time() * 1000)) self.logger.debug('Opening ElasticSearch Connection.') try: self.connection = Elasticsearch( host=self.connection_info['dbHost'].split(','), http_auth=( self.connection_info['dbUser'], self.connection_info['dbPassword']), scheme=scheme, port=self.connection_info['dbPort'], **kwargs) self.response = None self.doc_list = [] return True except (AuthorizationException, AuthenticationException, ConnectionError, ConnectionTimeout) as error: self.logger.critical(f'Unable to connect to cluster: {error}') return False except ElasticsearchException as error: self.logger.critical(f'Unable to connect to cluster: {error}') return False finally: self.logger.info( f'Attemtped conenction with {self.connection_info}') return False def create_index(self, index_name): """create index on cluster""" self.connection.indices.create(index=index_name) def create_doc_list(self, dct): """create the doc_list to be uploaded.""" for d in dct: # Set a default value for overwrite dct[d].setdefault('overwrite', False) dct[d]['pscreated'] = self.current_milli_time() dct[d]['psupsert'] = False self.doc_list.append(dct[d]) def upsert(self, dct, index=None): """perform an upsert on the es client.""" if index is None: index = self.index self.create_doc_list(dct) assert (self.__validate(dct)) if index != self.INDEX: self.index = index upsert_dicts = {} overwrite_dicts = {} for k, v in dct.items(): if v['overwrite'] is False: v['psupsert'] = True upsert_dicts[k] = v else: overwrite_dicts[k] = v upsert_dicts = self.__prepare_upsert_docs(upsert_dicts) resp = self.__index_data({**overwrite_dicts, **upsert_dicts}) return json.dumps(resp, indent=4) def __prepare_upsert_docs(self, upsert_docs): index_this = {} ## TODO: Merge New Document with Old Document and then Bulk Index for v in upsert_docs: new_doc = upsert_docs[v] old_doc = self.__get_doc(self.get_md5(new_doc['url'])) if old_doc and old_doc.__len__() > 0: index_this[v] = self.__merge_dict(old_doc, new_doc) else: index_this[v] = new_doc return index_this def __index_data(self, data): resp = None bulk_data = [] for i, (k, v) in enumerate(data.items()): bulk_data.append({ '_op_type': 'index', '_index': self.index, '_id': k, '_source': v, }) try: resp = helpers.bulk(client=self.client, actions=bulk_data) except Exception as e: self.logger.error(f'ElasticSearch failed to upload: {e}') return resp def __get_doc(self, doc_id): r = {} q = { 'query': { 'term': { '_id': doc_id } } } response = self.connection.search(index=self.index, body=q) try: if len(response['hits']['hits']) > 0: r = response['hits']['hits'][0]['_source'] except Exception as e: self.logger.warning( f'There is an issue pulling doc id: {doc_id} with error: ' + f'{e}. Defaulting to overwriting doc') finally: return r @staticmethod def __merge_dict(old_dict, new_dict): new_dict['psupsert'] = True return {**old_dict, **new_dict} @staticmethod def get_md5(url): return hashlib.md5(url.encode('utf-8')).hexdigest() @staticmethod def __validate(dct): for d in dct.items(): if all(key in d[1] for key in ('url', 'overwrite')): pass else: logging.error('Validation failure: ' + 'Required dictionary key is missing, dict must contain url, overwrite and psupsert') return False return True