354 lines
12 KiB
Python
354 lines
12 KiB
Python
"""
|
|
(c) 2019 EDI-Parser 1.0
|
|
Vanderbilt University Medical Center, Health Information Privacy Laboratory
|
|
https://hiplab.mc.vanderbilt.edu/tools
|
|
|
|
|
|
Authors:
|
|
Khanhly Nguyen,
|
|
Steve L. Nyemba<steve.l.nyemba@vanderbilt.edu>
|
|
|
|
License:
|
|
MIT, terms are available at https://opensource.org/licenses/MIT
|
|
|
|
This parser was originally written by Khanhly Nguyen for her internship and is intended to parse x12 835,837 and others provided the appropriate configuration
|
|
USAGE :
|
|
- COMMAND LINE
|
|
|
|
- EMBEDDED
|
|
"""
|
|
import os
|
|
import sys
|
|
import hashlib
|
|
import json
|
|
class X12 :
|
|
def split(self,row,sep='*',prefix='HI') :
|
|
pass
|
|
def get_config(self,config,row):
|
|
pass
|
|
def hash(self,value):
|
|
pass
|
|
def suppress (self,value):
|
|
pass
|
|
def format_date(self,value):
|
|
pass
|
|
|
|
def split(row,sep='*',prefix='HI'):
|
|
"""
|
|
This function is designed to split an x12 row and
|
|
"""
|
|
if row.startswith(prefix) is False:
|
|
value = []
|
|
for row_value in row.replace('~','').split(sep) :
|
|
|
|
if '>' in row_value :
|
|
if row_value.startswith('HC') or row_value.startswith('AD'):
|
|
|
|
value += row_value.split('>')[:2]
|
|
else:
|
|
|
|
value += row_value.split('>') if row.startswith('CLM') is False else [row_value]
|
|
|
|
else :
|
|
|
|
value.append(row_value)
|
|
return [xchar.replace('\r','') for xchar in value] #row.replace('~','').split(sep)
|
|
else:
|
|
|
|
return [ [prefix]+ split(item,'>') for item in row.replace('~','').split(sep)[1:] ]
|
|
def get_config(config,row):
|
|
"""
|
|
This function will return the meaningfull parts of the configuration for a given item
|
|
"""
|
|
_row = list(row) if type(row[0]) == str else list(row[0])
|
|
|
|
_info = config[_row[0]] if _row[0] in config else {}
|
|
key = None
|
|
if '@ref' in _info:
|
|
key = list(set(_row) & set(_info['@ref'].keys()))
|
|
if key :
|
|
key = key[0]
|
|
return _info['@ref'][key]
|
|
else:
|
|
return {}
|
|
|
|
if not _info and 'SIMILAR' in config:
|
|
#
|
|
# Let's look for the nearest key using the edit distance
|
|
if _row[0] in config['SIMILAR'] :
|
|
key = config['SIMILAR'][_row[0]]
|
|
_info = config[key]
|
|
|
|
return _info
|
|
def hash(value):
|
|
salt = os.environ['HEALTHCAREIO_SALT'] if 'HEALTHCAREIO_SALT' in os.environ else ''
|
|
_value = str(value)+ salt
|
|
if sys.version_info[0] > 2 :
|
|
return hashlib.md5(_value.encode('utf-8')).hexdigest()
|
|
else:
|
|
return hashlib.md5(_value).hexdigest()
|
|
def suppress(value):
|
|
return 'N/A'
|
|
|
|
def format_date(value) :
|
|
if len(value) == 8 :
|
|
year = value[:4]
|
|
month = value[4:6]
|
|
day = value[6:]
|
|
return "-".join([year,month,day])[:10] #{"year":year,"month":month,"day":day}
|
|
elif len(value) == 6 :
|
|
year = '20' + value[:2]
|
|
month = value[2:4]
|
|
day = value[4:]
|
|
return "-".join([year,month,day])
|
|
def format_time(value):
|
|
return ":".join([value[:2],value[2:] ])[:5]
|
|
def sv3_parse(value):
|
|
if '>' in value :
|
|
terms = value.split('>')
|
|
return {'type':terms[0],'code':terms[1]}
|
|
|
|
pass
|
|
def sv2_parse(value):
|
|
#
|
|
# @TODO: Sometimes there's a suffix (need to inventory all the variations)
|
|
#
|
|
if '>' in value or ':' in value:
|
|
xchar = '>' if '>' in value else ':'
|
|
_values = value.split(xchar)
|
|
modifier = {}
|
|
|
|
if len(_values) > 2 :
|
|
|
|
modifier= {"code":_values[2]}
|
|
if len(_values) > 3 :
|
|
modifier['type'] = _values[3]
|
|
_value = {"code":_values[1],"type":_values[0]}
|
|
if modifier :
|
|
_value['modifier'] = modifier
|
|
|
|
return _value
|
|
else:
|
|
return value
|
|
def format_proc(value):
|
|
for xchar in [':','<'] :
|
|
if xchar in value and len(value.split(xchar)) > 1 :
|
|
#_value = {"type":value.split(':')[0].strip(),"code":value.split(':')[1].strip()}
|
|
_value = {"type":value.split(xchar)[0].strip(),"code":value.split(xchar)[1].strip()}
|
|
break
|
|
else:
|
|
_value = str(value)
|
|
return _value
|
|
def format_diag(value):
|
|
|
|
return [ {"code":item[2], "type":item[1]} for item in value if len(item) > 1]
|
|
def format_pos(value):
|
|
|
|
xchar = '>' if '>' in value else ':'
|
|
x = value.split(xchar)
|
|
x = {"code":x[0],"indicator":x[1],"frequency":x[2]} if len(x) == 3 else {"code":x[0],"indicator":None,"frequency":None}
|
|
return x
|
|
|
|
def get_map(row,config,version=None):
|
|
|
|
label = config['label'] if 'label' in config else None
|
|
|
|
omap = config['map'] if not version or version not in config else config[version]
|
|
anchors = config['anchors'] if 'anchors' in config else []
|
|
if type(row[0]) == str:
|
|
object_value = {}
|
|
for key in omap :
|
|
index = omap[key]
|
|
if anchors and set(anchors) & set(row):
|
|
_key = list(set(anchors) & set(row))[0]
|
|
|
|
aindex = row.index(_key)
|
|
index = aindex + index
|
|
|
|
if index < len(row) :
|
|
value = row[index]
|
|
|
|
if 'cast' in config and key in config['cast'] and value.strip() != '' :
|
|
|
|
value = eval(config['cast'][key])(value)
|
|
|
|
|
|
if type(value) == dict :
|
|
for objkey in value :
|
|
|
|
if type(value[objkey]) == dict :
|
|
continue
|
|
if 'syn' in config and value[objkey] in config['syn'] :
|
|
value[objkey] = config['syn'][ value[objkey]]
|
|
value = {key:value} if key not in value else value
|
|
else:
|
|
if 'syn' in config and value in config['syn'] :
|
|
value = config['syn'][value]
|
|
if type(value) == dict :
|
|
|
|
object_value = dict(object_value, **value)
|
|
else:
|
|
object_value[key] = value
|
|
else:
|
|
#
|
|
# we are dealing with a complex object
|
|
object_value = []
|
|
|
|
for row_item in row :
|
|
value = get_map(row_item,config,version)
|
|
object_value.append(value)
|
|
#
|
|
# We need to add the index of the object it matters in determining the claim types
|
|
#
|
|
|
|
# object_value.append( list(get_map(row_item,config,version)))
|
|
# object_value = {label:object_value}
|
|
return object_value
|
|
|
|
def get_locations(x12_file,section='HL') :
|
|
locations = []
|
|
for line in x12_file :
|
|
|
|
if line.strip().startswith(section) :
|
|
i = x12_file.index(line)
|
|
locations.append(i)
|
|
return locations
|
|
|
|
#def get_claims(filename,config,section) :
|
|
def get_content(filename,config,section=None) :
|
|
"""
|
|
This function returns the of the EDI file parsed given the configuration specified
|
|
:section loop prefix (HL, CLP)
|
|
:config configuration with formatting rules, labels ...
|
|
:filename location of the file
|
|
"""
|
|
section = section if section else config['SECTION']
|
|
logs = []
|
|
try:
|
|
|
|
x12_file = open(filename.strip(),errors='ignore').read().split('\n')
|
|
except Exception as e:
|
|
#
|
|
# We have an error here that should be logged
|
|
if sys.version_info[0] > 2 :
|
|
# logs.append ({"version":VERSION,"filename":filename,"msg":e.args[0],"X12":x12_file[beg:end]})
|
|
logs.append ({"version":"unknown","filename":filename,"msg":e.args[0]})
|
|
else:
|
|
# logs.append ({"version":VERSION,"filename":filename,"msg":e.message,"X12":x12_file[beg:end]})
|
|
logs.append ({"version":"unknown","filename":filename,"msg":e.message})
|
|
return [],logs
|
|
|
|
pass
|
|
|
|
if len(x12_file) == 1 :
|
|
|
|
x12_file = x12_file[0].split('~')
|
|
|
|
#partitions = '\n'.join(x12_file).split(section+'*')
|
|
locations = get_locations(x12_file,section)
|
|
claims = []
|
|
#
|
|
# given locations it is possible to build up the partitions (made of segments)
|
|
|
|
beg = locations [0]
|
|
partitions = []
|
|
for end in locations[1:] :
|
|
partitions.append ("\n".join(x12_file[beg:end]))
|
|
beg = end
|
|
|
|
# VERSION = x12_file[2].split('*')[3].replace('~','')
|
|
TOP_ROW = x12_file[1].split('*')
|
|
CATEGORY= x12_file[2].split('*')[1].strip()
|
|
VERSION = x12_file[1].split('*')[-1].replace('~','')
|
|
SUBMITTED_DATE = format_date(TOP_ROW[4])
|
|
SENDER_ID = TOP_ROW[2]
|
|
row = split(x12_file[3])
|
|
_info = get_config(config,row)
|
|
|
|
_default_value = get_map(row,_info,VERSION) if _info else {}
|
|
|
|
N = len(locations)
|
|
|
|
# for index in range(0,N-1):
|
|
# beg = locations[index]
|
|
# end = locations[index+1]
|
|
# claim = {}
|
|
for segment in partitions :
|
|
|
|
claim = {}
|
|
# for row in x12_file[beg:end] :
|
|
segment = segment.replace('\n','').split('~')
|
|
for row in segment :
|
|
row = split(row)
|
|
|
|
_info = get_config(config,row)
|
|
if _info :
|
|
try:
|
|
# tmp = get_map(row,_info,VERSION)
|
|
# if 'parser' in _info :
|
|
# pointer = eval(_info['parser'])
|
|
|
|
tmp = get_map(row,_info,VERSION)
|
|
|
|
except Exception as e:
|
|
if sys.version_info[0] > 2 :
|
|
# logs.append ({"version":VERSION,"filename":filename,"msg":e.args[0],"X12":x12_file[beg:end]})
|
|
logs.append ({"version":VERSION,"filename":filename,"msg":e.args[0],"X12":row,"completed":False,"rows":len(row)})
|
|
else:
|
|
# logs.append ({"version":VERSION,"filename":filename,"msg":e.message,"X12":x12_file[beg:end]})
|
|
logs.append ({"version":VERSION,"filename":filename,"msg":e.message,"X12":row,"rows":len(row),"completed":False})
|
|
claim = {}
|
|
break
|
|
|
|
if 'label' not in _info :
|
|
tmp['version'] = VERSION
|
|
tmp['submitted'] = SUBMITTED_DATE
|
|
if TOP_ROW[1] == 'HP' :
|
|
tmp['payer_id'] = SENDER_ID
|
|
|
|
elif TOP_ROW[1] == 'HC':
|
|
tmp['provider_id'] = SENDER_ID
|
|
|
|
tmp['category'] = {"setid": CATEGORY,"version":'X'+VERSION.split('X')[1],"id":VERSION.split('X')[0].strip()}
|
|
claim = dict(claim, **tmp)
|
|
|
|
|
|
else:
|
|
label = _info['label']
|
|
if type(tmp) == list :
|
|
|
|
claim[label] = tmp if label not in claim else claim[label] + tmp
|
|
|
|
else:
|
|
if label not in claim:
|
|
claim[label] = [tmp]
|
|
elif len(list(tmp.keys())) == 1 :
|
|
|
|
index = len(claim[label]) -1
|
|
claim[label][index] = dict(claim[label][index],**tmp)
|
|
else:
|
|
claim[label].append(tmp)
|
|
if len(claim[label]) > 0 :
|
|
labels = []
|
|
for item in claim[label] :
|
|
item['_index'] = len(labels)
|
|
if item not in labels :
|
|
|
|
labels.append(item)
|
|
claim[label] = labels
|
|
# claim[label] = list( set(claim[label])) #-- removing redundancies
|
|
if claim and 'claim_id' in claim:
|
|
|
|
claim = dict(claim,**_default_value)
|
|
claim['name'] = filename.split(os.sep)[-1] #.replace(ROOT,'')
|
|
claim['index'] = len(claims) if len(claims) > 0 else 0
|
|
claims.append(claim)
|
|
else:
|
|
#
|
|
# Could not find claim identifier associated with data
|
|
#
|
|
pass
|
|
|
|
|
|
return claims,logs
|