From cae983fddec543c8e0c8f50abe8166550e59e1d7 Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Tue, 6 Feb 2024 11:23:29 -0600 Subject: [PATCH] housekeeping cleanup --- healthcareio/export.json | 3 - healthcareio/healthcare-io.py.bak | 397 ------------------------------ 2 files changed, 400 deletions(-) delete mode 100644 healthcareio/export.json delete mode 100644 healthcareio/healthcare-io.py.bak diff --git a/healthcareio/export.json b/healthcareio/export.json deleted file mode 100644 index 4446b24..0000000 --- a/healthcareio/export.json +++ /dev/null @@ -1,3 +0,0 @@ -{ - "provider":"postgresql","database":"healthcareio","schema":"public","context":"write" -} diff --git a/healthcareio/healthcare-io.py.bak b/healthcareio/healthcare-io.py.bak deleted file mode 100644 index 54d718e..0000000 --- a/healthcareio/healthcare-io.py.bak +++ /dev/null @@ -1,397 +0,0 @@ -#!/usr/bin/env python3 -""" -(c) 2019 Claims Toolkit, -Health Information Privacy Lab, Vanderbilt University Medical Center - -Steve L. Nyemba -Khanhly Nguyen - - -This code is intended to process and parse healthcare x12 837 (claims) and x12 835 (remittances) into human readable JSON format. -The claims/outpout can be forwarded to a NoSQL Data store like couchdb and mongodb -Usage : - Commandline : - python edi-parser --scope --config --folder --store <[mongo|disk|couch]> -- - - with : - --scope - --config path of the x12 to be parsed i.e it could be 835, or 837 - --folder location of the files (they must be decompressed) - --store data store could be disk, mongodb, couchdb - --db|path name of the folder to store the output or the database name - - Embedded in Code : - - import edi.parser - import json - - file = '/data/claim_1.x12' - conf = json.loads(open('config/837.json').read()) - edi.parser.get_content(filename,conf) -""" -from healthcareio.params import SYS_ARGS -from transport import factory -import requests -from healthcareio import analytics -from healthcareio import server - - -from healthcareio.parser import get_content -import os -import json -import sys -import numpy as np -from multiprocessing import Process -import time -from healthcareio import x12 -from healthcareio.export import export -import smart -import transport -from healthcareio.server import proxy -import pandas as pd - -PATH = os.sep.join([os.environ['HOME'],'.healthcareio']) -OUTPUT_FOLDER = os.sep.join([os.environ['HOME'],'healthcare-io']) -INFO = None -URL = "https://healthcareio.the-phi.com" -if not os.path.exists(PATH) : - os.mkdir(PATH) -import platform -import sqlite3 as lite -# PATH = os.sep.join([os.environ['HOME'],'.edi-parser']) -CONFIG_FILE = os.sep.join([PATH,'config.json']) if 'config' not in SYS_ARGS else SYS_ARGS['config'] -HELP_MESSAGE = """ - cli: - # - # Signup, allows parsing configuration to be downloaded - # - - # Support for SQLite3 - healthcare-io.py --signup steve@the-phi.com --store sqlite - - #or support for mongodb - healthcare-io.py --signup steve@the-phi.com --store mongo - - - healthcare-io.py --<[signup|init]> --store [--batch ] - healthcare-io.py --parse --folder [--batch ] [--resume] - healthcare-io.py --check-update - healthcare-io.py --export <835|837> --config - action : - --signup|init signup user and get configuration file - --parse starts parsing - --check-update checks for updates - --export export data of a 835 or 837 into another database - parameters : - --<[signup|init]> signup or get a configuration file from a parsing server - --folder location of the files (the program will recursively traverse it) - --store data store mongo or sqlite or mongodb - --resume will attempt to resume if there was an interruption - """ -def signup (**args) : - """ - :email user's email address - :url url of the provider to signup - """ - - email = args['email'] - url = args['url'] if 'url' in args else URL - folders = [PATH,OUTPUT_FOLDER] - for path in folders : - if not os.path.exists(path) : - os.mkdir(path) - - # - # - store = args['store'] if 'store' in args else 'sqlite' - headers = {"email":email,"client":platform.node(),"store":store,"db":args['db']} - http = requests.session() - r = http.post(url,headers=headers) - - # - # store = {"type":"disk.DiskWriter","args":{"path":OUTPUT_FOLDER}} - # if 'store' in args : - # store = args['store'] - # filename = (os.sep.join([PATH,'config.json'])) - filename = CONFIG_FILE - info = r.json() #{"parser":r.json(),"store":store} - info = dict({"owner":email},**info) - info['store']['args']['path'] =os.sep.join([OUTPUT_FOLDER,'healthcare-io.db3']) #-- sql - info['out-folder'] = OUTPUT_FOLDER - - file = open( filename,'w') - file.write( json.dumps(info)) - file.close() - _m = """ - Thank you for signingup!! - Your configuration file is store in :path, - - More information visit https://healthcareio.the-phi.com/parser - - Access the source https://healthcareio.the-phi.com/git/code/parser - - """.replace(":path",CONFIG_FILE) - print (_m) - # - # Create the sqlite3 database to - - -def log(**args): - """ - This function will perform a log of anything provided to it - """ - pass -def init(): - """ - read all the configuration from disk. - Requirements for configuration file : - {out-folder,store,837,835 } - """ - # filename = os.sep.join([PATH,'config.json']) - filename = CONFIG_FILE - info = None - if os.path.exists(filename): - # - # Loading the configuration file (JSON format) - file = open(filename) - info = json.loads(file.read()) - - - if 'output-folder' not in info and not os.path.exists(OUTPUT_FOLDER) : - os.mkdir(OUTPUT_FOLDER) - elif 'output-folder' in info and not os.path.exists(info['out-folder']) : - os.mkdir(info['out-folder']) - # if 'type' in info['store'] : - lwriter = None - is_sqlite = False - if'type' in info['store'] and info['store']['type'] == 'disk.SQLiteWriter' and not os.path.exists(info['store']['args']['path']) : - lwriter = transport.factory.instance(**info['store']) - is_sqlite = True - elif 'provider' in info['store'] and info['store']['provider'] == 'sqlite' : - lwriter = transport.instance(**info['store']) ; - is_sqlite = True - if lwriter and is_sqlite: - for key in info['schema'] : - if key != 'logs' : - _id = 'claims' if key == '837' else 'remits' - else: - _id = key - - if not lwriter.has(table=_id) : - lwriter.apply(info['schema'][key]['create']) - - # [lwriter.apply( info['schema'][key]['create']) for key in info['schema'] if not lwriter.has(table=key)] - lwriter.close() - - return info - -def upgrade(**args): - """ - :email provide us with who you are - :key upgrade key provided by the server for a given email - """ - url = args['url'] if 'url' in args else URL+"/upgrade" - headers = {"key":args['key'],"email":args["email"],"url":url} -def check(**_args): - """ - This function will check if there is an update available (versions are in the configuration file) - :param url - """ - url = _args['url'][:-1] if _args['url'].endswith('/') else _args['url'] - url = url + "/version" - if 'version' not in _args : - version = {"_id":"version","current":0.0} - else: - version = _args['version'] - http = requests.session() - r = http.get(url) - return r.json() - -if __name__ == '__main__' : - info = init() - - if 'out-folder' in SYS_ARGS : - OUTPUT_FOLDER = SYS_ARGS['out-folder'] - SYS_ARGS['url'] = SYS_ARGS['url'] if 'url' in SYS_ARGS else URL - - if set(list(SYS_ARGS.keys())) & set(['signup','init']): - # - # This command will essentially get a new copy of the configurations - # @TODO: Tie the request to a version ? - # - - email = SYS_ARGS['signup'].strip() if 'signup' in SYS_ARGS else SYS_ARGS['init'] - url = SYS_ARGS['url'] if 'url' in SYS_ARGS else URL - store = SYS_ARGS['store'] if 'store' in SYS_ARGS else 'sqlite' - db='healthcareio' if 'db' not in SYS_ARGS else SYS_ARGS['db'] - signup(email=email,url=url,store=store,db=db) - # else: - # m = """ - # usage: - # healthcareio --signup --email myemail@provider.com [--url ] - - # """ - # print (m) - elif 'upgrade' in SYS_ARGS : - # - # perform an upgrade i.e some code or new parsers information will be provided - # - - pass - elif 'parse' in SYS_ARGS and info: - """ - In this section of the code we are expecting the user to provide : - :folder location of the files to process or file to process - : - """ - files = [] - if 'file' in SYS_ARGS : - files = [SYS_ARGS['file']] if not os.path.isdir(SYS_ARGS['file']) else [] - if 'folder' in SYS_ARGS and os.path.exists(SYS_ARGS['folder']): - for root,_dir,f in os.walk(SYS_ARGS['folder']) : - - if f : - files += [os.sep.join([root,name]) for name in f] - - # names = os.listdir(SYS_ARGS['folder']) - # files += [os.sep.join([SYS_ARGS['folder'],name]) for name in names if not os.path.isdir(os.sep.join([SYS_ARGS['folder'],name]))] - else: - # - # raise an error - - pass - # - # if the user has specified to resume, we should look into the logs and pull the files processed and those that haven't - # - if 'resume' in SYS_ARGS : - store_config = json.loads( (open(CONFIG_FILE)).read() ) - files = proxy.get.resume(files,store_config ) - # print (["Found ",len(files)," files unprocessed"]) - # - # @TODO: Log this here so we know what is being processed or not - SCOPE = None - - if files : #and ('claims' in SYS_ARGS['parse'] or 'remits' in SYS_ARGS['parse']): - - BATCH_COUNT = 1 if 'batch' not in SYS_ARGS else int (SYS_ARGS['batch']) - - files = np.array_split(files,BATCH_COUNT) - procs = [] - index = 0 - for row in files : - - row = row.tolist() - # logger.write({"process":index,"parse":SYS_ARGS['parse'],"file_count":len(row)}) - # proc = Process(target=apply,args=(row,info['store'],_info,)) - # parser = x12.Parser(os.sep.join([PATH,'config.json'])) - - parser = x12.Parser(CONFIG_FILE) - parser.set.files(row) - parser.start() - procs.append(parser) - # index = index + 1 - while len(procs) > 0 : - procs = [proc for proc in procs if proc.is_alive()] - time.sleep(2) - uri = OUTPUT_FOLDER - store_config = json.loads( (open(CONFIG_FILE)).read() )['store'] - if 'type' in store_config : - uri = store_config['args']['host'] if 'host' in store_config['args'] else ( store_config['args']['path'] if 'path' in store_config['args'] else store_config['args']['database']) - if 'SQLite' in store_config['type']: - provider = 'sqlite' - elif 'sql' in store_config['type'] : - provider = 'SQL' - else: - provider = 'mongo' - - else: - provider = store_config['provider'] - _msg = """ - Completed Parsing, The data is available in :provider database at :uri - Logs are equally available for errors and summary statistics to be compiled - """.replace(":provider",provider).replace(":uri",uri) - print (_msg) - - pass - elif 'analytics' in SYS_ARGS : - PORT = int(SYS_ARGS['port']) if 'port' in SYS_ARGS else 5500 - DEBUG= int(SYS_ARGS['debug']) if 'debug' in SYS_ARGS else 0 - SYS_ARGS['context'] = SYS_ARGS['context'] if 'context' in SYS_ARGS else '' - # - # - - # PATH= SYS_ARGS['config'] if 'config' in SYS_ARGS else os.sep.join([os.environ['HOME'],'.healthcareio','config.json']) - - if os.path.exists(CONFIG_FILE) : - e = analytics.engine(CONFIG_FILE) #--@TODO: make the configuration file globally accessible - e.apply(type='claims',serialize=True) - SYS_ARGS['engine'] = e - SYS_ARGS['config'] = json.loads(open(CONFIG_FILE ).read()) - else: - SYS_ARGS['config'] = {"owner":None,"store":None} - - if 'args' not in SYS_ARGS['config'] : - SYS_ARGS['config']["args"] = {"batch":1,"resume":True} - # - # folder is mandatory - # SYS_ARGS['config']['args']['folder'] = SYS_ARGS['folder'] - - # pointer = lambda : server.app.run(host='0.0.0.0',port=PORT,debug=DEBUG,threaded=False) - # pthread = Process(target=pointer,args=()) - # pthread.start() - elif 'check-update' in SYS_ARGS : - _args = {"url":SYS_ARGS['url']} - try: - if os.path.exists(CONFIG_FILE) : - SYS_ARGS['config'] = json.loads(open(CONFIG_FILE ).read()) - else: - SYS_ARGS['config'] = {} - if 'version' in SYS_ARGS['config'] : - _args['version'] = SYS_ARGS['config']['version'] - version = check(**_args) - _version = {"current":0.0}if 'version' not in SYS_ARGS['config'] else SYS_ARGS['config']['version'] - if _version['current'] != version['current'] : - print () - print ("You need to upgrade your system to version to ",version['current']) - print ("\t- signup (for new configuration)") - print ("\t- use pip to upgrade the codebase") - else: - print () - print ("You are running the current configuraiton version ",_version['current']) - except Exception as e: - print (e) - pass - - elif 'export' in SYS_ARGS: - # - # this function is designed to export the data to csv - # - path = SYS_ARGS['export-config'] - - X12_TYPE = SYS_ARGS['export'] if 'export' in SYS_ARGS else '835' - if not os.path.exists(path) or X12_TYPE not in ['835','837']: - print (HELP_MESSAGE) - else: - # - # Let's run the export function ..., This will push files into a data-store of choice Redshift, PostgreSQL, MySQL ... - # - - # _store = {"type":"sql.SQLWriter","args":json.loads( (open(path) ).read())} - _store = json.loads( (open(path) ).read()) - - pipes = export.Factory.instance(type=X12_TYPE,write_store=_store,config = CONFIG_FILE) #"inspect":0,"cast":0}}) - # pipes[0].run() - # print (pipes) - - - for thread in pipes: - - if 'table' in SYS_ARGS and SYS_ARGS['table'] != thread.table : - continue - thread.start() - time.sleep(1) - thread.join() - - - - else: - - print(HELP_MESSAGE)