""" (c) 2019 Healthcare/IO 1.0 Vanderbilt University Medical Center, Health Information Privacy Laboratory https://hiplab.mc.vanderbilt.edu/healthcareio Authors: Khanhly Nguyen, Steve L. Nyemba License: MIT, terms are available at https://opensource.org/licenses/MIT This parser was originally written by Khanhly Nguyen for her internship and is intended to parse x12 835,837 and others provided the appropriate configuration USAGE : - COMMAND LINE - EMBEDDED """ import hashlib import json import os import sys from itertools import islice from multiprocessing import Process import transport import jsonmerge import copy class void : pass class Formatters : def __init__(self): # self.config = config self.get = void() self.get.config = self.get_config self.parse = void() self.parse.sv3 = self.sv3 self.parse.sv2 = self.sv2 self.sv2_parser = self.sv2 self.sv3_parser = self.sv3 self.sv3_parse = self.sv3 self.format_proc = self.procedure self.format_diag = self.diagnosis self.parse.procedure = self.procedure self.parse.diagnosis = self.diagnosis self.parse.date = self.date self.format_date = self.date self.format_pos = self.pos self.format_time = self.time def split(self,row,sep='*',prefix='HI') : """ This function is designed to split an x12 row and """ value = [] if row.startswith(prefix) is False: for row_value in row.replace('~','').split(sep) : if '>' in row_value and not row_value.startswith('HC'): # if row_value.startswith('HC') or row_value.startswith('AD'): if row_value.startswith('AD'): value += row_value.split('>')[:2] pass else: value += [row_value] # value += row_value.split('>') if row.startswith('CLM') is False else [row_value] else : value.append(row_value.replace('\n','')) value = [xchar.replace('\r','') for xchar in value] #row.replace('~','').split(sep) else: value = [ [prefix]+ self.split(item,'>') for item in row.replace('~','').split(sep)[1:] ] return value if type(value) == list and type(value[0]) != list else value[0] def get_config(self,config,row): """ This function will return the meaningfull parts of the configuration for a given item """ _row = list(row) if type(row[0]) == str else list(row[0]) _info = config[_row[0]] if _row[0] in config else {} _rinfo = {} key = None if '@ref' in _info: keys = list(set(_row) & set(_info['@ref'].keys())) if keys : _rinfo = {} for key in keys : _rinfo = jsonmerge.merge(_rinfo,_info['@ref'][key]) return _rinfo # key = key[0] # return _info['@ref'][key] else: return {} if not _info and 'SIMILAR' in config: # # Let's look for the nearest key using the edit distance if _row[0] in config['SIMILAR'] : key = config['SIMILAR'][_row[0]] _info = config[key] return _info def hash(self,value): salt = os.environ['HEALTHCAREIO_SALT'] if 'HEALTHCAREIO_SALT' in os.environ else '' _value = str(value)+ salt if sys.version_info[0] > 2 : return hashlib.md5(_value.encode('utf-8')).hexdigest() else: return hashlib.md5(_value).hexdigest() def suppress (self,value): return 'N/A' def date(self,value): if len(value) > 8 or '-' in value: value = value.split('-')[0] if len(value) == 8 : year = value[:4] month = value[4:6] day = value[6:] return "-".join([year,month,day])[:10] #{"year":year,"month":month,"day":day} elif len(value) == 6 : year = '20' + value[:2] month = value[2:4] day = value[4:] # # We have a date formatting issue return "-".join([year,month,day]) def time(self,value): pass def sv3(self,value): if '>' in value [1]: terms = value[1].split('>') return {'type':terms[0],'code':terms[1],"amount":float(value[2])} else: return {"code":value[2],"type":value[1],"amount":float(value[3])} def sv2(self,value): # # @TODO: Sometimes there's a suffix (need to inventory all the variations) # if '>' in value or ':' in value: xchar = '>' if '>' in value else ':' _values = value.split(xchar) modifier = {} if len(_values) > 2 : modifier= {"code":_values[2]} if len(_values) > 3 : modifier['type'] = _values[3] _value = {"code":_values[1],"type":_values[0]} if modifier : _value['modifier'] = modifier return _value else: return value def procedure(self,value): for xchar in [':','<','|','>'] : if xchar in value and len(value.split(xchar)) > 1 : #_value = {"type":value.split(':')[0].strip(),"code":value.split(':')[1].strip()} _value = {"type":value.split(xchar)[0].strip(),"code":value.split(xchar)[1].strip()} if len(value.split(xchar)) >2 : index = 1; for modifier in value.split(xchar)[2:] : _value['modifier_'+str(index)] = modifier index += 1 break else: _value = str(value) return _value def diagnosis(self,value): return [ {"code":item[2], "type":item[1]} for item in value if len(item) > 1] def pos(self,value): """ formatting place of service information within a segment (REF) @TODO: In order to accomodate the other elements they need to be specified in the configuration Otherwise it causes problems on export """ xchar = '>' if '>' in value else ':' x = value.split(xchar) x = {"code":x[0],"indicator":x[1],"frequency":x[2]} if len(x) == 3 else {"code":x[0],"indicator":None,"frequency":None} return x['code'] class Parser (Process): def __init__(self,path): """ :path path of the configuration file (it can be absolute) """ Process.__init__(self) self.utils = Formatters() self.get = void() self.get.value = self.get_map self.get.default_value = self.get_default_value _config = json.loads(open(path).read()) self._custom_config = self.get_custom(path) self.config = _config['parser'] self.store = _config['store'] self.cache = {} self.files = [] self.set = void() self.set.files = self.set_files self.emit = void() self.emit.pre = None self.emit.post = None def get_custom(self,path) : """ :path path of the configuration file (it can be absolute) """ # # _path = path.replace('config.json','') if _path.endswith(os.sep) : _path = _path[:-1] _config = {} _path = os.sep.join([_path,'custom']) if os.path.exists(_path) : files = os.listdir(_path) if files : fullname = os.sep.join([_path,files[0]]) _config = json.loads ( (open(fullname)).read() ) return _config def set_files(self,files): self.files = files def get_map(self,row,config,version=None): # label = config['label'] if 'label' in config else None handler = Formatters() if 'map' not in config and hasattr(handler,config['apply']): pointer = getattr(handler,config['apply']) object_value = pointer(row) return object_value # # Pull the goto configuration that skips rows # omap = config['map'] if not version or version not in config else config[version] anchors = config['anchors'] if 'anchors' in config else [] rewrite = config['rewrite'] if 'rewrite' in config else {} if type(row[0]) == str: object_value = {} for key in omap : index = omap[key] if anchors and set(anchors) & set(row): _key = list(set(anchors) & set(row))[0] aindex = row.index(_key) index = aindex + index if index < len(row) : value = row[index] if 'cast' in config and key in config['cast'] and value.strip() != '' : if config['cast'][key] in ['float','int'] : value = eval(config['cast'][key])(value) elif hasattr(handler,config['cast'][key]): pointer = getattr(handler,config['cast'][key]) value = pointer(value) else: print ("Missing Pointer ",key,config['cast']) if type(value) == dict : for objkey in value : if type(value[objkey]) == dict : continue if 'syn' in config and value[objkey] in config['syn'] : # value[objkey] = config['syn'][ value[objkey]] pass if key in rewrite : _key = rewrite[key] if _key in value : value = value[_key] else: value = "" value = {key:value} if key not in value else value else: if 'syn' in config and value in config['syn'] : # value = config['syn'][value] pass if type(value) == dict : # object_value = dict(object_value, **value) object_value = jsonmerge.merge(object_value, value) else: object_value[key] = value else: # # we are dealing with a complex object object_value = [] for row_item in row : value = self.get.value(row_item,config,version) object_value.append(value) # # We need to add the index of the object it matters in determining the claim types # # object_value.append( list(get_map(row_item,config,version))) # object_value = {label:object_value} return object_value def set_cache(self,tmp,_info) : """ insert into cache a value that the, these are in reference to a loop """ if 'cache' in _info : key = _info['cache']['key'] value=_info['cache']['value'] field = _info['cache']['field'] if value in tmp : self.cache [key] = {field:tmp[value]} pass def get_cache(self,row) : """ retrieve cache element for a current """ key = row[0] return self.cache[key] if key in self.cache else {} def apply(self,content,_code) : """ :content content of a file i.e a segment with the envelope :_code 837 or 835 (helps get the appropriate configuration) """ util = Formatters() # header = default_value.copy() value = {} for row in content[:] : row = util.split(row.replace('\n','').replace('~','')) _info = util.get.config(self.config[_code][0],row) if self._custom_config and _code in self._custom_config: _cinfo = util.get.config(self._custom_config[_code],row) else: _cinfo = {} if _info or _cinfo: try: _info = jsonmerge.merge(_info,_cinfo) tmp = self.get.value(row,_info) if not tmp : continue # # At this point we have the configuration and the row parsed into values # We should check to see if we don't have anything in the cache to be added to it # if row[0] in self.cache : tmp = jsonmerge.merge(tmp,self.get_cache(row)) if 'label' in _info : label = _info['label'] if type(tmp) == list : value[label] = tmp if label not in value else value[label] + tmp else: # if 'DTM' in row : # print ([label,tmp,label in value]) if label not in value : value[label] = [] value[label].append(tmp) # if label not in value: # value[label] = [tmp] # else: # value[label].append(tmp) if '_index' not in tmp : # # In case we asked it to be overriden, then this will not apply # X12 occasionally requires references to other elements in a loop (alas) # tmp['_index'] = len(value[label]) -1 elif 'field' in _info : name = _info['field'] # value[name] = tmp # value = jsonmerge.merge(value,{name:tmp}) value = dict(value,**{name:tmp}) else: value = dict(value,**tmp) pass except Exception as e : print (e.args[0]) # print ('__',(dir(e.args))) pass # # At this point the object is completely built, # if there ar any attributes to be cached it will be done here # if 'cache' in _info : self.set_cache(tmp,_info) return value if value else {} def get_default_value(self,content,_code): util = Formatters() TOP_ROW = content[1].split('*') SUBMITTED_DATE = util.parse.date(TOP_ROW[4]) CATEGORY= content[2].split('*')[1].strip() VERSION = content[1].split('*')[-1].replace('~','').replace('\n','') SENDER_ID = TOP_ROW[2] row = util.split(content[3]) _info = util.get_config(self.config[_code][0],row) value = self.get.value(row,_info,VERSION) if _info else {} value['category'] = {"setid": _code,"version":'X'+VERSION.split('X')[1],"id":VERSION.split('X')[0].strip()} value["submitted"] = SUBMITTED_DATE value['sender_id'] = SENDER_ID value = dict(value,**self.apply(content,_code)) # Let's parse this for default values return value #jsonmerge.merge(value,self.apply(content,_code)) def read(self,filename) : """ :formerly get_content This function returns the of the EDI file parsed given the configuration specified. it is capable of identifying a file given the content :section loop prefix (HL, CLP) :config configuration with formatting rules, labels ... :filename location of the file """ # section = section if section else config['SECTION'] logs = [] claims = [] try: self.cache = {} file = open(filename.strip()) file = file.read().split('CLP') _code = '835' section = 'CLP' if len(file) == 1 : file = file[0].split('CLM') _code = '837' section = 'HL' INITIAL_ROWS = file[0].split(section)[0].split('\n') if len(INITIAL_ROWS) == 1 : INITIAL_ROWS = INITIAL_ROWS[0].split('~') # for item in file[1:] : # item = item.replace('~','\n') # print (INITIAL_ROWS) DEFAULT_VALUE = self.get.default_value(INITIAL_ROWS,_code) DEFAULT_VALUE['name'] = filename.strip() file = section.join(file).split('\n') if len(file) == 1: file = file[0].split('~') # # In the initial rows, there's redundant information (so much for x12 standard) # index 1 identifies file type i.e CLM for claim and CLP for remittance segment = [] index = 0; _toprows = [] _default = None for row in file : row = row.replace('\r','') # if not segment and not row.startswith(section): # _toprows += [row] if row.startswith(section) and not segment: segment = [row] continue elif segment and not row.startswith(section): segment.append(row) if len(segment) > 1 and row.startswith(section): # # process the segment somewhere (create a thread maybe?) # _claim = self.apply(segment,_code) if _claim : _claim['index'] = index #len(claims) # claims.append(dict(DEFAULT_VALUE,**_claim)) # # schema = [ {key:{"mergeStrategy":"append" if list( type(_claim[key])) else "overwrite"}} for key in _claim.keys()] # if type(_claim[key]) == list] # _schema = set(DEFAULT_VALUE.keys()) - schema # if schema : # schema = {"properties":dict.fromkeys(schema,{"mergeStrategy":"append"})} # else: # schema = {"properties":{}} # schema = jsonmerge.merge(schema['properties'],dict.fromkeys(_schema,{"mergeStrategy":"overwrite"})) schema = {"properties":{}} for attr in _claim.keys() : schema['properties'][attr] = {"mergeStrategy": "append" if type(_claim[attr]) == list else "overwrite" } merger = jsonmerge.Merger(schema) _baseclaim = None _baseclaim = merger.merge(_baseclaim,copy.deepcopy(DEFAULT_VALUE)) _claim = merger.merge(_baseclaim,_claim) # _claim = merger.merge(DEFAULT_VALUE.copy(),_claim) claims.append( _claim) segment = [row] index += 1 pass # # Handling the last claim found if segment and segment[0].startswith(section) : # default_claim = dict({"name":index},**DEFAULT_VALUE) claim = self.apply(segment,_code) if claim : claim['index'] = len(claims) # schema = [key for key in claim.keys() if type(claim[key]) == list] # if schema : # schema = {"properties":dict.fromkeys(schema,{"mergeStrategy":"append"})} # else: # print (claim.keys()) # schema = {} # # @TODO: Fix merger related to schema (drops certain fields ... NOT cool) # merger = jsonmerge.Merger(schema) # top_row_claim = self.apply(_toprows,_code) # claim = merger.merge(claim,self.apply(_toprows,_code)) # claims.append(dict(DEFAULT_VALUE,**claim)) schema = {"properties":{}} for attr in claim.keys() : schema['properties'][attr] = {"mergeStrategy": "append" if type(claim[attr]) == list else "overwrite" } merger = jsonmerge.Merger(schema) _baseclaim = None _baseclaim = merger.merge(_baseclaim,copy.deepcopy(DEFAULT_VALUE)) claim = merger.merge(_baseclaim,claim) claims.append(claim) # claims.append(merger.merge(DEFAULT_VALUE.copy(),claim)) if type(file) != list : file.close() # x12_file = open(filename.strip(),errors='ignore').read().split('\n') except Exception as e: logs.append ({"parse":_code,"completed":False,"name":filename,"msg":e.args[0]}) return [],logs,None rate = 0 if len(claims) == 0 else (1 + index)/len(claims) logs.append ({"parse":"claims" if _code == '837' else 'remits',"completed":True,"name":filename,"rate":rate}) # self.finish(claims,logs,_code) return claims,logs,_code def run(self): if self.emit.pre : self.emit.pre() for filename in self.files : content,logs,_code = self.read(filename) self.finish(content,logs,_code) def finish(self,content,logs,_code) : args = self.store _args = json.loads(json.dumps(self.store)) if args['type'] == 'mongo.MongoWriter' : args['args']['doc'] = 'claims' if _code == '837' else 'remits' _args['args']['doc'] = 'logs' else: args['args']['table'] = 'claims' if _code == '837' else 'remits' _args['args']['table'] = 'logs' if content : writer = transport.factory.instance(**args) writer.write(content) writer.close() if logs : logger = transport.factory.instance(**_args) logger.write(logs) logger.close() if self.emit.post : self.emit.post(content,logs)