144 lines
5.0 KiB
Python
144 lines
5.0 KiB
Python
import numpy as np
|
|
import os
|
|
|
|
"""
|
|
This file contains utilities that will be used accross the x12 framework/platform
|
|
@TODO:
|
|
- Provisions with multiprocessing (locks/releases)
|
|
"""
|
|
class ContentHandler :
|
|
"""
|
|
This class implements {x12} content handling
|
|
"""
|
|
def split (self,_stream) :
|
|
if type(_stream) == str :
|
|
_xchar = '~\n' if '~\n' in _stream else ('~' if '~' in _stream else ('\n' if '\n' in _stream else None))
|
|
|
|
if _xchar :
|
|
_xchar = ''.join(_xchar)
|
|
_rows = _stream.split(_xchar)
|
|
|
|
return [row.strip().split('*') for row in _rows if row.strip()]
|
|
else:
|
|
return _stream.split('*')
|
|
|
|
def classify(self,_content):
|
|
"""
|
|
This function is designed to split claim information from the rest of the information (envelope header)
|
|
:_content The file content (already split by row and seperator)
|
|
"""
|
|
_indexes = [1 if 'HL' in line else 0 for line in _content]
|
|
_indexes = [_index for _index,_value in enumerate(_indexes) if _value == 1]
|
|
|
|
#
|
|
# At this point we know how many claims are in the file (log this somewhere)
|
|
#
|
|
_beg = 0
|
|
_end = _indexes[0]
|
|
_header = _content[_beg:_end]
|
|
_block = []
|
|
for _index,_beg in enumerate(_indexes) :
|
|
if _index + 1 == len(_indexes) :
|
|
_end = len(_content)
|
|
else:
|
|
_end = _indexes[_index + 1]
|
|
_block.append(_content[_beg:_end])
|
|
|
|
return {'header':_header,'block':_block}
|
|
|
|
def merge (self,_x,_y):
|
|
"""
|
|
This function will merge two objects _x, _y
|
|
"""
|
|
_zcols = list(set(_x.keys()) & set(_y.keys())) #--common columns
|
|
|
|
if _zcols :
|
|
_out = dict(_x,**{})
|
|
for _key in _y.keys() :
|
|
if not _key in _zcols :
|
|
|
|
_out[_key] = _y[_key]
|
|
else:
|
|
if type(_out[_key]) == list :
|
|
_out[_key] += _y[_key]
|
|
elif type(_out[_key]) == dict:
|
|
_out[_key] = dict(_out[_key],**_y[_key])
|
|
else:
|
|
_out[_key] = _y[_key]
|
|
|
|
return _out
|
|
else:
|
|
|
|
return dict(_x,**_y)
|
|
def _inspect_row(self,**_args):
|
|
"""
|
|
This function makes sure the indexes actually exist in the row
|
|
:row row to be parsed (already split)
|
|
:indexes list of indexes
|
|
:columns columns to be used in the creation of the object
|
|
"""
|
|
_max = np.max(_args['indexes'])
|
|
_len = np.size(_args['row']) -1
|
|
return _max > _len and np.size(_args['indexes']) == np.size(_args['columns'])
|
|
|
|
def _parse (self,**_args):
|
|
"""
|
|
This function will parse an x12 element given
|
|
:row row of the x12 element
|
|
:_columns attributes of the object to be returned
|
|
:_indexes indexes of interest
|
|
"""
|
|
pass
|
|
_row = _args['row']
|
|
_meta = _args['meta']
|
|
_columns = _args['columns']
|
|
_indexes = np.array(_args['indexes'])
|
|
if not self._inspect_row (_args) :
|
|
#
|
|
# Minimizing parsing errors by padding the line
|
|
_delta = 1+ np.max(_indexes) - np.size(_row)
|
|
_row = _row + np.repeat('',_delta).tolist()
|
|
#
|
|
# @TODO: Log that the rows were padded
|
|
#
|
|
_row = np.array(_row)
|
|
return dict(zip(_columns,_row[_indexes].tolist()))
|
|
|
|
def _buildObject (self,**_args):
|
|
"""
|
|
:meta data that is pulled from the decorator function
|
|
:object row parsed and stored as an object
|
|
:document existing document being parsed
|
|
"""
|
|
_meta = _args['meta']
|
|
_document = _args['document']
|
|
_object = _args['object']
|
|
if 'field' not in _meta and 'container' not in _meta :
|
|
_document = self.merge(_document,_object)
|
|
elif 'field' :
|
|
field = _meta['field']
|
|
if field in _document :
|
|
_document[field] = self.merge(_document[field],_object)
|
|
else:
|
|
_document[field] = _object
|
|
elif 'container' in _meta :
|
|
_label = _meta['container']
|
|
if _label not in _document :
|
|
_document[_label] = []
|
|
|
|
_document[_label].append(_object)
|
|
return _document
|
|
def get_files(self,**_args):
|
|
folder = _args['folder']
|
|
files = []
|
|
if not os.path.exists(folder) :
|
|
return []
|
|
elif os.path.isdir(folder):
|
|
|
|
for root,_dir,f in os.walk(folder) :
|
|
if f :
|
|
files += [os.sep.join([root,name]) for name in f]
|
|
files = [path for path in files if os.path.isfile(path)]
|
|
else:
|
|
files = [folder]
|
|
return files |