diff --git a/healthcareio/version.py b/healthcareio/version.py new file mode 100644 index 0000000..c14d3e9 --- /dev/null +++ b/healthcareio/version.py @@ -0,0 +1,17 @@ +__author__ = 'The Phi Technology LLC' +__version__ = '1.0' +__license__ = """ +(c) 2019 EDI Parser Toolkit, +Health Information Privacy Lab, Vanderbilt University Medical Center & The Phi Technology + +Steve L. Nyemba +Khanhly Nguyen + + +This code is intended to process and parse healthcare x12 837 (claims) and x12 835 (remittances) into human readable JSON format. +The claims/outpout can be forwarded to a NoSQL Data store like couchdb and mongodb +Usage : + Commandline : + python xreader.py --parse claims|remits --config + Embedded : +""" \ No newline at end of file diff --git a/healthcareio/x12/export/__init___.py b/healthcareio/x12/export/__init___.py new file mode 100644 index 0000000..c2166e5 --- /dev/null +++ b/healthcareio/x12/export/__init___.py @@ -0,0 +1,31 @@ +"""" +This module is designed to perform exports to a relational data stores +Note that the There are two possible methods to perform relational exports +""" +import transport +from transport import providers +import healthcareio.x12.plugins + +# +# We start by loading all the plugins +def primary_key (**_args) : + _plugins = _args['plugins'] + for key in _plugins : + _lpointers = +def init (**_args): + if 'path' in _args : + _path = _args['path'] + _plugins,_parents = healthcareio.x12.plugins.instance(path=_path) + else: + _plugins,_parents = healthcareio.x12.plugins.instance() + for key in _plugins : + _lpointers = _plugins[key] + _foreign = {} + _table = {} + for _pointer in _lpointers : + _meta = _pointer.meta + if 'map' in _meta : + _attr = list(_meta['map'].values()) + if 'field' in _meta : + _name = _meta['field'] + _foreign[_name] = _attr \ No newline at end of file diff --git a/healthcareio/x12/plugins/default.py b/healthcareio/x12/plugins/default.py deleted file mode 100644 index d3a89bc..0000000 --- a/healthcareio/x12/plugins/default.py +++ /dev/null @@ -1,38 +0,0 @@ -import datetime - -def date(**_args): - """# - This function will return a data as presented in the {x12} i.e it could be a date-range or a single date - - In the case of a single data it is returned as a string - - In the case of a range a complex object is returned with to,from keys - NOTE: dates will be formatted as they - """ - if not _args : - return ['from','to','type'] - _date = "" - return _date -def procedure (**_args): - """ - This function will parse SVC element and return given the following The return object is as follows : - claim_id,charge_amount, payment_amount,patient_amount,patient_status,claim_status - - """ - cols = ['type','code','amount'] - if not _args : - return cols - _procedure = dict.fromkeys(cols,None) - _row = _args['row'] - # _document = _args['document'] - if len(_row) == 3 : - _procedure = dict(zip(cols,_row[1:4])) - return _procedure - - return _info -def SV2(**_args): - pass -def SV3(**_args): - pass -def HL (**_args): - pass -def HI(**_args): - pass \ No newline at end of file diff --git a/healthcareio/x12/plugins/default/__init__.py b/healthcareio/x12/plugins/default/__init__.py index f4c69bb..114367e 100644 --- a/healthcareio/x12/plugins/default/__init__.py +++ b/healthcareio/x12/plugins/default/__init__.py @@ -7,20 +7,49 @@ In addition to the allow custom plugins to be written/loaded and these will be g - Support configuration specification """ import os -from . import common -from . import header -from . import body +import sys +# from . import common +# from . import header +# from . import body +import importlib as IL +# import imp +from .. import parser +# from .claims import * +# from .remits import * +# EDI = body.BODY +# X12Handler = body.BODY +from healthcareio.x12.plugins.default import claims +from healthcareio.x12.plugins.default import remits +# import .remits -EDI = body.BODY -__version__ = '0.01' -__author__ = 'The Phi Technology' -def instance(**_args): +@parser(element='ISA',x12='837',field='header', map={15:'mode',12:'version',9:'date',10:'time'}) +def ISA(**_args): + """ + :row raw {x12} row + :data parsed data + :meta elements containing map {index:field_name} + """ + pass + +@parser(element='GS', map={1:'type',2:'sender',3:'receiver',4:'date',5:'time',8:'version'},field='receiver') +def GS(**_args): + pass + +@parser(element='ST', x12='837', field='header', map={1:'x12',2:'control_number'}) +def ST(**_args): + """ + :row raw {x12} row + :data parsed data + :meta elements containing map {index:field_name} + """ + pass + + +@parser(element='BHT',field='header',map={3:'app_id',4:'date',5:'time',6:'type'}) +def BHT (**_args): + """ + :row raw {x12} row + :data parsed data + :meta elements containing map {index:field_name} + """ pass -# -# defining commong functions that can/should be used accross the board -# -# # class Parser : -# def __init__(**_args): -# folder = _args['path'] -# files = [ os.sep.join(_name,folder) for _name in os.listdir(folder)] -# pass \ No newline at end of file diff --git a/healthcareio/x12/plugins/default/_common.py b/healthcareio/x12/plugins/default/_common.py new file mode 100644 index 0000000..4ad687d --- /dev/null +++ b/healthcareio/x12/plugins/default/_common.py @@ -0,0 +1,456 @@ +from typing import Any +import numpy as np +import json +from multiprocessing import Process, RLock +import os +import io +import queue +import transport +from transport import providers + +class Store(Process): + """ + This is the data-store service that will handle read/writes + """ + dataStore = None + @staticmethod + def init(self,**_args): + if Store.dataStore is None : + _args = _args['store'] + + else: + pass + @staticmethod + def reset(): + pass + +class X12DOCUMENT (Process): + """ + X12DOCUMENT class encapsulates functions that will be used to format an x12 (835,837) claim into an object + """ + _queue = queue.Queue() + + class MODE : + # + # The following allow us to handle raw content (stream) or a filename + # The raw content will be wrapped into io.StringIO so that it is handled as if it were a file + # + NAMES,STREAM = 'NAMES','STREAM' + class ConfigHandler : + def format(self,**_args): + """ + This function formats variations of an element's parsing rules + :info {index,field|label,map} + """ + + _info = _args['info'] + _ref = {} + + for _item in _info : + _index = str(_item['index']) + _field = _item['field'] if 'field' in _item else None + _label = _item['label'] if 'label' in _item else None + if _field : + _ref[_index] = {'field':_field} + elif _label : + _ref[_index] = {'label':_label} + + return {'@ref':_ref} + def _getColumnsIndexes(self,_columns,_indexes,_map): + """ + This function return columns and indexes related if a parsing map is passed + :param _columns + :param _indexes + :param _map parsing map (field:index) + """ + # @TODO: insure the lengths are the same for adequate usage downstream ... + _xcolumns,_xindexes = list(_map.keys()), list(_map.values()) + keys,values = _xcolumns + _columns,_xindexes + _indexes + _config = dict(zip(keys,values)) + + _outColumns,_outIndexes = list(_config.keys()),list(_config.values()) + return _outColumns,_outIndexes + def _getObjectAtributes(self,_config): + _field = _config['field'] if 'field' in _config else {} + _label = _config['label'] if 'label' in _config else {} + return _field,_label + def merge(self,**_args): + # + # This function overrides the old configuration with the new configuration specifications + # + + # _columns,_indexes = [],[] + _columns,_indexes = _args['columns'],_args['index'] + _map = {} + _config = _args['config'] if 'config' in _args else {} + _field,_label = self._getObjectAtributes(_config) + + if 'map' in _config : + _map = _args['config']['map'] + _columns,_indexes = self._getColumnsIndexes(_columns,_indexes,_map) + + if '@ref' in _config : + # _columns,_indexes = [],[] + _row = _args['row'] + + _ref = _config['@ref'] + + for _anchor in _ref: + # print ([_anchor,_anchor == _row[1].strip()]) + if _anchor == _row[1].strip() : + _field,_label = self._getObjectAtributes(_ref[_anchor]) + + _map = _ref[_anchor]['map'] if 'map' in _ref[_anchor] else {} + + if _map : + _columns,_indexes = self._getColumnsIndexes([],[],_map) + + + break + # _columns,_indexes = _columns + _map.keys() + + return {'columns':_columns,'index':_indexes,'field':_field,'label':_label} + def legacy(self,**_args): + # + # This function returns the legacy configuration (default parsing) + # + + _config = _args['config'] if 'config' in _args else {} + _field,_label = self._getObjectAtributes(_config) + _columns,_indexes = [],[] + if 'map' in _config : + _columns = list(_config['map'].keys()) + _indexes = list(_config['map'].values()) + + return {'columns':_columns,'index':_indexes,'field':_field,'label':_label} + def override(self,**_args): + return _args['columns'],_args['indexes'] + def __init__(self,**_args): + super().__init__() + self._mode = _args['mode'] if 'mode' in _args else 'NAMES' + if 'files' in _args : + self.files = _args['files'] + self._config = _args['config'] if 'config' in _args else {} + self._document = [] + + self._x12FileType = None + self._configHandler = X12DOCUMENT.ConfigHandler() + + # + #-- The files need to be classified, the files need to be either claims or remits + # + if 'store' not in self._config : + self._store_args = _args['store'] if 'store' in _args else {'provider':providers.CONSOLE} + else: + self._store_args = self._config['store'] + + def init(self,_header): + """ + Expected Elements must include ST + """ + pass + + def merge (self,_x,_y): + """ + This function will merge two objects _x, _y + """ + _zcols = list(set(_x.keys()) & set(_y.keys())) #--common columns + + if _zcols : + _out = dict(_x,**{}) + for _key in _y.keys() : + if not _key in _zcols : + + _out[_key] = _y[_key] + else: + if type(_out[_key]) == list : + _out[_key] += _y[_key] + elif type(_out[_key]) == dict: + _out[_key] = dict(_out[_key],**_y[_key]) + else: + _out[_key] = _y[_key] + + return _out + else: + + return dict(_x,**_y) + + def split(self,content): + """ + This function will split the content of an X12 document into blocks and headers + :content x12 document in raw format (text) + """ + #_content = content.split('~') + _content = content.split('HL') + _header = _content[:1][0].split('~') + + _blocks = ['HL'+_item for _item in _content[1:]] + _blocks = [_item.split('~') for _item in _blocks ] + + # for row in _content : + # if not _blocks and not row.startswith('HL') : + # _header.append(row) + # else: + # _blocks.append(row) + return {'header':_header,'blocks':_blocks} + def parse (self,columns,index,**_args): + """ + This function encapulates how an x12 document element will be processed + :columns list of attributes that make up the object + :index indexes of the said items in the element + :_args + - row raw x12 element (string) + - config configuration of the element. his should indicate functions to apply against function + """ + _ELEMENT = _args['row'][0] + # + # get the right configuration from the _config object + _config = _args['config'][_ELEMENT] if _ELEMENT in _args['config'] else {} + + # _field = _config['field'] if 'field' in _config else None + # _label = _config['label'] if 'label' in _config else None + _map = _config['map'] if 'map' in _config else {} + # + # Let's see if overriding the fields/labels isn't necessary + + + # columns, index,_refField,_refLabel = self._configHandler.merge(row=_args['row'],columns=columns,index=index,config=_config) + # _field = _field if not _refField else _refField + # _label = _label if not _refLabel else _refLabel + + _outInfo = self._configHandler.merge(row=_args['row'],columns=columns,index=index,config=_config) + + _field,_label = _outInfo['field'],_outInfo['label'] + _columns,_index = _outInfo['columns'],_outInfo['index'] + + + if 'row' in _args: + _row = _args['row'] if type(_args['row']) == list else _args['row'].split('*') + + _index = np.array(_index) + + # + # Sometimes the _row doesn't have all expected indexes, we will compensate + # This allows to minimize parsing errors as it may relate to disconnects between configuration and x12 element variations (shitty format) + # + if np.max(_index) > len(_row) -1 : + _delta = 1 + np.max(_index) - len(_row) + _row = _row + np.repeat('',_delta).tolist() + _row = np.array(_row) + + # _element = _row[0] + + _configKeys = [] #list(self._config.keys()) + _configTree = [] #list(self._config.values()) + if 'config' in _args : + _config = _args['config'] + _configKeys = list(_config.keys()) + _configTree = list(_config.values()) + else: + _config = {} + + _info = dict(zip(_columns,_row[_index].tolist())) + _document = _args['document'] if 'document' in _args else {} + # + # Extracting configuration (minimal information) + # _config = _args['config'] if 'config' in _args else {} + # _config = self._config + + + # if '@ref' in _config : + # print (_config['@ref']) + # _values = _config['@ref'] + # print (_values) + + if _field : + if not _field in _document : + return {_field:_info} + else: + return self.merge(_document[_field],_info) + elif _label : + if not _label in _document : + return {_label:[_info]} + else: + return _document[_label] + [_info] + else: + return _info + + else: + return columns + def elements(self): + """ + This function returns elements that are supported as specified by X12 standard + """ + return [_name for _name in dir(self) if not _name.startswith('_') and not _name.islower() ] + def pointers(self): + """ + This function returns pointers associated with each element ... + :return Object of Element:Function + """ + _attr = self.elements() + _pointers = [getattr(self,_name) for _name in _attr] + return dict(zip(_attr,_pointers)) + + def set(self,_info,_document,_config): + _attrName,_attrType = None,None + + if 'label' in _config : + _attrType = 'label' + _attrName = _config['label'] + elif 'field' in _config : + _attrType = 'field' + _attrName = _config['field'] + + if _attrName : + if _attrName not in _document : + _document[_attrName] = [] if _attrType == 'label' else {} + + # + # @TODO: make sure we don't have a case of an attribute being overridden + if type(_document[_attrName]) == list : + _document[_attrName] += [_info] + else: + _document[_attrName] = dict(_document[_attrName],**_info) + # _document[_attrName] += [_info] if _attrType == 'label' else dict(_document[_attrName],**_info) + + return _document + + return dict(_document,**_info) + + pass + def log (self,**_args): + pass + def run(self): + """ + This function will trigger the workflow associated with a particular file + """ + _getContent = { + # + # For the sake of testing, the following insures + # that raw string content is handled as if it were a file + # + X12DOCUMENT.MODE.STREAM: (lambda stream : io.StringIO(stream)) , + X12DOCUMENT.MODE.NAMES: (lambda name: open(name)) + + + } + _writer = transport.factory.instance(**self._store_args) + for _filename in self.files : + try: + _documents = [] + _parts = [] + # _content = (open(_filename)).read() + _reader = _getContent[self._mode] + _content = _reader(_filename).read() + _info = self.split(_content) + _fileType=self.init(_content) + _header = self.apply(_info['header']) + + # print (json.dumps(_header)) + for _content in _info['blocks'] : + + _body = self.apply(_content,header=_header) + _doc = self.merge(_header,_body) + + if _doc and 'claim_id' in _doc: + # X12DOCUMENT._queue.put(_document) + + _documents += [_doc] + + + except Exception as e: + # + # @TODO: Log this issue for later analysis ... + print (e) + pass + # + # Let us post this to the documents we have, we should find a place to post it + # + if _documents : + # print (_header['header']) + + self.post(document=_documents,writer=_writer) + break + + def post(self,**_args): + """ + This function is intended to post content to a given location + :param document + :param writer + """ + _writer = _args['writer'] if 'writer' in _args else None + _document = _args['document'] + if not _writer: + X12DOCUMENT._queue.put(_document) + else: + + _writer.write(_document) + def _getConfig(self,_chunk): + # + # Let us determine what kind of file we are dealing with, so we can extract the configuration + # For this we need to look for the ST loop ... + # + + line = [line for line in _chunk if line and line[:2] == 'ST' ] + + if line : + # + # We found the header of the block, so we can set the default configuration + # + self._x12FileType = line[0].split('*')[1].strip() + _config = {} + if self._x12FileType : + _config = self._config[self._x12FileType] + + return _config + + def apply(self,_chunk, header = {}): + """ + _chunks are groups of elements split by HL, within each chunk are x12 loops HL,CLM,ISA + """ + + + _document,_cached = {},{} + _pointers = self.pointers() + _config = self._getConfig(_chunk) + # + # The configuration comes from the file, let's run this in merge mode + # _config = self._configHandler.merge + _pid = None + for line in _chunk : + + segments = line.split('*') + + _ELEMENT = segments[0] + + if _ELEMENT not in _pointers or not _ELEMENT: + continue + if _ELEMENT in ['HL','CLM','ISA'] or not _pid: + _pid = _ELEMENT + if _pid not in _cached : + _cached [_pid] = {} + + _pointer = _pointers[_ELEMENT] + + _args = {'row':segments,'document':_document,'header':header,'config':(_config)} + + + _parsedLine = _pointer(**_args) + # print ([_pid,_ELEMENT,_parsedLine]) + + _cached[_pid] = self.merge(_cached[_pid],_parsedLine) + + + # + # Let's create the documents as we understand them to be + # @TODO: Create a log so there can be visibility into the parser + # + _document = {} + for _id in _cached : + # print ('patient' in _cached[_id] ) + + _document = self.merge(_document,_cached[_id]) + + return _document + + \ No newline at end of file diff --git a/healthcareio/x12/plugins/default/body.py b/healthcareio/x12/plugins/default/body.py index eac2dcc..ee21ce5 100644 --- a/healthcareio/x12/plugins/default/body.py +++ b/healthcareio/x12/plugins/default/body.py @@ -27,10 +27,13 @@ class BODY (HEADER): '2':{'field':'payer'}, 'PR':{'field':'payer'}, '41':{'field':'header'}, + '45':{'field':'ambulance_location'}, 'IL':{'field':'patient','map':{'type':2,'first_name':4,'last_name':3}}, 'P5':{'field':'plan_sponsor'}, '82':{'field':'rendering_provider','map':{'type':2,'first_name':4,'last_name':3}}, - '85':{'field':'billing_provider'} + '85':{'field':'billing_provider'}, + + } _args ['plugin-context'] = {'@ref':CONTEXT_MAP} @@ -78,12 +81,13 @@ class BODY (HEADER): return self.parse(_columns,[9,2,1],**_args) def DMG (self,**_args): """ - Expected Element DMG + Expected Element DMG, these need to be stored in a patient object """ + _columns = ['dob','gender','format'] _info = self.parse(_columns,[2,3,1],**_args) - - return _info + + return {'patient':_info} def DTP (self,**_args): """ Expected Element DTP @@ -135,7 +139,8 @@ class BODY (HEADER): Expected Element HL The expected block is supposed to be unprocessed (to make things simple) """ - _row = _args['row'] if type(_args['row']) == list else _args['row'].split('~') + + # _row = _args['row'] if type(_args['row']) == list else _args['row'].split('~') # _attr = self.elements() #[_name for _name in dir() if not _name.islower() and not _name.startswith('_')] # _pointers = [getattr(self,_name) for _name in _attr] @@ -146,62 +151,9 @@ class BODY (HEADER): # The index here tells us what we are processing i.e index == 1 something about header # _columns = ['_index','parent','code','child'] - _args['row'] = _row[0] + # _args['row'] = _row[0] _info = self.parse (_columns,[1,2,3,4],**_args) # _field = 'billing_provider' if _info['_index'] == '1' else 'patient' # _config ={'field':_field} return _info - # _claim = {_field:_info} - # for _element in _row[1:] : - # _key = _element.split('*')[0] - # if _key in _map and len(_element) > 0: - # _document = _args['document'] - # _pointer = _map[_key] - # if _key not in ['CLM','HI','SV3','SV2','SV1'] : - # _claim = self.merge (_claim,_pointer(row=_element.strip().split('*'),document=_document,config=_config)) - # else: - # _config = _args['config'] if 'config' in _args else {} - # _claim = self.merge (_claim,_pointer(row=_element.strip().split('*'),document=_document,config=_config)) - # else: - # print (['SKIPPING ',_key]) - # pass - - # return _claim - # def apply(self,_block): - # """ - # :_block elements that do not belong to the header block - # """ - # _apply = self.pointers() - - # _header = {} - # if _block : - - # for content in _block : - - # _KEY_ELEMENT = content.split('*')[0] - # if _KEY_ELEMENT not in _apply : - # # - # # @TODO: Log elements that are skipped - # # print ([_KEY_ELEMENT , 'NOT FOUND']) - # continue - - # _info = _apply[_KEY_ELEMENT](row=content,document=_header) - - # if _info : - # if not _header : - # _header = _info - - # else: - - # _header = self.merge(_header,_info) - # else: - # # - # # For some reason the parser failed by returning a null - # # @TODO: Log this event .... - # pass - # else: - # # - # # @TODO: return the meta data for what is expected - # pass - # return _header \ No newline at end of file diff --git a/healthcareio/x12/plugins/default/claims.py b/healthcareio/x12/plugins/default/claims.py new file mode 100644 index 0000000..ddcabea --- /dev/null +++ b/healthcareio/x12/plugins/default/claims.py @@ -0,0 +1,194 @@ +import numpy as np +from .. import parser +from datetime import datetime +@parser(element='NM1',x12='*', anchor={'41':'submitter','40':'receiver','82':'rendering_provider','85':'billing_provider','87':'pay_to_provider','IL':'patient','PR':'payer','QC':'patient','DN':'referring_provider','77':'provider','2':'billing_provider'}, map={1:'type',3:'name',-1:'id'}) +def NM1 (**_args): + """ + Expected Element NM1 + ref IL,40,41,82,85,PR ... + Information about entities (doctors, clearing house, provider). we should be mindful of the references + """ + # _CODE_INDEX = 1 + # CONTEXT_MAP = { + + # '2':{'field':'payer'}, + # 'PR':{'field':'payer'}, + # '41':{'field':'header'}, + # '45':{'field':'ambulance_location'}, + # 'IL':{'field':'patient','map':{'type':2,'first_name':4,'last_name':3}}, + # 'P5':{'field':'plan_sponsor'}, + # '82':{'field':'rendering_provider','map':{'type':2,'first_name':4,'last_name':3}}, + # '85':{'field':'billing_provider'}, + + + + # } + # _args ['plugin-context'] = {'@ref':CONTEXT_MAP} + # # _map = {_CODE_INDEX:{'41':'submitter','40':'receiver','PR':'payer'}} + # _columns = ['type','name','id'] + # _indexes = [1,3,-1] + # # _info = [{'index':'40','field':'receiver'},{'index':'41','field':'submitter'},{'index':'PR','field':'payer'}] + # _pointer = _args['parser'] + # _info = _pointer(_columns,_indexes,**_args) + # self.lastelement = _info + # return _info + pass +@parser(element='N3',x12='837', parent='NM1',map={1:'address_line_1',2:'address_line_2'}) +def N3 (**_args): + """ + Expected Element N3 + """ + pass + # _columns = ['address_line_1'] + # return self.parse(_columns,[1,2],**_args) +@parser(element='N4',x12='*',parent='NM1',map={1:'city',2:'state',3:'zipcode'}) +def N4(**_args): + """ + Expected Element N4 + """ + # _columns = ['city','state','zip'] + # return self.parse(_columns,[1,2,3],**_args) + pass +@parser(element='HI',x12='837', map={1:'type',2:'code'}) +def HI(**_args): + """ + Expected Element HI + This function will parse diagnosis codes ICD 9/10 + """ + # _columns = ['code','type'] + # return self.parse(_columns,[2,1],**_args) + pass +@parser(element='AMT',x12='837',map={2:'patient_amount',1:'patient_amount_qualifier'}) +def AMT (**_args): + """ + Expected Element AMT + """ + # _columns = ['amount','qualifier'] + # return self.parse(_columns,[2,1],**_args) + pass + +@parser(element='SBR',x12='837',field='subscriber',map={9:'vendor',2:'individual_code',1:'type'}) +def SBR (**_args): + """ + Expected Element SBR + """ + # _index = [9,1] + # _columns = ['vendor','individual_code','type'] + # return self.parse(_columns,[9,2,1],**_args) + pass +@parser(element='DMG',x12='837', field='patient',map={2:'dob',3:'gender',1:'format'}) +def DMG (**_args): + """ + Expected Element DMG, these need to be stored in a patient object + """ + _data = _args['data'] + _y = _data['dob'][:4] + _m= _data['dob'][4:6] + _d = _data['dob'][6:].strip() + _data['dob'] = datetime(year=int(_y), month=int(_m),day=int(_d)) + return _data + # _columns = ['dob','gender','format'] + # _info = self.parse(_columns,[2,3,1],**_args) + + # return {'patient':_info} + pass +@parser(element='DTP', x12='837', field='date',map={3:['to','from']}) +def DTP (**_args): + """ + Expected Element DTP + """ + # _columns = ['to','from','type'] + # return self.parse(_columns,[3],**_args) + _data = _args['data'] + _data['to'] = '-'.join([_data['to'][:4],_data['to'][4:6],_data['to'][6:]]) + _data['from'] = '-'.join([_data['from'][:4],_data['from'][4:6],_data['from'][6:]]) + return _data + pass +@parser(element='PER',anchor={'IC':'submitter'},map={2:'contact',4:'phone_number',8:'email'}) +def PER (**_args): + """ + Expected Element PER + """ + # _CODE_INDEX = 1 + # _map = {_CODE_INDEX:{'IC':'submitter'}} # attribute to store the data in + # _columns = ['contact_name','phone','email'] + # _info = self.parse (_columns,[2,4,8],**_args) + # + # @TODO: Inspect the configuration file for the attribute information + # + # return _info + pass +@parser(element='CLM',x12='837',map={1:'claim_id',2:'claim_amount',5:'facility_code',5:'facility_qualifier',5:'frequency_code'}) +def CLM (**_args): + """ + Expected Element CLM + """ + _data = _args['data'] + _data['claim_amount'] = np.float64(_data['claim_amount']) + return _data + # _columns = ['claim_id','claim_amount','facility_code','facility_qualifier','frequency_code'] + # return self.parse(_columns,[1,2,5,5,5],**_args) + pass +# @parser(element='REF', field='ref',map={2:'id'}) +def REF (**_args): + # print (_args) + _columns = ['identifier','qualifier',''] + # _CODE_INDEX = 1 # -- according to x12 standard + # _map = {_CODE_INDEX:{'EA':'patient','EI':'provider','6R':'','D9':''}} + # return self.parse(_columns,[2],**_args) + pass +@parser(element='HI',x12='837',container='diagnosis',map={1:'code',2:'type'}) +def HI (**_args): + """ + Expected Element HI + """ + # _columns = ['code','type'] + # return self.parse(_columns,[1,2],**_args) + _data = _args['data'] + if ':' in _data['code'] : + _data['type'],_data['code'] = _data['code'].split(':') + return _data + +@parser(element=['SV1','SV3'],x12='837',container='procedures',map={1:['type','code'],2:'amount'}) +def SV1 (**_args): + """ + Expected Element SV1 + """ + # _row = _args['row'] if type(_args['row']) == list else _args['row'].split('*') + # _columns = ['type','code','amount','modifier_1','modifier_2','modifier_3','modifier_4','place_of_service','units','measurement'] + # return self.parse(_columns,[1,1,2,1,1,1,1,5,4,3],**_args) + _data = _args['data'] + if 'type' in _data : + _data['type'] = _data['type'].split(':')[0] + _data['code'] = _data['code'].split(':')[1] + _data['amount']= np.float64(_data['amount']) + return _data + pass +@parser (element='HL',x12='837', field='patient', map={1:'_index',2:'parent_code',3:'level_code',4:'child_code'}) +def HL (**_args) : + """, + Expected Element HL + The expected block is supposed to be unprocessed (to make things simple) + """ + pass + + # _data = _args['data'] + # _data['_index'] = int(_data['_index']) + # return _data + # # _row = _args['row'] if type(_args['row']) == list else _args['row'].split('~') + + # # _attr = self.elements() #[_name for _name in dir() if not _name.islower() and not _name.startswith('_')] + # # _pointers = [getattr(_name) for _name in _attr] + # # _map = dict(zip(_attr,_pointers)) + # _map = self.pointers() + + # # + # # The index here tells us what we are processing i.e index == 1 something about header + # # + # _columns = ['_index','parent','code','child'] + # # _args['row'] = _row[0] + + # _info = self.parse (_columns,[1,2,3,4],**_args) + # # _field = 'billing_provider' if _info['_index'] == '1' else 'patient' + # # _config ={'field':_field} + # return _info diff --git a/healthcareio/x12/plugins/default/common.py b/healthcareio/x12/plugins/default/common.py index 125ad87..b596154 100644 --- a/healthcareio/x12/plugins/default/common.py +++ b/healthcareio/x12/plugins/default/common.py @@ -123,6 +123,7 @@ class X12DOCUMENT (Process): _field,_label = self._getObjectAtributes(_config) _columns,_indexes = [],[] if 'map' in _config : + _columns = list(_config['map'].keys()) _indexes = list(_config['map'].values()) @@ -194,16 +195,20 @@ class X12DOCUMENT (Process): """ #_content = content.split('~') _content = content.split('HL') - _header = _content[:1][0].split('~') + xchar = '~\n' if '~\n' in _content[0] else '~' + _header = _content[:1][0].split(xchar) #.split('~') - _blocks = ['HL'+_item for _item in _content[1:]] - _blocks = [_item.split('~') for _item in _blocks ] + _blocks = ['HL*'+_item for _item in _content[1:]] + # xchar = '~\n' if '~\n' in _blocks[0] else '~' + + _blocks = [_item.split(xchar) for _item in _blocks ] # for row in _content : # if not _blocks and not row.startswith('HL') : # _header.append(row) # else: # _blocks.append(row) + return {'header':_header,'blocks':_blocks} def parse (self,columns,index,**_args): """ @@ -215,6 +220,7 @@ class X12DOCUMENT (Process): - config configuration of the element. his should indicate functions to apply against function """ _ELEMENT = _args['row'][0] + # # get the right configuration from the _config object _config = _args['config'][_ELEMENT] if _ELEMENT in _args['config'] else {} @@ -258,16 +264,6 @@ class X12DOCUMENT (Process): _row = _row + np.repeat('',_delta).tolist() _row = np.array(_row) - # _element = _row[0] - - # _configKeys = [] #list(self._config.keys()) - # _configTree = [] #list(self._config.values()) - # if 'config' in _args : - # _config = _args['config'] - # _configKeys = list(_config.keys()) - # _configTree = list(_config.values()) - # else: - # _config = {} _info = dict(zip(_columns,_row[_index].tolist())) _document = _args['document'] if 'document' in _args else {} @@ -296,9 +292,11 @@ class X12DOCUMENT (Process): _item = _document[_label] + [_info] else: _item = _info + if _ELEMENT in self._hierarchy and _field: # print ([_field,_item]) + self.lastelement = _item pass else: @@ -307,9 +305,17 @@ class X12DOCUMENT (Process): _ikey = list(self.lastelement.keys())[0] _oldinfo = self.lastelement[_ikey] - _item = {_ikey: self.merge(_oldinfo,_item)} + if type(_oldinfo) != dict : + # + # This is we should log somewhere to suggest an issue happened + # + # self.log(action='error',input=_row) + pass + else: + _item = {_ikey: self.merge(_oldinfo,_item)} break + pass return _item else: # @@ -358,64 +364,79 @@ class X12DOCUMENT (Process): pass def log (self,**_args): - pass + print(_args) + def parseBlocks (self,_blocks,_header): + """ + This function extracts blocks and returns them to the caller, + Blocks of a document are made of transactional loops, that constitute a patients claim + """ + _tmp = {} + _documents = [] + for _content in _blocks : + + + _body = self.apply(_content,header=_header) + + _doc = self.merge(_header,_body) + # self.log(action='parse',section='body',input=_content[0]) + if _doc and 'claim_id' in _doc: + # X12DOCUMENT._queue.put(_document) + # self.log(action='parse',section='document') + _documents += [self.merge(_tmp,_doc)] + _tmp = {} + else: + # + # The document is being built and not yet ready + _tmp = self.merge(_tmp,_doc) + + return _documents def run(self): """ This function will trigger the workflow associated with a particular file """ - _getContent = { - # - # For the sake of testing, the following insures - # that raw string content is handled as if it were a file - # - X12DOCUMENT.MODE.STREAM: (lambda stream : io.StringIO(stream)) , - X12DOCUMENT.MODE.NAMES: (lambda name: open(name)) - - - } - _writer = transport.factory.instance(**self._store_args) for _filename in self.files : + # self.log(action='parse',section='file',input=_filename) try: _documents = [] _parts = [] - # _content = (open(_filename)).read() - _reader = _getContent[self._mode] - _content = _reader(_filename).read() + + if os.sep in _filename and os.path.exists(_filename) : + _reader = open(_filename) + else: + # + # This is a stream, we are wrapping it into an appropriate structure + # + _reader = io.StringIO(_filename) + + + # + # Let us log the mode we have set ... + + _content = _reader.read() + if hasattr(_reader,'close') : + _reader.close() _info = self.split(_content) _fileType=self.init(_content) _header = self.apply(_info['header']) - - # print (json.dumps(_header)) - _tmp = {} - for _content in _info['blocks'] : + if _info['blocks'] : + # + # processing blocks for the current claim + # + _documents = self.parseBlocks(_info['blocks'],_header) - _body = self.apply(_content,header=_header) - - _doc = self.merge(_header,_body) - - if _doc and 'claim_id' in _doc: - # X12DOCUMENT._queue.put(_document) - - _documents += [self.merge(_tmp,_doc)] - _tmp = {} - else: - # - # The document is being built and not yet ready - _tmp = self.merge(_tmp,_doc) - except Exception as e: # # @TODO: Log this issue for later analysis ... print (e) pass - # - # Let us post this to the documents we have, we should find a place to post it - # + # # + # # Let us post this to the documents we have, we should find a place to post it + # # if _documents : # print (_header['header']) - + _writer = transport.factory.instance(**self._store_args) self.post(document=_documents,writer=_writer) - break + def post(self,**_args): """ @@ -467,7 +488,7 @@ class X12DOCUMENT (Process): segments = line.split('*') _ELEMENT = segments[0] - + if _ELEMENT not in _pointers or not _ELEMENT: continue if _ELEMENT in ['HL','CLM','ISA'] or not _pid: @@ -481,9 +502,12 @@ class X12DOCUMENT (Process): _parsedLine = _pointer(**_args) - # print ([_pid,_ELEMENT,_parsedLine]) - _cached[_pid] = self.merge(_cached[_pid],_parsedLine) + + if _pid in _cached : + _cached[_pid] = self.merge(_cached[_pid],_parsedLine) + else: + _cached[_pid] = _parsedLine # diff --git a/healthcareio/x12/plugins/default/header.py b/healthcareio/x12/plugins/default/header.py index fae7645..1803d8b 100644 --- a/healthcareio/x12/plugins/default/header.py +++ b/healthcareio/x12/plugins/default/header.py @@ -40,3 +40,6 @@ class HEADER (X12DOCUMENT): """ _columns= ['app_id','date','time','type'] return self.parse(_columns,[3,4,5,6],**_args) + +# +# let us perform this \ No newline at end of file diff --git a/healthcareio/x12/plugins/default/remits.py b/healthcareio/x12/plugins/default/remits.py new file mode 100644 index 0000000..b037778 --- /dev/null +++ b/healthcareio/x12/plugins/default/remits.py @@ -0,0 +1,77 @@ +import numpy as np +from .. import parser + +@parser(element='ISA',x12='835',field='header', map={6:'submitter_id',8:'receiver_id',9:'date',10:'time'}) +def ISA(**_args): + """ + :row raw {x12} row + :data parsed data + :meta elements containing map {index:field_name} + """ + # print (_args['data']) + pass + +@parser(element='ST',x12='835', field='ISA', map={1:'x12',2:'control_number'}) +def ST(**_args): + """ + :row raw {x12} row + :data parsed data + :meta elements containing map {index:field_name} + """ + + pass + + +@parser (element='BPR',x12='835',map={2:'transaction_amount',3:'transaction_type',4:'method',6:'depository'}) +def BPR (**_args): + pass +@parser(element='CLP',x12='835', + map={1:'claim_id',2:'status',3:'charge_amount',4:'payment_amount',5:'patient_amount',7:'claim_control_number',8:'bill_type',10:'patient_status',11:'drg_code'}) +def CLP (**_args): + _data = _args['data'] + for _id in ['charge_amount','payment_amount','patient_amount']: + _data[_id] = np.float64(_data[_id]) + return _data + pass +@parser (element='PER',x12='835',field="billing_provider",map={2:'name',4:'phone_number'}) +def PER (**_args): + pass +@parser(element='N1',x12='835',anchor={'PE':'billing_provider','PR':'payer'},map={2:'address_line_1',4:'id'}) +def N1(**_args): + pass +@parser(element='DTM',x12='835',container='dates',map={1:'type',2:'date'}) +def DTM(**_args): + pass +@parser(element='PLB',x12='835',container='provider', map={1:'id',2:'adjustment_fiscal_year',-1:'adjustment_amount'}) +def PLB(**_args): + pass +@parser(element='CAS',x12='835',container='adjustments',map={2:'reason',3:'amount',4:'quantity'}) +def CAS(**_args): + pass +@parser(element='SVC',x12='835',container='procedures',map={1:['code','type'],2:'charge_amount',3:'paid_amount',7:'submitted_units',4:'revenue_code',5:'paid_units_of_service'}) +def SVC (**_args): + _data = _args['data'] + _data['type'] = _data['type'].split('|')[0] + _data['code'] = _data['code'].split('|')[1] + _data['charge_amount'] = np.float64(_data['charge_amount']) + _data['paid_amount'] = np.float64(_data['paid_amount']) + return _data + pass +@parser(element='N1',x12='835',anchor={'PR':'provider'},map={1:'name'}) +def N1(**_args): + pass +@parser(element='N3',x12='835',parent='N1',map={1:'address_line_1'}) +def N3(**_args): + pass +@parser(element='N4',x12='835',parent='N1',map={1:'city',2:'state',3:'zipcode'}) +def N4(**_args): + pass + +@parser (element='AMT',x12='835',container='amounts', map={2:'amount',1:'type'}) +def AMT (**_args): + _data = _args['data'] + _map = {'B6':'AMOUNT_ALLOWED','AU':'AMOUNT_COVERED','F5':'PATIENT_PAID'} + if _data['type'] in _map : + _data['type'] = _map[_data['type']] + _data['amount'] = np.float64(_data['amount']) + return _data \ No newline at end of file diff --git a/healthcareio/x12/util/__init__.py b/healthcareio/x12/util/__init__.py new file mode 100644 index 0000000..823d300 --- /dev/null +++ b/healthcareio/x12/util/__init__.py @@ -0,0 +1,156 @@ +""" +This package contains tools used across the various modules, these tools actually "do the work" +We intend to have these tools be Object-Oriented by design so as to not run into any concurrency issues +""" +from . import file, document, common +from healthcareio import x12 +from multiprocessing import Process +# class X12Engine(Process): +# def __init__(self,**_args): +# """ +# :files group of files to be processed +# """ +# self.files = _args['files'] +# self._cHandler = file.Content() +# self._dHandler = document.Builder(plugins=_args['plugins'],parents=_args['plugins']) +# def run(self): +# """ +# This function performs parsing given +# """ +# for _location in self.files : +# _content = self._cHandler.read(_location) +# _content = self._cHandler.split(_content) + +# pass +def merge (_x,_y): + """ + This function will merge two objects _x, _y + """ + _zcols = list(set(_x.keys()) & set(_y.keys())) #--common columns + + if _zcols : + _out = dict(_x,**{}) + for _key in list(_y.keys()) : + + + if _key not in _zcols and _key: + _out[_key] = _y[_key] + else: + if type(_out[_key]) == list : + for value in _y[_key] : + if value not in _out[_key] : + _out[_key].append(value) + # _out[_key] += _y[_key] + elif type(_out[_key]) == dict: + _out[_key] = dict(_out[_key],**_y[_key]) + else: + _out[_key] = _y[_key] + + return _out + else: + + return dict(_x,**_y) +def template(**_args) : + """ + This function generates an object template to be used in object assignment and export functionalities + We chose to proceed in this manner so as to enforce consistency of the parser + :plugins {*,837,835} with element and pointers associated + """ + _plugins = _args['plugins'] + _object = {'837':{},'835':{}} + for _x12 in _plugins : + _pointers = _plugins[_x12] + for _element in _pointers : + _meta = _pointers[_element].meta + _values = _meta['map'].values() if 'map' in _meta else _meta['columns'] + # + # where do the attributes go .. + # + _attr = [] + for _item in list(_values) : + if type(_item) == list : + _attr = _attr + _item + else: + _attr.append(_item) + _field = [] + if 'field' in _meta or 'container' in _meta : + _field = _meta['field'] if 'field' in _meta else _meta['container'] + + if 'anchor' in _meta : #-- No parents are expected + _field = _meta['anchor'].values() + + elif _meta['parent'] : + # + # It means the attributes will be + _parentPlug = x12.plugins.filter(elements=[_meta['parent']],plugins=_plugins) + _pid = list(_parentPlug.keys())[0] + _parentMeta = _parentPlug[_pid][_meta['parent']].meta + + _attr = _attr + list(_parentMeta['map'].values()) if 'map' in _parentMeta else _parentMeta['columns'] + if 'anchor' in _parentMeta : + _field = list(_parentMeta['anchor'].values()) + _field = [_field] if type(_field) == str else _field + _attr = dict.fromkeys(_attr,'') + if not _field : + _info = (_attr) + else: + _info = (dict.fromkeys(_field,_attr)) + if _x12 == '*' : + + _object['837']= merge(_object['837'], _info) + _object['835']= merge (_object['835'], _info) + else: + _object[_x12] = merge(_object[_x12],_info) + return _object +# def template(**_args) : +# """ +# This function generates an object template to be used in object assignment and export functionalities +# We chose to proceed in this manner so as to enforce consistency of the parser +# :plugins {*,837,835} with element and pointers associated +# """ +# _plugins = _args['plugins'] +# _object = {'837':{},'835':{}} +# for _x12 in _plugins : +# _pointers = _plugins[_x12] +# for _element in _pointers : +# _meta = _pointers[_element].meta +# _values = _meta['map'].values() if 'map' in _meta else _meta['columns'] +# # +# # where do the attributes go .. +# # +# _attr = [] +# for _item in list(_values) : +# if type(_item) == list : +# _attr = _attr + _item +# else: +# _attr.append(_item) +# _field = [] +# if 'field' in _meta or 'container' in _meta : +# _field = _meta['field'] if 'field' in _meta else _meta['container'] + +# if 'anchor' in _meta : #-- No parents are expected +# _field = _meta['anchor'].values() + +# elif _meta['parent'] : +# # +# # It means the attributes will be +# _parentPlug = filter(elements=[_meta['parent']],plugins=_plugins) +# _pid = list(_parentPlug.keys())[0] +# _parentMeta = _parentPlug[_pid][_meta['parent']].meta + +# _attr = _attr + list(_parentMeta['map'].values()) if 'map' in _parentMeta else _parentMeta['columns'] +# if 'anchor' in _parentMeta : +# _field = list(_parentMeta['anchor'].values()) +# _field = [_field] if type(_field) == str else _field +# _attr = dict.fromkeys(_attr,'') +# if not _field : +# _info = (_attr) +# else: +# _info = (dict.fromkeys(_field,_attr)) +# if _x12 == '*' : + +# _object['837']= merge(_object['837'], _info) +# _object['835']= merge (_object['835'], _info) +# else: +# _object[_x12] = merge(_object[_x12],_info) +# return _object diff --git a/healthcareio/x12/util/common.py b/healthcareio/x12/util/common.py new file mode 100644 index 0000000..5ce48dc --- /dev/null +++ b/healthcareio/x12/util/common.py @@ -0,0 +1,32 @@ +# class Common : +# def parent(self,**_args): +# """ +# This function returns the "parent" pointer associated with a given element +# :meta meta data of a decorated/annotated function +# """ +# _meta = _args['meta'] +# _item = None + +# if 'parent' in _meta : #hasattr(_meta,'parent'): +# _hasField = 'field' in _meta +# _hasParent= _meta['element'] in self._parents +# if _hasField and _hasParent: #_meta.element in self._parents and hasattr(_meta,'field'): + +# self._last = _item +# pass +# else: +# for key in self._parents : +# if _meta.element in self._parents[key] : + +# _ikey = list(self._last.keys())[0] +# _oldinfo = self._last[_ikey] +# if type(_oldinfo) != dict : +# # +# # Only applicable against a dictionary not a list (sorry) +# pass +# else: +# _item = {_ikey: self.merge(_oldinfo,_item)} + +# break +# pass +# return _item diff --git a/healthcareio/x12/util/document.py b/healthcareio/x12/util/document.py new file mode 100644 index 0000000..cd8520f --- /dev/null +++ b/healthcareio/x12/util/document.py @@ -0,0 +1,272 @@ +""" +This file encapsulates the functions needed to build a document +""" +import numpy as np +import copy +class Builder: + __doc__ = """ + This class is intended to create and manipulate objects + :merge The class merges two objects and accounts for attributes that are lists + :parent returns the parent for a given object + """ + def __init__(self,**_args): + self._last = {} + + self._plugins = copy.deepcopy(_args['plugins']) + self._parents = copy.deepcopy(_args['parents']) + self._loop = {} + + + def reset (self): + self._last = {} + self._loop = {} + def parent(self,**_args): + """ + This function returns the parent item of an object + :meta meta data of a decorated/annotated function + """ + _meta = _args['meta'] + # _item = None + if _meta['parent'] : + _id = _meta['parent'] + if _id : + return self._last[_id] if _id in self._last else None + return None + + # if _id in self._parents : + # self._last[_id] = + + # if 'parent' in _meta : #hasattr(_meta,'parent'): + # _hasField = 'field' in _meta + # _hasParent= _meta['element'] in self._parents + # if _hasField and _hasParent: #_meta.element in self._parents and hasattr(_meta,'field'): + + # self._last = _item + # pass + # else: + # for key in self._parents : + # if _meta['element'] in self._parents[key] : + + # _ikey = list(self._last.keys())[0] + # _oldinfo = self._last[_ikey] + # if type(_oldinfo) != dict : + # # + # # Only applicable against a dictionary not a list (sorry) + # pass + # else: + # _item = {_ikey: self.merge(_oldinfo,_item)} + + # break + # pass + + # return _item + def count(self,_element): + if _element not in self._loop : + self._loop[_element] = 0 + self._loop[_element] += 1 + def pointer(self,**_args): + """ + This function returns a pointer associated with a row element + @TODO: Make sure we know what kind of file we are processing (it would help suppress the loop) + """ + _id = _args['row'][0] if 'row' in _args else _args['element'] + _filetype = _args['x12'] + _pointer = None + + if _id in self._plugins[_filetype] : + _pointer = self._plugins[_filetype][_id] + else: + for _x12 in self._plugins : + if _id in self._plugins[_x12] : + _pointer = self._plugins[_x12][_id] + break + + + return _pointer + def field(self,**_args) : + _row = _args['row'] + _meta= _args['meta'] + _field = None + if _meta['parent'] : + _field = self.parent(meta=_meta)['field'] + if 'field' in _meta or 'container' in _meta : + _field = _meta['field'] if 'field' in _meta else _meta['container'] + + if 'anchor' in _meta : + _anchor = _meta['anchor'] + + for key in _anchor : + + if key == _row[1].strip() : + _field = _anchor[key] + break + + return _field + def merge (self,_x,_y): + """ + This function will merge two objects _x, _y + """ + _zcols = list(set(_x.keys()) & set(_y.keys())) #--common columns + + if _zcols : + _out = dict(_x,**{}) + for _key in list(_y.keys()) : + + + if _key not in _zcols and _key: + _out[_key] = _y[_key] + else: + if type(_out[_key]) == list : + for value in _y[_key] : + if value not in _out[_key] : + _out[_key].append(value) + # _out[_key] += _y[_key] + elif type(_out[_key]) == dict: + _out[_key] = dict(_out[_key],**_y[_key]) + else: + _out[_key] = _y[_key] + + return _out + else: + + return dict(_x,**_y) + def parse (self,**_args): + """ + This function will perform parsing on behalf of the plugin by relying on map function + :row raw x12 row + :meta meta data of the plugin function + """ + #-- Loop Markers + + _row = _args['row'] + _map = _args['meta']['map'] + # _map = self.pointer(row=_row).meta['map'] + + _index = list(_map.keys()) + + _columns = [] #[_map[_id] for _id in _index ] + for _id in _index : + _name = _map[_id] + if type(_name) == list : + _columns += _name + _i = _index.index(_id) + _index = (_index[:_i] + np.repeat(_index[_i], len(_name)).tolist()+_index[_i+1:]) + else: + _columns.append(_name) + _info = {} + _index = np.array(_index).astype(int) + # _document = _args['document'] + + if np.max(_index) > len(_row) -1 : + _delta = 1 + np.max(_index) - len(_row) + _row = _row + np.repeat('',_delta).tolist() + _row = np.array(_row) + try: + _info = dict(zip(_columns,_row[_index].tolist())) + except Exception as e: + # print (_row) + # print ( e) + pass + + return _info + def meta (self,**_args): + _row = _args['row'] + _id = _row[0] + _meta = None + for key in self._plugins : + _items = self._plugins[key] + if _id in _items : + _meta = (_items[_id].meta) + break + return _meta + def update(self,**_args): + _element = _args['row'][0] + if _element in self._parents : + _meta = self.meta(row=_args['row']) + if 'field' not in _meta : + _field = self.field(row=_args['row'],meta=_meta) + else: + _field = _meta['field'] + self._last[_element] = {'data':_args['data'],'field':_field} + def bind(self,**_args): + """ + This function is intended to make an object out of an element + :row raw row of x12 + :document object that is the document + """ + _row = _args['row'] + _filetype = _args['x12'] + _id = _row[0] + self.count(_id) + + _pointer = self.pointer(row=_row,x12=_filetype) + + + _parent = None + _data = {} + # _document = _args['document'] + if not _pointer : + return None,None + # + # Should we use the built-in parser or not + if _pointer and 'map' in _pointer.meta : + _data = self.parse(row=_row,meta=_pointer.meta) + # + # This function will be used as formatter (at least) + # We will also insure that the current element is not the last one + _out = _pointer(row=_row,data=_data, meta=_pointer.meta) + _data = _data if _out is None else _out + self.update(row = _row, data=_data) #-- If this element is considered a parent, we store it + return _data, _pointer.meta + + + def build (self,**_args): + """ + This function attemps to place a piece of data within a document + """ + + _meta = _args['meta'] + _data = _args['data'] + _row = _args['row'] + + _document = _args['document'] + + # if _meta['parent'] : + # _field = self.parent(meta=_meta)['field'] + # elif 'field' in _meta : + # _field = _meta['field'] + # elif 'container' in _meta : + # _field = _meta['container'] + + # if type(_document[_field]) != list : + # _data = self.merge(_document[_field],_data) + # _document[_field] = [] + + # elif 'anchor' in _meta: + # _field = self.field(row=_row,meta=_meta) + + + # else: + # _field = None + _field = self.field(meta=_meta,row=_row) + if _field : + if 'container' in _meta and type(_document[_field]) != list : + _document[_field] = [] + if _field and _document: + + if _field not in _document : + _document[_field] =_data + else: + if 'container' in _meta : + _document[_field].append(_data) + else: + _document[_field] = self.merge(_document[_field],_data) + else: + if not _field and 'anchor' in _meta : + # + # This is an unusual situation ... + pass + _document = self.merge(_document,_data) + return _document + + diff --git a/healthcareio/x12/util/file.py b/healthcareio/x12/util/file.py new file mode 100644 index 0000000..47854c9 --- /dev/null +++ b/healthcareio/x12/util/file.py @@ -0,0 +1,172 @@ +import os +import numpy as np +from io import StringIO +# from .common import Common +class Content : + """ + This class implements functions that will manipulate content of a file + :split splits the content + :read reads the content of a file given a filename + :parse parses the content of a file given a map {index:field_name} + """ + def __init__(self,**_args): + self._parents = {} + self._lastelement = {} + + def split(self,_content): + if type(_content) == str : + _xchar = '~\n' if '~\n' in _content else ('~' if '~' in _content else ('\n' if '\n' in _content else None)) + _x12 = '837' if 'CLM*' in _content else ('835' if 'CLP*' in _content else None) + _map = {'835':'CLP','837':'CLM'} + _claim_mark = _map[_x12] + _content = _content.split(_claim_mark) + _xchar = ''.join(_xchar) + _chunks = [] + + + for _block in _content : + + if len(_chunks) > 0 : + _block = _claim_mark+ _block + _splitblocks = [row.strip().split('*') for row in _block.split(_xchar) if row.strip()] + _chunks.append(_splitblocks) + return _chunks,_x12 + # if _xchar : + # _xchar = ''.join(_xchar) + # _rows = _content.split(_xchar) + + # return [row.strip().split('*') for row in _rows if row.strip()] + # else: + # return _content.split('*') + return [],None + def read(self,**_args): + """ + This function will read and clean-up the content of a file + """ + + _filename = _args['filename'] + if type(_filename) == StringIO : + return _filename.read() + else: + f = open(_filename) + _content = f.read() + f.close() + return _content + + def _ix_parse (self,columns,index,**_args): + """ + This function encapulates how an x12 document element will be processed + :columns list of attributes that make up the object + :index indexes of the said items in the element + :_args + - row raw x12 element (string) + - pointer decorated function + - document + + """ + + _ELEMENT = _args['row'][0] + _pointer = _args['pointer'] + _document = _args['document'] + if 'map' in _pointer.meta : + _map = _pointer.meta['map'] + _index = list(_map.keys()) + _columns = [_map[_id] for _id in _index ] + _info = {} + _row = _args['row'] if type(_args['row']) == list else _args['row'].split('*') + _index = np.array(_index) + + # + # Sometimes the _row doesn't have all expected indexes, we will compensate + # This allows to minimize parsing errors as it may relate to disconnects between configuration and x12 element variations (shitty format) + # + if np.max(_index) > len(_row) -1 : + _delta = 1 + np.max(_index) - len(_row) + _row = _row + np.repeat('',_delta).tolist() + _row = np.array(_row) + _info = dict(zip(_columns,_row[_index].tolist())) + else: + # + # We should call the function that is intended to perform the parsing + # + _info = _pointer(row=_args['row'],document=_document,meta=_pointer.meta) + # + # @TODO: We should look into the object created and enforce the specifications are met + # + return _info + + # def consolidate(self,**_args): + # """ + # This function takes an object and addit to the document given meta data + # :document document associated associated with a claim (processing the loops) + # :object + # :caller attributes within the decorator + # """ + # _document = _args['document'] if 'document' in _args else {} + # _info = _args['object'] + # _meta = _args['meta'] + # # + # # @TODO: + # # Apply parsing/casting function to the object retrieved + # # _apply(_info) #-- the object will be processed accordingly + # # + + # # + # # @TODO: + # # The objects parsed must be augmented against the appropriate ones e.g: NM1 <- N1,N2,N3,N4 + # # - Find a way to drive this from a configuration ... + # # + # if 'field' in _meta : #hasattr(_meta,'field') : + # _field = _meta['field'] + # if not _field in _document : + # _item = {_field:_info} + # else: + # _item = self.merge(_document[_field],_info) + # elif 'container' in _meta: # hasattr(_meta,'container') : + # _label = _meta.container + # if not _label in _document : + # _item = {_label:[_info]} + # else: + # _item = _document[_label] + [_info] + # else: + # _item = _info + + # if 'parent' in _meta : #hasattr(_meta,'parent'): + # _hasField = 'field' in _meta + # _hasParent= _meta['element'] in self._parents + # if _hasField and _hasParent: #_meta.element in self._parents and hasattr(_meta,'field'): + + # self_last = _item + # pass + # else: + # for key in self._parents : + # if _meta.element in self._parents[key] : + + # _ikey = list(self_last.keys())[0] + # _oldinfo = self_last[_ikey] + # if type(_oldinfo) != dict : + # # + # # Only applicable against a dictionary not a list (sorry) + # pass + # else: + # _item = {_ikey: self.merge(_oldinfo,_item)} + + # break + # pass + # return _item +class Location : + @staticmethod + def get(**_args): + _path = _args['path'] + files = [] + + if os.path.isdir(_path): + + for root,_dir,f in os.walk(_path) : + if f : + files += [os.sep.join([root,name]) for name in f] + files = [path for path in files if os.path.isfile(path)] + else: + files = [_path] + _chunks = 0 if 'chunks' not in _args else int(_args['chunks']) + return files if not _chunks else np.array_split(files,_chunks) \ No newline at end of file