parser/healthcareio/x12/util/file.py

172 lines
6.9 KiB
Python

import os
import numpy as np
from io import StringIO
# from .common import Common
class Content :
"""
This class implements functions that will manipulate content of a file
:split splits the content
:read reads the content of a file given a filename
:parse parses the content of a file given a map {index:field_name}
"""
def __init__(self,**_args):
self._parents = {}
self._lastelement = {}
def split(self,_content):
if type(_content) == str :
_xchar = '~\n' if '~\n' in _content else ('~' if '~' in _content else ('\n' if '\n' in _content else None))
_x12 = '837' if 'CLM*' in _content else ('835' if 'CLP*' in _content else None)
_map = {'835':'CLP','837':'CLM'}
_claim_mark = _map[_x12]
_content = _content.split(_claim_mark)
_xchar = ''.join(_xchar)
_chunks = []
for _block in _content :
if len(_chunks) > 0 :
_block = _claim_mark+ _block
_splitblocks = [row.strip().split('*') for row in _block.split(_xchar) if row.strip()]
_chunks.append(_splitblocks)
return _chunks,_x12
# if _xchar :
# _xchar = ''.join(_xchar)
# _rows = _content.split(_xchar)
# return [row.strip().split('*') for row in _rows if row.strip()]
# else:
# return _content.split('*')
return [],None
def read(self,**_args):
"""
This function will read and clean-up the content of a file
"""
_filename = _args['filename']
if type(_filename) == StringIO :
return _filename.read()
else:
f = open(_filename)
_content = f.read()
f.close()
return _content
def _ix_parse (self,columns,index,**_args):
"""
This function encapulates how an x12 document element will be processed
:columns list of attributes that make up the object
:index indexes of the said items in the element
:_args
- row raw x12 element (string)
- pointer decorated function
- document
"""
_ELEMENT = _args['row'][0]
_pointer = _args['pointer']
_document = _args['document']
if 'map' in _pointer.meta :
_map = _pointer.meta['map']
_index = list(_map.keys())
_columns = [_map[_id] for _id in _index ]
_info = {}
_row = _args['row'] if type(_args['row']) == list else _args['row'].split('*')
_index = np.array(_index)
#
# Sometimes the _row doesn't have all expected indexes, we will compensate
# This allows to minimize parsing errors as it may relate to disconnects between configuration and x12 element variations (shitty format)
#
if np.max(_index) > len(_row) -1 :
_delta = 1 + np.max(_index) - len(_row)
_row = _row + np.repeat('',_delta).tolist()
_row = np.array(_row)
_info = dict(zip(_columns,_row[_index].tolist()))
else:
#
# We should call the function that is intended to perform the parsing
#
_info = _pointer(row=_args['row'],document=_document,meta=_pointer.meta)
#
# @TODO: We should look into the object created and enforce the specifications are met
#
return _info
# def consolidate(self,**_args):
# """
# This function takes an object and addit to the document given meta data
# :document document associated associated with a claim (processing the loops)
# :object
# :caller attributes within the decorator
# """
# _document = _args['document'] if 'document' in _args else {}
# _info = _args['object']
# _meta = _args['meta']
# #
# # @TODO:
# # Apply parsing/casting function to the object retrieved
# # _apply(_info) #-- the object will be processed accordingly
# #
# #
# # @TODO:
# # The objects parsed must be augmented against the appropriate ones e.g: NM1 <- N1,N2,N3,N4
# # - Find a way to drive this from a configuration ...
# #
# if 'field' in _meta : #hasattr(_meta,'field') :
# _field = _meta['field']
# if not _field in _document :
# _item = {_field:_info}
# else:
# _item = self.merge(_document[_field],_info)
# elif 'container' in _meta: # hasattr(_meta,'container') :
# _label = _meta.container
# if not _label in _document :
# _item = {_label:[_info]}
# else:
# _item = _document[_label] + [_info]
# else:
# _item = _info
# if 'parent' in _meta : #hasattr(_meta,'parent'):
# _hasField = 'field' in _meta
# _hasParent= _meta['element'] in self._parents
# if _hasField and _hasParent: #_meta.element in self._parents and hasattr(_meta,'field'):
# self_last = _item
# pass
# else:
# for key in self._parents :
# if _meta.element in self._parents[key] :
# _ikey = list(self_last.keys())[0]
# _oldinfo = self_last[_ikey]
# if type(_oldinfo) != dict :
# #
# # Only applicable against a dictionary not a list (sorry)
# pass
# else:
# _item = {_ikey: self.merge(_oldinfo,_item)}
# break
# pass
# return _item
class Location :
@staticmethod
def get(**_args):
_path = _args['path']
files = []
if os.path.isdir(_path):
for root,_dir,f in os.walk(_path) :
if f :
files += [os.sep.join([root,name]) for name in f]
files = [path for path in files if os.path.isfile(path)]
else:
files = [_path]
_chunks = 0 if 'chunks' not in _args else int(_args['chunks'])
return files if not _chunks else np.array_split(files,_chunks)