parser/healthcareio/x12/__init__.py

574 lines
21 KiB
Python
Raw Normal View History

"""
(c) 2019 Healthcare/IO 1.0
Vanderbilt University Medical Center, Health Information Privacy Laboratory
https://hiplab.mc.vanderbilt.edu/healthcareio
Authors:
Khanhly Nguyen,
Steve L. Nyemba<steve.l.nyemba@vanderbilt.edu>
License:
MIT, terms are available at https://opensource.org/licenses/MIT
This parser was originally written by Khanhly Nguyen for her internship and is intended to parse x12 835,837 and others provided the appropriate configuration
USAGE :
- COMMAND LINE
- EMBEDDED
"""
import hashlib
import json
import os
import sys
from itertools import islice
from multiprocessing import Process
import transport
import jsonmerge
class void :
pass
class Formatters :
def __init__(self):
# self.config = config
self.get = void()
self.get.config = self.get_config
self.parse = void()
self.parse.sv3 = self.sv3
self.parse.sv2 = self.sv2
2021-02-08 22:24:15 +00:00
self.sv2_parser = self.sv2
self.sv3_parser = self.sv3
self.sv3_parse = self.sv3
2020-10-06 16:51:03 +00:00
self.format_proc = self.procedure
self.format_diag = self.diagnosis
self.parse.procedure = self.procedure
self.parse.diagnosis = self.diagnosis
self.parse.date = self.date
self.format_date = self.date
self.format_pos = self.pos
self.format_time = self.time
def split(self,row,sep='*',prefix='HI') :
"""
This function is designed to split an x12 row and
"""
2020-12-11 12:45:10 +00:00
value = []
if row.startswith(prefix) is False:
2020-12-11 12:45:10 +00:00
for row_value in row.replace('~','').split(sep) :
2021-01-18 20:49:56 +00:00
if '>' in row_value and not row_value.startswith('HC'):
# if row_value.startswith('HC') or row_value.startswith('AD'):
if row_value.startswith('AD'):
value += row_value.split('>')[:2]
2021-01-18 20:49:56 +00:00
pass
else:
value += [row_value]
# value += row_value.split('>') if row.startswith('CLM') is False else [row_value]
else :
value.append(row_value.replace('\n',''))
2020-12-11 12:45:10 +00:00
value = [xchar.replace('\r','') for xchar in value] #row.replace('~','').split(sep)
else:
2020-12-11 12:45:10 +00:00
value = [ [prefix]+ self.split(item,'>') for item in row.replace('~','').split(sep)[1:] ]
2020-12-11 12:45:10 +00:00
return value if type(value) == list and type(value[0]) != list else value[0]
def get_config(self,config,row):
"""
This function will return the meaningfull parts of the configuration for a given item
"""
_row = list(row) if type(row[0]) == str else list(row[0])
_info = config[_row[0]] if _row[0] in config else {}
key = None
if '@ref' in _info:
key = list(set(_row) & set(_info['@ref'].keys()))
if key :
key = key[0]
return _info['@ref'][key]
else:
return {}
if not _info and 'SIMILAR' in config:
#
# Let's look for the nearest key using the edit distance
if _row[0] in config['SIMILAR'] :
key = config['SIMILAR'][_row[0]]
_info = config[key]
return _info
def hash(self,value):
salt = os.environ['HEALTHCAREIO_SALT'] if 'HEALTHCAREIO_SALT' in os.environ else ''
_value = str(value)+ salt
if sys.version_info[0] > 2 :
return hashlib.md5(_value.encode('utf-8')).hexdigest()
else:
return hashlib.md5(_value).hexdigest()
def suppress (self,value):
return 'N/A'
def date(self,value):
if len(value) > 8 or '-' in value:
value = value.split('-')[0]
if len(value) == 8 :
year = value[:4]
month = value[4:6]
day = value[6:]
return "-".join([year,month,day])[:10] #{"year":year,"month":month,"day":day}
elif len(value) == 6 :
year = '20' + value[:2]
month = value[2:4]
day = value[4:]
#
# We have a date formatting issue
return "-".join([year,month,day])
def time(self,value):
pass
def sv3(self,value):
if '>' in value [1]:
terms = value[1].split('>')
return {'type':terms[0],'code':terms[1],"amount":float(value[2])}
else:
2020-12-11 12:45:10 +00:00
return {"code":value[2],"type":value[1],"amount":float(value[3])}
def sv2(self,value):
#
# @TODO: Sometimes there's a suffix (need to inventory all the variations)
#
if '>' in value or ':' in value:
xchar = '>' if '>' in value else ':'
_values = value.split(xchar)
modifier = {}
if len(_values) > 2 :
modifier= {"code":_values[2]}
if len(_values) > 3 :
modifier['type'] = _values[3]
_value = {"code":_values[1],"type":_values[0]}
if modifier :
_value['modifier'] = modifier
return _value
else:
return value
2020-10-06 19:12:43 +00:00
def procedure(self,value):
2021-01-18 20:49:56 +00:00
for xchar in [':','<','|','>'] :
if xchar in value and len(value.split(xchar)) > 1 :
#_value = {"type":value.split(':')[0].strip(),"code":value.split(':')[1].strip()}
_value = {"type":value.split(xchar)[0].strip(),"code":value.split(xchar)[1].strip()}
2021-01-18 20:49:56 +00:00
if len(value.split(xchar)) >2 :
index = 1;
for modifier in value.split(xchar)[2:] :
_value['modifier_'+str(index)] = modifier
index += 1
break
else:
_value = str(value)
return _value
2021-01-18 20:49:56 +00:00
def diagnosis(self,value):
return [ {"code":item[2], "type":item[1]} for item in value if len(item) > 1]
def pos(self,value):
"""
formatting place of service information within a segment (REF)
@TODO: In order to accomodate the other elements they need to be specified in the configuration
Otherwise it causes problems on export
"""
xchar = '>' if '>' in value else ':'
x = value.split(xchar)
x = {"code":x[0],"indicator":x[1],"frequency":x[2]} if len(x) == 3 else {"code":x[0],"indicator":None,"frequency":None}
return x['code']
class Parser (Process):
def __init__(self,path):
"""
:path path of the configuration file (it can be absolute)
"""
Process.__init__(self)
self.utils = Formatters()
self.get = void()
self.get.value = self.get_map
self.get.default_value = self.get_default_value
_config = json.loads(open(path).read())
self._custom_config = self.get_custom(path)
self.config = _config['parser']
self.store = _config['store']
self.files = []
self.set = void()
self.set.files = self.set_files
2020-12-11 12:45:10 +00:00
self.emit = void()
self.emit.pre = None
self.emit.post = None
def get_custom(self,path) :
"""
:path path of the configuration file (it can be absolute)
"""
#
#
_path = path.replace('config.json','')
if _path.endswith(os.sep) :
_path = _path[:-1]
_config = {}
_path = os.sep.join([_path,'custom'])
if os.path.exists(_path) :
files = os.listdir(_path)
if files :
2021-01-18 20:49:56 +00:00
fullname = os.sep.join([_path,files[0]])
_config = json.loads ( (open(fullname)).read() )
return _config
def set_files(self,files):
self.files = files
def get_map(self,row,config,version=None):
# label = config['label'] if 'label' in config else None
handler = Formatters()
2021-02-08 22:24:15 +00:00
if 'map' not in config and hasattr(handler,config['apply']):
pointer = getattr(handler,config['apply'])
2021-02-08 22:24:15 +00:00
object_value = pointer(row)
return object_value
#
# Pull the goto configuration that skips rows
#
omap = config['map'] if not version or version not in config else config[version]
anchors = config['anchors'] if 'anchors' in config else []
rewrite = config['rewrite'] if 'rewrite' in config else {}
if type(row[0]) == str:
object_value = {}
for key in omap :
index = omap[key]
if anchors and set(anchors) & set(row):
_key = list(set(anchors) & set(row))[0]
aindex = row.index(_key)
index = aindex + index
if index < len(row) :
value = row[index]
if 'cast' in config and key in config['cast'] and value.strip() != '' :
if config['cast'][key] in ['float','int'] :
value = eval(config['cast'][key])(value)
elif hasattr(handler,config['cast'][key]):
2021-01-18 20:49:56 +00:00
pointer = getattr(handler,config['cast'][key])
value = pointer(value)
else:
print ("Missing Pointer ",key,config['cast'])
if type(value) == dict :
for objkey in value :
if type(value[objkey]) == dict :
continue
if 'syn' in config and value[objkey] in config['syn'] :
2021-02-13 00:28:26 +00:00
# value[objkey] = config['syn'][ value[objkey]]
pass
if key in rewrite :
_key = rewrite[key]
if _key in value :
value = value[_key]
else:
value = ""
value = {key:value} if key not in value else value
else:
if 'syn' in config and value in config['syn'] :
2021-02-13 00:28:26 +00:00
# value = config['syn'][value]
pass
if type(value) == dict :
object_value = dict(object_value, **value)
else:
object_value[key] = value
else:
#
# we are dealing with a complex object
object_value = []
for row_item in row :
value = self.get.value(row_item,config,version)
object_value.append(value)
#
# We need to add the index of the object it matters in determining the claim types
#
# object_value.append( list(get_map(row_item,config,version)))
# object_value = {label:object_value}
2020-10-06 19:12:43 +00:00
return object_value
2020-10-06 19:12:43 +00:00
def apply(self,content,_code) :
"""
:content content of a file i.e a segment with the envelope
:_code 837 or 835 (helps get the appropriate configuration)
"""
util = Formatters()
2020-10-06 19:12:43 +00:00
# header = default_value.copy()
value = {}
for row in content[:] :
2020-10-20 17:31:09 +00:00
row = util.split(row.replace('\n','').replace('~',''))
_info = util.get.config(self.config[_code][0],row)
if self._custom_config and _code in self._custom_config:
_cinfo = util.get.config(self._custom_config[_code],row)
else:
_cinfo = {}
2021-02-08 22:24:15 +00:00
if 'SV3' in row :
print (row)
print (_info)
if _info or _cinfo:
try:
_info = jsonmerge.merge(_info,_cinfo)
tmp = self.get.value(row,_info)
2020-11-02 21:35:40 +00:00
2021-02-08 22:24:15 +00:00
if not tmp :
continue
if 'label' in _info :
label = _info['label']
2020-10-06 19:12:43 +00:00
if type(tmp) == list :
2021-02-13 00:00:41 +00:00
value[label] = tmp if label not in value else value[label] + tmp
else:
2021-02-13 00:00:41 +00:00
# if 'DTM' in row :
# print ([label,tmp,label in value])
if label not in value :
value[label] = []
value[label].append(tmp)
# if label not in value:
2021-02-13 00:00:41 +00:00
# value[label] = [tmp]
# else:
# value[label].append(tmp)
2020-10-06 19:12:43 +00:00
tmp['_index'] = len(value[label]) -1
elif 'field' in _info :
name = _info['field']
# value[name] = tmp
value = jsonmerge.merge(value,{name:tmp})
else:
2021-02-13 00:00:41 +00:00
value = dict(value,**tmp)
pass
except Exception as e :
2021-02-08 22:24:15 +00:00
print (e.args[0])
# print ('__',(dir(e.args)))
pass
2020-10-06 19:12:43 +00:00
return value if value else {}
def get_default_value(self,content,_code):
util = Formatters()
TOP_ROW = content[1].split('*')
CATEGORY= content[2].split('*')[1].strip()
VERSION = content[1].split('*')[-1].replace('~','').replace('\n','')
SUBMITTED_DATE = util.parse.date(TOP_ROW[4])
SENDER_ID = TOP_ROW[2]
row = util.split(content[3])
_info = util.get_config(self.config[_code][0],row)
value = self.get.value(row,_info,VERSION) if _info else {}
value['category'] = {"setid": CATEGORY,"version":'X'+VERSION.split('X')[1],"id":VERSION.split('X')[0].strip()}
value["submitted"] = SUBMITTED_DATE
# value['version'] = VERSION
2021-02-08 21:19:22 +00:00
# if _code== '835' :
# value['receiver_id'] = SENDER_ID
# else:
# value['provider_id'] = SENDER_ID
# pass
value['sender_id'] = SENDER_ID
#
2021-02-08 21:19:22 +00:00
# Let's parse this for default values
return jsonmerge.merge(value,self.apply(content,_code))
def read(self,filename) :
"""
:formerly get_content
This function returns the of the EDI file parsed given the configuration specified. it is capable of identifying a file given the content
:section loop prefix (HL, CLP)
:config configuration with formatting rules, labels ...
:filename location of the file
"""
# section = section if section else config['SECTION']
logs = []
claims = []
try:
file = open(filename.strip(),errors='ignore')
INITIAL_ROWS = list(islice(file,4)) #.readlines(4)
2020-10-06 22:08:57 +00:00
_code = "unknown"
if len(INITIAL_ROWS) == 1 :
file = INITIAL_ROWS[0].split('~')
INITIAL_ROWS = file[:4]
2020-10-06 22:08:57 +00:00
if len(INITIAL_ROWS) < 3 :
return None,[{"name":filename,"completed":False}],None
_code = INITIAL_ROWS[2].split('*')[1].strip()
section = self.config[_code][0]['SECTION'].strip()
#
# adjusting the
DEFAULT_VALUE = self.get.default_value(INITIAL_ROWS,_code)
DEFAULT_VALUE['name'] = filename.strip()
#
# In the initial rows, there's redundant information (so much for x12 standard)
# index 1 identifies file type i.e CLM for claim and CLP for remittance
segment = []
index = 0;
_toprows = []
_default = None
for row in file :
row = row.replace('\r','')
if not segment and not row.startswith(section):
_toprows += [row]
2021-02-08 22:24:15 +00:00
if row.startswith(section) and not segment:
segment = [row]
continue
2020-10-06 19:12:43 +00:00
elif segment and not row.startswith(section):
if not _default :
_default = (self.apply(_toprows,_code))
DEFAULT_VALUE = dict(DEFAULT_VALUE,**_default)
segment.append(row)
if len(segment) > 1 and row.startswith(section):
#
# process the segment somewhere (create a thread maybe?)
#
2021-02-08 22:24:15 +00:00
2020-10-06 19:12:43 +00:00
_claim = self.apply(segment,_code)
2020-10-06 19:12:43 +00:00
if _claim :
2020-10-20 17:31:09 +00:00
_claim['index'] = index #len(claims)
2021-02-13 00:28:26 +00:00
# claims.append(dict(DEFAULT_VALUE,**_claim))
2021-02-13 00:00:41 +00:00
claims.append( jsonmerge.merge(DEFAULT_VALUE,_claim))
segment = [row]
index += 1
pass
#
# Handling the last claim found
if segment[0].startswith(section) :
default_claim = dict({"name":index},**DEFAULT_VALUE)
2020-10-06 19:12:43 +00:00
claim = self.apply(segment,_code)
if claim :
claim['index'] = len(claims)
2021-02-13 00:00:41 +00:00
schema = [key for key in claim.keys() if type(claim[key]) == list]
if schema :
schema = {"properties":dict.fromkeys(schema,{"mergeStrategy":"append"})}
2021-02-13 00:28:26 +00:00
2021-02-13 00:00:41 +00:00
else:
schema = {}
merger = jsonmerge.Merger(schema)
top_row_claim = self.apply(_toprows,_code)
claim = merger.merge(claim,self.apply(_toprows,_code))
# claims.append(dict(DEFAULT_VALUE,**claim))
claims.append(merger.merge(DEFAULT_VALUE,claim))
if type(file) != list :
file.close()
# x12_file = open(filename.strip(),errors='ignore').read().split('\n')
except Exception as e:
2020-10-06 22:08:57 +00:00
logs.append ({"parse":_code,"completed":False,"name":filename,"msg":e.args[0]})
return [],logs,None
rate = 0 if len(claims) == 0 else (1 + index)/len(claims)
logs.append ({"parse":"claims" if _code == '837' else 'remits',"completed":True,"name":filename,"rate":rate})
# self.finish(claims,logs,_code)
return claims,logs,_code
def run(self):
2020-12-11 12:45:10 +00:00
if self.emit.pre :
self.emit.pre()
for filename in self.files :
content,logs,_code = self.read(filename)
self.finish(content,logs,_code)
def finish(self,content,logs,_code) :
args = self.store
_args = json.loads(json.dumps(self.store))
if args['type'] == 'mongo.MongoWriter' :
args['args']['doc'] = 'claims' if _code == '837' else 'remits'
_args['args']['doc'] = 'logs'
2020-12-11 12:45:10 +00:00
else:
args['args']['table'] = 'claims' if _code == '837' else 'remits'
_args['args']['table'] = 'logs'
if content :
writer = transport.factory.instance(**args)
writer.write(content)
writer.close()
if logs :
2020-12-11 12:45:10 +00:00
logger = transport.factory.instance(**_args)
logger.write(logs)
2020-12-11 12:45:10 +00:00
logger.close()
2020-12-11 12:45:10 +00:00
if self.emit.post :
self.emit.post(content,logs)