parser/healthcareio/healthcare-io.py

440 lines
17 KiB
Python

#!/usr/bin/env python3
"""
(c) 2019 Claims Toolkit,
Health Information Privacy Lab, Vanderbilt University Medical Center
Steve L. Nyemba <steve.l.nyemba@vanderbilt.edu>
Khanhly Nguyen <khanhly.t.nguyen@gmail.com>
This code is intended to process and parse healthcare x12 837 (claims) and x12 835 (remittances) into human readable JSON format.
The claims/outpout can be forwarded to a NoSQL Data store like couchdb and mongodb
Usage :
Commandline :
python edi-parser --scope --config <path> --folder <path> --store <[mongo|disk|couch]> --<db|path]> <id|path>
with :
--scope <claims|remits>
--config path of the x12 to be parsed i.e it could be 835, or 837
--folder location of the files (they must be decompressed)
--store data store could be disk, mongodb, couchdb
--db|path name of the folder to store the output or the database name
Embedded in Code :
import edi.parser
import json
file = '/data/claim_1.x12'
conf = json.loads(open('config/837.json').read())
edi.parser.get_content(filename,conf)
"""
from healthcareio.params import SYS_ARGS
from transport import factory
import requests
from healthcareio import analytics
from healthcareio import server
from healthcareio.parser import get_content
import os
import json
import sys
import numpy as np
from multiprocessing import Process
import time
import x12
PATH = os.sep.join([os.environ['HOME'],'.healthcareio'])
OUTPUT_FOLDER = os.sep.join([os.environ['HOME'],'healthcare-io'])
INFO = None
URL = "https://healthcareio.the-phi.com"
if not os.path.exists(PATH) :
os.mkdir(PATH)
import platform
import sqlite3 as lite
# PATH = os.sep.join([os.environ['HOME'],'.edi-parser'])
def register (**args) :
"""
:email user's email address
:url url of the provider to register
"""
email = args['email']
url = args['url'] if 'url' in args else URL
folders = [PATH,OUTPUT_FOLDER]
for path in folders :
if not os.path.exists(path) :
os.mkdir(path)
#
#
store = args['store'] if 'store' in args else 'sqlite'
headers = {"email":email,"client":platform.node(),"store":store,"db":args['db']}
http = requests.session()
r = http.post(url,headers=headers)
#
# store = {"type":"disk.DiskWriter","args":{"path":OUTPUT_FOLDER}}
# if 'store' in args :
# store = args['store']
filename = (os.sep.join([PATH,'config.json']))
info = r.json() #{"parser":r.json(),"store":store}
info = dict({"owner":email},**info)
info['store']['args']['path'] =os.sep.join([OUTPUT_FOLDER,'healthcare-io.db3']) #-- sql
info['out-folder'] = OUTPUT_FOLDER
file = open( filename,'w')
file.write( json.dumps(info))
file.close()
#
# Create the sqlite3 database to
def log(**args):
"""
This function will perform a log of anything provided to it
"""
pass
def init():
"""
read all the configuration from the
"""
filename = os.sep.join([PATH,'config.json'])
info = None
if os.path.exists(filename):
file = open(filename)
info = json.loads(file.read())
if not os.path.exists(info['out-folder']) :
os.mkdir(info['out-folder'])
if info['store']['type'] == 'disk.SQLiteWriter' and not os.path.exists(info['store']['args']['path']) :
conn = lite.connect(info['store']['args']['path'],isolation_level=None)
for key in info['schema'] :
_sql = info['schema'][key]['create']
# r = conn.execute("select * from sqlite_master where name in ('claims','remits')")
conn.execute(_sql)
conn.commit()
conn.close()
return info
#
# Global variables that load the configuration files
def parse(**args):
"""
This function will parse the content of a claim or remittance (x12 format) give the following parameters
:filename absolute path of the file to be parsed
:type claims|remits in x12 format
"""
global INFO
if not INFO :
INFO = init()
if args['type'] == 'claims' :
CONFIG = INFO['parser']['837']
elif args['type'] == 'remits' :
CONFIG = INFO['parser']['835']
else:
CONFIG = None
if CONFIG :
# CONFIG = CONFIG[-1] if 'version' not in args and (args['version'] < len(CONFIG)) else CONFIG[0]
CONFIG = CONFIG[int(args['version'])-1] if 'version' in SYS_ARGS and int(SYS_ARGS['version']) < len(CONFIG) else CONFIG[-1]
SECTION = CONFIG['SECTION']
os.environ['HEALTHCAREIO_SALT'] = INFO['owner']
return get_content(args['filename'],CONFIG,SECTION)
def resume (files,id,config):
_args = config['store'].copy()
if 'mongo' in config['store']['type'] :
_args['type'] = 'mongo.MongoReader'
reader = factory.instance(**_args)
_files = []
if 'resume' in config['analytics'] :
_args = config['analytics']['resume'][id]
_files = reader.read(**_args)
_files = [item['name'] for item in _files if item['name'] != None]
return list(set(files) - set(_files))
return files
pass
def apply(files,store_info,logger_info=None):
"""
:files list of files to be processed in this given thread/process
:store_info information about data-store, for now disk isn't thread safe
:logger_info information about where to store the logs
"""
if not logger_info :
logger = factory.instance(type='disk.DiskWriter',args={'path':os.sep.join([info['out-folder'],SYS_ARGS['parse']+'.log'])})
else:
logger = factory.instance(**logger_info)
writer = factory.instance(**store_info)
for filename in files :
if filename.strip() == '':
continue
# content,logs = get_content(filename,CONFIG,CONFIG['SECTION'])
#
try:
content,logs = parse(filename = filename,type=SYS_ARGS['parse'])
if content :
writer.write(content)
if logs :
[logger.write(dict(_row,**{"parse":SYS_ARGS['parse']})) for _row in logs]
else:
logger.write({"parse":SYS_ARGS['parse'],"name":filename,"completed":True,"rows":len(content)})
except Exception as e:
logger.write({"parse":SYS_ARGS['parse'],"filename":filename,"completed":False,"rows":-1,"msg":e.args[0]})
# print ([filename,len(content)])
#
# @TODO: forward this data to the writer and log engine
#
def upgrade(**args):
"""
:email provide us with who you are
:key upgrade key provided by the server for a given email
"""
url = args['url'] if 'url' in args else URL+"/upgrade"
headers = {"key":args['key'],"email":args["email"],"url":url}
if __name__ == '__main__' :
info = init()
if 'out-folder' in SYS_ARGS :
OUTPUT_FOLDER = SYS_ARGS['out-folder']
if set(list(SYS_ARGS.keys())) & set(['signup','init']):
#
# This command will essentially get a new copy of the configurations
# @TODO: Tie the request to a version ?
#
email = SYS_ARGS['signup'].strip() if 'signup' in SYS_ARGS else SYS_ARGS['init']
url = SYS_ARGS['url'] if 'url' in SYS_ARGS else 'https://healthcareio.the-phi.com'
store = SYS_ARGS['store'] if 'store' in SYS_ARGS else 'sqlite'
db='healthcareio' if 'db' not in SYS_ARGS else SYS_ARGS['db']
register(email=email,url=url,store=store,db=db)
# else:
# m = """
# usage:
# healthcareio --signup --email myemail@provider.com [--url <host>]
# """
# print (m)
elif 'upgrade' in SYS_ARGS :
#
# perform an upgrade i.e some code or new parsers information will be provided
#
pass
elif 'parse' in SYS_ARGS and info:
"""
In this section of the code we are expecting the user to provide :
:folder location of the files to process or file to process
:
"""
files = []
if 'file' in SYS_ARGS :
files = [SYS_ARGS['file']] if not os.path.isdir(SYS_ARGS['file']) else []
if 'folder' in SYS_ARGS and os.path.exists(SYS_ARGS['folder']):
names = os.listdir(SYS_ARGS['folder'])
files += [os.sep.join([SYS_ARGS['folder'],name]) for name in names if not os.path.isdir(os.sep.join([SYS_ARGS['folder'],name]))]
else:
#
# raise an erro
pass
#
# if the user has specified to resume, we should look into the logs and pull the files processed and those that haven't
#
if 'resume' in SYS_ARGS :
files = resume(files,SYS_ARGS['parse'],info)
print (["Found ",len(files)," files unprocessed"])
#
# @TODO: Log this here so we know what is being processed or not
SCOPE = None
if files : #and ('claims' in SYS_ARGS['parse'] or 'remits' in SYS_ARGS['parse']):
# logger = factory.instance(type='disk.DiskWriter',args={'path':os.sep.join([info['out-folder'],SYS_ARGS['parse']+'.log'])})
# if info['store']['type'] == 'disk.DiskWriter' :
# info['store']['args']['path'] += (os.sep + 'healthcare-io.json')
# elif info['store']['type'] == 'disk.SQLiteWriter' :
# # info['store']['args']['path'] += (os.sep + 'healthcare-io.db3')
# pass
# if info['store']['type'] == 'disk.SQLiteWriter' :
# info['store']['args']['table'] = SYS_ARGS['parse'].strip().lower()
# _info = json.loads(json.dumps(info['store']))
# _info['args']['table']='logs'
# else:
# #
# # if we are working with no-sql we will put the logs in it (performance )?
# info['store']['args']['doc'] = SYS_ARGS['parse'].strip().lower()
# _info = json.loads(json.dumps(info['store']))
# _info['args']['doc'] = 'logs'
# logger = factory.instance(**_info)
# writer = factory.instance(**info['store'])
#
# we need to have batches ready for this in order to run some of these queries in parallel
# @TODO: Make sure it is with a persistence storage (not disk .. not thread/process safe yet)
# - Make sure we can leverage this on n-cores later on, for now the assumption is a single core
#
BATCH_COUNT = 1 if 'batch' not in SYS_ARGS else int (SYS_ARGS['batch'])
files = np.array_split(files,BATCH_COUNT)
procs = []
index = 0
for row in files :
row = row.tolist()
# logger.write({"process":index,"parse":SYS_ARGS['parse'],"file_count":len(row)})
# proc = Process(target=apply,args=(row,info['store'],_info,))
parser = x12.Parser(os.sep.join([PATH,'config.json']))
parser.set.files(row)
parser.start()
procs.append(parser)
# index = index + 1
while len(procs) > 0 :
procs = [proc for proc in procs if proc.is_alive()]
time.sleep(2)
# for filename in files :
# if filename.strip() == '':
# continue
# # content,logs = get_content(filename,CONFIG,CONFIG['SECTION'])
# #
# try:
# content,logs = parse(filename = filename,type=SYS_ARGS['parse'])
# if content :
# writer.write(content)
# if logs :
# [logger.write(dict(_row,**{"parse":SYS_ARGS['parse']})) for _row in logs]
# else:
# logger.write({"parse":SYS_ARGS['parse'],"name":filename,"completed":True,"rows":len(content)})
# except Exception as e:
# logger.write({"parse":SYS_ARGS['parse'],"filename":filename,"completed":False,"rows":-1,"msg":e.args[0]})
# # print ([filename,len(content)])
# #
# # @TODO: forward this data to the writer and log engine
# #
pass
elif 'analytics' in SYS_ARGS :
PORT = int(SYS_ARGS['port']) if 'port' in SYS_ARGS else 5500
DEBUG= int(SYS_ARGS['debug']) if 'debug' in SYS_ARGS else 0
SYS_ARGS['context'] = SYS_ARGS['context'] if 'context' in SYS_ARGS else ''
#
#
# PATH= SYS_ARGS['config'] if 'config' in SYS_ARGS else os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
e = analytics.engine(os.sep.join([PATH,'config.json'])) #--@TODO: make the configuration file globally accessible
e.apply(type='claims',serialize=True)
SYS_ARGS['engine'] = e
pointer = lambda : server.app.run(host='0.0.0.0',port=PORT,debug=DEBUG,threaded=False)
pthread = Process(target=pointer,args=())
pthread.start()
elif 'export' in SYS_ARGS:
#
# this function is designed to export the data to csv
#
format = SYS_ARGS['format'] if 'format' in SYS_ARGS else 'csv'
format = format.lower()
if set([format]) not in ['xls','csv'] :
format = 'csv'
else:
msg = """
cli:
healthcare-io.py --<[signup|init]> <email> --store <sqlite|mongo> [--batch <value>]
healthcare-io.py --parse claims --folder <path> [--batch <value>]
healthcare-io.py --parse remits --folder <path> [--batch <value>] [--resume]
parameters :
--<[signup|init]> signup or get a configuration file from a parsing server
--store data store mongo or sqlite or mongodb
--resume will attempt to resume if there was an interruption
"""
print(msg)
pass
# """
# The program was called from the command line thus we are expecting
# parse in [claims,remits]
# config os.sep.path.exists(path)
# folder os.sep.path.exists(path)
# store store ()
# """
# p = len( set(['store','config','folder']) & set(SYS_ARGS.keys())) == 3 and ('db' in SYS_ARGS or 'path' in SYS_ARGS)
# TYPE = {
# 'mongo':'mongo.MongoWriter',
# 'couch':'couch.CouchWriter',
# 'disk':'disk.DiskWriter'
# }
# INFO = {
# '837':{'scope':'claims','section':'HL'},
# '835':{'scope':'remits','section':'CLP'}
# }
# if p :
# args = {}
# scope = SYS_ARGS['config'][:-5].split(os.sep)[-1]
# CONTEXT = INFO[scope]['scope']
# #
# # @NOTE:
# # improve how database and data stores are handled.
# if SYS_ARGS['store'] == 'couch' :
# args = {'url': SYS_ARGS['url'] if 'url' in SYS_ARGS else 'http://localhost:5984'}
# args['dbname'] = SYS_ARGS['db']
# elif SYS_ARGS ['store'] == 'mongo':
# args = {'host':SYS_ARGS['host']if 'host' in SYS_ARGS else 'localhost:27017'}
# if SYS_ARGS['store'] in ['mongo','couch']:
# args['dbname'] = SYS_ARGS['db'] if 'db' in SYS_ARGS else 'claims_outcomes'
# args['doc'] = CONTEXT
# TYPE = TYPE[SYS_ARGS['store']]
# writer = factory.instance(type=TYPE,args=args)
# if SYS_ARGS['store'] == 'disk':
# writer.init(path = 'output-claims.json')
# logger = factory.instance(type=TYPE,args= dict(args,**{"doc":"logs"}))
# files = os.listdir(SYS_ARGS['folder'])
# CONFIG = json.loads(open(SYS_ARGS['config']).read())
# SECTION = INFO[scope]['section']
# for file in files :
# if 'limit' in SYS_ARGS and files.index(file) == int(SYS_ARGS['limit']) :
# break
# else:
# filename = os.sep.join([SYS_ARGS['folder'],file])
# try:
# content,logs = get_content(filename,CONFIG,SECTION)
# except Exception as e:
# if sys.version_info[0] > 2 :
# logs = [{"filename":filename,"msg":e.args[0]}]
# else:
# logs = [{"filename":filename,"msg":e.message}]
# content = None
# if content :
# writer.write(content)
# if logs:
# logger.write(logs)
# pass
# else:
# print (__doc__)