""" (c) 2019 EDI-Parser 1.0 Vanderbilt University Medical Center, Health Information Privacy Laboratory https://hiplab.mc.vanderbilt.edu/tools Authors: Khanhly Nguyen, Steve L. Nyemba License: MIT, terms are available at https://opensource.org/licenses/MIT This parser was originally written by Khanhly Nguyen for her internship and is intended to parse x12 835,837 and others provided the appropriate configuration USAGE : - COMMAND LINE - EMBEDDED """ import os import sys import hashlib import json class X12 : def split(self,row,sep='*',prefix='HI') : pass def get_config(self,config,row): pass def hash(self,value): pass def suppress (self,value): pass def format_date(self,value): pass def split(row,sep='*',prefix='HI'): """ This function is designed to split an x12 row and """ if row.startswith(prefix) is False: value = [] for row_value in row.replace('~','').split(sep) : if '>' in row_value : if row_value.startswith('HC') or row_value.startswith('AD'): value += row_value.split('>')[:2] else: value += row_value.split('>') if row.startswith('CLM') is False else [row_value] else : value.append(row_value) return [xchar.replace('\r','') for xchar in value] #row.replace('~','').split(sep) else: return [ [prefix]+ split(item,'>') for item in row.replace('~','').split(sep)[1:] ] def get_config(config,row): """ This function will return the meaningfull parts of the configuration for a given item """ _row = list(row) if type(row[0]) == str else list(row[0]) _info = config[_row[0]] if _row[0] in config else {} key = None if '@ref' in _info: key = list(set(_row) & set(_info['@ref'].keys())) if key : key = key[0] return _info['@ref'][key] else: return {} if not _info and 'SIMILAR' in config: # # Let's look for the nearest key using the edit distance if _row[0] in config['SIMILAR'] : key = config['SIMILAR'][_row[0]] _info = config[key] return _info def hash(value): salt = os.environ['HEALTHCAREIO_SALT'] if 'HEALTHCAREIO_SALT' in os.environ else '' _value = str(value)+ salt if sys.version_info[0] > 2 : return hashlib.md5(_value.encode('utf-8')).hexdigest() else: return hashlib.md5(_value).hexdigest() def suppress(value): return 'N/A' def format_date(value) : if len(value) == 8 : year = value[:4] month = value[4:6] day = value[6:] return "-".join([year,month,day])[:10] #{"year":year,"month":month,"day":day} elif len(value) == 6 : year = '20' + value[:2] month = value[2:4] day = value[4:] return "-".join([year,month,day]) def format_time(value): return ":".join([value[:2],value[2:] ])[:5] def sv3_parse(value): if '>' in value : terms = value.split('>') return {'type':terms[0],'code':terms[1]} pass def sv2_parse(value): # # @TODO: Sometimes there's a suffix (need to inventory all the variations) # if '>' in value or ':' in value: xchar = '>' if '>' in value else ':' _values = value.split(xchar) modifier = {} if len(_values) > 2 : modifier= {"code":_values[2]} if len(_values) > 3 : modifier['type'] = _values[3] _value = {"code":_values[1],"type":_values[0]} if modifier : _value['modifier'] = modifier return _value else: return value def format_proc(value): for xchar in [':','<'] : if xchar in value and len(value.split(xchar)) > 1 : #_value = {"type":value.split(':')[0].strip(),"code":value.split(':')[1].strip()} _value = {"type":value.split(xchar)[0].strip(),"code":value.split(xchar)[1].strip()} break else: _value = str(value) return _value def format_diag(value): return [ {"code":item[2], "type":item[1]} for item in value if len(item) > 1] def format_pos(value): xchar = '>' if '>' in value else ':' x = value.split(xchar) x = {"code":x[0],"indicator":x[1],"frequency":x[2]} if len(x) == 3 else {"code":x[0],"indicator":None,"frequency":None} return x def get_map(row,config,version=None): label = config['label'] if 'label' in config else None omap = config['map'] if not version or version not in config else config[version] anchors = config['anchors'] if 'anchors' in config else [] if type(row[0]) == str: object_value = {} for key in omap : index = omap[key] if anchors and set(anchors) & set(row): _key = list(set(anchors) & set(row))[0] aindex = row.index(_key) index = aindex + index if index < len(row) : value = row[index] if 'cast' in config and key in config['cast'] and value.strip() != '' : value = eval(config['cast'][key])(value) if type(value) == dict : for objkey in value : if type(value[objkey]) == dict : continue if 'syn' in config and value[objkey] in config['syn'] : value[objkey] = config['syn'][ value[objkey]] value = {key:value} if key not in value else value else: if 'syn' in config and value in config['syn'] : value = config['syn'][value] if type(value) == dict : object_value = dict(object_value, **value) else: object_value[key] = value else: # # we are dealing with a complex object object_value = [] for row_item in row : value = get_map(row_item,config,version) object_value.append(value) # # We need to add the index of the object it matters in determining the claim types # # object_value.append( list(get_map(row_item,config,version))) # object_value = {label:object_value} return object_value def get_locations(x12_file,section='HL') : locations = [] for line in x12_file : if line.strip().startswith(section) : i = x12_file.index(line) locations.append(i) return locations #def get_claims(filename,config,section) : def get_content(filename,config,section=None) : """ This function returns the of the EDI file parsed given the configuration specified :section loop prefix (HL, CLP) :config configuration with formatting rules, labels ... :filename location of the file """ section = section if section else config['SECTION'] logs = [] try: x12_file = open(filename.strip(),errors='ignore').read().split('\n') except Exception as e: # # We have an error here that should be logged if sys.version_info[0] > 2 : # logs.append ({"version":VERSION,"filename":filename,"msg":e.args[0],"X12":x12_file[beg:end]}) logs.append ({"version":"unknown","filename":filename,"msg":e.args[0]}) else: # logs.append ({"version":VERSION,"filename":filename,"msg":e.message,"X12":x12_file[beg:end]}) logs.append ({"version":"unknown","filename":filename,"msg":e.message}) return [],logs pass if len(x12_file) == 1 : x12_file = x12_file[0].split('~') #partitions = '\n'.join(x12_file).split(section+'*') locations = get_locations(x12_file,section) claims = [] # # given locations it is possible to build up the partitions (made of segments) beg = locations [0] partitions = [] for end in locations[1:] : partitions.append ("\n".join(x12_file[beg:end])) beg = end # VERSION = x12_file[2].split('*')[3].replace('~','') TOP_ROW = x12_file[1].split('*') CATEGORY= x12_file[2].split('*')[1].strip() VERSION = x12_file[1].split('*')[-1].replace('~','') SUBMITTED_DATE = format_date(TOP_ROW[4]) SENDER_ID = TOP_ROW[2] row = split(x12_file[3]) _info = get_config(config,row) _default_value = get_map(row,_info,VERSION) if _info else {} N = len(locations) # for index in range(0,N-1): # beg = locations[index] # end = locations[index+1] # claim = {} for segment in partitions : claim = {} # for row in x12_file[beg:end] : segment = segment.replace('\n','').split('~') for row in segment : row = split(row) _info = get_config(config,row) if _info : try: # tmp = get_map(row,_info,VERSION) # if 'parser' in _info : # pointer = eval(_info['parser']) # print (pointer(row)) tmp = get_map(row,_info,VERSION) except Exception as e: if sys.version_info[0] > 2 : # logs.append ({"version":VERSION,"filename":filename,"msg":e.args[0],"X12":x12_file[beg:end]}) logs.append ({"version":VERSION,"filename":filename,"msg":e.args[0],"X12":row,"completed":False,"rows":len(row)}) else: # logs.append ({"version":VERSION,"filename":filename,"msg":e.message,"X12":x12_file[beg:end]}) logs.append ({"version":VERSION,"filename":filename,"msg":e.message,"X12":row,"rows":len(row),"completed":False}) claim = {} break if 'label' not in _info : tmp['version'] = VERSION tmp['submitted'] = SUBMITTED_DATE if TOP_ROW[1] == 'HP' : tmp['payer_id'] = SENDER_ID elif TOP_ROW[1] == 'HC': tmp['provider_id'] = SENDER_ID tmp['category'] = {"setid": CATEGORY,"version":'X'+VERSION.split('X')[1],"id":VERSION.split('X')[0].strip()} claim = dict(claim, **tmp) else: label = _info['label'] if type(tmp) == list : claim[label] = tmp if label not in claim else claim[label] + tmp else: if label not in claim: claim[label] = [tmp] elif len(list(tmp.keys())) == 1 : # print "\t",len(claim[label]),tmp index = len(claim[label]) -1 claim[label][index] = dict(claim[label][index],**tmp) else: claim[label].append(tmp) if len(claim[label]) > 0 : labels = [] for item in claim[label] : item['_index'] = len(labels) if item not in labels : labels.append(item) claim[label] = labels # claim[label] = list( set(claim[label])) #-- removing redundancies if claim and 'claim_id' in claim: claim = dict(claim,**_default_value) claim['name'] = filename.split(os.sep)[-1] #.replace(ROOT,'') claim['index'] = len(claims) if len(claims) > 0 else 0 claims.append(claim) else: # # Could not find claim identifier associated with data # pass return claims,logs