157 lines
5.0 KiB
Python
157 lines
5.0 KiB
Python
"""
|
|
This function contains code to load plugins, the idea of a plugin is that it is designed to load, functions written outside of the scope of the parser.
|
|
This enables better parsing, and the creation of a pipeline that can perform pre/post conditions given that {x12} is organized in loops
|
|
|
|
Contract:
|
|
Functions will take in an object (the row of the claim they are intended to parse), and prior rows (parsed)
|
|
"""
|
|
import os
|
|
import importlib as IL
|
|
import sys
|
|
|
|
DEFAULT_PLUGINS='healthcareio.x12.plugins.default'
|
|
class MODE :
|
|
TRUST,CHECK,TEST,TEST_AND_CHECK= [0,1,2,3]
|
|
def instance(**_args) :
|
|
pass
|
|
def has(**_args) :
|
|
"""
|
|
This function will inspect if a function is valid as a plugin or not
|
|
name : function name for a given file
|
|
path : python file to examine
|
|
"""
|
|
_pyfile = _args['path'] if 'path' in _args else ''
|
|
_name = _args['name']
|
|
# p = os.path.exists(_pyfile)
|
|
_module = {}
|
|
if os.path.exists(_pyfile):
|
|
_info = IL.utils.spec_from_file_location(_name,_pyfile)
|
|
if _info :
|
|
_module = IL.utils.module_from_spec(_info)
|
|
_info.load.exec(_module)
|
|
|
|
|
|
else:
|
|
_module = sys.modules[DEFAULT_PLUGINS]
|
|
return hasattr(_module,_name)
|
|
def get(**_args) :
|
|
"""
|
|
This function will inspect if a function is valid as a plugin or not
|
|
name : function name for a given file
|
|
path : python file to examine
|
|
"""
|
|
_pyfile = _args['path'] if 'path' in _args else ''
|
|
_name = _args['name']
|
|
# p = os.path.exists(_pyfile)
|
|
_module = {}
|
|
if os.path.exists(_pyfile):
|
|
_info = IL.utils.spec_from_file_location(_name,_pyfile)
|
|
if _info :
|
|
_module = IL.utils.module_from_spec(_info)
|
|
_info.load.exec(_module)
|
|
|
|
|
|
else:
|
|
_module = sys.modules[DEFAULT_PLUGINS]
|
|
return getattr(_module,_name) if hasattr(_module,_name) else None
|
|
|
|
|
|
def test (**_args):
|
|
"""
|
|
This function will test a plugin to insure the plugin conforms to the norm we are setting here
|
|
:pointer function to call
|
|
"""
|
|
_params = {}
|
|
try:
|
|
if 'pointer' in _args :
|
|
_caller = _args['pointer']
|
|
else:
|
|
_name = _args['name']
|
|
_path = _args['path'] if 'path' in _args else None
|
|
_caller = get(name=_name,path=_path)
|
|
_params = _caller()
|
|
#
|
|
# the expected result is a list of field names [field_o,field_i]
|
|
#
|
|
return [_item for _item in _params if _item not in ['',None] and type(_item) == str]
|
|
except Exception as e :
|
|
return []
|
|
pass
|
|
def inspect(**_args):
|
|
_mode = _args['mode']
|
|
_name= _args['name']
|
|
_path= _args['path']
|
|
if _mode == MODE.CHECK :
|
|
_doapply = [has]
|
|
elif _mode == MODE.TEST :
|
|
_doapply = [test]
|
|
elif _mode == MODE.TEST_AND_CHECK :
|
|
_doapply = [has,test]
|
|
_status = True
|
|
_plugin = {"name":_name}
|
|
for _pointer in _doapply :
|
|
_plugin[_pointer.__name__] = _pointer(name=_name,path=_path)
|
|
if not _plugin[_pointer.__name__] :
|
|
_status = False
|
|
break
|
|
_plugin['loaded'] = _status
|
|
return _plugin
|
|
def load(**_args):
|
|
"""
|
|
This function will load all the plugins given an set of arguments :
|
|
path file
|
|
name list of functions to export
|
|
mode 1- CHECK ONLY, 2 - TEST ONLY, 3- TEST_AND_CHECK
|
|
"""
|
|
_path = _args ['path']
|
|
_names= _args['names']
|
|
_mode= _args ['mode'] if 'mode' in _args else MODE.TEST_AND_CHECK
|
|
_doapply = []
|
|
if _mode == MODE.CHECK :
|
|
_doapply = [has]
|
|
elif _mode == MODE.TEST :
|
|
_doapply = [test]
|
|
elif _mode == MODE.TEST_AND_CHECK :
|
|
_doapply = [has,test]
|
|
# _plugins = []
|
|
_plugins = {}
|
|
for _name in _names :
|
|
_plugin = {"name":_name}
|
|
if 'inspect' in _args and _args['inspect'] :
|
|
_plugin = inspect(name=_name,mode=_mode,path=_path)
|
|
else:
|
|
_plugin["method"] = ""
|
|
_status = True
|
|
_plugin['loaded'] = _status
|
|
if _plugin['loaded'] :
|
|
_plugin['pointer'] = get(name=_name,path=_path)
|
|
else:
|
|
_plugin['pointer'] = None
|
|
|
|
# _plugins.append(_plugin)
|
|
_plugins[_name] = _plugin
|
|
return _plugins
|
|
|
|
|
|
def parse(**_args):
|
|
"""
|
|
This function will apply a function against a given function, and data
|
|
:row claim/remits pre-processed
|
|
:plugins list of plugins
|
|
:conifg configuration associated with
|
|
"""
|
|
_row = _args['row']
|
|
_document = _args['document']
|
|
_config = _args['config']
|
|
"""
|
|
"apply":"@path:name"
|
|
"""
|
|
|
|
_info = _args['config']['apply']
|
|
_plug_conf = _args['config']['plugin'] if 'plugin' in _args['config'] else {}
|
|
if _info.startswith('@') :
|
|
_path = '' #-- get this from general configuration
|
|
elif _info.startswith('!'):
|
|
_path = _info.split('!')[0][1:]
|
|
_name = _info.split(':')[-1]
|
|
_name = _args['config']['apply'].split(_path) |