parser/healthcareio/x12/util/document.py

259 lines
8.7 KiB
Python
Raw Normal View History

2023-11-21 18:56:35 +00:00
"""
This file encapsulates the functions needed to build a document
"""
import numpy as np
import copy
class Builder:
__doc__ = """
This class is intended to create and manipulate objects
:merge The class merges two objects and accounts for attributes that are lists
:parent returns the parent for a given object
"""
def __init__(self,**_args):
self._last = {}
self._plugins = copy.deepcopy(_args['plugins'])
self._parents = copy.deepcopy(_args['parents'])
self._loop = {}
2024-02-06 03:56:46 +00:00
self._logger = None if 'logger' not in _args else _args['logger']
2023-11-21 18:56:35 +00:00
def reset (self):
self._last = {}
self._loop = {}
def parent(self,**_args):
"""
This function returns the parent item of an object
:meta meta data of a decorated/annotated function
"""
_meta = _args['meta']
# _item = None
if _meta['parent'] :
_id = _meta['parent']
if _id :
return self._last[_id] if _id in self._last else None
return None
2024-02-06 03:56:46 +00:00
2023-11-21 18:56:35 +00:00
def count(self,_element):
if _element not in self._loop :
self._loop[_element] = 0
self._loop[_element] += 1
def pointer(self,**_args):
"""
This function returns a pointer associated with a row element
@TODO: Make sure we know what kind of file we are processing (it would help suppress the loop)
"""
_id = _args['row'][0] if 'row' in _args else _args['element']
_filetype = _args['x12']
_pointer = None
if _id in self._plugins[_filetype] :
_pointer = self._plugins[_filetype][_id]
else:
for _x12 in self._plugins :
if _id in self._plugins[_x12] :
_pointer = self._plugins[_x12][_id]
break
return _pointer
def field(self,**_args) :
_row = _args['row']
_meta= _args['meta']
_field = None
if _meta['parent'] :
_field = self.parent(meta=_meta)['field']
if 'field' in _meta or 'container' in _meta :
_field = _meta['field'] if 'field' in _meta else _meta['container']
if 'anchor' in _meta :
_anchor = _meta['anchor']
for key in _anchor :
if key == _row[1].strip() :
_field = _anchor[key]
break
return _field
def merge (self,_x,_y):
"""
This function will merge two objects _x, _y
"""
_zcols = list(set(_x.keys()) & set(_y.keys())) #--common columns
if _zcols :
_out = dict(_x,**{})
for _key in list(_y.keys()) :
if _key not in _zcols and _key:
_out[_key] = _y[_key]
else:
if type(_out[_key]) == list :
for value in _y[_key] :
if value not in _out[_key] :
_out[_key].append(value)
# _out[_key] += _y[_key]
elif type(_out[_key]) == dict:
_out[_key] = dict(_out[_key],**_y[_key])
else:
_out[_key] = _y[_key]
return _out
else:
return dict(_x,**_y)
def parse (self,**_args):
"""
This function will perform parsing on behalf of the plugin by relying on map function
:row raw x12 row
:meta meta data of the plugin function
"""
#-- Loop Markers
_row = _args['row']
_map = _args['meta']['map']
# _map = self.pointer(row=_row).meta['map']
_index = list(_map.keys())
_columns = [] #[_map[_id] for _id in _index ]
for _id in _index :
_name = _map[_id]
if type(_name) == list :
_columns += _name
_i = _index.index(_id)
_index = (_index[:_i] + np.repeat(_index[_i], len(_name)).tolist()+_index[_i+1:])
else:
_columns.append(_name)
_info = {}
_index = np.array(_index).astype(int)
# _document = _args['document']
if np.max(_index) > len(_row) -1 :
_delta = 1 + np.max(_index) - len(_row)
_row = _row + np.repeat('',_delta).tolist()
_row = np.array(_row)
try:
_info = dict(zip(_columns,_row[_index].tolist()))
except Exception as e:
# print (_row)
# print ( e)
pass
return _info
def meta (self,**_args):
_row = _args['row']
_id = _row[0]
_meta = None
for key in self._plugins :
_items = self._plugins[key]
if _id in _items :
_meta = (_items[_id].meta)
break
return _meta
def update(self,**_args):
_element = _args['row'][0]
if _element in self._parents :
_meta = self.meta(row=_args['row'])
if 'field' not in _meta :
_field = self.field(row=_args['row'],meta=_meta)
else:
_field = _meta['field']
self._last[_element] = {'data':_args['data'],'field':_field}
def bind(self,**_args):
"""
This function is intended to make an object out of an element
:row raw row of x12
:document object that is the document
"""
_row = _args['row']
_filetype = _args['x12']
_id = _row[0]
self.count(_id)
_pointer = self.pointer(row=_row,x12=_filetype)
_parent = None
_data = {}
if not _pointer :
return None,None
#
# Should we use the built-in parser or not
if _pointer and 'map' in _pointer.meta :
_data = self.parse(row=_row,meta=_pointer.meta)
#
# This function will be used as formatter (at least)
# We will also insure that the current element is not the last one
_out = _pointer(row=_row,data=_data, meta=_pointer.meta)
_data = _data if _out is None else _out
self.update(row = _row, data=_data) #-- If this element is considered a parent, we store it
return _data, _pointer.meta
def build (self,**_args):
"""
This function attemps to place a piece of data within a document
"""
_meta = _args['meta']
_data = _args['data']
_row = _args['row']
_document = _args['document']
# if _meta['parent'] :
# _field = self.parent(meta=_meta)['field']
# elif 'field' in _meta :
# _field = _meta['field']
# elif 'container' in _meta :
# _field = _meta['container']
# if type(_document[_field]) != list :
# _data = self.merge(_document[_field],_data)
# _document[_field] = []
# elif 'anchor' in _meta:
# _field = self.field(row=_row,meta=_meta)
# else:
# _field = None
_field = self.field(meta=_meta,row=_row)
if _field :
if 'container' in _meta and type(_document[_field]) != list :
_document[_field] = []
2024-02-06 03:56:46 +00:00
if _field and _document :
2023-11-21 18:56:35 +00:00
if _field not in _document :
_document[_field] =_data
2024-02-06 03:56:46 +00:00
pass
2023-11-21 18:56:35 +00:00
else:
if 'container' in _meta :
_document[_field].append(_data)
else:
_document[_field] = self.merge(_document[_field],_data)
else:
if not _field and 'anchor' in _meta :
#
2024-02-06 03:56:46 +00:00
# We should determine if the element is either a parent or has a parent
# This would allow us to avoid having runaway attributes and undermine structural integrity
#
#
# The element has NOT been specified by the plugin (alas)
# For this case we would advise writing a user-defined plugin to handle this case
#
if self._logger :
self._logger.log(action='missing-plugin',module='build',data={'element':_row[0],'anchor':_row[1]})
return _document
2023-11-21 18:56:35 +00:00
pass
2024-02-06 03:56:46 +00:00
# print ([_row[0],set(_data) - set(_document.keys())])
2023-11-21 18:56:35 +00:00
_document = self.merge(_document,_data)
return _document