From 4a2c039c380db18ed845be69358d423c0458666e Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Tue, 6 Feb 2024 14:58:31 -0600 Subject: [PATCH] bug fix: exports --- healthcareio/__main__.py | 7 ++++++- healthcareio/x12/publish.py | 24 ++++++++++++++++++++---- 2 files changed, 26 insertions(+), 5 deletions(-) diff --git a/healthcareio/__main__.py b/healthcareio/__main__.py index 38e1235..d93c318 100644 --- a/healthcareio/__main__.py +++ b/healthcareio/__main__.py @@ -143,6 +143,8 @@ def parse (claim_folder:str,plugin_folder:str = None,config_path:str = None): # pass # else: # pass + print () + print (" PARSED ") print ("...................... FINISHED .........................") # # @@ -188,7 +190,6 @@ def publish (file_type:str,path:str): _type = 'remits' _x12 = '835' if _type : - print ([f"Exporting {_type}"]) _store = {'source':os.sep.join([CONFIG_FOLDER,'config.json']),'target':path} for _key in _store : f = open(_store[_key]) @@ -200,6 +201,10 @@ def publish (file_type:str,path:str): x12.publish.init(plugins=_plugins,x12=_x12,store=_store) else: print ("Can not determine type, (837 or 835)") + + print () + print (" EXPORT ") + print ("...................... FINISHED .........................") if __name__ == '__main__' : diff --git a/healthcareio/x12/publish.py b/healthcareio/x12/publish.py index 3f6e289..5f0265e 100644 --- a/healthcareio/x12/publish.py +++ b/healthcareio/x12/publish.py @@ -6,7 +6,7 @@ import time import pandas as pd from multiprocessing import Process import json - +from healthcareio.logger import X12Logger def build (**_args): """ This function will build SQL statements to create a table (perhaps not needed) @@ -111,25 +111,40 @@ def init(**_args): _plugins = _args['plugins'] _store = _args['store'] _default = build(plugins=_plugins,x12=_file_type) - + _logger = X12Logger(store = _store['source']) + _df = read(store = _store['source'],x12=_file_type) + # + # @LOG : + if _logger : + _logger.log(module='init',action='export-init',data={'rows':_df.shape[0],'attributes':list(_df.columns)}) + _pkey = util.getPrimaryKey(plugins = _plugins, x12=_file_type) SEGMENTS = 4 # arbitrary choice _indexes = np.array_split(np.arange(_df.shape[0]),SEGMENTS) jobs = [] + _tables = {} for _ii in _indexes : try: _data = format(rows= _df.iloc[_ii].to_dict(orient='records'),x12=_file_type,primary_key=_pkey) _thread = Process(target=post,args=({'store':_store['target'],'data':_data,'default':_default,'x12':_file_type},)) + _thread.start() jobs.append(_thread) + _tables = list(_data.keys()) except Exception as e: # # Log: sigment, print (e) pass + + # + # @LOG : + if _logger : + _logger.log(module='init',action='export-wait',data={'jobs':len(jobs),'tables':_tables}) + if jobs : - jobs[0].start() - jobs[0].join() + # jobs[0].start() + # jobs[0].join() while jobs : jobs = [thread for thread in jobs if thread.is_alive()] time.sleep(1) @@ -160,6 +175,7 @@ def post(_args): _tablename = _prefix+_name _store['table'] = _tablename if _name not in ['remits','claims'] else _name _store['context']='write' + _store['lock'] = True writer = transport.factory.instance(**_store) if len(_data[_name]) == 0 and _name in _default and not writer.has(table=_tablename): _rows = [_default[_name]]