diff --git a/healthcareio/healthcare-io.py b/healthcareio/healthcare-io.py index 828c39e..3a0a6dc 100644 --- a/healthcareio/healthcare-io.py +++ b/healthcareio/healthcare-io.py @@ -42,7 +42,7 @@ import sys import numpy as np from multiprocessing import Process import time - +import x12 PATH = os.sep.join([os.environ['HOME'],'.healthcareio']) OUTPUT_FOLDER = os.sep.join([os.environ['HOME'],'healthcare-io']) @@ -257,36 +257,29 @@ if __name__ == '__main__' : # @TODO: Log this here so we know what is being processed or not SCOPE = None - if files and ('claims' in SYS_ARGS['parse'] or 'remits' in SYS_ARGS['parse']): - # _map = {'claims':'837','remits':'835'} - # key = _map[SYS_ARGS['parse']] - # CONFIG = info['parser'][key] - # if 'version' in SYS_ARGS and int(SYS_ARGS['version']) < len(CONFIG) : - # CONFIG = CONFIG[ int(SYS_ARGS['version'])] + if files : #and ('claims' in SYS_ARGS['parse'] or 'remits' in SYS_ARGS['parse']): + # logger = factory.instance(type='disk.DiskWriter',args={'path':os.sep.join([info['out-folder'],SYS_ARGS['parse']+'.log'])}) + # if info['store']['type'] == 'disk.DiskWriter' : + # info['store']['args']['path'] += (os.sep + 'healthcare-io.json') + # elif info['store']['type'] == 'disk.SQLiteWriter' : + # # info['store']['args']['path'] += (os.sep + 'healthcare-io.db3') + # pass + + + # if info['store']['type'] == 'disk.SQLiteWriter' : + # info['store']['args']['table'] = SYS_ARGS['parse'].strip().lower() + # _info = json.loads(json.dumps(info['store'])) + # _info['args']['table']='logs' # else: - # CONFIG = CONFIG[-1] - logger = factory.instance(type='disk.DiskWriter',args={'path':os.sep.join([info['out-folder'],SYS_ARGS['parse']+'.log'])}) - if info['store']['type'] == 'disk.DiskWriter' : - info['store']['args']['path'] += (os.sep + 'healthcare-io.json') - elif info['store']['type'] == 'disk.SQLiteWriter' : - # info['store']['args']['path'] += (os.sep + 'healthcare-io.db3') - pass - - - if info['store']['type'] == 'disk.SQLiteWriter' : - info['store']['args']['table'] = SYS_ARGS['parse'].strip().lower() - _info = json.loads(json.dumps(info['store'])) - _info['args']['table']='logs' - else: - # - # if we are working with no-sql we will put the logs in it (performance )? + # # + # # if we are working with no-sql we will put the logs in it (performance )? - info['store']['args']['doc'] = SYS_ARGS['parse'].strip().lower() - _info = json.loads(json.dumps(info['store'])) - _info['args']['doc'] = 'logs' - logger = factory.instance(**_info) + # info['store']['args']['doc'] = SYS_ARGS['parse'].strip().lower() + # _info = json.loads(json.dumps(info['store'])) + # _info['args']['doc'] = 'logs' + # logger = factory.instance(**_info) - writer = factory.instance(**info['store']) + # writer = factory.instance(**info['store']) # # we need to have batches ready for this in order to run some of these queries in parallel @@ -295,23 +288,19 @@ if __name__ == '__main__' : # BATCH_COUNT = 1 if 'batch' not in SYS_ARGS else int (SYS_ARGS['batch']) - #logger = factory.instance(type='mongo.MongoWriter',args={'db':'healthcareio','doc':SYS_ARGS['parse']+'_logs'}) - # schema = info['schema'] - - # for key in schema : - # sql = schema[key]['create'] - # writer.write(sql) files = np.array_split(files,BATCH_COUNT) procs = [] index = 0 for row in files : row = row.tolist() - logger.write({"process":index,"parse":SYS_ARGS['parse'],"file_count":len(row)}) - proc = Process(target=apply,args=(row,info['store'],_info,)) - proc.start() - procs.append(proc) - index = index + 1 + # logger.write({"process":index,"parse":SYS_ARGS['parse'],"file_count":len(row)}) + # proc = Process(target=apply,args=(row,info['store'],_info,)) + parser = x12.Parser(os.sep.join([PATH,'config.json'])) + parser.set.files(row) + parser.start() + procs.append(parser) + # index = index + 1 while len(procs) > 0 : procs = [proc for proc in procs if proc.is_alive()] time.sleep(2) diff --git a/healthcareio/x12/__init__.py b/healthcareio/x12/__init__.py new file mode 100644 index 0000000..ed1ded4 --- /dev/null +++ b/healthcareio/x12/__init__.py @@ -0,0 +1,446 @@ +""" + (c) 2019 Healthcare/IO 1.0 + Vanderbilt University Medical Center, Health Information Privacy Laboratory + https://hiplab.mc.vanderbilt.edu/healthcareio + + + Authors: + Khanhly Nguyen, + Steve L. Nyemba + + License: + MIT, terms are available at https://opensource.org/licenses/MIT + + This parser was originally written by Khanhly Nguyen for her internship and is intended to parse x12 835,837 and others provided the appropriate configuration + USAGE : + - COMMAND LINE + + - EMBEDDED +""" +import hashlib +import json +import os +import sys +from itertools import islice +from multiprocessing import Process +import transport +class void : + pass +class Formatters : + def __init__(self): + # self.config = config + self.get = void() + self.get.config = self.get_config + + self.parse = void() + self.parse.sv3 = self.sv3 + self.parse.sv2 = self.sv2 + self.sv2_parse = self.sv2 + self.sv3_parse = self.sv3 + self.parse.procedure = self.procedure + self.parse.diagnosis = self.diagnosis + self.parse.date = self.date + self.format_date = self.date + self.format_pos = self.pos + self.format_time = self.time + def split(self,row,sep='*',prefix='HI') : + """ + This function is designed to split an x12 row and + """ + if row.startswith(prefix) is False: + value = [] + + for row_value in row.replace('~','').split(sep) : + + if '>' in row_value : + if row_value.startswith('HC') or row_value.startswith('AD'): + + value += row_value.split('>')[:2] + else: + + value += row_value.split('>') if row.startswith('CLM') is False else [row_value] + + else : + + value.append(row_value.replace('\n','')) + return [xchar.replace('\r','') for xchar in value] #row.replace('~','').split(sep) + else: + + return [ [prefix]+ self.split(item,'>') for item in row.replace('~','').split(sep)[1:] ] + def get_config(self,config,row): + """ + This function will return the meaningfull parts of the configuration for a given item + """ + + _row = list(row) if type(row[0]) == str else list(row[0]) + + _info = config[_row[0]] if _row[0] in config else {} + key = None + if '@ref' in _info: + key = list(set(_row) & set(_info['@ref'].keys())) + if key : + key = key[0] + return _info['@ref'][key] + else: + return {} + + if not _info and 'SIMILAR' in config: + # + # Let's look for the nearest key using the edit distance + if _row[0] in config['SIMILAR'] : + key = config['SIMILAR'][_row[0]] + _info = config[key] + + return _info + + def hash(self,value): + salt = os.environ['HEALTHCAREIO_SALT'] if 'HEALTHCAREIO_SALT' in os.environ else '' + _value = str(value)+ salt + if sys.version_info[0] > 2 : + return hashlib.md5(_value.encode('utf-8')).hexdigest() + else: + return hashlib.md5(_value).hexdigest() + + def suppress (self,value): + return 'N/A' + def date(self,value): + if len(value) > 8 or '-' in value: + value = value.split('-')[0] + + if len(value) == 8 : + year = value[:4] + month = value[4:6] + day = value[6:] + return "-".join([year,month,day])[:10] #{"year":year,"month":month,"day":day} + elif len(value) == 6 : + year = '20' + value[:2] + month = value[2:4] + day = value[4:] + + # + # We have a date formatting issue + + return "-".join([year,month,day]) + def time(self,value): + pass + def sv3(self,value): + if '>' in value [1]: + terms = value[1].split('>') + return {'type':terms[0],'code':terms[1],"amount":float(value[2])} + else: + + return {"code":value[2],"type":value[1],"amount":float(value[3])} + def sv2(self,value): + # + # @TODO: Sometimes there's a suffix (need to inventory all the variations) + # + if '>' in value or ':' in value: + xchar = '>' if '>' in value else ':' + _values = value.split(xchar) + modifier = {} + + if len(_values) > 2 : + + modifier= {"code":_values[2]} + if len(_values) > 3 : + modifier['type'] = _values[3] + _value = {"code":_values[1],"type":_values[0]} + if modifier : + _value['modifier'] = modifier + + return _value + else: + return value + def procedure(self,value): + for xchar in [':','<'] : + if xchar in value and len(value.split(xchar)) > 1 : + #_value = {"type":value.split(':')[0].strip(),"code":value.split(':')[1].strip()} + _value = {"type":value.split(xchar)[0].strip(),"code":value.split(xchar)[1].strip()} + break + else: + _value = str(value) + return _value + def diagnosis(self,alue): + + return [ {"code":item[2], "type":item[1]} for item in value if len(item) > 1] + def pos(self,value): + """ + formatting place of service information within a segment (REF) + """ + + xchar = '>' if '>' in value else ':' + x = value.split(xchar) + x = {"code":x[0],"indicator":x[1],"frequency":x[2]} if len(x) == 3 else {"code":x[0],"indicator":None,"frequency":None} + return x +class Parser (Process): + def __init__(self,path): + Process.__init__(self) + self.utils = Formatters() + self.get = void() + self.get.value = self.get_map + self.get.default_value = self.get_default_value + _config = json.loads(open(path).read()) + + self.config = _config['parser'] + self.store = _config['store'] + + self.files = [] + self.set = void() + self.set.files = self.set_files + def set_files(self,files): + self.files = files + def get_map(self,row,config,version=None): + + # label = config['label'] if 'label' in config else None + handler = Formatters() + if 'map' not in config and hasattr(handler,config['apply']): + + pointer = getattr(handler,config['apply']) + object_value = pointer(row) + return object_value + + omap = config['map'] if not version or version not in config else config[version] + anchors = config['anchors'] if 'anchors' in config else [] + + if type(row[0]) == str: + object_value = {} + for key in omap : + + index = omap[key] + if anchors and set(anchors) & set(row): + _key = list(set(anchors) & set(row))[0] + + aindex = row.index(_key) + index = aindex + index + + if index < len(row) : + value = row[index] + + if 'cast' in config and key in config['cast'] and value.strip() != '' : + if config['cast'][key] in ['float','int'] : + value = eval(config['cast'][key])(value) + elif hasattr(handler,config['cast'][key]): + pointer = getattr(handler,config['cast'][key]) + value = pointer(value) + else: + print ("Missing Pointer ",config['cast'][key]) + + # print (key,value) + + if type(value) == dict : + for objkey in value : + + if type(value[objkey]) == dict : + continue + if 'syn' in config and value[objkey] in config['syn'] : + value[objkey] = config['syn'][ value[objkey]] + + value = {key:value} if key not in value else value + + else: + if 'syn' in config and value in config['syn'] : + value = config['syn'][value] + if type(value) == dict : + + object_value = dict(object_value, **value) + else: + + object_value[key] = value + else: + # + # we are dealing with a complex object + object_value = [] + + for row_item in row : + + value = self.get.value(row_item,config,version) + object_value.append(value) + # + # We need to add the index of the object it matters in determining the claim types + # + + # object_value.append( list(get_map(row_item,config,version))) + # object_value = {label:object_value} + return object_value + def apply(self,content,_code,default_value) : + """ + :file content i.e a segment with the envelope + :_code 837 or 835 (helps get the appropriate configuration) + """ + util = Formatters() + claim = default_value.copy() + value = {} + for row in content[:] : + + row = util.split(row) + _info = util.get.config(self.config[_code][0],row) + + if _info : + try: + + tmp = self.get.value(row,_info) + + if not tmp : + continue + if 'label' in _info : + label = _info['label'] + + if type(tmp) == list : + + value[label] = tmp if label not in value else value[label] + tmp + + else: + if label not in value: + value[label] = [tmp] + elif len(list(tmp.keys())) == 1 : + # print "\t",len(claim[label]),tmp + index = len(value[label]) -1 + value[label][index] = dict(value[label][index],**tmp) + else: + value[label].append(tmp) + if len(value[label]) > 0 : + labels = [] + for item in value[label] : + item['_index'] = len(labels) + if item not in labels : + + labels.append(item) + value[label] = labels + elif 'field' in _info : + name = _info['field'] + value[name] = tmp + else: + + + value = dict(value,**tmp) + + pass + except Exception as e : + print ('__',e) + pass + + return dict(claim,**value) if value else {} + + def get_default_value(self,content,_code): + util = Formatters() + TOP_ROW = content[1].split('*') + CATEGORY= content[2].split('*')[1].strip() + VERSION = content[1].split('*')[-1].replace('~','').replace('\n','') + SUBMITTED_DATE = util.parse.date(TOP_ROW[4]) + SENDER_ID = TOP_ROW[2] + row = util.split(content[3]) + + _info = util.get_config(self.config[_code][0],row) + + value = self.get.value(row,_info,VERSION) if _info else {} + value['category'] = {"setid": CATEGORY,"version":'X'+VERSION.split('X')[1],"id":VERSION.split('X')[0].strip()} + value["submitted"] = SUBMITTED_DATE + # value['version'] = VERSION + if _code== '835' : + value['payer_id'] = SENDER_ID + else: + value['provider_id'] = SENDER_ID + return value + + def read(self,filename) : + """ + :formerly get_content + This function returns the of the EDI file parsed given the configuration specified. it is capable of identifying a file given the content + :section loop prefix (HL, CLP) + :config configuration with formatting rules, labels ... + :filename location of the file + """ + # section = section if section else config['SECTION'] + logs = [] + claims = [] + try: + file = open(filename.strip(),errors='ignore') + INITIAL_ROWS = list(islice(file,4)) #.readlines(4) + + if len(INITIAL_ROWS) == 1 : + file = INITIAL_ROWS[0].split('~') + INITIAL_ROWS = file[:4] + section = 'CLM' if INITIAL_ROWS[1].split('*')[1] == 'HC' else 'CLP' + _code = '837' if section == 'CLM' else '835' + DEFAULT_VALUE = self.get.default_value(INITIAL_ROWS,_code) + DEFAULT_VALUE['name'] = filename.strip() + # + # In the initial rows, there's redundant information (so much for x12 standard) + # index 1 identifies file type i.e CLM for claim and CLP for remittance + segment = [] + index = 0; + for row in file : + if row.startswith(section) and not segment: + + segment = [row] + continue + + elif segment: + + segment.append(row) + + if len(segment) > 1 and row.startswith(section): + # + # process the segment somewhere (create a thread maybe?) + # + default_claim = dict({"index":index},**DEFAULT_VALUE) + claim = self.apply(segment,_code,default_claim) + + claims.append(claim) + segment = [row] + index += 1 + + + pass + # + # Handling the last claim found + if segment[0].startswith(section) : + default_claim = dict({"name":index},**DEFAULT_VALUE) + + claim = self.apply(segment,_code,DEFAULT_VALUE) + claims.append(claim) + if type(file) != list : + file.close() + + # x12_file = open(filename.strip(),errors='ignore').read().split('\n') + except Exception as e: + logs.append ({"parse":"claims" if _code == '837' else 'remits',"completed":False,"name":filename,"msg":e.args[0]}) + return [],logs + + rate = 0 if len(claims) == 0 else (1 + index)/len(claims) + logs.append ({"parse":"claims" if _code == '837' else 'remits',"completed":True,"name":filename,"rate":rate}) + # self.finish(claims,logs,_code) + return claims,logs,_code + def run(self): + for filename in self.files : + content,logs,_code = self.read(filename) + self.finish(content,logs,_code) + def finish(self,content,logs,_code) : + args = self.store + _args = json.loads(json.dumps(self.store)) + if args['type'] == 'mongo.MongoWriter' : + args['args']['doc'] = 'claims' if _code == '837' else 'remits' + _args['args']['doc'] = 'logs' + if content : + writer = transport.factory.instance(**args) + writer.write(content) + writer.close() + if logs : + logger = transport.factory.instance(**_args) + logger.write(logs) + logger.close() + + + +# p = Parser('/home/steve/.healthcareio/config.json') +# p.set.files(['../../data/small/claims/ssiUB1122042711220427127438.clm_191122T043504']) +# path = '../../data/small/claims/ssiUB1122042711220427127438.clm_191122T043504' +# path = '../../data/small/claims/problems-with-procs' +# path = '../../data/small/remits/1SG03927258.dat_181018T074559' + +# _path = "../../data/small/remits/1TR21426701.dat_180703T074559" +# p.start() +# p.join() +# claims,logs = p.read(path) +# print (json.dumps(claims[3])) +# print (logs) \ No newline at end of file