""" (c) 2019 Healthcare/IO 1.0 Vanderbilt University Medical Center, Health Information Privacy Laboratory https://hiplab.mc.vanderbilt.edu/healthcareio Authors: Khanhly Nguyen, Steve L. Nyemba License: MIT, terms are available at https://opensource.org/licenses/MIT This parser was originally written by Khanhly Nguyen for her internship and is intended to parse x12 835,837 and others provided the appropriate configuration USAGE : - COMMAND LINE - EMBEDDED """ # import hashlib # import json # import os # import sys # # version 2.0 # # import util # # from parser import X12Parser # #-- end # from itertools import islice # from multiprocessing import Process # import transport # from transport import providers # import jsonmerge # # import plugins # import copy # class void : # pass # class Formatters : # def __init__(self): # # self.config = config # self.get = void() # self.get.config = self.get_config # self.parse = void() # self.parse.sv3 = self.sv3 # self.parse.sv2 = self.sv2 # self.sv2_parser = self.sv2 # self.sv3_parser = self.sv3 # self.sv3_parse = self.sv3 # self.format_proc = self.procedure # self.format_diag = self.diagnosis # self.parse.procedure = self.procedure # self.parse.diagnosis = self.diagnosis # self.parse.date = self.date # self.format_date = self.date # self.format_pos = self.pos # self.format_time = self.time # def split(self,row,sep='*',prefix='HI') : # """ # This function is designed to split an x12 row and # """ # value = [] # if row.startswith(prefix) is False: # for row_value in row.replace('~','').split(sep) : # if '>' in row_value and not row_value.startswith('HC'): # # if row_value.startswith('HC') or row_value.startswith('AD'): # if row_value.startswith('AD'): # value += row_value.split('>')[:2] # pass # else: # value += [row_value] # # value += row_value.split('>') if row.startswith('CLM') is False else [row_value] # else : # value.append(row_value.replace('\n','')) # value = [xchar.replace('\r','') for xchar in value] #row.replace('~','').split(sep) # else: # value = [ [prefix]+ self.split(item,'>') for item in row.replace('~','').split(sep)[1:] ] # return value if type(value) == list and type(value[0]) != list else value[0] # def get_config(self,config,row): # """ # This function will return the meaningfull parts of the configuration for a given item # """ # _row = list(row) if type(row[0]) == str else list(row[0]) # _info = config[_row[0]] if _row[0] in config else {} # _rinfo = {} # key = None # if '@ref' in _info: # keys = list(set(_row) & set(_info['@ref'].keys())) # if keys : # _rinfo = {} # for key in keys : # _rinfo = jsonmerge.merge(_rinfo,_info['@ref'][key]) # return _rinfo # # key = key[0] # # return _info['@ref'][key] # else: # return {} # if not _info and 'SIMILAR' in config: # # # # Let's look for the nearest key using the edit distance # if _row[0] in config['SIMILAR'] : # key = config['SIMILAR'][_row[0]] # _info = config[key] # return _info # def hash(self,value): # salt = os.environ['HEALTHCAREIO_SALT'] if 'HEALTHCAREIO_SALT' in os.environ else '' # _value = str(value)+ salt # if sys.version_info[0] > 2 : # return hashlib.md5(_value.encode('utf-8')).hexdigest() # else: # return hashlib.md5(_value).hexdigest() # def suppress (self,value): # return 'N/A' # def date(self,value): # value = value if type(value) != list else "-".join(value) # if len(value) > 8 or '-' in value: # # # # This is the case of a thru date i.e the first part should be provided in a 435 entry # # # fdate = "-".join([value[:8][:4],value[:8][4:6],value[:8][6:8]]) # tdate = "-".join([value[9:][:4],value[9:][4:6],value[9:][6:8]]) # return {"from":fdate,"to":tdate} # if len(value) == 8 : # year = value[:4] # month = value[4:6] # day = value[6:] # return "-".join([year,month,day])[:10] #{"year":year,"month":month,"day":day} # elif len(value) == 6 : # year = '20' + value[:2] # month = value[2:4] # day = value[4:] # elif value.isnumeric() and len(value) >= 10: # # # # Here I a will assume we have a numeric vale # year = value[:4] # month= value[4:6] # day = value[6:8] # else: # # # # We have a date formatting issue # return value # return "-".join([year,month,day]) # def time(self,value): # pass # def sv3(self,value): # if '>' in value [1]: # terms = value[1].split('>') # return {'type':terms[0],'code':terms[1],"amount":float(value[2])} # else: # return {"code":value[2],"type":value[1],"amount":float(value[3])} # def sv2(self,value): # # # # @TODO: Sometimes there's a suffix (need to inventory all the variations) # # # if '>' in value or ':' in value: # xchar = '>' if '>' in value else ':' # _values = value.split(xchar) # modifier = {} # if len(_values) > 2 : # modifier= {"code":_values[2]} # if len(_values) > 3 : # modifier['type'] = _values[3] # _value = {"code":_values[1],"type":_values[0]} # if modifier : # _value['modifier'] = modifier # return _value # else: # return value # def procedure(self,value): # for xchar in [':','<','|','>'] : # if xchar in value and len(value.split(xchar)) > 1 : # #_value = {"type":value.split(':')[0].strip(),"code":value.split(':')[1].strip()} # _value = {"type":value.split(xchar)[0].strip(),"code":value.split(xchar)[1].strip()} # if len(value.split(xchar)) >2 : # index = 1; # for modifier in value.split(xchar)[2:] : # _value['modifier_'+str(index)] = modifier # index += 1 # break # else: # _value = str(value) # return _value # def diagnosis(self,value): # return [ {"code":item[2], "type":item[1]} for item in value if len(item) > 1] # def parse_loc(self,value): # if ':' in value : # return dict(zip(['place_of_service','claim_indicator','claim_frequency'],value.split(':'))) # def pos(self,value): # """ # formatting place of service information within a segment (REF) # @TODO: In order to accomodate the other elements they need to be specified in the configuration # Otherwise it causes problems on export # """ # xchar = '>' if '>' in value else ':' # x = value.split(xchar) # x = {"place_of_service":x[0],"indicator":x[1],"frequency":x[2]} if len(x) == 3 else {"place_of_service":x[0],"indicator":None,"frequency":None} # return x # class Parser (Process): # @staticmethod # def setup (path): # # self.config = _config['parser'] # config = json.loads(open(path).read()) # _config = config['parser'] # # # # The parser may need some editing provided, this allows ease of developement and using alternate configurations # # # if type(_config['837']) == str or type(_config['835']) == str : # for _id in ['837','835'] : # if type(_config[_id]) == str and os.path.exists(_config[_id]): # _config[_id] = json.loads(open(_config[_id]).read()) # if type(_config[_id]) == dict : # _config[_id] = [_config[_id]] # config['parser'] = _config # return config # @staticmethod # def init(**_args): # """ # This function allows to initialize the database that will store the claims if need be # :path configuration file # """ # PATH = os.sep.join([os.environ['HOME'],'.healthcareio']) # filename = os.sep.join([PATH,'config.json']) # filename = _args['path'] if 'path' in _args else filename # info = None # if os.path.exists(filename): # # # # Loading the configuration file (JSON format) # file = open(filename) # info = json.loads(file.read()) # OUTPUT_FOLDER = info['out-folder'] # if 'output-folder' not in info and not os.path.exists(OUTPUT_FOLDER) : # os.mkdir(OUTPUT_FOLDER) # elif 'output-folder' in info and not os.path.exists(info['out-folder']) : # os.mkdir(info['out-folder']) # # if 'type' in info['store'] : # lwriter = None # IS_SQL = False # if'type' in info['store'] and info['store']['type'] == 'disk.SQLiteWriter' : # lwriter = transport.factory.instance(**info['store']) # IS_SQL = True # elif 'provider' in info['store'] and info['store']['provider'] == 'sqlite' : # lwriter = transport.instance(**info['store']) ; # IS_SQL = [providers.SQLITE,providers.POSTGRESQL,providers.NETEZZA,providers.MYSQL,providers.MARIADB] # if lwriter and IS_SQL: # for key in info['schema'] : # if key != 'logs' : # _id = 'claims' if key == '837' else 'remits' # else: # _id = key # if not lwriter.has(table=_id) : # lwriter.apply(info['schema'][key]['create']) # # [lwriter.apply( info['schema'][key]['create']) for key in info['schema'] if not lwriter.has(table=key)] # lwriter.close() # return info # def __init__(self,path): # """ # :path path of the configuration file (it can be absolute) # """ # Process.__init__(self) # self.utils = Formatters() # self.get = void() # self.get.value = self.get_map # self.get.default_value = self.get_default_value # # _config = json.loads(open(path).read()) # self._custom_config = self.get_custom(path) # # self.config = _config['parser'] # # # # # # The parser may need some editing provided, this allows ease of developement and using alternate configurations # # # # # if type(self.config['837']) == str or type(self.config['835']) == str : # # for _id in ['837','835'] : # # if type(self.config[_id]) == str: # # self.config[_id] = json.loads(open(self.config[_id]).read()) # # if type(self.config[_id]) == dict : # # self.config[_id] = [self.config[_id]] # _config = Parser.setup(path) # self.config = _config['parser'] # self.store = _config['store'] # self.cache = {} # self.files = [] # self.set = void() # self.set.files = self.set_files # self.emit = void() # self.emit.pre = None # self.emit.post = None # def get_custom(self,path) : # """ # :path path of the configuration file (it can be absolute) # """ # # # # # _path = path.replace('config.json','') # if _path.endswith(os.sep) : # _path = _path[:-1] # _config = {} # _path = os.sep.join([_path,'custom']) # if os.path.exists(_path) : # files = os.listdir(_path) # if files : # fullname = os.sep.join([_path,files[0]]) # _config = json.loads ( (open(fullname)).read() ) # return _config # def set_files(self,files): # self.files = files # def get_map(self,row,config,version=None): # # label = config['label'] if 'label' in config else None # handler = Formatters() # if 'map' not in config and hasattr(handler,config['apply']): # pointer = getattr(handler,config['apply']) # object_value = pointer(row) # return object_value # # # # Pull the goto configuration that skips rows # # # omap = config['map'] if not version or version not in config else config[version] # anchors = config['anchors'] if 'anchors' in config else [] # rewrite = config['rewrite'] if 'rewrite' in config else {} # if len(row) == 2 and row[0] == 'HI' : # row = ([row[0]] + row[1].split(':')) # if type(row[0]) == str: # object_value = {} # for key in omap : # index = omap[key] # if anchors and set(anchors) & set(row): # _key = list(set(anchors) & set(row))[0] # aindex = row.index(_key) # index = aindex + index # if index < len(row) : # value = row[index] # if 'cast' in config and key in config['cast'] and value.strip() != '' : # if config['cast'][key] in ['float','int']: # try: # value = eval(config['cast'][key])(value) # except Exception as e: # pass # # # # Sometimes shit hits the fan when the anchor is missing # # This is typical but using the hardened function helps circumvent this (SV2,SV3) # # # elif hasattr(handler,config['cast'][key]): # pointer = getattr(handler,config['cast'][key]) # value = pointer(value) # else: # print ("Missing Pointer ",key,config['cast']) # if type(value) == dict : # for objkey in value : # if type(value[objkey]) == dict : # continue # if 'syn' in config and value[objkey] in config['syn'] : # # value[objkey] = config['syn'][ value[objkey]] # pass # if key in rewrite : # _key = rewrite[key] # if _key in value : # value = value[_key] # else: # value = "" # value = {key:value} if key not in value else value # else: # if 'syn' in config and value in config['syn'] : # # value = config['syn'][value] # pass # if type(value) == dict : # object_value = jsonmerge.merge(object_value, value) # else: # object_value[key] = value # else: # # # # we are dealing with a complex object # object_value = [] # for row_item in row : # value = self.get.value(row_item,config,version) # object_value.append(value) # return object_value # def set_cache(self,tmp,_info) : # """ # insert into cache a value that the, these are in reference to a loop # """ # if 'cache' in _info : # key = _info['cache']['key'] # value=_info['cache']['value'] # field = _info['cache']['field'] # if value in tmp : # self.cache [key] = {field:tmp[value]} # pass # def get_cache(self,row) : # """ # retrieve cache element for a current # """ # key = row[0] # return self.cache[key] if key in self.cache else {} # def apply(self,content,_code) : # """ # :content content of a file i.e a segment with the envelope # :_code 837 or 835 (helps get the appropriate configuration) # """ # util = Formatters() # # header = default_value.copy() # value = {} # for row in content[:] : # row = util.split(row.replace('\n','').replace('~','')) # _info = util.get.config(self.config[_code][0],row) # if self._custom_config and _code in self._custom_config: # _cinfo = util.get.config(self._custom_config[_code],row) # else: # _cinfo = {} # if _info or _cinfo: # try: # _info = jsonmerge.merge(_info,_cinfo) # tmp = self.get.value(row,_info) # if not tmp : # continue # # # # At this point we have the configuration and the row parsed into values # # We should check to see if we don't have anything in the cache to be added to it # # # if row[0] in self.cache : # tmp = jsonmerge.merge(tmp,self.get_cache(row)) # if 'label' in _info : # label = _info['label'] # if type(tmp) == list : # value[label] = tmp if label not in value else value[label] + tmp # else: # # if 'DTM' in row : # # print ([label,tmp,label in value]) # if label not in value : # value[label] = [] # value[label].append(tmp) # # if label not in value: # # value[label] = [tmp] # # else: # # value[label].append(tmp) # if '_index' not in tmp : # # # # In case we asked it to be overriden, then this will not apply # # X12 occasionally requires references to other elements in a loop (alas) # # # tmp['_index'] = len(value[label]) -1 # elif 'field' in _info : # name = _info['field'] # # value[name] = tmp # # value = jsonmerge.merge(value,{name:tmp}) # if name not in value : # value = dict(value,**{name:tmp}) # else: # value[name] = dict(value[name],**tmp) # else: # value = dict(value,**tmp) # pass # except Exception as e : # print (e.args[0]) # # print ('__',(dir(e.args))) # pass # # # # At this point the object is completely built, # # if there ar any attributes to be cached it will be done here # # # if 'cache' in _info : # self.set_cache(tmp,_info) # return value if value else {} # def get_default_value(self,content,_code): # util = Formatters() # TOP_ROW = content[1].split('*') # SUBMITTED_DATE = util.parse.date(TOP_ROW[4]) # CATEGORY= content[2].split('*')[1].strip() # VERSION = content[1].split('*')[-1].replace('~','').replace('\n','') # SENDER_ID = TOP_ROW[2] # row = util.split(content[3]) # _info = util.get_config(self.config[_code][0],row) # value = self.get.value(row,_info,VERSION) if _info else {} # value['category'] = {"setid": _code,"version":'X'+VERSION.split('X')[1],"id":VERSION.split('X')[0].strip()} # value["submitted"] = SUBMITTED_DATE # value['sender_id'] = SENDER_ID # # value = dict(value,**self.apply(content,_code)) # value = jsonmerge.merge(value,self.apply(content,_code)) # # Let's parse this for default values # return value #jsonmerge.merge(value,self.apply(content,_code)) # def read(self,filename) : # """ # :formerly get_content # This function returns the of the EDI file parsed given the configuration specified. it is capable of identifying a file given the content # :section loop prefix (HL, CLP) # :config configuration with formatting rules, labels ... # :filename location of the file # """ # # section = section if section else config['SECTION'] # logs = [] # claims = [] # _code = 'UNKNOWN' # try: # self.cache = {} # file = open(filename.strip()) # file = file.read().split('CLP') # _code = '835' # section = 'CLP' # if len(file) == 1 : # file = file[0].split('CLM') #.split('HL') # _code = '837' # section = 'CLM' #'HL' # INITIAL_ROWS = file[0].split(section)[0].split('\n') # if len(INITIAL_ROWS) == 1 : # INITIAL_ROWS = INITIAL_ROWS[0].split('~') # # for item in file[1:] : # # item = item.replace('~','\n') # # print (INITIAL_ROWS) # DEFAULT_VALUE = self.get.default_value(INITIAL_ROWS,_code) # DEFAULT_VALUE['name'] = filename.strip() # file = section.join(file).split('\n') # if len(file) == 1: # file = file[0].split('~') # # # # In the initial rows, there's redundant information (so much for x12 standard) # # index 1 identifies file type i.e CLM for claim and CLP for remittance # segment = [] # index = 0; # _toprows = [] # _default = None # for row in file : # row = row.replace('\r','') # # if not segment and not row.startswith(section): # # _toprows += [row] # if row.startswith(section) and not segment: # segment = [row] # continue # elif segment and not row.startswith(section): # segment.append(row) # if len(segment) > 1 and row.startswith(section): # # # # process the segment somewhere (create a thread maybe?) # # # _claim = self.apply(segment,_code) # if _claim : # _claim['index'] = index #len(claims) # # claims.append(dict(DEFAULT_VALUE,**_claim)) # # # # schema = [ {key:{"mergeStrategy":"append" if list( type(_claim[key])) else "overwrite"}} for key in _claim.keys()] # if type(_claim[key]) == list] # # _schema = set(DEFAULT_VALUE.keys()) - schema # # if schema : # # schema = {"properties":dict.fromkeys(schema,{"mergeStrategy":"append"})} # # else: # # schema = {"properties":{}} # # schema = jsonmerge.merge(schema['properties'],dict.fromkeys(_schema,{"mergeStrategy":"overwrite"})) # schema = {"properties":{}} # for attr in _claim.keys() : # schema['properties'][attr] = {"mergeStrategy": "append" if type(_claim[attr]) == list else "overwrite" } # merger = jsonmerge.Merger(schema) # _baseclaim = None # _baseclaim = merger.merge(_baseclaim,copy.deepcopy(DEFAULT_VALUE)) # _claim = merger.merge(_baseclaim,_claim) # # _claim = merger.merge(DEFAULT_VALUE.copy(),_claim) # claims.append( _claim) # segment = [row] # index += 1 # pass # # # # Handling the last claim found # if segment and segment[0].startswith(section) : # # default_claim = dict({"name":index},**DEFAULT_VALUE) # claim = self.apply(segment,_code) # if claim : # claim['index'] = len(claims) # # schema = [key for key in claim.keys() if type(claim[key]) == list] # # if schema : # # schema = {"properties":dict.fromkeys(schema,{"mergeStrategy":"append"})} # # else: # # print (claim.keys()) # # schema = {} # # # # @TODO: Fix merger related to schema (drops certain fields ... NOT cool) # # merger = jsonmerge.Merger(schema) # # top_row_claim = self.apply(_toprows,_code) # # claim = merger.merge(claim,self.apply(_toprows,_code)) # # claims.append(dict(DEFAULT_VALUE,**claim)) # schema = {"properties":{}} # for attr in claim.keys() : # schema['properties'][attr] = {"mergeStrategy": "append" if type(claim[attr]) == list else "overwrite" } # merger = jsonmerge.Merger(schema) # _baseclaim = None # _baseclaim = merger.merge(_baseclaim,copy.deepcopy(DEFAULT_VALUE)) # claim = merger.merge(_baseclaim,claim) # claims.append(claim) # # claims.append(merger.merge(DEFAULT_VALUE.copy(),claim)) # if type(file) != list : # file.close() # # x12_file = open(filename.strip(),errors='ignore').read().split('\n') # except Exception as e: # logs.append ({"parse":_code,"completed":False,"name":filename,"msg":e.args[0]}) # return [],logs,None # rate = 0 if len(claims) == 0 else (1 + index)/len(claims) # logs.append ({"parse":"claims" if _code == '837' else 'remits',"completed":True,"name":filename,"rate":rate}) # # self.finish(claims,logs,_code) # return claims,logs,_code # def run(self): # if self.emit.pre : # self.emit.pre() # for filename in self.files : # content,logs,_code = self.read(filename) # self.finish(content,logs,_code) # def finish(self,content,logs,_code) : # args = self.store # _args = json.loads(json.dumps(self.store)) # ISNEW_MONGO = 'provider' in args and args['provider'] in ['mongo', 'mongodb'] # ISLEG_MONGO = ('type' in args and args['type'] == 'mongo.MongoWriter') # if ISLEG_MONGO or ISNEW_MONGO: # if ISLEG_MONGO: # # Legacy specification ... # args['args']['doc'] = 'claims' if _code == '837' else 'remits' # _args['args']['doc'] = 'logs' # else: # args['doc'] = 'claims' if _code == '837' else 'remits' # _args['doc'] = 'logs' # else: # if 'type' in args : # # Legacy specification ... # args['args']['table'] = 'claims' if _code == '837' else 'remits' # _args['args']['table'] = 'logs' # table = args['args']['table'] # else: # args['table']= 'claims' if _code == '837' else 'remits' # _args['table'] = 'logs' # table = args['table'] # writer = transport.factory.instance(**args) # IS_SQLITE = type(writer) == transport.disk.SQLiteWriter # if content: # if IS_SQLITE : # for row in content : # writer.apply("""insert into :table(data) values (':values')""".replace(":values",json.dumps(row)).replace(":table",table) ) # else: # writer.write(content) # writer.close() # if logs : # logger = transport.factory.instance(**_args) # if IS_SQLITE: # for row in logs: # logger.apply("""insert into logs values (':values')""".replace(":values",json.dumps(row))) # else: # logger.write(logs) # logger.close() # if self.emit.post : # self.emit.post(content,logs)