340 lines
12 KiB
Python
340 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
(c) 2019 Claims Toolkit,
|
|
Health Information Privacy Lab, Vanderbilt University Medical Center
|
|
|
|
Steve L. Nyemba <steve.l.nyemba@vanderbilt.edu>
|
|
Khanhly Nguyen <khanhly.t.nguyen@gmail.com>
|
|
|
|
|
|
This code is intended to process and parse healthcare x12 837 (claims) and x12 835 (remittances) into human readable JSON format.
|
|
The claims/outpout can be forwarded to a NoSQL Data store like couchdb and mongodb
|
|
Usage :
|
|
Commandline :
|
|
python edi-parser --scope --config <path> --folder <path> --store <[mongo|disk|couch]> --<db|path]> <id|path>
|
|
|
|
with :
|
|
--scope <claims|remits>
|
|
--config path of the x12 to be parsed i.e it could be 835, or 837
|
|
--folder location of the files (they must be decompressed)
|
|
--store data store could be disk, mongodb, couchdb
|
|
--db|path name of the folder to store the output or the database name
|
|
|
|
Embedded in Code :
|
|
|
|
import edi.parser
|
|
import json
|
|
|
|
file = '/data/claim_1.x12'
|
|
conf = json.loads(open('config/837.json').read())
|
|
edi.parser.get_content(filename,conf)
|
|
"""
|
|
from healthcareio.params import SYS_ARGS
|
|
from transport import factory
|
|
import requests
|
|
from healthcareio.parser import get_content
|
|
import os
|
|
import json
|
|
import sys
|
|
PATH = os.sep.join([os.environ['HOME'],'.healthcareio'])
|
|
OUTPUT_FOLDER = os.sep.join([os.environ['HOME'],'healthcare-io'])
|
|
INFO = None
|
|
URL = "https://healthcareio.the-phi.com"
|
|
if not os.path.exists(PATH) :
|
|
os.mkdir(PATH)
|
|
import platform
|
|
import sqlite3 as lite
|
|
# PATH = os.sep.join([os.environ['HOME'],'.edi-parser'])
|
|
def register (**args) :
|
|
"""
|
|
:email user's email address
|
|
:url url of the provider to register
|
|
"""
|
|
|
|
email = args['email']
|
|
url = args['url'] if 'url' in args else URL
|
|
folders = [PATH,OUTPUT_FOLDER]
|
|
for path in folders :
|
|
if not os.path.exists(path) :
|
|
os.mkdir(path)
|
|
|
|
#
|
|
#
|
|
headers = {"email":email,"client":platform.node()}
|
|
http = requests.session()
|
|
r = http.post(url,headers=headers)
|
|
|
|
#
|
|
# store = {"type":"disk.DiskWriter","args":{"path":OUTPUT_FOLDER}}
|
|
# if 'store' in args :
|
|
# store = args['store']
|
|
filename = (os.sep.join([PATH,'config.json']))
|
|
info = r.json() #{"parser":r.json(),"store":store}
|
|
info = dict({"owner":email},**info)
|
|
info['store']['args']['path'] =os.sep.join([OUTPUT_FOLDER,'healthcare-io.db3']) #-- sql
|
|
info['out-folder'] = OUTPUT_FOLDER
|
|
|
|
file = open( filename,'w')
|
|
file.write( json.dumps(info))
|
|
file.close()
|
|
|
|
#
|
|
# Create the sqlite3 database to
|
|
|
|
|
|
def analytics(**args):
|
|
"""
|
|
This fucntion will only compute basic distributions of a given feature for a given claim
|
|
@args
|
|
@param x: vector of features to process
|
|
@param apply: operation to be applied {dist}
|
|
"""
|
|
if args['apply'] in ['dist','distribution'] :
|
|
"""
|
|
This section of the code will return the distribution of a given space.
|
|
It is intended to be applied on several claims/remits
|
|
"""
|
|
x = pd.DataFrame(args['x'],columns=['x'])
|
|
return x.groupby(['x']).size().to_frame().T.to_dict(orient='record')
|
|
|
|
|
|
def log(**args):
|
|
"""
|
|
This function will perform a log of anything provided to it
|
|
"""
|
|
pass
|
|
def init():
|
|
"""
|
|
read all the configuration from the
|
|
"""
|
|
filename = os.sep.join([PATH,'config.json'])
|
|
info = None
|
|
if os.path.exists(filename):
|
|
file = open(filename)
|
|
info = json.loads(file.read())
|
|
if not os.path.exists(info['out-folder']) :
|
|
os.mkdir(info['out-folder'])
|
|
|
|
if info['store']['type'] == 'disk.SQLiteWriter' and not os.path.exists(info['store']['args']['path']) :
|
|
conn = lite.connect(info['store']['args']['path'],isolation_level=None)
|
|
for key in info['schema'] :
|
|
_sql = info['schema'][key]['create']
|
|
# r = conn.execute("select * from sqlite_master where name in ('claims','remits')")
|
|
conn.execute(_sql)
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return info
|
|
#
|
|
# Global variables that load the configuration files
|
|
|
|
def parse(**args):
|
|
"""
|
|
This function will parse the content of a claim or remittance (x12 format) give the following parameters
|
|
:filename absolute path of the file to be parsed
|
|
:type claims|remits in x12 format
|
|
"""
|
|
global INFO
|
|
if not INFO :
|
|
INFO = init()
|
|
if args['type'] == 'claims' :
|
|
CONFIG = INFO['parser']['837']
|
|
elif args['type'] == 'remits' :
|
|
CONFIG = INFO['parser']['835']
|
|
else:
|
|
CONFIG = None
|
|
if CONFIG :
|
|
# CONFIG = CONFIG[-1] if 'version' not in args and (args['version'] < len(CONFIG)) else CONFIG[0]
|
|
CONFIG = CONFIG[int(args['version'])-1] if 'version' in SYS_ARGS and int(SYS_ARGS['version']) < len(CONFIG) else CONFIG[-1]
|
|
SECTION = CONFIG['SECTION']
|
|
os.environ['HEALTHCAREIO_SALT'] = INFO['owner']
|
|
|
|
|
|
return get_content(args['filename'],CONFIG,SECTION)
|
|
|
|
|
|
def upgrade(**args):
|
|
"""
|
|
:email provide us with who you are
|
|
:key upgrade key provided by the server for a given email
|
|
"""
|
|
url = args['url'] if 'url' in args else URL+"/upgrade"
|
|
headers = {"key":args['key'],"email":args["email"],"url":url}
|
|
|
|
if __name__ == '__main__' :
|
|
info = init()
|
|
|
|
if 'out-folder' in SYS_ARGS :
|
|
OUTPUT_FOLDER = SYS_ARGS['out-folder']
|
|
|
|
if set(list(SYS_ARGS.keys())) & set(['signup','init']):
|
|
#
|
|
# This command will essentially get a new copy of the configurations
|
|
# @TODO: Tie the request to a version ?
|
|
#
|
|
|
|
email = SYS_ARGS['signup'].strip() if 'signup' in SYS_ARGS else SYS_ARGS['init']
|
|
url = SYS_ARGS['url'] if 'url' in SYS_ARGS else 'https://healthcareio.the-phi.com'
|
|
|
|
register(email=email,url=url)
|
|
# else:
|
|
# m = """
|
|
# usage:
|
|
# healthcareio --signup --email myemail@provider.com [--url <host>]
|
|
|
|
# """
|
|
# print (m)
|
|
elif 'upgrade' in SYS_ARGS :
|
|
#
|
|
# perform an upgrade i.e some code or new parsers information will be provided
|
|
#
|
|
|
|
pass
|
|
elif 'parse' in SYS_ARGS and info:
|
|
"""
|
|
In this section of the code we are expecting the user to provide :
|
|
:folder location of the files to process or file to process
|
|
:
|
|
"""
|
|
files = []
|
|
if 'file' in SYS_ARGS :
|
|
files = [SYS_ARGS['file']] if not os.path.isdir(SYS_ARGS['file']) else []
|
|
if 'folder' in SYS_ARGS and os.path.exists(SYS_ARGS['folder']):
|
|
names = os.listdir(SYS_ARGS['folder'])
|
|
files += [os.sep.join([SYS_ARGS['folder'],name]) for name in names if not os.path.isdir(os.sep.join([SYS_ARGS['folder'],name]))]
|
|
else:
|
|
#
|
|
# raise an erro
|
|
pass
|
|
#
|
|
# @TODO: Log this here so we know what is being processed or not
|
|
SCOPE = None
|
|
|
|
if files and ('claims' in SYS_ARGS['parse'] or 'remits' in SYS_ARGS['parse']):
|
|
# _map = {'claims':'837','remits':'835'}
|
|
# key = _map[SYS_ARGS['parse']]
|
|
# CONFIG = info['parser'][key]
|
|
# if 'version' in SYS_ARGS and int(SYS_ARGS['version']) < len(CONFIG) :
|
|
# CONFIG = CONFIG[ int(SYS_ARGS['version'])]
|
|
# else:
|
|
# CONFIG = CONFIG[-1]
|
|
if info['store']['type'] == 'disk.DiskWriter' :
|
|
info['store']['args']['path'] += (os.sep + 'healthcare-io.json')
|
|
elif info['store']['type'] == 'disk.SQLiteWriter' :
|
|
# info['store']['args']['path'] += (os.sep + 'healthcare-io.db3')
|
|
pass
|
|
if info['store']['type'] == 'disk.SQLiteWriter' :
|
|
info['store']['args']['table'] = SYS_ARGS['parse'].strip().lower()
|
|
else:
|
|
|
|
info['store']['args']['doc'] = SYS_ARGS['parse'].strip().lower()
|
|
writer = factory.instance(**info['store'])
|
|
logger = factory.instance(type='disk.DiskWriter',args={'path':os.sep.join([info['out-folder'],SYS_ARGS['parse']+'.log'])})
|
|
#logger = factory.instance(type='mongo.MongoWriter',args={'db':'healthcareio','doc':SYS_ARGS['parse']+'_logs'})
|
|
# schema = info['schema']
|
|
|
|
# for key in schema :
|
|
# sql = schema[key]['create']
|
|
# writer.write(sql)
|
|
for filename in files :
|
|
|
|
if filename.strip() == '':
|
|
continue
|
|
# content,logs = get_content(filename,CONFIG,CONFIG['SECTION'])
|
|
#
|
|
try:
|
|
content,logs = parse(filename = filename,type=SYS_ARGS['parse'])
|
|
if content :
|
|
writer.write(content)
|
|
if logs :
|
|
[logger.write(_row) for _row in logs]
|
|
else:
|
|
logger.write({"name":filename,"completed":True,"rows":len(content)})
|
|
except Exception as e:
|
|
logger.write({"filename":filename,"completed":False,"rows":-1,"msg":e.args[0]})
|
|
# print ([filename,len(content)])
|
|
#
|
|
# @TODO: forward this data to the writer and log engine
|
|
#
|
|
|
|
pass
|
|
elif 'export' in SYS_ARGS:
|
|
#
|
|
# this function is designed to export the data to csv
|
|
#
|
|
format = SYS_ARGS['format'] if 'format' in SYS_ARGS else 'csv'
|
|
format = format.lower()
|
|
if set([format]) not in ['xls','csv'] :
|
|
format = 'csv'
|
|
|
|
|
|
pass
|
|
# """
|
|
# The program was called from the command line thus we are expecting
|
|
# parse in [claims,remits]
|
|
# config os.sep.path.exists(path)
|
|
# folder os.sep.path.exists(path)
|
|
# store store ()
|
|
# """
|
|
# p = len( set(['store','config','folder']) & set(SYS_ARGS.keys())) == 3 and ('db' in SYS_ARGS or 'path' in SYS_ARGS)
|
|
# TYPE = {
|
|
# 'mongo':'mongo.MongoWriter',
|
|
# 'couch':'couch.CouchWriter',
|
|
# 'disk':'disk.DiskWriter'
|
|
# }
|
|
# INFO = {
|
|
# '837':{'scope':'claims','section':'HL'},
|
|
# '835':{'scope':'remits','section':'CLP'}
|
|
# }
|
|
# if p :
|
|
# args = {}
|
|
# scope = SYS_ARGS['config'][:-5].split(os.sep)[-1]
|
|
# CONTEXT = INFO[scope]['scope']
|
|
# #
|
|
# # @NOTE:
|
|
# # improve how database and data stores are handled.
|
|
# if SYS_ARGS['store'] == 'couch' :
|
|
# args = {'url': SYS_ARGS['url'] if 'url' in SYS_ARGS else 'http://localhost:5984'}
|
|
# args['dbname'] = SYS_ARGS['db']
|
|
|
|
# elif SYS_ARGS ['store'] == 'mongo':
|
|
# args = {'host':SYS_ARGS['host']if 'host' in SYS_ARGS else 'localhost:27017'}
|
|
# if SYS_ARGS['store'] in ['mongo','couch']:
|
|
# args['dbname'] = SYS_ARGS['db'] if 'db' in SYS_ARGS else 'claims_outcomes'
|
|
# args['doc'] = CONTEXT
|
|
|
|
# TYPE = TYPE[SYS_ARGS['store']]
|
|
# writer = factory.instance(type=TYPE,args=args)
|
|
# if SYS_ARGS['store'] == 'disk':
|
|
# writer.init(path = 'output-claims.json')
|
|
# logger = factory.instance(type=TYPE,args= dict(args,**{"doc":"logs"}))
|
|
# files = os.listdir(SYS_ARGS['folder'])
|
|
# CONFIG = json.loads(open(SYS_ARGS['config']).read())
|
|
# SECTION = INFO[scope]['section']
|
|
|
|
# for file in files :
|
|
# if 'limit' in SYS_ARGS and files.index(file) == int(SYS_ARGS['limit']) :
|
|
# break
|
|
# else:
|
|
# filename = os.sep.join([SYS_ARGS['folder'],file])
|
|
|
|
# try:
|
|
# content,logs = get_content(filename,CONFIG,SECTION)
|
|
# except Exception as e:
|
|
# if sys.version_info[0] > 2 :
|
|
# logs = [{"filename":filename,"msg":e.args[0]}]
|
|
# else:
|
|
# logs = [{"filename":filename,"msg":e.message}]
|
|
# content = None
|
|
# if content :
|
|
|
|
# writer.write(content)
|
|
# if logs:
|
|
|
|
# logger.write(logs)
|
|
|
|
|
|
# pass
|
|
# else:
|
|
# print (__doc__)
|