parser/healthcareio/healthcare-io.py

454 lines
18 KiB
Python

#!/usr/bin/env python3
"""
(c) 2019 Claims Toolkit,
Health Information Privacy Lab, Vanderbilt University Medical Center
Steve L. Nyemba <steve.l.nyemba@vanderbilt.edu>
Khanhly Nguyen <khanhly.t.nguyen@gmail.com>
This code is intended to process and parse healthcare x12 837 (claims) and x12 835 (remittances) into human readable JSON format.
The claims/outpout can be forwarded to a NoSQL Data store like couchdb and mongodb
Usage :
Commandline :
python edi-parser --scope --config <path> --folder <path> --store <[mongo|disk|couch]> --<db|path]> <id|path>
with :
--scope <claims|remits>
--config path of the x12 to be parsed i.e it could be 835, or 837
--folder location of the files (they must be decompressed)
--store data store could be disk, mongodb, couchdb
--db|path name of the folder to store the output or the database name
Embedded in Code :
import edi.parser
import json
file = '/data/claim_1.x12'
conf = json.loads(open('config/837.json').read())
edi.parser.get_content(filename,conf)
"""
from healthcareio.params import SYS_ARGS
from transport import factory
import requests
from healthcareio import analytics
from healthcareio import server
from healthcareio.parser import get_content
import os
import json
import sys
import numpy as np
from multiprocessing import Process
import time
from healthcareio import x12
import smart
from healthcareio.server import proxy
import pandas as pd
PATH = os.sep.join([os.environ['HOME'],'.healthcareio'])
OUTPUT_FOLDER = os.sep.join([os.environ['HOME'],'healthcare-io'])
INFO = None
URL = "https://healthcareio.the-phi.com"
if not os.path.exists(PATH) :
os.mkdir(PATH)
import platform
import sqlite3 as lite
# PATH = os.sep.join([os.environ['HOME'],'.edi-parser'])
def register (**args) :
"""
:email user's email address
:url url of the provider to register
"""
email = args['email']
url = args['url'] if 'url' in args else URL
folders = [PATH,OUTPUT_FOLDER]
for path in folders :
if not os.path.exists(path) :
os.mkdir(path)
#
#
store = args['store'] if 'store' in args else 'sqlite'
headers = {"email":email,"client":platform.node(),"store":store,"db":args['db']}
http = requests.session()
r = http.post(url,headers=headers)
#
# store = {"type":"disk.DiskWriter","args":{"path":OUTPUT_FOLDER}}
# if 'store' in args :
# store = args['store']
filename = (os.sep.join([PATH,'config.json']))
info = r.json() #{"parser":r.json(),"store":store}
info = dict({"owner":email},**info)
info['store']['args']['path'] =os.sep.join([OUTPUT_FOLDER,'healthcare-io.db3']) #-- sql
info['out-folder'] = OUTPUT_FOLDER
file = open( filename,'w')
file.write( json.dumps(info))
file.close()
#
# Create the sqlite3 database to
def log(**args):
"""
This function will perform a log of anything provided to it
"""
pass
def init():
"""
read all the configuration from the
"""
filename = os.sep.join([PATH,'config.json'])
info = None
if os.path.exists(filename):
file = open(filename)
info = json.loads(file.read())
if not os.path.exists(info['out-folder']) :
os.mkdir(info['out-folder'])
if info['store']['type'] == 'disk.SQLiteWriter' and not os.path.exists(info['store']['args']['path']) :
conn = lite.connect(info['store']['args']['path'],isolation_level=None)
for key in info['schema'] :
_sql = info['schema'][key]['create']
# r = conn.execute("select * from sqlite_master where name in ('claims','remits')")
conn.execute(_sql)
conn.commit()
conn.close()
return info
#
# Global variables that load the configuration files
def parse(**args):
"""
This function will parse the content of a claim or remittance (x12 format) give the following parameters
:filename absolute path of the file to be parsed
:type claims|remits in x12 format
"""
global INFO
if not INFO :
INFO = init()
if args['type'] == 'claims' :
CONFIG = INFO['parser']['837']
elif args['type'] == 'remits' :
CONFIG = INFO['parser']['835']
else:
CONFIG = None
if CONFIG :
# CONFIG = CONFIG[-1] if 'version' not in args and (args['version'] < len(CONFIG)) else CONFIG[0]
CONFIG = CONFIG[int(args['version'])-1] if 'version' in SYS_ARGS and int(SYS_ARGS['version']) < len(CONFIG) else CONFIG[-1]
SECTION = CONFIG['SECTION']
os.environ['HEALTHCAREIO_SALT'] = INFO['owner']
return get_content(args['filename'],CONFIG,SECTION)
def resume (files,id,config):
_args = config['store'].copy()
if 'mongo' in config['store']['type'] :
_args['type'] = 'mongo.MongoReader'
reader = factory.instance(**_args)
_files = []
if 'resume' in config['analytics'] :
_args = config['analytics']['resume'][id]
_files = reader.read(**_args)
_files = [item['name'] for item in _files if item['name'] != None]
return list(set(files) - set(_files))
return files
pass
def apply(files,store_info,logger_info=None):
"""
:files list of files to be processed in this given thread/process
:store_info information about data-store, for now disk isn't thread safe
:logger_info information about where to store the logs
"""
if not logger_info :
logger = factory.instance(type='disk.DiskWriter',args={'path':os.sep.join([info['out-folder'],SYS_ARGS['parse']+'.log'])})
else:
logger = factory.instance(**logger_info)
writer = factory.instance(**store_info)
for filename in files :
if filename.strip() == '':
continue
# content,logs = get_content(filename,CONFIG,CONFIG['SECTION'])
#
try:
content,logs = parse(filename = filename,type=SYS_ARGS['parse'])
if content :
writer.write(content)
if logs :
[logger.write(dict(_row,**{"parse":SYS_ARGS['parse']})) for _row in logs]
else:
logger.write({"parse":SYS_ARGS['parse'],"name":filename,"completed":True,"rows":len(content)})
except Exception as e:
logger.write({"parse":SYS_ARGS['parse'],"filename":filename,"completed":False,"rows":-1,"msg":e.args[0]})
# print ([filename,len(content)])
#
# @TODO: forward this data to the writer and log engine
#
def upgrade(**args):
"""
:email provide us with who you are
:key upgrade key provided by the server for a given email
"""
url = args['url'] if 'url' in args else URL+"/upgrade"
headers = {"key":args['key'],"email":args["email"],"url":url}
if __name__ == '__main__' :
info = init()
if 'out-folder' in SYS_ARGS :
OUTPUT_FOLDER = SYS_ARGS['out-folder']
if set(list(SYS_ARGS.keys())) & set(['signup','init']):
#
# This command will essentially get a new copy of the configurations
# @TODO: Tie the request to a version ?
#
email = SYS_ARGS['signup'].strip() if 'signup' in SYS_ARGS else SYS_ARGS['init']
url = SYS_ARGS['url'] if 'url' in SYS_ARGS else 'https://healthcareio.the-phi.com'
store = SYS_ARGS['store'] if 'store' in SYS_ARGS else 'sqlite'
db='healthcareio' if 'db' not in SYS_ARGS else SYS_ARGS['db']
register(email=email,url=url,store=store,db=db)
# else:
# m = """
# usage:
# healthcareio --signup --email myemail@provider.com [--url <host>]
# """
# print (m)
elif 'upgrade' in SYS_ARGS :
#
# perform an upgrade i.e some code or new parsers information will be provided
#
pass
elif 'parse' in SYS_ARGS and info:
"""
In this section of the code we are expecting the user to provide :
:folder location of the files to process or file to process
:
"""
files = []
if 'file' in SYS_ARGS :
files = [SYS_ARGS['file']] if not os.path.isdir(SYS_ARGS['file']) else []
if 'folder' in SYS_ARGS and os.path.exists(SYS_ARGS['folder']):
names = os.listdir(SYS_ARGS['folder'])
files += [os.sep.join([SYS_ARGS['folder'],name]) for name in names if not os.path.isdir(os.sep.join([SYS_ARGS['folder'],name]))]
else:
#
# raise an erro
pass
#
# if the user has specified to resume, we should look into the logs and pull the files processed and those that haven't
#
if 'resume' in SYS_ARGS :
store_config = json.loads( (open(os.sep.join([PATH,'config.json']))).read() )
files = proxy.get.resume(files,store_config )
print (["Found ",len(files)," files unprocessed"])
#
# @TODO: Log this here so we know what is being processed or not
SCOPE = None
if files : #and ('claims' in SYS_ARGS['parse'] or 'remits' in SYS_ARGS['parse']):
# logger = factory.instance(type='disk.DiskWriter',args={'path':os.sep.join([info['out-folder'],SYS_ARGS['parse']+'.log'])})
# if info['store']['type'] == 'disk.DiskWriter' :
# info['store']['args']['path'] += (os.sep + 'healthcare-io.json')
# elif info['store']['type'] == 'disk.SQLiteWriter' :
# # info['store']['args']['path'] += (os.sep + 'healthcare-io.db3')
# pass
# if info['store']['type'] == 'disk.SQLiteWriter' :
# info['store']['args']['table'] = SYS_ARGS['parse'].strip().lower()
# _info = json.loads(json.dumps(info['store']))
# _info['args']['table']='logs'
# else:
# #
# # if we are working with no-sql we will put the logs in it (performance )?
# info['store']['args']['doc'] = SYS_ARGS['parse'].strip().lower()
# _info = json.loads(json.dumps(info['store']))
# _info['args']['doc'] = 'logs'
# logger = factory.instance(**_info)
# writer = factory.instance(**info['store'])
#
# we need to have batches ready for this in order to run some of these queries in parallel
# @TODO: Make sure it is with a persistence storage (not disk .. not thread/process safe yet)
# - Make sure we can leverage this on n-cores later on, for now the assumption is a single core
#
BATCH_COUNT = 1 if 'batch' not in SYS_ARGS else int (SYS_ARGS['batch'])
files = np.array_split(files,BATCH_COUNT)
procs = []
index = 0
for row in files :
row = row.tolist()
# logger.write({"process":index,"parse":SYS_ARGS['parse'],"file_count":len(row)})
# proc = Process(target=apply,args=(row,info['store'],_info,))
parser = x12.Parser(os.sep.join([PATH,'config.json']))
parser.set.files(row)
parser.start()
procs.append(parser)
# index = index + 1
while len(procs) > 0 :
procs = [proc for proc in procs if proc.is_alive()]
time.sleep(2)
# for filename in files :
# if filename.strip() == '':
# continue
# # content,logs = get_content(filename,CONFIG,CONFIG['SECTION'])
# #
# try:
# content,logs = parse(filename = filename,type=SYS_ARGS['parse'])
# if content :
# writer.write(content)
# if logs :
# [logger.write(dict(_row,**{"parse":SYS_ARGS['parse']})) for _row in logs]
# else:
# logger.write({"parse":SYS_ARGS['parse'],"name":filename,"completed":True,"rows":len(content)})
# except Exception as e:
# logger.write({"parse":SYS_ARGS['parse'],"filename":filename,"completed":False,"rows":-1,"msg":e.args[0]})
# # print ([filename,len(content)])
# #
# # @TODO: forward this data to the writer and log engine
# #
pass
elif 'analytics' in SYS_ARGS :
PORT = int(SYS_ARGS['port']) if 'port' in SYS_ARGS else 5500
DEBUG= int(SYS_ARGS['debug']) if 'debug' in SYS_ARGS else 0
SYS_ARGS['context'] = SYS_ARGS['context'] if 'context' in SYS_ARGS else ''
#
#
# PATH= SYS_ARGS['config'] if 'config' in SYS_ARGS else os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
if os.path.exists(os.sep.join([PATH,'config.json'])) :
e = analytics.engine(os.sep.join([PATH,'config.json'])) #--@TODO: make the configuration file globally accessible
e.apply(type='claims',serialize=True)
SYS_ARGS['engine'] = e
SYS_ARGS['config'] = json.loads(open(os.sep.join([PATH,'config.json'])).read())
else:
SYS_ARGS['config'] = {"owner":None,"store":None}
if 'args' not in SYS_ARGS['config'] :
SYS_ARGS['config']["args"] = {"batch":1,"resume":True,"folder":"/data"}
me = pd.DataFrame(smart.top.read(name='healthcare-io.py')).args.unique().tolist()
SYS_ARGS['me'] = me[0] #-- This key will identify the current process
pointer = lambda : server.app.run(host='0.0.0.0',port=PORT,debug=DEBUG,threaded=False)
pthread = Process(target=pointer,args=())
pthread.start()
elif 'export' in SYS_ARGS:
#
# this function is designed to export the data to csv
#
format = SYS_ARGS['format'] if 'format' in SYS_ARGS else 'csv'
format = format.lower()
if set([format]) not in ['xls','csv'] :
format = 'csv'
else:
msg = """
cli:
healthcare-io.py --<[signup|init]> <email> --store <sqlite|mongo> [--batch <value>]
healthcare-io.py --parse claims --folder <path> [--batch <value>]
healthcare-io.py --parse remits --folder <path> [--batch <value>] [--resume]
parameters :
--<[signup|init]> signup or get a configuration file from a parsing server
--store data store mongo or sqlite or mongodb
--resume will attempt to resume if there was an interruption
"""
print(msg)
pass
# """
# The program was called from the command line thus we are expecting
# parse in [claims,remits]
# config os.sep.path.exists(path)
# folder os.sep.path.exists(path)
# store store ()
# """
# p = len( set(['store','config','folder']) & set(SYS_ARGS.keys())) == 3 and ('db' in SYS_ARGS or 'path' in SYS_ARGS)
# TYPE = {
# 'mongo':'mongo.MongoWriter',
# 'couch':'couch.CouchWriter',
# 'disk':'disk.DiskWriter'
# }
# INFO = {
# '837':{'scope':'claims','section':'HL'},
# '835':{'scope':'remits','section':'CLP'}
# }
# if p :
# args = {}
# scope = SYS_ARGS['config'][:-5].split(os.sep)[-1]
# CONTEXT = INFO[scope]['scope']
# #
# # @NOTE:
# # improve how database and data stores are handled.
# if SYS_ARGS['store'] == 'couch' :
# args = {'url': SYS_ARGS['url'] if 'url' in SYS_ARGS else 'http://localhost:5984'}
# args['dbname'] = SYS_ARGS['db']
# elif SYS_ARGS ['store'] == 'mongo':
# args = {'host':SYS_ARGS['host']if 'host' in SYS_ARGS else 'localhost:27017'}
# if SYS_ARGS['store'] in ['mongo','couch']:
# args['dbname'] = SYS_ARGS['db'] if 'db' in SYS_ARGS else 'claims_outcomes'
# args['doc'] = CONTEXT
# TYPE = TYPE[SYS_ARGS['store']]
# writer = factory.instance(type=TYPE,args=args)
# if SYS_ARGS['store'] == 'disk':
# writer.init(path = 'output-claims.json')
# logger = factory.instance(type=TYPE,args= dict(args,**{"doc":"logs"}))
# files = os.listdir(SYS_ARGS['folder'])
# CONFIG = json.loads(open(SYS_ARGS['config']).read())
# SECTION = INFO[scope]['section']
# for file in files :
# if 'limit' in SYS_ARGS and files.index(file) == int(SYS_ARGS['limit']) :
# break
# else:
# filename = os.sep.join([SYS_ARGS['folder'],file])
# try:
# content,logs = get_content(filename,CONFIG,SECTION)
# except Exception as e:
# if sys.version_info[0] > 2 :
# logs = [{"filename":filename,"msg":e.args[0]}]
# else:
# logs = [{"filename":filename,"msg":e.message}]
# content = None
# if content :
# writer.write(content)
# if logs:
# logger.write(logs)
# pass
# else:
# print (__doc__)