import numpy as np import os """ This file contains utilities that will be used accross the x12 framework/platform @TODO: - Provisions with multiprocessing (locks/releases) """ class ContentHandler : """ This class implements {x12} content handling """ def split (self,_stream) : if type(_stream) == str : _xchar = '~\n' if '~\n' in _stream else ('~' if '~' in _stream else ('\n' if '\n' in _stream else None)) if _xchar : _xchar = ''.join(_xchar) _rows = _stream.split(_xchar) return [row.strip().split('*') for row in _rows if row.strip()] else: return _stream.split('*') def classify(self,_content): """ This function is designed to split claim information from the rest of the information (envelope header) :_content The file content (already split by row and seperator) """ _indexes = [1 if 'HL' in line else 0 for line in _content] _indexes = [_index for _index,_value in enumerate(_indexes) if _value == 1] # # At this point we know how many claims are in the file (log this somewhere) # _beg = 0 _end = _indexes[0] _header = _content[_beg:_end] _block = [] for _index,_beg in enumerate(_indexes) : if _index + 1 == len(_indexes) : _end = len(_content) else: _end = _indexes[_index + 1] _block.append(_content[_beg:_end]) return {'header':_header,'block':_block} def merge (self,_x,_y): """ This function will merge two objects _x, _y """ _zcols = list(set(_x.keys()) & set(_y.keys())) #--common columns if _zcols : _out = dict(_x,**{}) for _key in _y.keys() : if not _key in _zcols : _out[_key] = _y[_key] else: if type(_out[_key]) == list : _out[_key] += _y[_key] elif type(_out[_key]) == dict: _out[_key] = dict(_out[_key],**_y[_key]) else: _out[_key] = _y[_key] return _out else: return dict(_x,**_y) def _inspect_row(self,**_args): """ This function makes sure the indexes actually exist in the row :row row to be parsed (already split) :indexes list of indexes :columns columns to be used in the creation of the object """ _max = np.max(_args['indexes']) _len = np.size(_args['row']) -1 return _max > _len and np.size(_args['indexes']) == np.size(_args['columns']) def _parse (self,**_args): """ This function will parse an x12 element given :row row of the x12 element :_columns attributes of the object to be returned :_indexes indexes of interest """ pass _row = _args['row'] _meta = _args['meta'] _columns = _args['columns'] _indexes = np.array(_args['indexes']) if not self._inspect_row (_args) : # # Minimizing parsing errors by padding the line _delta = 1+ np.max(_indexes) - np.size(_row) _row = _row + np.repeat('',_delta).tolist() # # @TODO: Log that the rows were padded # _row = np.array(_row) return dict(zip(_columns,_row[_indexes].tolist())) def _buildObject (self,**_args): """ :meta data that is pulled from the decorator function :object row parsed and stored as an object :document existing document being parsed """ _meta = _args['meta'] _document = _args['document'] _object = _args['object'] if 'field' not in _meta and 'container' not in _meta : _document = self.merge(_document,_object) elif 'field' : field = _meta['field'] if field in _document : _document[field] = self.merge(_document[field],_object) else: _document[field] = _object elif 'container' in _meta : _label = _meta['container'] if _label not in _document : _document[_label] = [] _document[_label].append(_object) return _document def get_files(self,**_args): folder = _args['folder'] files = [] if not os.path.exists(folder) : return [] elif os.path.isdir(folder): for root,_dir,f in os.walk(folder) : if f : files += [os.sep.join([root,name]) for name in f] files = [path for path in files if os.path.isfile(path)] else: files = [folder] return files