#!/usr/bin/env python3 """ (c) 2019 Claims Toolkit, Health Information Privacy Lab, Vanderbilt University Medical Center Steve L. Nyemba Khanhly Nguyen This code is intended to process and parse healthcare x12 837 (claims) and x12 835 (remittances) into human readable JSON format. The claims/outpout can be forwarded to a NoSQL Data store like couchdb and mongodb Usage : Commandline : python edi-parser --scope --config --folder --store <[mongo|disk|couch]> -- with : --scope --config path of the x12 to be parsed i.e it could be 835, or 837 --folder location of the files (they must be decompressed) --store data store could be disk, mongodb, couchdb --db|path name of the folder to store the output or the database name Embedded in Code : import edi.parser import json file = '/data/claim_1.x12' conf = json.loads(open('config/837.json').read()) edi.parser.get_content(filename,conf) """ from params import SYS_ARGS from transport import factory import requests from parser import get_content import os import json import sys PATH = os.sep.join([os.environ['HOME'],'.healthcareio']) OUTPUT_FOLDER = os.sep.join([os.environ['HOME'],'healthcare-io']) INFO = None URL = "https://healthcareio.the-phi.com" if not os.path.exists(PATH) : os.mkdir(PATH) import platform import sqlite3 as lite # PATH = os.sep.join([os.environ['HOME'],'.edi-parser']) def register (**args) : """ :email user's email address :url url of the provider to register """ email = args['email'] url = args['url'] if 'url' in args else URL folders = [PATH,OUTPUT_FOLDER] for path in folders : if not os.path.exists(path) : os.mkdir(path) # # headers = {"email":email,"client":platform.node()} http = requests.session() r = http.post(url,headers=headers) # # store = {"type":"disk.DiskWriter","args":{"path":OUTPUT_FOLDER}} # if 'store' in args : # store = args['store'] filename = (os.sep.join([PATH,'config.json'])) info = r.json() #{"parser":r.json(),"store":store} info = dict({"owner":email},**info) info['store']['args']['path'] =os.sep.join([OUTPUT_FOLDER,'healthcare-io.db3']) #-- sql info['out-folder'] = OUTPUT_FOLDER file = open( filename,'w') file.write( json.dumps(info)) file.close() # # Create the sqlite3 database to def analytics(**args): """ This fucntion will only compute basic distributions of a given feature for a given claim @args @param x: vector of features to process @param apply: operation to be applied {dist} """ if args['apply'] in ['dist','distribution'] : """ This section of the code will return the distribution of a given space. It is intended to be applied on several claims/remits """ x = pd.DataFrame(args['x'],columns=['x']) return x.groupby(['x']).size().to_frame().T.to_dict(orient='record') def log(**args): """ This function will perform a log of anything provided to it """ pass def init(): """ read all the configuration from the """ filename = os.sep.join([PATH,'config.json']) info = None if os.path.exists(filename): file = open(filename) info = json.loads(file.read()) if not os.path.exists(info['out-folder']) : os.mkdir(info['out-folder']) if not os.path.exists(info['store']['args']['path']) : conn = lite.connect(info['store']['args']['path'],isolation_level=None) for key in info['schema'] : _sql = info['schema'][key]['create'] # r = conn.execute("select * from sqlite_master where name in ('claims','remits')") conn.execute(_sql) conn.commit() conn.close() return info # # Global variables that load the configuration files def parse(**args): """ This function will parse the content of a claim or remittance (x12 format) give the following parameters :filename absolute path of the file to be parsed :type claims|remits in x12 format """ global INFO if not INFO : INFO = init() if args['type'] == 'claims' : CONFIG = INFO['parser']['837'] elif args['type'] == 'remits' : CONFIG = INFO['parser']['835'] else: CONFIG = None if CONFIG : # CONFIG = CONFIG[-1] if 'version' not in args and (args['version'] < len(CONFIG)) else CONFIG[0] CONFIG = CONFIG[int(args['version'])-1] if 'version' in SYS_ARGS and int(SYS_ARGS['version']) < len(CONFIG) else CONFIG[-1] SECTION = CONFIG['SECTION'] os.environ['HEALTHCAREIO_SALT'] = INFO['owner'] return get_content(args['filename'],CONFIG,SECTION) def upgrade(**args): """ :email provide us with who you are :key upgrade key provided by the server for a given email """ url = args['url'] if 'url' in args else URL+"/upgrade" headers = {"key":args['key'],"email":args["email"],"url":url} if __name__ == '__main__' : info = init() if 'out-folder' in SYS_ARGS : OUTPUT_FOLDER = SYS_ARGS['out-folder'] if set(list(SYS_ARGS.keys())) & set(['signup','init']): # # This command will essentially get a new copy of the configurations # @TODO: Tie the request to a version ? # email = SYS_ARGS['signup'].strip() if 'signup' in SYS_ARGS else SYS_ARGS['init'] url = SYS_ARGS['url'] if 'url' in SYS_ARGS else 'https://healthcareio.the-phi.com' register(email=email,url=url) # else: # m = """ # usage: # healthcareio --signup --email myemail@provider.com [--url ] # """ # print (m) elif 'upgrade' in SYS_ARGS : # # perform an upgrade i.e some code or new parsers information will be provided # pass elif 'parse' in SYS_ARGS and info: """ In this section of the code we are expecting the user to provide : :folder location of the files to process or file to process : """ files = [] if 'file' in SYS_ARGS : files = [SYS_ARGS['file']] if not os.path.isdir(SYS_ARGS['file']) else [] if 'folder' in SYS_ARGS and os.path.exists(SYS_ARGS['folder']): names = os.listdir(SYS_ARGS['folder']) files += [os.sep.join([SYS_ARGS['folder'],name]) for name in names if not os.path.isdir(os.sep.join([SYS_ARGS['folder'],name]))] else: # # raise an erro pass # # @TODO: Log this here so we know what is being processed or not SCOPE = None if files and ('claims' in SYS_ARGS['parse'] or 'remits' in SYS_ARGS['parse']): # _map = {'claims':'837','remits':'835'} # key = _map[SYS_ARGS['parse']] # CONFIG = info['parser'][key] # if 'version' in SYS_ARGS and int(SYS_ARGS['version']) < len(CONFIG) : # CONFIG = CONFIG[ int(SYS_ARGS['version'])] # else: # CONFIG = CONFIG[-1] if info['store']['type'] == 'disk.DiskWriter' : info['store']['args']['path'] += (os.sep + 'healthcare-io.json') elif info['store']['type'] == 'disk.SQLiteWriter' : # info['store']['args']['path'] += (os.sep + 'healthcare-io.db3') pass info['store']['args']['table'] = SYS_ARGS['parse'].strip().lower() writer = factory.instance(**info['store']) logger = factory.instance(type='disk.DiskWriter',args={'path':os.sep.join([info['out-folder'],SYS_ARGS['parse']+'.log'])}) #logger = factory.instance(type='mongo.MongoWriter',args={'db':'healthcareio','doc':SYS_ARGS['parse']+'_logs'}) # schema = info['schema'] # for key in schema : # sql = schema[key]['create'] # writer.write(sql) for filename in files : if filename.strip() == '': continue # content,logs = get_content(filename,CONFIG,CONFIG['SECTION']) # try: content,logs = parse(filename = filename,type=SYS_ARGS['parse']) if content : writer.write(content) if logs : [logger.write(_row) for _row in logs] else: logger.write({"name":filename,"completed":True,"rows":len(content)}) except Exception as e: logger.write({"filename":filename,"completed":False,"rows":-1}) # print ([filename,len(content)]) # # @TODO: forward this data to the writer and log engine # pass elif 'export' in SYS_ARGS: # # this function is designed to export the data to csv # format = SYS_ARGS['format'] if 'format' in SYS_ARGS else 'csv' format = format.lower() if set([format]) not in ['xls','csv'] : format = 'csv' pass # """ # The program was called from the command line thus we are expecting # parse in [claims,remits] # config os.sep.path.exists(path) # folder os.sep.path.exists(path) # store store () # """ # p = len( set(['store','config','folder']) & set(SYS_ARGS.keys())) == 3 and ('db' in SYS_ARGS or 'path' in SYS_ARGS) # TYPE = { # 'mongo':'mongo.MongoWriter', # 'couch':'couch.CouchWriter', # 'disk':'disk.DiskWriter' # } # INFO = { # '837':{'scope':'claims','section':'HL'}, # '835':{'scope':'remits','section':'CLP'} # } # if p : # args = {} # scope = SYS_ARGS['config'][:-5].split(os.sep)[-1] # CONTEXT = INFO[scope]['scope'] # # # # @NOTE: # # improve how database and data stores are handled. # if SYS_ARGS['store'] == 'couch' : # args = {'url': SYS_ARGS['url'] if 'url' in SYS_ARGS else 'http://localhost:5984'} # args['dbname'] = SYS_ARGS['db'] # elif SYS_ARGS ['store'] == 'mongo': # args = {'host':SYS_ARGS['host']if 'host' in SYS_ARGS else 'localhost:27017'} # if SYS_ARGS['store'] in ['mongo','couch']: # args['dbname'] = SYS_ARGS['db'] if 'db' in SYS_ARGS else 'claims_outcomes' # args['doc'] = CONTEXT # TYPE = TYPE[SYS_ARGS['store']] # writer = factory.instance(type=TYPE,args=args) # if SYS_ARGS['store'] == 'disk': # writer.init(path = 'output-claims.json') # logger = factory.instance(type=TYPE,args= dict(args,**{"doc":"logs"})) # files = os.listdir(SYS_ARGS['folder']) # CONFIG = json.loads(open(SYS_ARGS['config']).read()) # SECTION = INFO[scope]['section'] # for file in files : # if 'limit' in SYS_ARGS and files.index(file) == int(SYS_ARGS['limit']) : # break # else: # filename = os.sep.join([SYS_ARGS['folder'],file]) # try: # content,logs = get_content(filename,CONFIG,SECTION) # except Exception as e: # if sys.version_info[0] > 2 : # logs = [{"filename":filename,"msg":e.args[0]}] # else: # logs = [{"filename":filename,"msg":e.message}] # content = None # if content : # writer.write(content) # if logs: # logger.write(logs) # pass # else: # print (__doc__)