""" This file encapsulates the functions needed to build a document """ import numpy as np import copy class Builder: __doc__ = """ This class is intended to create and manipulate objects :merge The class merges two objects and accounts for attributes that are lists :parent returns the parent for a given object """ def __init__(self,**_args): self._last = {} self._plugins = copy.deepcopy(_args['plugins']) self._parents = copy.deepcopy(_args['parents']) self._loop = {} def reset (self): self._last = {} self._loop = {} def parent(self,**_args): """ This function returns the parent item of an object :meta meta data of a decorated/annotated function """ _meta = _args['meta'] # _item = None if _meta['parent'] : _id = _meta['parent'] if _id : return self._last[_id] if _id in self._last else None return None # if _id in self._parents : # self._last[_id] = # if 'parent' in _meta : #hasattr(_meta,'parent'): # _hasField = 'field' in _meta # _hasParent= _meta['element'] in self._parents # if _hasField and _hasParent: #_meta.element in self._parents and hasattr(_meta,'field'): # self._last = _item # pass # else: # for key in self._parents : # if _meta['element'] in self._parents[key] : # _ikey = list(self._last.keys())[0] # _oldinfo = self._last[_ikey] # if type(_oldinfo) != dict : # # # # Only applicable against a dictionary not a list (sorry) # pass # else: # _item = {_ikey: self.merge(_oldinfo,_item)} # break # pass # return _item def count(self,_element): if _element not in self._loop : self._loop[_element] = 0 self._loop[_element] += 1 def pointer(self,**_args): """ This function returns a pointer associated with a row element @TODO: Make sure we know what kind of file we are processing (it would help suppress the loop) """ _id = _args['row'][0] if 'row' in _args else _args['element'] _filetype = _args['x12'] _pointer = None if _id in self._plugins[_filetype] : _pointer = self._plugins[_filetype][_id] else: for _x12 in self._plugins : if _id in self._plugins[_x12] : _pointer = self._plugins[_x12][_id] break return _pointer def field(self,**_args) : _row = _args['row'] _meta= _args['meta'] _field = None if _meta['parent'] : _field = self.parent(meta=_meta)['field'] if 'field' in _meta or 'container' in _meta : _field = _meta['field'] if 'field' in _meta else _meta['container'] if 'anchor' in _meta : _anchor = _meta['anchor'] for key in _anchor : if key == _row[1].strip() : _field = _anchor[key] break return _field def merge (self,_x,_y): """ This function will merge two objects _x, _y """ _zcols = list(set(_x.keys()) & set(_y.keys())) #--common columns if _zcols : _out = dict(_x,**{}) for _key in list(_y.keys()) : if _key not in _zcols and _key: _out[_key] = _y[_key] else: if type(_out[_key]) == list : for value in _y[_key] : if value not in _out[_key] : _out[_key].append(value) # _out[_key] += _y[_key] elif type(_out[_key]) == dict: _out[_key] = dict(_out[_key],**_y[_key]) else: _out[_key] = _y[_key] return _out else: return dict(_x,**_y) def parse (self,**_args): """ This function will perform parsing on behalf of the plugin by relying on map function :row raw x12 row :meta meta data of the plugin function """ #-- Loop Markers _row = _args['row'] _map = _args['meta']['map'] # _map = self.pointer(row=_row).meta['map'] _index = list(_map.keys()) _columns = [] #[_map[_id] for _id in _index ] for _id in _index : _name = _map[_id] if type(_name) == list : _columns += _name _i = _index.index(_id) _index = (_index[:_i] + np.repeat(_index[_i], len(_name)).tolist()+_index[_i+1:]) else: _columns.append(_name) _info = {} _index = np.array(_index).astype(int) # _document = _args['document'] if np.max(_index) > len(_row) -1 : _delta = 1 + np.max(_index) - len(_row) _row = _row + np.repeat('',_delta).tolist() _row = np.array(_row) try: _info = dict(zip(_columns,_row[_index].tolist())) except Exception as e: # print (_row) # print ( e) pass return _info def meta (self,**_args): _row = _args['row'] _id = _row[0] _meta = None for key in self._plugins : _items = self._plugins[key] if _id in _items : _meta = (_items[_id].meta) break return _meta def update(self,**_args): _element = _args['row'][0] if _element in self._parents : _meta = self.meta(row=_args['row']) if 'field' not in _meta : _field = self.field(row=_args['row'],meta=_meta) else: _field = _meta['field'] self._last[_element] = {'data':_args['data'],'field':_field} def bind(self,**_args): """ This function is intended to make an object out of an element :row raw row of x12 :document object that is the document """ _row = _args['row'] _filetype = _args['x12'] _id = _row[0] self.count(_id) _pointer = self.pointer(row=_row,x12=_filetype) _parent = None _data = {} # _document = _args['document'] if not _pointer : return None,None # # Should we use the built-in parser or not if _pointer and 'map' in _pointer.meta : _data = self.parse(row=_row,meta=_pointer.meta) # # This function will be used as formatter (at least) # We will also insure that the current element is not the last one _out = _pointer(row=_row,data=_data, meta=_pointer.meta) _data = _data if _out is None else _out self.update(row = _row, data=_data) #-- If this element is considered a parent, we store it return _data, _pointer.meta def build (self,**_args): """ This function attemps to place a piece of data within a document """ _meta = _args['meta'] _data = _args['data'] _row = _args['row'] _document = _args['document'] # if _meta['parent'] : # _field = self.parent(meta=_meta)['field'] # elif 'field' in _meta : # _field = _meta['field'] # elif 'container' in _meta : # _field = _meta['container'] # if type(_document[_field]) != list : # _data = self.merge(_document[_field],_data) # _document[_field] = [] # elif 'anchor' in _meta: # _field = self.field(row=_row,meta=_meta) # else: # _field = None _field = self.field(meta=_meta,row=_row) if _field : if 'container' in _meta and type(_document[_field]) != list : _document[_field] = [] if _field and _document: if _field not in _document : _document[_field] =_data else: if 'container' in _meta : _document[_field].append(_data) else: _document[_field] = self.merge(_document[_field],_data) else: if not _field and 'anchor' in _meta : # # This is an unusual situation ... pass _document = self.merge(_document,_data) return _document