export function and code clean up

This commit is contained in:
Steve Nyemba 2023-11-30 21:38:06 -06:00
parent 3fc6778802
commit 3975643686
5 changed files with 80 additions and 60 deletions

View File

@ -19,6 +19,7 @@ Usage :
# from healthcareio import server
# from healthcareio import export
import healthcareio.x12 as x12
from healthcareio.x12 import publish, plugins
import healthcareio.params as params
# from healthcareio import server

View File

@ -1,31 +1,36 @@
""""
This module is designed to perform exports to a relational data stores
__doc__ = """"
This module is designed to perform exports to a relational data stores, We have 2 methods to infer structure possible :
1. learn structure from the data
2. use template: fast, but may not generalize
Note that the There are two possible methods to perform relational exports
"""
import transport
from transport import providers
import healthcareio.x12.plugins
# import transport
# from transport import providers
# # from healthcareio.x12 import plugins, util
# print ('hello world')
# class Mode :
# TEMPLATE,LEARN = [0,1]
# def build ():
# pass
# class Template :
# """
# This class is intended to generate a set of SQL Instructions to to create the tables
# """
# @staticmethod
# def build (**_args):
# """
# This function will build SQL statements to create a table (perhaps not needed)
# :plugins loaded plugins
# :x12 837|835 file types
# """
# _plugins=_args['plugins']
# _x12 = _args['x12']
# _template = util.template(plugins=_plugins)[_x12]
# _primaryKey = util.getPrimaryKey(plugins=_plugins,x12=_x12)
# _tables = []
# for _item in _template :
# if _primaryKey not in _item :
# _item[_primaryKey] = ''
# _tables.append(_item)
# return _tables
#
# We start by loading all the plugins
def primary_key (**_args) :
_plugins = _args['plugins']
for key in _plugins :
# _lpointers =
def init (**_args):
if 'path' in _args :
_path = _args['path']
_plugins,_parents = healthcareio.x12.plugins.instance(path=_path)
else:
_plugins,_parents = healthcareio.x12.plugins.instance()
for key in _plugins :
_lpointers = _plugins[key]
_foreign = {}
_table = {}
for _pointer in _lpointers :
_meta = _pointer.meta
if 'map' in _meta :
_attr = list(_meta['map'].values())
if 'field' in _meta :
_name = _meta['field']
_foreign[_name] = _attr

View File

@ -19,13 +19,13 @@ class BasicParser (Process) :
super().__init__()
self._plugins = _args['plugins']
self._parents = _args['parents']
self._files = _args['files']
self._files = _args['files']
self._store = _args['store']
def apply(self,**_args):
_content = _args['content']
_filetype = _args['x12']
_doc = _args['document'] #{}
_content = _args['content']
_filetype = _args['x12']
_doc = _args['document'] #{}
_documentHandler = x12.util.document.Builder(plugins = self._plugins,parents=self._parents)
try:
@ -116,30 +116,30 @@ class X12Parser(BasicParser):
_writer.close()
def instance (**_args):
"""
:path
"""
# _files = x12.util.Files.get(_args['file'])
# def instance (**_args):
# """
# :path
# """
# # _files = x12.util.Files.get(_args['file'])
# #
# # We can split these files (multi-processing)
# #
# _jobCount = 1 if 'jobs' not in _args else int (_args['jobs'])
# _files = np.array_split(_files,_jobCount)
# PATH = os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
# if 'config' in _args :
# PATH = _args['config']
# f = open(PATH)
# _config = json.loads(f.read())
# f.close()
# jobs = []
# for _batch in _files :
# pthread = Parser(files=_batch,config=_config)
# pthread.start()
# jobs.append(pthread)
# time.sleep(1)
pass
# # #
# # # We can split these files (multi-processing)
# # #
# # _jobCount = 1 if 'jobs' not in _args else int (_args['jobs'])
# # _files = np.array_split(_files,_jobCount)
# # PATH = os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
# # if 'config' in _args :
# # PATH = _args['config']
# # f = open(PATH)
# # _config = json.loads(f.read())
# # f.close()
# # jobs = []
# # for _batch in _files :
# # pthread = Parser(files=_batch,config=_config)
# # pthread.start()
# # jobs.append(pthread)
# # time.sleep(1)
# pass
# class parser (Process) :

View File

@ -92,7 +92,7 @@ def DMG (**_args):
# return {'patient':_info}
pass
@parser(element='DTP', x12='837', field='date',map={3:['to','from']})
@parser(element='DTP', x12='837', field='date',map={3:['end_date','start_date']})
def DTP (**_args):
"""
Expected Element DTP
@ -131,11 +131,7 @@ def CLM (**_args):
pass
# @parser(element='REF', field='ref',map={2:'id'})
def REF (**_args):
# print (_args)
_columns = ['identifier','qualifier','']
# _CODE_INDEX = 1 # -- according to x12 standard
# _map = {_CODE_INDEX:{'EA':'patient','EI':'provider','6R':'','D9':''}}
# return self.parse(_columns,[2],**_args)
pass
@parser(element='HI',x12='837',container='diagnosis',map={1:'code',2:'type'})
def HI (**_args):

View File

@ -102,6 +102,24 @@ def template(**_args) :
else:
_object[_x12] = merge(_object[_x12],_info)
return _object
def getPrimaryKey(**_args):
"""
:plugins plugins that are loaded
:x12 837|835
"""
_plugins=_args['plugins']
_pointer = x12.plugins.filter(elements=['CLM'],plugins=_plugins)
_keys = {}
for _element in ['CLM','CLP'] :
_pointer = x12.plugins.filter(elements=[_element],plugins=_plugins)
if not _pointer :
continue
_pointer = list(_pointer.values())[0]
_meta = _pointer[_element].meta
_name = _meta['map'][1] if 'map' in _meta else _meta['columns'][0]
_id = '837' if _element == 'CLM' else '835'
_keys[_id] = _name
return _keys[_args['x12']]
# def template(**_args) :
# """
# This function generates an object template to be used in object assignment and export functionalities