parser/healthcareio/healthcare-io.py

401 lines
15 KiB
Python
Raw Permalink Normal View History

2020-08-21 04:49:13 +00:00
#!/usr/bin/env python3
"""
(c) 2019 Claims Toolkit,
Health Information Privacy Lab, Vanderbilt University Medical Center
Steve L. Nyemba <steve.l.nyemba@vanderbilt.edu>
Khanhly Nguyen <khanhly.t.nguyen@gmail.com>
This code is intended to process and parse healthcare x12 837 (claims) and x12 835 (remittances) into human readable JSON format.
The claims/outpout can be forwarded to a NoSQL Data store like couchdb and mongodb
Usage :
Commandline :
python edi-parser --scope --config <path> --folder <path> --store <[mongo|disk|couch]> --<db|path]> <id|path>
with :
--scope <claims|remits>
--config path of the x12 to be parsed i.e it could be 835, or 837
--folder location of the files (they must be decompressed)
--store data store could be disk, mongodb, couchdb
--db|path name of the folder to store the output or the database name
Embedded in Code :
import edi.parser
import json
file = '/data/claim_1.x12'
conf = json.loads(open('config/837.json').read())
edi.parser.get_content(filename,conf)
"""
2020-08-21 14:28:24 +00:00
from healthcareio.params import SYS_ARGS
2020-08-21 04:49:13 +00:00
from transport import factory
import requests
2020-09-26 20:32:06 +00:00
from healthcareio import analytics
from healthcareio import server
2020-08-21 14:28:24 +00:00
from healthcareio.parser import get_content
2020-08-21 04:49:13 +00:00
import os
import json
import sys
2020-09-26 20:32:06 +00:00
import numpy as np
from multiprocessing import Process
import time
2020-10-06 16:47:20 +00:00
from healthcareio import x12
from healthcareio.export import export
2020-12-11 12:45:10 +00:00
import smart
2022-01-29 20:40:51 +00:00
import transport
2020-12-11 19:08:54 +00:00
from healthcareio.server import proxy
2020-12-11 12:45:10 +00:00
import pandas as pd
2020-09-26 20:32:06 +00:00
2020-08-21 04:49:13 +00:00
PATH = os.sep.join([os.environ['HOME'],'.healthcareio'])
OUTPUT_FOLDER = os.sep.join([os.environ['HOME'],'healthcare-io'])
INFO = None
URL = "https://healthcareio.the-phi.com"
if not os.path.exists(PATH) :
os.mkdir(PATH)
import platform
import sqlite3 as lite
# PATH = os.sep.join([os.environ['HOME'],'.edi-parser'])
2022-01-29 20:40:51 +00:00
CONFIG_FILE = os.sep.join([PATH,'config.json']) if 'config' not in SYS_ARGS else SYS_ARGS['config']
HELP_MESSAGE = """
cli:
2022-01-29 20:40:51 +00:00
#
# Signup, allows parsing configuration to be downloaded
#
# Support for SQLite3
healthcare-io.py --signup steve@the-phi.com --store sqlite
#or support for mongodb
healthcare-io.py --signup steve@the-phi.com --store mongo
healthcare-io.py --<[signup|init]> <email> --store <sqlite|mongo> [--batch <value>]
healthcare-io.py --parse --folder <path> [--batch <value>] [--resume]
healthcare-io.py --check-update
healthcare-io.py --export <835|837> --config <config-path>
action :
--signup|init signup user and get configuration file
--parse starts parsing
2021-04-28 20:38:26 +00:00
--check-update checks for updates
--export export data of a 835 or 837 into another database
parameters :
--<[signup|init]> signup or get a configuration file from a parsing server
--folder location of the files (the program will recursively traverse it)
--store data store mongo or sqlite or mongodb
--resume will attempt to resume if there was an interruption
"""
def signup (**args) :
2020-08-21 04:49:13 +00:00
"""
:email user's email address
:url url of the provider to signup
2020-08-21 04:49:13 +00:00
"""
email = args['email']
url = args['url'] if 'url' in args else URL
folders = [PATH,OUTPUT_FOLDER]
for path in folders :
if not os.path.exists(path) :
os.mkdir(path)
#
#
2020-09-26 20:32:06 +00:00
store = args['store'] if 'store' in args else 'sqlite'
headers = {"email":email,"client":platform.node(),"store":store,"db":args['db']}
2020-08-21 04:49:13 +00:00
http = requests.session()
r = http.post(url,headers=headers)
#
# store = {"type":"disk.DiskWriter","args":{"path":OUTPUT_FOLDER}}
# if 'store' in args :
# store = args['store']
2022-01-29 20:40:51 +00:00
# filename = (os.sep.join([PATH,'config.json']))
filename = CONFIG_FILE
2020-08-21 04:49:13 +00:00
info = r.json() #{"parser":r.json(),"store":store}
info = dict({"owner":email},**info)
info['store']['args']['path'] =os.sep.join([OUTPUT_FOLDER,'healthcare-io.db3']) #-- sql
info['out-folder'] = OUTPUT_FOLDER
file = open( filename,'w')
file.write( json.dumps(info))
file.close()
2022-06-26 20:45:05 +00:00
_m = """
Thank you for signingup!!
Your configuration file is store in :path,
- More information visit https://healthcareio.the-phi.com/parser
- Access the source https://healthcareio.the-phi.com/git/code/parser
2020-08-21 04:49:13 +00:00
2022-06-26 20:45:05 +00:00
""".replace(":path",CONFIG_FILE)
print (_m)
2020-08-21 04:49:13 +00:00
#
# Create the sqlite3 database to
def log(**args):
"""
This function will perform a log of anything provided to it
"""
pass
def init():
"""
2022-01-29 20:40:51 +00:00
read all the configuration from disk.
Requirements for configuration file :
{out-folder,store,837,835 }
2020-08-21 04:49:13 +00:00
"""
2022-01-29 20:40:51 +00:00
# filename = os.sep.join([PATH,'config.json'])
filename = CONFIG_FILE
2020-08-21 04:49:13 +00:00
info = None
if os.path.exists(filename):
2022-01-29 20:40:51 +00:00
#
# Loading the configuration file (JSON format)
2020-08-21 04:49:13 +00:00
file = open(filename)
info = json.loads(file.read())
2020-08-21 14:28:24 +00:00
2022-01-29 20:40:51 +00:00
if 'output-folder' not in info and not os.path.exists(OUTPUT_FOLDER) :
os.mkdir(OUTPUT_FOLDER)
elif 'output-folder' in info and not os.path.exists(info['out-folder']) :
os.mkdir(info['out-folder'])
# if 'type' in info['store'] :
lwriter = None
is_sqlite = False
2022-06-26 20:45:05 +00:00
if'type' in info['store'] and info['store']['type'] == 'disk.SQLiteWriter' :
2022-01-29 20:40:51 +00:00
lwriter = transport.factory.instance(**info['store'])
is_sqlite = True
elif 'provider' in info['store'] and info['store']['provider'] == 'sqlite' :
lwriter = transport.instance(**info['store']) ;
is_sqlite = True
2022-06-26 20:45:05 +00:00
2022-01-29 20:40:51 +00:00
if lwriter and is_sqlite:
2020-08-21 04:49:13 +00:00
for key in info['schema'] :
2022-01-29 20:40:51 +00:00
if key != 'logs' :
_id = 'claims' if key == '837' else 'remits'
else:
_id = key
2022-06-26 20:45:05 +00:00
if not lwriter.has(table=_id) :
2022-01-29 20:40:51 +00:00
lwriter.apply(info['schema'][key]['create'])
2020-08-21 04:49:13 +00:00
2022-01-29 20:40:51 +00:00
# [lwriter.apply( info['schema'][key]['create']) for key in info['schema'] if not lwriter.has(table=key)]
lwriter.close()
2020-08-21 04:49:13 +00:00
2022-01-29 20:40:51 +00:00
return info
2020-09-26 20:32:06 +00:00
2020-08-21 04:49:13 +00:00
def upgrade(**args):
"""
:email provide us with who you are
:key upgrade key provided by the server for a given email
"""
url = args['url'] if 'url' in args else URL+"/upgrade"
headers = {"key":args['key'],"email":args["email"],"url":url}
def check(**_args):
"""
This function will check if there is an update available (versions are in the configuration file)
:param url
"""
url = _args['url'][:-1] if _args['url'].endswith('/') else _args['url']
url = url + "/version"
if 'version' not in _args :
version = {"_id":"version","current":0.0}
else:
version = _args['version']
http = requests.session()
r = http.get(url)
return r.json()
2020-08-21 04:49:13 +00:00
if __name__ == '__main__' :
info = init()
if 'out-folder' in SYS_ARGS :
OUTPUT_FOLDER = SYS_ARGS['out-folder']
SYS_ARGS['url'] = SYS_ARGS['url'] if 'url' in SYS_ARGS else URL
2020-08-21 04:49:13 +00:00
if set(list(SYS_ARGS.keys())) & set(['signup','init']):
#
# This command will essentially get a new copy of the configurations
# @TODO: Tie the request to a version ?
#
email = SYS_ARGS['signup'].strip() if 'signup' in SYS_ARGS else SYS_ARGS['init']
url = SYS_ARGS['url'] if 'url' in SYS_ARGS else URL
2020-09-26 20:32:06 +00:00
store = SYS_ARGS['store'] if 'store' in SYS_ARGS else 'sqlite'
db='healthcareio' if 'db' not in SYS_ARGS else SYS_ARGS['db']
signup(email=email,url=url,store=store,db=db)
2020-08-21 04:49:13 +00:00
# else:
# m = """
# usage:
# healthcareio --signup --email myemail@provider.com [--url <host>]
# """
# print (m)
elif 'upgrade' in SYS_ARGS :
#
# perform an upgrade i.e some code or new parsers information will be provided
#
pass
elif 'parse' in SYS_ARGS and info:
"""
In this section of the code we are expecting the user to provide :
:folder location of the files to process or file to process
:
"""
files = []
if 'file' in SYS_ARGS :
files = [SYS_ARGS['file']] if not os.path.isdir(SYS_ARGS['file']) else []
if 'folder' in SYS_ARGS and os.path.exists(SYS_ARGS['folder']):
for root,_dir,f in os.walk(SYS_ARGS['folder']) :
if f :
files += [os.sep.join([root,name]) for name in f]
# names = os.listdir(SYS_ARGS['folder'])
# files += [os.sep.join([SYS_ARGS['folder'],name]) for name in names if not os.path.isdir(os.sep.join([SYS_ARGS['folder'],name]))]
2020-08-21 04:49:13 +00:00
else:
#
# raise an error
2020-08-21 04:49:13 +00:00
pass
#
2020-10-01 16:43:57 +00:00
# if the user has specified to resume, we should look into the logs and pull the files processed and those that haven't
#
if 'resume' in SYS_ARGS :
2022-01-29 20:40:51 +00:00
store_config = json.loads( (open(CONFIG_FILE)).read() )
2020-12-11 19:08:54 +00:00
files = proxy.get.resume(files,store_config )
# print (["Found ",len(files)," files unprocessed"])
2020-10-01 16:43:57 +00:00
#
2020-08-21 04:49:13 +00:00
# @TODO: Log this here so we know what is being processed or not
SCOPE = None
if files : #and ('claims' in SYS_ARGS['parse'] or 'remits' in SYS_ARGS['parse']):
2020-09-26 20:32:06 +00:00
BATCH_COUNT = 1 if 'batch' not in SYS_ARGS else int (SYS_ARGS['batch'])
files = np.array_split(files,BATCH_COUNT)
procs = []
index = 0
for row in files :
row = row.tolist()
# logger.write({"process":index,"parse":SYS_ARGS['parse'],"file_count":len(row)})
# proc = Process(target=apply,args=(row,info['store'],_info,))
2022-01-29 20:40:51 +00:00
# parser = x12.Parser(os.sep.join([PATH,'config.json']))
parser = x12.Parser(CONFIG_FILE)
parser.set.files(row)
parser.start()
procs.append(parser)
# index = index + 1
2020-09-26 20:32:06 +00:00
while len(procs) > 0 :
procs = [proc for proc in procs if proc.is_alive()]
time.sleep(2)
2022-06-26 20:45:05 +00:00
uri = OUTPUT_FOLDER
store_config = json.loads( (open(CONFIG_FILE)).read() )['store']
if 'type' in store_config :
uri = store_config['args']['host'] if 'host' in store_config['args'] else ( store_config['args']['path'] if 'path' in store_config['args'] else store_config['args']['database'])
if 'SQLite' in store_config['type']:
provider = 'sqlite'
elif 'sql' in store_config['type'] :
provider = 'SQL'
else:
provider = 'mongo'
else:
provider = store_config['provider']
_msg = """
Completed Parsing, The data is available in :provider database at :uri
Logs are equally available for errors and summary statistics to be compiled
""".replace(":provider",provider).replace(":uri",uri)
print (_msg)
2020-08-21 04:49:13 +00:00
pass
2020-09-26 20:32:06 +00:00
elif 'analytics' in SYS_ARGS :
PORT = int(SYS_ARGS['port']) if 'port' in SYS_ARGS else 5500
DEBUG= int(SYS_ARGS['debug']) if 'debug' in SYS_ARGS else 0
SYS_ARGS['context'] = SYS_ARGS['context'] if 'context' in SYS_ARGS else ''
#
#
# PATH= SYS_ARGS['config'] if 'config' in SYS_ARGS else os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
2022-01-29 20:40:51 +00:00
if os.path.exists(CONFIG_FILE) :
e = analytics.engine(CONFIG_FILE) #--@TODO: make the configuration file globally accessible
2020-12-11 12:45:10 +00:00
e.apply(type='claims',serialize=True)
SYS_ARGS['engine'] = e
2022-01-29 20:40:51 +00:00
SYS_ARGS['config'] = json.loads(open(CONFIG_FILE ).read())
2020-12-11 12:45:10 +00:00
else:
SYS_ARGS['config'] = {"owner":None,"store":None}
if 'args' not in SYS_ARGS['config'] :
SYS_ARGS['config']["args"] = {"batch":1,"resume":True,"folder":"/data"}
2020-09-26 20:32:06 +00:00
2020-12-11 12:45:10 +00:00
me = pd.DataFrame(smart.top.read(name='healthcare-io.py')).args.unique().tolist()
SYS_ARGS['me'] = me[0] #-- This key will identify the current process
2020-09-26 20:32:06 +00:00
pointer = lambda : server.app.run(host='0.0.0.0',port=PORT,debug=DEBUG,threaded=False)
pthread = Process(target=pointer,args=())
pthread.start()
elif 'check-update' in SYS_ARGS :
_args = {"url":SYS_ARGS['url']}
try:
2022-01-29 20:40:51 +00:00
if os.path.exists(CONFIG_FILE) :
SYS_ARGS['config'] = json.loads(open(CONFIG_FILE ).read())
else:
SYS_ARGS['config'] = {}
if 'version' in SYS_ARGS['config'] :
_args['version'] = SYS_ARGS['config']['version']
version = check(**_args)
_version = {"current":0.0}if 'version' not in SYS_ARGS['config'] else SYS_ARGS['config']['version']
if _version['current'] != version['current'] :
print ()
print ("You need to upgrade your system to version to ",version['current'])
print ("\t- signup (for new configuration)")
print ("\t- use pip to upgrade the codebase")
else:
print ()
2020-12-22 07:22:08 +00:00
print ("You are running the current configuraiton version ",_version['current'])
except Exception as e:
print (e)
pass
2020-09-26 20:32:06 +00:00
2020-08-21 04:49:13 +00:00
elif 'export' in SYS_ARGS:
#
# this function is designed to export the data to csv
#
2022-01-29 20:40:51 +00:00
path = SYS_ARGS['export-config']
X12_TYPE = SYS_ARGS['export'] if 'export' in SYS_ARGS else '835'
if not os.path.exists(path) or X12_TYPE not in ['835','837']:
print (HELP_MESSAGE)
else:
#
# Let's run the export function ..., This will push files into a data-store of choice Redshift, PostgreSQL, MySQL ...
#
2020-08-21 04:49:13 +00:00
2022-01-29 20:40:51 +00:00
# _store = {"type":"sql.SQLWriter","args":json.loads( (open(path) ).read())}
_store = json.loads( (open(path) ).read())
pipes = export.Factory.instance(type=X12_TYPE,write_store=_store,config = CONFIG_FILE) #"inspect":0,"cast":0}})
# pipes[0].run()
2022-01-29 20:40:51 +00:00
# print (pipes)
2021-02-07 22:02:35 +00:00
for thread in pipes:
2021-02-08 20:47:49 +00:00
2021-02-06 18:08:43 +00:00
if 'table' in SYS_ARGS and SYS_ARGS['table'] != thread.table :
continue
thread.start()
2021-02-07 22:02:35 +00:00
time.sleep(1)
thread.join()
2022-01-29 20:40:51 +00:00
2020-08-21 04:49:13 +00:00
else:
print(HELP_MESSAGE)