parser/edi/parser.py

235 lines
8.2 KiB
Python
Raw Normal View History

2019-11-06 20:37:26 +00:00
"""
(c) 2019 EDI-Parser 1.0
Vanderbilt University Medical Center, Health Information Privacy Laboratory
https://hiplab.mc.vanderbilt.edu/tools
Authors:
Khanhly Nguyen,
Steve L. Nyemba<steve.l.nyemba@vanderbilt.edu>
License:
MIT, terms are available at https://opensource.org/licenses/MIT
This parser was originally written by Khanhly Nguyen for her internship and is intended to parse x12 835,837 and others provided the appropriate configuration
USAGE :
- COMMAND LINE
- EMBEDDED
"""
import os
import sys
def split(row,sep='*',prefix='HI'):
"""
This function is designed to split an x12 row and
"""
if row.startswith(prefix) is False:
value = []
for row_value in row.replace('~','').split(sep) :
if '>' in row_value :
if row_value.startswith('HC') or row_value.startswith('AD'):
value += row_value.split('>')[:2]
else:
value += row_value.split('>')
else :
value.append(row_value)
return [xchar.replace('\r','') for xchar in value] #row.replace('~','').split(sep)
else:
return [ [prefix]+ split(item,'>') for item in row.replace('~','').split(sep)[1:] ]
def get_config(config,row):
"""
This function will return the meaningfull parts of the configuration for a given item
"""
_row = list(row) if type(row[0]) == str else list(row[0])
2020-02-01 03:54:51 +00:00
2019-11-06 20:37:26 +00:00
_info = config[_row[0]] if _row[0] in config else {}
key = None
if '@ref' in _info:
key = list(set(_row) & set(_info['@ref'].keys()))
if key :
key = key[0]
return _info['@ref'][key]
else:
return {}
if not _info and 'SIMILAR' in config:
#
# Let's look for the nearest key using the edit distance
if _row[0] in config['SIMILAR'] :
key = config['SIMILAR'][_row[0]]
_info = config[key]
2020-02-01 03:54:51 +00:00
2019-11-06 20:37:26 +00:00
return _info
def format_date(value) :
2020-02-04 02:30:24 +00:00
if len(value) == 8 :
year = value[:4]
month = value[4:6]
day = value[6:]
return "-".join([year,month,day])[:10] #{"year":year,"month":month,"day":day}
elif len(value) == 6 :
year = '20' + value[:2]
month = value[2:4]
day = value[4:]
return "-".join([year,month,day])
2019-11-06 20:37:26 +00:00
def format_time(value):
return ":".join([value[:2],value[2:] ])[:5]
def format_proc(value):
2020-02-04 02:30:24 +00:00
for xchar in [':','<']
if xchar in value and len(value.split(xchar)) == 2
_value = {"type":value.split(':')[0].strip(),"code":value.split(':')[1].strip()}
break
else
_value = str(value)
return _value
2020-02-01 03:54:51 +00:00
def format_diag(value):
2019-11-06 20:37:26 +00:00
2020-02-01 03:54:51 +00:00
return [ {"code":item[2], "type":item[1]} for item in value if len(item) > 1]
2019-11-07 07:07:15 +00:00
def get_map(row,config,version):
2019-11-06 20:37:26 +00:00
label = config['label'] if 'label' in config else None
omap = config['map'] if version not in config else config[version]
anchors = config['anchors'] if 'anchors' in config else []
if type(row[0]) == str:
object_value = {}
for key in omap :
index = omap[key]
if anchors and set(anchors) & set(row):
_key = list(set(anchors) & set(row))[0]
aindex = row.index(_key)
index = aindex + index
if index < len(row) :
value = row[index]
2020-02-01 03:54:51 +00:00
2019-11-06 20:37:26 +00:00
if 'cast' in config and key in config['cast'] and value.strip() != '' :
value = eval(config['cast'][key])(value)
pass
if 'syn' in config and value in config['syn'] :
value = config['syn'][value]
if type(value) == dict :
object_value = dict(object_value, **value)
else:
object_value[key] = value
else:
#
# we are dealing with a complex object
object_value = []
2020-02-01 03:54:51 +00:00
2019-11-06 20:37:26 +00:00
for row_item in row :
2020-02-01 03:54:51 +00:00
value = get_map(row_item,config,version)
object_value.append(value)
# object_value.append( list(get_map(row_item,config,version)))
2019-11-06 20:37:26 +00:00
# object_value = {label:object_value}
return object_value
def get_locations(x12_file,section='HL') :
locations = []
for line in x12_file :
if line.strip().startswith(section) :
i = x12_file.index(line)
locations.append(i)
return locations
#def get_claims(filename,config,section) :
def get_content(filename,config,section=None) :
"""
This function returns the of the EDI file parsed given the configuration specified
:section loop prefix (HL, CLP)
:config configuration with formatting rules, labels ...
:filename location of the file
"""
section = section if section else config['SECTION']
2020-02-01 03:54:51 +00:00
2019-11-06 20:37:26 +00:00
x12_file = open(filename).read().split('\n')
if len(x12_file) == 1 :
x12_file = x12_file[0].split('~')
locations = get_locations(x12_file,section)
claims = []
logs = []
# VERSION = x12_file[2].split('*')[3].replace('~','')
TOP_ROW = x12_file[1].split('*')
2020-02-04 02:30:24 +00:00
CATEGORY= x12_file[2].split('*')[1].strip()
VERSION = x12_file[1].split('*')[-1].replace('~','')
SUBMITTED_DATE = format_date(TOP_ROW[4])
SENDER_ID = TOP_ROW[2]
2019-11-06 20:37:26 +00:00
row = split(x12_file[3])
2020-02-01 03:54:51 +00:00
_info = get_config(config,row)
_default_value = get_map(row,_info,VERSION) if _info else {}
2019-11-06 20:37:26 +00:00
N = len(locations)
for index in range(0,N-1):
beg = locations[index]
end = locations[index+1]
claim = {}
for row in x12_file[beg:end] :
row = split(row)
_info = get_config(config,row)
if _info :
try:
2019-11-07 07:07:15 +00:00
# tmp = get_map(row,_info,VERSION)
2020-02-01 03:54:51 +00:00
# if 'parser' in _info :
# pointer = eval(_info['parser'])
# print (pointer(row))
2019-11-07 07:07:15 +00:00
tmp = get_map(row,_info,VERSION)
2019-11-06 20:37:26 +00:00
except Exception as e:
2020-02-04 02:30:24 +00:00
if sys.version_info[0] > 2 :
2019-11-06 20:37:26 +00:00
logs.append ({"version":VERSION,"filename":filename,"msg":e.args[0],"X12":x12_file[beg:end]})
else:
logs.append ({"version":VERSION,"filename":filename,"msg":e.message,"X12":x12_file[beg:end]})
claim = {}
break
if 'label' not in _info :
tmp['version'] = VERSION
tmp['submitted'] = SUBMITTED_DATE
if TOP_ROW[1] == 'HP' :
tmp['payer_id'] = SENDER_ID
2020-02-04 02:30:24 +00:00
elif TOP_ROW[1] == 'HC':
tmp['provider_id'] = SENDER_ID
2020-02-04 02:30:24 +00:00
tmp['category'] = {"setid": CATEGORY,"version":'X'+VERSION.split('X')[1],"id":VERSION.split('X')[0].strip()}
2019-11-06 20:37:26 +00:00
claim = dict(claim, **tmp)
else:
label = _info['label']
if type(tmp) == list :
claim[label] = tmp if label not in claim else claim[label] + tmp
else:
if label not in claim:
claim[label] = [tmp]
elif len(list(tmp.keys())) == 1 :
# print "\t",len(claim[label]),tmp
index = len(claim[label]) -1
claim[label][index] = dict(claim[label][index],**tmp)
else:
claim[label].append(tmp)
if claim and 'claim_id' in claim:
claim = dict(claim,**_default_value)
claim['name'] = filename[:-5].split(os.sep)[-1] #.replace(ROOT,'')
claim['index'] = index
claims.append(claim)
return claims,logs