plugin features and parsing
This commit is contained in:
parent
a9bed578b2
commit
c3f5759a6a
|
@ -0,0 +1,45 @@
|
|||
"""
|
||||
This class refactors the default parsing class (better & streamlined implementation)
|
||||
"""
|
||||
from multiprocessing import Process, RLock
|
||||
import os
|
||||
import json
|
||||
|
||||
|
||||
|
||||
class parser (Process) :
|
||||
_CONFIGURATION = {}
|
||||
def __init__(self,path=None) :
|
||||
if not parser._CONFIGURATION :
|
||||
_path = path if path else os.sep.join([os.environ['HOME'],'.healthcareio/config.json'])
|
||||
#
|
||||
# @TODO: Load custom configuration just in case we need to do further processing
|
||||
config = json.loads(open(path).read())
|
||||
parser._CONFIGURATION = config['parser']
|
||||
#
|
||||
# do we have a custom configuration in this location
|
||||
#
|
||||
_custompath = _path.replace('config.json','')
|
||||
_custompath = _custompath if not _custompath.endswith(os.sep) else _custompath[:-1]
|
||||
_custompath = os.sep.join([_custompath,'custom'])
|
||||
if os.exists(_custompath) :
|
||||
files = os.listdir(_custompath)
|
||||
if files :
|
||||
_filename = os.sep.join([_custompath,files[0]])
|
||||
_customconf = json.loads(open(_filename).read())
|
||||
#
|
||||
# merge with existing configuration
|
||||
|
||||
|
||||
else:
|
||||
pass
|
||||
|
||||
#
|
||||
#
|
||||
class getter :
|
||||
def value(self,) :
|
||||
pass
|
||||
class setter :
|
||||
def files(self,files):
|
||||
pass
|
||||
|
|
@ -0,0 +1,157 @@
|
|||
"""
|
||||
This function contains code to load plugins, the idea of a plugin is that it is designed to load, functions written outside of the scope of the parser.
|
||||
This enables better parsing, and the creation of a pipeline that can perform pre/post conditions given that {x12} is organized in loops
|
||||
|
||||
Contract:
|
||||
Functions will take in an object (the row of the claim they are intended to parse), and prior rows (parsed)
|
||||
"""
|
||||
import os
|
||||
import importlib as IL
|
||||
import sys
|
||||
|
||||
DEFAULT_PLUGINS='healthcareio.x12.plugins.default'
|
||||
class MODE :
|
||||
TRUST,CHECK,TEST,TEST_AND_CHECK= [0,1,2,3]
|
||||
def instance(**_args) :
|
||||
pass
|
||||
def has(**_args) :
|
||||
"""
|
||||
This function will inspect if a function is valid as a plugin or not
|
||||
name : function name for a given file
|
||||
path : python file to examine
|
||||
"""
|
||||
_pyfile = _args['path'] if 'path' in _args else ''
|
||||
_name = _args['name']
|
||||
# p = os.path.exists(_pyfile)
|
||||
_module = {}
|
||||
if os.path.exists(_pyfile):
|
||||
_info = IL.utils.spec_from_file_location(_name,_pyfile)
|
||||
if _info :
|
||||
_module = IL.utils.module_from_spec(_info)
|
||||
_info.load.exec(_module)
|
||||
|
||||
|
||||
else:
|
||||
_module = sys.modules[DEFAULT_PLUGINS]
|
||||
return hasattr(_module,_name)
|
||||
def get(**_args) :
|
||||
"""
|
||||
This function will inspect if a function is valid as a plugin or not
|
||||
name : function name for a given file
|
||||
path : python file to examine
|
||||
"""
|
||||
_pyfile = _args['path'] if 'path' in _args else ''
|
||||
_name = _args['name']
|
||||
# p = os.path.exists(_pyfile)
|
||||
_module = {}
|
||||
if os.path.exists(_pyfile):
|
||||
_info = IL.utils.spec_from_file_location(_name,_pyfile)
|
||||
if _info :
|
||||
_module = IL.utils.module_from_spec(_info)
|
||||
_info.load.exec(_module)
|
||||
|
||||
|
||||
else:
|
||||
_module = sys.modules[DEFAULT_PLUGINS]
|
||||
return getattr(_module,_name) if hasattr(_module,_name) else None
|
||||
|
||||
|
||||
def test (**_args):
|
||||
"""
|
||||
This function will test a plugin to insure the plugin conforms to the norm we are setting here
|
||||
:pointer function to call
|
||||
"""
|
||||
_params = {}
|
||||
try:
|
||||
if 'pointer' in _args :
|
||||
_caller = _args['pointer']
|
||||
else:
|
||||
_name = _args['name']
|
||||
_path = _args['path'] if 'path' in _args else None
|
||||
_caller = get(name=_name,path=_path)
|
||||
_params = _caller()
|
||||
#
|
||||
# the expected result is a list of field names [field_o,field_i]
|
||||
#
|
||||
return [_item for _item in _params if _item not in ['',None] and type(_item) == str]
|
||||
except Exception as e :
|
||||
return []
|
||||
pass
|
||||
def inspect(**_args):
|
||||
_mode = _args['mode']
|
||||
_name= _args['name']
|
||||
_path= _args['path']
|
||||
if _mode == MODE.CHECK :
|
||||
_doapply = [has]
|
||||
elif _mode == MODE.TEST :
|
||||
_doapply = [test]
|
||||
elif _mode == MODE.TEST_AND_CHECK :
|
||||
_doapply = [has,test]
|
||||
_status = True
|
||||
_plugin = {"name":_name}
|
||||
for _pointer in _doapply :
|
||||
_plugin[_pointer.__name__] = _pointer(name=_name,path=_path)
|
||||
if not _plugin[_pointer.__name__] :
|
||||
_status = False
|
||||
break
|
||||
_plugin['loaded'] = _status
|
||||
return _plugin
|
||||
def load(**_args):
|
||||
"""
|
||||
This function will load all the plugins given an set of arguments :
|
||||
path file
|
||||
name list of functions to export
|
||||
mode 1- CHECK ONLY, 2 - TEST ONLY, 3- TEST_AND_CHECK
|
||||
"""
|
||||
_path = _args ['path']
|
||||
_names= _args['names']
|
||||
_mode= _args ['mode'] if 'mode' in _args else MODE.TEST_AND_CHECK
|
||||
_doapply = []
|
||||
if _mode == MODE.CHECK :
|
||||
_doapply = [has]
|
||||
elif _mode == MODE.TEST :
|
||||
_doapply = [test]
|
||||
elif _mode == MODE.TEST_AND_CHECK :
|
||||
_doapply = [has,test]
|
||||
# _plugins = []
|
||||
_plugins = {}
|
||||
for _name in _names :
|
||||
_plugin = {"name":_name}
|
||||
if 'inspect' in _args and _args['inspect'] :
|
||||
_plugin = inspect(name=_name,mode=_mode,path=_path)
|
||||
else:
|
||||
_plugin["method"] = ""
|
||||
_status = True
|
||||
_plugin['loaded'] = _status
|
||||
if _plugin['loaded'] :
|
||||
_plugin['pointer'] = get(name=_name,path=_path)
|
||||
else:
|
||||
_plugin['pointer'] = None
|
||||
|
||||
# _plugins.append(_plugin)
|
||||
_plugins[_name] = _plugin
|
||||
return _plugins
|
||||
|
||||
|
||||
def parse(**_args):
|
||||
"""
|
||||
This function will apply a function against a given function, and data
|
||||
:row claim/remits pre-processed
|
||||
:plugins list of plugins
|
||||
:conifg configuration associated with
|
||||
"""
|
||||
_row = _args['row']
|
||||
_document = _args['document']
|
||||
_config = _args['config']
|
||||
"""
|
||||
"apply":"@path:name"
|
||||
"""
|
||||
|
||||
_info = _args['config']['apply']
|
||||
_plug_conf = _args['config']['plugin'] if 'plugin' in _args['config'] else {}
|
||||
if _info.startswith('@') :
|
||||
_path = '' #-- get this from general configuration
|
||||
elif _info.startswith('!'):
|
||||
_path = _info.split('!')[0][1:]
|
||||
_name = _info.split(':')[-1]
|
||||
_name = _args['config']['apply'].split(_path)
|
|
@ -0,0 +1,38 @@
|
|||
import datetime
|
||||
|
||||
def date(**_args):
|
||||
"""#
|
||||
This function will return a data as presented in the {x12} i.e it could be a date-range or a single date
|
||||
- In the case of a single data it is returned as a string
|
||||
- In the case of a range a complex object is returned with to,from keys
|
||||
NOTE: dates will be formatted as they
|
||||
"""
|
||||
if not _args :
|
||||
return ['from','to','type']
|
||||
_date = ""
|
||||
return _date
|
||||
def procedure (**_args):
|
||||
"""
|
||||
This function will parse SVC element and return given the following The return object is as follows :
|
||||
claim_id,charge_amount, payment_amount,patient_amount,patient_status,claim_status
|
||||
|
||||
"""
|
||||
cols = ['type','code','amount']
|
||||
if not _args :
|
||||
return cols
|
||||
_procedure = dict.fromkeys(cols,None)
|
||||
_row = _args['row']
|
||||
# _document = _args['document']
|
||||
if len(_row) == 3 :
|
||||
_procedure = dict(zip(cols,_row[1:4]))
|
||||
return _procedure
|
||||
|
||||
return _info
|
||||
def SV2(**_args):
|
||||
pass
|
||||
def SV3(**_args):
|
||||
pass
|
||||
def HL (**_args):
|
||||
pass
|
||||
def HI(**_args):
|
||||
pass
|
|
@ -0,0 +1,22 @@
|
|||
"""
|
||||
This file serves as the interface (so to speak) to the x12 parser, plugin interface
|
||||
@TODO:
|
||||
- How to write custom plugin
|
||||
- Provide interface for meta-data (expected)
|
||||
- Support configuration specification
|
||||
"""
|
||||
import os
|
||||
from . import common
|
||||
from . import header
|
||||
from . import body
|
||||
|
||||
EDI = body.BODY
|
||||
|
||||
def instance(**_args):
|
||||
pass
|
||||
|
||||
# class Parser :
|
||||
# def __init__(**_args):
|
||||
# folder = _args['path']
|
||||
# files = [ os.sep.join(_name,folder) for _name in os.listdir(folder)]
|
||||
# pass
|
|
@ -0,0 +1,194 @@
|
|||
from .common import X12DOCUMENT
|
||||
from .header import HEADER
|
||||
import numpy as np
|
||||
|
||||
|
||||
class BODY (HEADER):
|
||||
"""
|
||||
This class will contain functions that will parse elements associated with the claim body
|
||||
"""
|
||||
def BHT (self,**_args):
|
||||
"""
|
||||
Expected Element BHT
|
||||
This functiion will process header submitted (receiver,claim_type,)
|
||||
"""
|
||||
_columns= ['receiver_id','submitted_date','submitted_time','claim_type']
|
||||
return self.parse(_columns,[3,7],**_args)
|
||||
|
||||
def NM1 (self,**_args):
|
||||
"""
|
||||
Expected Element NM1
|
||||
ref IL,40,41,82,85,PR ...
|
||||
Information about entities (doctors, clearing house, provider). we should be mindful of the references
|
||||
"""
|
||||
# _CODE_INDEX = 1
|
||||
# _map = {_CODE_INDEX:{'41':'submitter','40':'receiver','PR':'payer'}}
|
||||
_columns = ['type','name','id']
|
||||
_indexes = [1,3,-1]
|
||||
# _info = [{'index':'40','field':'receiver'},{'index':'41','field':'submitter'},{'index':'PR','field':'payer'}]
|
||||
|
||||
_info = self.parse(_columns,_indexes,**_args)
|
||||
return _info
|
||||
def N3 (self,**_args):
|
||||
"""
|
||||
Expected Element N3
|
||||
"""
|
||||
|
||||
_columns = ['address_line_1']
|
||||
return self.parse(_columns,[1,2],**_args)
|
||||
|
||||
def N4(self,**_args):
|
||||
"""
|
||||
Expected Element N4
|
||||
"""
|
||||
_columns = ['city','state','zip']
|
||||
return self.parse(_columns,[1,2,3],**_args)
|
||||
def HI(self,**_args):
|
||||
"""
|
||||
Expected Element HI
|
||||
This function will parse diagnosis codes ICD 9/10
|
||||
"""
|
||||
_columns = ['code','type']
|
||||
return self.parse(_columns,[2,1],**_args)
|
||||
def AMT (self,**_args):
|
||||
"""
|
||||
Expected Element AMT
|
||||
"""
|
||||
_columns = ['amount','qualifier']
|
||||
return self.parse(_columns,[2,1],**_args)
|
||||
def SBR (self,**_args):
|
||||
"""
|
||||
Expected Element SBR
|
||||
"""
|
||||
_index = [9,1]
|
||||
_columns = ['vendor','individual_code','type']
|
||||
return self.parse(_columns,[9,2,1],**_args)
|
||||
def DMG (self,**_args):
|
||||
"""
|
||||
Expected Element DMG
|
||||
"""
|
||||
_columns = ['dob','gender','format']
|
||||
_info = self.parse(_columns,[2,3,1],**_args)
|
||||
|
||||
return _info
|
||||
def DTP (self,**_args):
|
||||
"""
|
||||
Expected Element DTP
|
||||
"""
|
||||
_columns = ['to','from','type']
|
||||
return self.parse(_columns,[3],**_args)
|
||||
def PER (self,**_args):
|
||||
"""
|
||||
Expected Element PER
|
||||
"""
|
||||
_CODE_INDEX = 1
|
||||
_map = {_CODE_INDEX:{'IC':'submitter'}} # attribute to store the data in
|
||||
_columns = ['contact_name','phone','email']
|
||||
_info = self.parse (_columns,[2,4,8],**_args)
|
||||
#
|
||||
# @TODO: Inspect the configuration file for the attribute information
|
||||
#
|
||||
return _info
|
||||
def CLM (self,**_args):
|
||||
"""
|
||||
Expected Element CLM
|
||||
"""
|
||||
_columns = ['claim_id','claim_amount','facility_code','facility_qualifier','frequency_code']
|
||||
return self.parse(_columns,[1,2,5,5,5],**_args)
|
||||
def REF (self,**_args):
|
||||
_columns = ['identifier','qualifier','']
|
||||
_CODE_INDEX = 1 # -- according to x12 standard
|
||||
_map = {_CODE_INDEX:{'EA':'patient','EI':'provider','6R':'','D9':''}}
|
||||
return self.parse(_columns,[2],**_args)
|
||||
|
||||
def HI (self,**_args):
|
||||
"""
|
||||
Expected Element HI
|
||||
"""
|
||||
_columns = ['code','type']
|
||||
return self.parse(_columns,[1,2],**_args)
|
||||
def SV1 (self,**_args):
|
||||
"""
|
||||
Expected Element SV1
|
||||
"""
|
||||
_row = _args['row'] if type(_args['row']) == list else _args['row'].split('*')
|
||||
_columns = ['type','code','amount','modifier_1','modifier_2','modifier_3','modifier_4','place_of_service','units','measurement']
|
||||
return self.parse(_columns,[1,1,2,1,1,1,1,5,4,3],**_args)
|
||||
|
||||
def SV3 (self,**_args):
|
||||
return self.SV1(**_args)
|
||||
def HL (self,**_args) :
|
||||
""",
|
||||
Expected Element HL
|
||||
The expected block is supposed to be unprocessed (to make things simple)
|
||||
"""
|
||||
_row = _args['row'] if type(_args['row']) == list else _args['row'].split('~')
|
||||
|
||||
# _attr = self.elements() #[_name for _name in dir() if not _name.islower() and not _name.startswith('_')]
|
||||
# _pointers = [getattr(self,_name) for _name in _attr]
|
||||
# _map = dict(zip(_attr,_pointers))
|
||||
_map = self.pointers()
|
||||
|
||||
#
|
||||
# The index here tells us what we are processing i.e index == 1 something about header
|
||||
#
|
||||
_columns = ['_index','parent','code','child']
|
||||
_args['row'] = _row[0]
|
||||
|
||||
_info = self.parse (_columns,[1,2,3,4],**_args)
|
||||
# _field = 'billing_provider' if _info['_index'] == '1' else 'patient'
|
||||
# _config ={'field':_field}
|
||||
return _info
|
||||
# _claim = {_field:_info}
|
||||
# for _element in _row[1:] :
|
||||
# _key = _element.split('*')[0]
|
||||
# if _key in _map and len(_element) > 0:
|
||||
# _document = _args['document']
|
||||
# _pointer = _map[_key]
|
||||
# if _key not in ['CLM','HI','SV3','SV2','SV1'] :
|
||||
# _claim = self.merge (_claim,_pointer(row=_element.strip().split('*'),document=_document,config=_config))
|
||||
# else:
|
||||
# _config = _args['config'] if 'config' in _args else {}
|
||||
# _claim = self.merge (_claim,_pointer(row=_element.strip().split('*'),document=_document,config=_config))
|
||||
# else:
|
||||
# print (['SKIPPING ',_key])
|
||||
# pass
|
||||
|
||||
# return _claim
|
||||
# def apply(self,_block):
|
||||
# """
|
||||
# :_block elements that do not belong to the header block
|
||||
# """
|
||||
# _apply = self.pointers()
|
||||
|
||||
# _header = {}
|
||||
# if _block :
|
||||
|
||||
# for content in _block :
|
||||
|
||||
# _KEY_ELEMENT = content.split('*')[0]
|
||||
# if _KEY_ELEMENT not in _apply :
|
||||
# #
|
||||
# # @TODO: Log elements that are skipped
|
||||
# # print ([_KEY_ELEMENT , 'NOT FOUND'])
|
||||
# continue
|
||||
|
||||
# _info = _apply[_KEY_ELEMENT](row=content,document=_header)
|
||||
|
||||
# if _info :
|
||||
# if not _header :
|
||||
# _header = _info
|
||||
|
||||
# else:
|
||||
|
||||
# _header = self.merge(_header,_info)
|
||||
# else:
|
||||
# #
|
||||
# # For some reason the parser failed by returning a null
|
||||
# # @TODO: Log this event ....
|
||||
# pass
|
||||
# else:
|
||||
# #
|
||||
# # @TODO: return the meta data for what is expected
|
||||
# pass
|
||||
# return _header
|
|
@ -0,0 +1,456 @@
|
|||
from typing import Any
|
||||
import numpy as np
|
||||
import json
|
||||
from multiprocessing import Process, RLock
|
||||
import os
|
||||
import io
|
||||
import queue
|
||||
import transport
|
||||
from transport import providers
|
||||
|
||||
class Store(Process):
|
||||
"""
|
||||
This is the data-store service that will handle read/writes
|
||||
"""
|
||||
dataStore = None
|
||||
@staticmethod
|
||||
def init(self,**_args):
|
||||
if Store.dataStore is None :
|
||||
_args = _args['store']
|
||||
|
||||
else:
|
||||
pass
|
||||
@staticmethod
|
||||
def reset():
|
||||
pass
|
||||
|
||||
class X12DOCUMENT (Process):
|
||||
"""
|
||||
X12DOCUMENT class encapsulates functions that will be used to format an x12 (835,837) claim into an object
|
||||
"""
|
||||
_queue = queue.Queue()
|
||||
|
||||
class MODE :
|
||||
#
|
||||
# The following allow us to handle raw content (stream) or a filename
|
||||
# The raw content will be wrapped into io.StringIO so that it is handled as if it were a file
|
||||
#
|
||||
NAMES,STREAM = 'NAMES','STREAM'
|
||||
class ConfigHandler :
|
||||
def format(self,**_args):
|
||||
"""
|
||||
This function formats variations of an element's parsing rules
|
||||
:info {index,field|label,map}
|
||||
"""
|
||||
|
||||
_info = _args['info']
|
||||
_ref = {}
|
||||
|
||||
for _item in _info :
|
||||
_index = str(_item['index'])
|
||||
_field = _item['field'] if 'field' in _item else None
|
||||
_label = _item['label'] if 'label' in _item else None
|
||||
if _field :
|
||||
_ref[_index] = {'field':_field}
|
||||
elif _label :
|
||||
_ref[_index] = {'label':_label}
|
||||
|
||||
return {'@ref':_ref}
|
||||
def _getColumnsIndexes(self,_columns,_indexes,_map):
|
||||
"""
|
||||
This function return columns and indexes related if a parsing map is passed
|
||||
:param _columns
|
||||
:param _indexes
|
||||
:param _map parsing map (field:index)
|
||||
"""
|
||||
# @TODO: insure the lengths are the same for adequate usage downstream ...
|
||||
_xcolumns,_xindexes = list(_map.keys()), list(_map.values())
|
||||
keys,values = _xcolumns + _columns,_xindexes + _indexes
|
||||
_config = dict(zip(keys,values))
|
||||
|
||||
_outColumns,_outIndexes = list(_config.keys()),list(_config.values())
|
||||
return _outColumns,_outIndexes
|
||||
def _getObjectAtributes(self,_config):
|
||||
_field = _config['field'] if 'field' in _config else {}
|
||||
_label = _config['label'] if 'label' in _config else {}
|
||||
return _field,_label
|
||||
def merge(self,**_args):
|
||||
#
|
||||
# This function overrides the old configuration with the new configuration specifications
|
||||
#
|
||||
|
||||
# _columns,_indexes = [],[]
|
||||
_columns,_indexes = _args['columns'],_args['index']
|
||||
_map = {}
|
||||
_config = _args['config'] if 'config' in _args else {}
|
||||
_field,_label = self._getObjectAtributes(_config)
|
||||
|
||||
if 'map' in _config :
|
||||
_map = _args['config']['map']
|
||||
_columns,_indexes = self._getColumnsIndexes(_columns,_indexes,_map)
|
||||
|
||||
if '@ref' in _config :
|
||||
# _columns,_indexes = [],[]
|
||||
_row = _args['row']
|
||||
|
||||
_ref = _config['@ref']
|
||||
|
||||
for _anchor in _ref:
|
||||
# print ([_anchor,_anchor == _row[1].strip()])
|
||||
if _anchor == _row[1].strip() :
|
||||
_field,_label = self._getObjectAtributes(_ref[_anchor])
|
||||
|
||||
_map = _ref[_anchor]['map'] if 'map' in _ref[_anchor] else {}
|
||||
|
||||
if _map :
|
||||
_columns,_indexes = self._getColumnsIndexes([],[],_map)
|
||||
|
||||
|
||||
break
|
||||
# _columns,_indexes = _columns + _map.keys()
|
||||
|
||||
return {'columns':_columns,'index':_indexes,'field':_field,'label':_label}
|
||||
def legacy(self,**_args):
|
||||
#
|
||||
# This function returns the legacy configuration (default parsing)
|
||||
#
|
||||
|
||||
_config = _args['config'] if 'config' in _args else {}
|
||||
_field,_label = self._getObjectAtributes(_config)
|
||||
_columns,_indexes = [],[]
|
||||
if 'map' in _config :
|
||||
_columns = list(_config['map'].keys())
|
||||
_indexes = list(_config['map'].values())
|
||||
|
||||
return {'columns':_columns,'index':_indexes,'field':_field,'label':_label}
|
||||
def override(self,**_args):
|
||||
return _args['columns'],_args['indexes']
|
||||
def __init__(self,**_args):
|
||||
super().__init__()
|
||||
self._mode = _args['mode'] if 'mode' in _args else 'NAMES'
|
||||
if 'files' in _args :
|
||||
self.files = _args['files']
|
||||
self._config = _args['config'] if 'config' in _args else {}
|
||||
self._document = []
|
||||
|
||||
self._x12FileType = None
|
||||
self._configHandler = X12DOCUMENT.ConfigHandler()
|
||||
|
||||
#
|
||||
#-- The files need to be classified, the files need to be either claims or remits
|
||||
#
|
||||
if 'store' not in self._config :
|
||||
self._store_args = _args['store'] if 'store' in _args else {'provider':providers.CONSOLE}
|
||||
else:
|
||||
self._store_args = self._config['store']
|
||||
|
||||
def init(self,_header):
|
||||
"""
|
||||
Expected Elements must include ST
|
||||
"""
|
||||
pass
|
||||
|
||||
def merge (self,_x,_y):
|
||||
"""
|
||||
This function will merge two objects _x, _y
|
||||
"""
|
||||
_zcols = list(set(_x.keys()) & set(_y.keys())) #--common columns
|
||||
|
||||
if _zcols :
|
||||
_out = dict(_x,**{})
|
||||
for _key in _y.keys() :
|
||||
if not _key in _zcols :
|
||||
|
||||
_out[_key] = _y[_key]
|
||||
else:
|
||||
if type(_out[_key]) == list :
|
||||
_out[_key] += _y[_key]
|
||||
elif type(_out[_key]) == dict:
|
||||
_out[_key] = dict(_out[_key],**_y[_key])
|
||||
else:
|
||||
_out[_key] = _y[_key]
|
||||
|
||||
return _out
|
||||
else:
|
||||
|
||||
return dict(_x,**_y)
|
||||
|
||||
def split(self,content):
|
||||
"""
|
||||
This function will split the content of an X12 document into blocks and headers
|
||||
:content x12 document in raw format (text)
|
||||
"""
|
||||
#_content = content.split('~')
|
||||
_content = content.split('HL')
|
||||
_header = _content[:1][0].split('~')
|
||||
|
||||
_blocks = ['HL'+_item for _item in _content[1:]]
|
||||
_blocks = [_item.split('~') for _item in _blocks ]
|
||||
|
||||
# for row in _content :
|
||||
# if not _blocks and not row.startswith('HL') :
|
||||
# _header.append(row)
|
||||
# else:
|
||||
# _blocks.append(row)
|
||||
return {'header':_header,'blocks':_blocks}
|
||||
def parse (self,columns,index,**_args):
|
||||
"""
|
||||
This function encapulates how an x12 document element will be processed
|
||||
:columns list of attributes that make up the object
|
||||
:index indexes of the said items in the element
|
||||
:_args
|
||||
- row raw x12 element (string)
|
||||
- config configuration of the element. his should indicate functions to apply against function
|
||||
"""
|
||||
_ELEMENT = _args['row'][0]
|
||||
#
|
||||
# get the right configuration from the _config object
|
||||
_config = _args['config'][_ELEMENT] if _ELEMENT in _args['config'] else {}
|
||||
|
||||
# _field = _config['field'] if 'field' in _config else None
|
||||
# _label = _config['label'] if 'label' in _config else None
|
||||
_map = _config['map'] if 'map' in _config else {}
|
||||
#
|
||||
# Let's see if overriding the fields/labels isn't necessary
|
||||
|
||||
|
||||
# columns, index,_refField,_refLabel = self._configHandler.merge(row=_args['row'],columns=columns,index=index,config=_config)
|
||||
# _field = _field if not _refField else _refField
|
||||
# _label = _label if not _refLabel else _refLabel
|
||||
|
||||
_outInfo = self._configHandler.merge(row=_args['row'],columns=columns,index=index,config=_config)
|
||||
|
||||
_field,_label = _outInfo['field'],_outInfo['label']
|
||||
_columns,_index = _outInfo['columns'],_outInfo['index']
|
||||
|
||||
|
||||
if 'row' in _args:
|
||||
_row = _args['row'] if type(_args['row']) == list else _args['row'].split('*')
|
||||
|
||||
_index = np.array(_index)
|
||||
|
||||
#
|
||||
# Sometimes the _row doesn't have all expected indexes, we will compensate
|
||||
# This allows to minimize parsing errors as it may relate to disconnects between configuration and x12 element variations (shitty format)
|
||||
#
|
||||
if np.max(_index) > len(_row) -1 :
|
||||
_delta = 1 + np.max(_index) - len(_row)
|
||||
_row = _row + np.repeat('',_delta).tolist()
|
||||
_row = np.array(_row)
|
||||
|
||||
# _element = _row[0]
|
||||
|
||||
_configKeys = [] #list(self._config.keys())
|
||||
_configTree = [] #list(self._config.values())
|
||||
if 'config' in _args :
|
||||
_config = _args['config']
|
||||
_configKeys = list(_config.keys())
|
||||
_configTree = list(_config.values())
|
||||
else:
|
||||
_config = {}
|
||||
|
||||
_info = dict(zip(_columns,_row[_index].tolist()))
|
||||
_document = _args['document'] if 'document' in _args else {}
|
||||
#
|
||||
# Extracting configuration (minimal information)
|
||||
# _config = _args['config'] if 'config' in _args else {}
|
||||
# _config = self._config
|
||||
|
||||
|
||||
# if '@ref' in _config :
|
||||
# print (_config['@ref'])
|
||||
# _values = _config['@ref']
|
||||
# print (_values)
|
||||
|
||||
if _field :
|
||||
if not _field in _document :
|
||||
return {_field:_info}
|
||||
else:
|
||||
return self.merge(_document[_field],_info)
|
||||
elif _label :
|
||||
if not _label in _document :
|
||||
return {_label:[_info]}
|
||||
else:
|
||||
return _document[_label] + [_info]
|
||||
else:
|
||||
return _info
|
||||
|
||||
else:
|
||||
return columns
|
||||
def elements(self):
|
||||
"""
|
||||
This function returns elements that are supported as specified by X12 standard
|
||||
"""
|
||||
return [_name for _name in dir(self) if not _name.startswith('_') and not _name.islower() ]
|
||||
def pointers(self):
|
||||
"""
|
||||
This function returns pointers associated with each element ...
|
||||
:return Object of Element:Function
|
||||
"""
|
||||
_attr = self.elements()
|
||||
_pointers = [getattr(self,_name) for _name in _attr]
|
||||
return dict(zip(_attr,_pointers))
|
||||
|
||||
def set(self,_info,_document,_config):
|
||||
_attrName,_attrType = None,None
|
||||
|
||||
if 'label' in _config :
|
||||
_attrType = 'label'
|
||||
_attrName = _config['label']
|
||||
elif 'field' in _config :
|
||||
_attrType = 'field'
|
||||
_attrName = _config['field']
|
||||
|
||||
if _attrName :
|
||||
if _attrName not in _document :
|
||||
_document[_attrName] = [] if _attrType == 'label' else {}
|
||||
|
||||
#
|
||||
# @TODO: make sure we don't have a case of an attribute being overridden
|
||||
if type(_document[_attrName]) == list :
|
||||
_document[_attrName] += [_info]
|
||||
else:
|
||||
_document[_attrName] = dict(_document[_attrName],**_info)
|
||||
# _document[_attrName] += [_info] if _attrType == 'label' else dict(_document[_attrName],**_info)
|
||||
|
||||
return _document
|
||||
|
||||
return dict(_document,**_info)
|
||||
|
||||
pass
|
||||
def log (self,**_args):
|
||||
pass
|
||||
def run(self):
|
||||
"""
|
||||
This function will trigger the workflow associated with a particular file
|
||||
"""
|
||||
_getContent = {
|
||||
#
|
||||
# For the sake of testing, the following insures
|
||||
# that raw string content is handled as if it were a file
|
||||
#
|
||||
X12DOCUMENT.MODE.STREAM: (lambda stream : io.StringIO(stream)) ,
|
||||
X12DOCUMENT.MODE.NAMES: (lambda name: open(name))
|
||||
|
||||
|
||||
}
|
||||
_writer = transport.factory.instance(**self._store_args)
|
||||
for _filename in self.files :
|
||||
try:
|
||||
_documents = []
|
||||
_parts = []
|
||||
# _content = (open(_filename)).read()
|
||||
_reader = _getContent[self._mode]
|
||||
_content = _reader(_filename).read()
|
||||
_info = self.split(_content)
|
||||
_fileType=self.init(_content)
|
||||
_header = self.apply(_info['header'])
|
||||
|
||||
# print (json.dumps(_header))
|
||||
for _content in _info['blocks'] :
|
||||
|
||||
_body = self.apply(_content,header=_header)
|
||||
_doc = self.merge(_header,_body)
|
||||
|
||||
if _doc and 'claim_id' in _doc:
|
||||
# X12DOCUMENT._queue.put(_document)
|
||||
|
||||
_documents += [_doc]
|
||||
|
||||
|
||||
except Exception as e:
|
||||
#
|
||||
# @TODO: Log this issue for later analysis ...
|
||||
print (e)
|
||||
pass
|
||||
#
|
||||
# Let us post this to the documents we have, we should find a place to post it
|
||||
#
|
||||
if _documents :
|
||||
# print (_header['header'])
|
||||
|
||||
self.post(document=_documents,writer=_writer)
|
||||
break
|
||||
|
||||
def post(self,**_args):
|
||||
"""
|
||||
This function is intended to post content to a given location
|
||||
:param document
|
||||
:param writer
|
||||
"""
|
||||
_writer = _args['writer'] if 'writer' in _args else None
|
||||
_document = _args['document']
|
||||
if not _writer:
|
||||
X12DOCUMENT._queue.put(_document)
|
||||
else:
|
||||
|
||||
_writer.write(_document)
|
||||
def _getConfig(self,_chunk):
|
||||
#
|
||||
# Let us determine what kind of file we are dealing with, so we can extract the configuration
|
||||
# For this we need to look for the ST loop ...
|
||||
#
|
||||
|
||||
line = [line for line in _chunk if line and line[:2] == 'ST' ]
|
||||
|
||||
if line :
|
||||
#
|
||||
# We found the header of the block, so we can set the default configuration
|
||||
#
|
||||
self._x12FileType = line[0].split('*')[1].strip()
|
||||
_config = {}
|
||||
if self._x12FileType :
|
||||
_config = self._config[self._x12FileType]
|
||||
|
||||
return _config
|
||||
|
||||
def apply(self,_chunk, header = {}):
|
||||
"""
|
||||
_chunks are groups of elements split by HL, within each chunk are x12 loops HL,CLM,ISA
|
||||
"""
|
||||
|
||||
|
||||
_document,_cached = {},{}
|
||||
_pointers = self.pointers()
|
||||
_config = self._getConfig(_chunk)
|
||||
#
|
||||
# The configuration comes from the file, let's run this in merge mode
|
||||
# _config = self._configHandler.merge
|
||||
_pid = None
|
||||
for line in _chunk :
|
||||
|
||||
segments = line.split('*')
|
||||
|
||||
_ELEMENT = segments[0]
|
||||
|
||||
if _ELEMENT not in _pointers or not _ELEMENT:
|
||||
continue
|
||||
if _ELEMENT in ['HL','CLM','ISA'] or not _pid:
|
||||
_pid = _ELEMENT
|
||||
if _pid not in _cached :
|
||||
_cached [_pid] = {}
|
||||
|
||||
_pointer = _pointers[_ELEMENT]
|
||||
|
||||
_args = {'row':segments,'document':_document,'header':header,'config':(_config)}
|
||||
|
||||
|
||||
_parsedLine = _pointer(**_args)
|
||||
# print ([_pid,_ELEMENT,_parsedLine])
|
||||
|
||||
_cached[_pid] = self.merge(_cached[_pid],_parsedLine)
|
||||
|
||||
|
||||
#
|
||||
# Let's create the documents as we understand them to be
|
||||
# @TODO: Create a log so there can be visibility into the parser
|
||||
#
|
||||
_document = {}
|
||||
for _id in _cached :
|
||||
# print ('patient' in _cached[_id] )
|
||||
|
||||
_document = self.merge(_document,_cached[_id])
|
||||
|
||||
return _document
|
||||
|
||||
|
|
@ -0,0 +1,42 @@
|
|||
"""
|
||||
|
||||
This file contains parsing functions for a claim header, the functions are built within class (Object Orientation)
|
||||
The intent to leverage Object Orientated designs is to easily support multi-processing
|
||||
|
||||
"""
|
||||
import numpy as np
|
||||
import json
|
||||
# from . im common
|
||||
from .common import X12DOCUMENT
|
||||
|
||||
|
||||
class HEADER (X12DOCUMENT):
|
||||
# @staticmethod
|
||||
def ISA(self,**_args):
|
||||
|
||||
_columns = ['mode','version','date','time']
|
||||
_index = [15,12,9,10]
|
||||
return self.parse(_columns,_index,**_args)
|
||||
# @staticmethod
|
||||
def GS(self,**_args):
|
||||
"""
|
||||
Expected Element GS
|
||||
"""
|
||||
_columns = ['type','sender','receiver','date','time','version']
|
||||
return self.parse(_columns,[1,2,3,4,5,8],**_args)
|
||||
# @staticmethod
|
||||
def ST(self,**_args):
|
||||
"""
|
||||
Expected Element ST
|
||||
According to {x12} standard ST will take precedence over GS
|
||||
"""
|
||||
_columns = ['type','control_number','version']
|
||||
return self.parse(_columns,[1,2,3],**_args)
|
||||
# @staticmethod
|
||||
def BHT (self,**_args):
|
||||
"""
|
||||
Expected Element BHT
|
||||
This functiion will process header submitted (receiver,claim_type,)
|
||||
"""
|
||||
_columns= ['app_id','date','time','type']
|
||||
return self.parse(_columns,[3,4,5,6],**_args)
|
Loading…
Reference in New Issue