03.08 使用Python單例模式批量寫入數據


ElasticSearch - 使用Python單例模式批量寫入數據

1.批量寫入

  • 如果有大量的數據,一次插入一條肯定效率太慢,我們可以使用elasticsearch模塊導入helper,通過helper.bluk 來批量處理大量的數據。首先我們將所有的數據定義成字典形式,各字段含義如下:
<code>from elasticsearch import Elasticsearch, helpers
es = Elasticsearch(**es_settings)
# 批量數據的列表
actions = [{
'_index': index, #索引
'_type': doc_type, #類型
'_source': item_dict0 #對應字典內的字段和值,有多個字段
},
{'_index': index,'_type': doc_type, '_source': item_dict1},
\t\t{'_index': index,'_type': doc_type, '_source': item_dict2},
\t\t......
\t\t]
# 使用es的 helpers.bluk方法
helpers.bluk(es, actions, stats_only=True,raise_on_error=False, raise_on_exception=False)/<code>

2.單例模式寫入

<code>from elasticsearch import Elasticsearch, helpers
class SingleES(object):
\t_instance = None
\tclient = None
\tactions = []
\tdef __init__(self):
\t\tself.client = Elasticsearch(hosts=['10.10.10.165:9200'],timeout=60)
\t\tself.actions_length = 50
\t
\t@staticmethod
def instance():
\t# 不存在 _instance的屬性
if not SingleES._instance:
\t# 創建實例, 將實例賦值給_instance屬性
SingleES._instance = SingleES()
return SingleES._instance
def storage(self, item_dict):
\taction = {
'_index': 'index',
'_type': 'doc_type',
'_source': item_dict
}
self.actions.append(action)
if len(self.actions) >= self.actions_length :
\t try:
\t helpers.bulk(self.client, self.actions, stats_only=True,
\t raise_on_error=False, raise_on_exception=False)
\t except:
\t logging.error("ElasticSearch bulk data fail!")
\t finally:
\t self.actions = []/<code>

3.多線程寫入,使用線程鎖

<code>from elasticsearch import Elasticsearch, helpers
import threading


class SingleES(object):
\t_instance = None
\tclient = None
\tactions = []
\t_instance_lock = threading.Lock() # 線程鎖
\t
\tdef __init__(self):
\t\tself.client = Elasticsearch(hosts=['10.10.10.165:9200'],timeout=60)
\t\tself.actions_length = 50
\t
\t@staticmethod
def instance():
\t# 不存在 _instance的屬性
if not SingleES._instance:
\t# 創建實例, 將實例賦值給_instance屬性
SingleES._instance = SingleES()
return SingleES._instance
def storage(self, item_dict):
\taction = {
'_index': 'index',
'_type': 'doc_type',
'_source': item_dict
}
self.actions.append(action)
if len(self.actions) >= self.actions_length :
\t# todo 使用線程鎖
\twith SingleES._instance_lock:
\t\t if len(self.actions) >= self.actions_length :
\t\t\t try:
\t\t\t helpers.bulk(self.client, self.actions, stats_only=True,
\t\t\t raise_on_error=False, raise_on_exception=False)
\t\t\t except:
\t\t\t logging.error("ElasticSearch bulk data fail!")
\t\t\t finally:
\t\t\t self.actions = []

if __name__ == '__main__':
es_db = SingleES.instance()
for i in range(10000):
es_db.storage({'name': 'zhang', 'age':123})/<code>


分享到:


相關文章: