parser/healthcareio/x12/utils.py.old

144 lines
5.0 KiB
Python
Raw Permalink Normal View History

2024-02-06 17:16:10 +00:00
import numpy as np
import os
"""
This file contains utilities that will be used accross the x12 framework/platform
@TODO:
- Provisions with multiprocessing (locks/releases)
"""
class ContentHandler :
"""
This class implements {x12} content handling
"""
def split (self,_stream) :
if type(_stream) == str :
_xchar = '~\n' if '~\n' in _stream else ('~' if '~' in _stream else ('\n' if '\n' in _stream else None))
if _xchar :
_xchar = ''.join(_xchar)
_rows = _stream.split(_xchar)
return [row.strip().split('*') for row in _rows if row.strip()]
else:
return _stream.split('*')
def classify(self,_content):
"""
This function is designed to split claim information from the rest of the information (envelope header)
:_content The file content (already split by row and seperator)
"""
_indexes = [1 if 'HL' in line else 0 for line in _content]
_indexes = [_index for _index,_value in enumerate(_indexes) if _value == 1]
#
# At this point we know how many claims are in the file (log this somewhere)
#
_beg = 0
_end = _indexes[0]
_header = _content[_beg:_end]
_block = []
for _index,_beg in enumerate(_indexes) :
if _index + 1 == len(_indexes) :
_end = len(_content)
else:
_end = _indexes[_index + 1]
_block.append(_content[_beg:_end])
return {'header':_header,'block':_block}
def merge (self,_x,_y):
"""
This function will merge two objects _x, _y
"""
_zcols = list(set(_x.keys()) & set(_y.keys())) #--common columns
if _zcols :
_out = dict(_x,**{})
for _key in _y.keys() :
if not _key in _zcols :
_out[_key] = _y[_key]
else:
if type(_out[_key]) == list :
_out[_key] += _y[_key]
elif type(_out[_key]) == dict:
_out[_key] = dict(_out[_key],**_y[_key])
else:
_out[_key] = _y[_key]
return _out
else:
return dict(_x,**_y)
def _inspect_row(self,**_args):
"""
This function makes sure the indexes actually exist in the row
:row row to be parsed (already split)
:indexes list of indexes
:columns columns to be used in the creation of the object
"""
_max = np.max(_args['indexes'])
_len = np.size(_args['row']) -1
return _max > _len and np.size(_args['indexes']) == np.size(_args['columns'])
def _parse (self,**_args):
"""
This function will parse an x12 element given
:row row of the x12 element
:_columns attributes of the object to be returned
:_indexes indexes of interest
"""
pass
_row = _args['row']
_meta = _args['meta']
_columns = _args['columns']
_indexes = np.array(_args['indexes'])
if not self._inspect_row (_args) :
#
# Minimizing parsing errors by padding the line
_delta = 1+ np.max(_indexes) - np.size(_row)
_row = _row + np.repeat('',_delta).tolist()
#
# @TODO: Log that the rows were padded
#
_row = np.array(_row)
return dict(zip(_columns,_row[_indexes].tolist()))
def _buildObject (self,**_args):
"""
:meta data that is pulled from the decorator function
:object row parsed and stored as an object
:document existing document being parsed
"""
_meta = _args['meta']
_document = _args['document']
_object = _args['object']
if 'field' not in _meta and 'container' not in _meta :
_document = self.merge(_document,_object)
elif 'field' :
field = _meta['field']
if field in _document :
_document[field] = self.merge(_document[field],_object)
else:
_document[field] = _object
elif 'container' in _meta :
_label = _meta['container']
if _label not in _document :
_document[_label] = []
_document[_label].append(_object)
return _document
def get_files(self,**_args):
folder = _args['folder']
files = []
if not os.path.exists(folder) :
return []
elif os.path.isdir(folder):
for root,_dir,f in os.walk(folder) :
if f :
files += [os.sep.join([root,name]) for name in f]
files = [path for path in files if os.path.isfile(path)]
else:
files = [folder]
return files