parser/healthcareio/x12/publish.py

160 lines
5.0 KiB
Python

import copy
from . import util
import transport
import numpy as np
import time
import pandas as pd
from multiprocessing import Process
def build (**_args):
"""
This function will build SQL statements to create a table (perhaps not needed)
:plugins loaded plugins
:x12 837|835 file types
"""
_plugins=_args['plugins']
_x12 = _args['x12']
_template = util.template(plugins=_plugins)[_x12]
_primaryKey = util.getPrimaryKey(plugins=_plugins,x12=_x12)
_tables = []
_main = {}
for _name in _template :
_item = _template[_name] #copy.deepcopy(_template[_name])
if _primaryKey not in _item and type(_item) == dict:
_item[_primaryKey] = ''
_tables.append({_name:_item})
else:
_main[_name] = ''
_name = getContext(_x12)
_tables += [{_name:_main}]
_template[_name] = _main
return _template #_tables
def getContext(_x12) :
return 'claims' if _x12 == '837' else 'remits'
def format(**_args) :
"""
:rows rows for the
:primary_key primary_key field name
:x12 file format
"""
# _name = _args['table']
_rows = _args['rows']
_primary_key = _args['primary_key']
_x12 = _args['x12']
_mainTableName = getContext(_x12)
_tables = {_mainTableName:[]}
for _claim in _rows :
# #
# # Turn the claim into a relational model ...
# #
_main = {}
_pkvalue = None
if _primary_key in _claim :
_pkvalue = _claim[_primary_key]
for _attrName in _claim :
_item = _claim[_attrName]
_item = update(_item,_primary_key,_pkvalue)
if _attrName not in _tables and type(_item) in [dict,list]:
_tables[_attrName] = []
if type(_item) in [dict,list] :
_tables[_attrName] += _item if type(_item) == list else [_item]
else:
#
# This section suggests we found a main table attribute
_main[_attrName] = _item
_tables[_mainTableName].append(_main)
return _tables
def update (_item,key,value):
if type(_item) not in [dict,list] :
return _item
if type(_item) == dict :
_item[key] = value
else:
#
# List, we will go through every item and update accordingly
_index = 0
for _row in _item :
if type(_row) == dict :
_row['_index'] = _index
_row[key] = value
return _item
def init(**_args):
"""
This function will kick off the export process provided claims/remits and the loaded plugins (not sure why)
It requires the data it is pulling to be consistently formatted (otherwise nothing can be done)
:plugins
:store data store information i.e {source,target} specifications for data-transport
:x12 file type i.e 837|835
"""
_file_type = _args['x12']
_plugins = _args['plugins']
_store = _args['store']
_default = build(plugins=_plugins,x12=_file_type)
_df = read(store = _store['source'],x12=_file_type)
_pkey = util.getPrimaryKey(plugins = _plugins, x12=_file_type)
SEGMENTS = 4 # arbitrary choice
_indexes = np.array_split(np.arange(_df.shape[0]),SEGMENTS)
jobs = []
for _ii in _indexes :
try:
_data = format(rows= _df.iloc[_ii].to_dict(orient='records'),x12=_file_type,primary_key=_pkey)
_thread = Process(target=post,args=({'store':_store['target'],'data':_data,'default':_default,'x12':_file_type},))
jobs.append(_thread)
except Exception as e:
#
# Log: sigment,
pass
if jobs :
jobs[0].start()
jobs[0].join()
while jobs :
jobs = [thread for thread in jobs if thread.is_alive()]
time.sleep(1)
def read (**_args):
_store = copy.copy(_args['store'])
_x12 = _args['x12']
_store['table'] = getContext(_x12) #'claims' if _x12 == '837' else 'remits'
reader = transport.factory.instance(**_store)
#
# @TODO: reading should support streaming (for scalability)
_df = reader.read()
return _df
def post(_args):
_data = _args['data']
_store = _args['store']
_default = _args['default']
_prefix = 'clm_' if _args['x12'] == '837' else 'rem_'
for _name in _data :
_tablename = _prefix+_name
_store['table'] = _tablename if _name not in ['remits','claims'] else _name
_store['context']='write'
writer = transport.factory.instance(**_store)
if len(_data[_name]) == 0 and _name in _default and not writer.has(table=_tablename):
_rows = [_default[_name]]
else:
_rows = _data[_name]
writer.write(pd.DataFrame(_rows).fillna(''))
if hasattr(writer,'close') :
writer.close()
# _xwriter = trasnport.factory.instance(**_store)
# _xwriter.write(_df)
# _info = format()
pass