From 2beddab6b47d108c07b990264f8fb2676669c68d Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Mon, 21 Dec 2020 23:28:22 -0600 Subject: [PATCH] feature: added support for custom configuration --- README.md | 31 ++++++-- healthcareio/__init__.py | 2 + healthcareio/healthcare-io.py | 123 ++++++++++++++++---------------- healthcareio/server/__init__.py | 20 +++--- healthcareio/server/proxy.py | 2 +- healthcareio/x12/__init__.py | 83 +++++++++++++++++---- 6 files changed, 170 insertions(+), 91 deletions(-) diff --git a/README.md b/README.md index 37fd19d..b908173 100644 --- a/README.md +++ b/README.md @@ -22,14 +22,38 @@ We wrote this frame to be used in both command line or as a library within in yo pip install --upgrade git+https://hiplab.mc.vanderbilt.edu/git/lab/parse-edi.git ## Usage + cli: + + healthcare-io.py --<[signup|init]> --store [--batch ] + healthcare-io.py --parse --folder [--batch ] [--resume] + healthcare-io.py --check-update + action : + --signup|init signup user and get configuration file + --parse starts parsing + --check checks for updates + parameters : + --<[signup|init]> signup or get a configuration file from a parsing server + --folder location of the files (the program will recursively traverse it) + --store data store mongo or sqlite or mongodb + --resume will attempt to resume if there was an interruption **cli :** 1. signup to get parsing configuration + The parser is driven by a configuration, file you need by signing up. + healthcare-io.py --signup [--store ] + +2. check version + + Occasionally the attributes in the configuration file may change, This function will determine if there is a new version available. + + healthcare-io.py --check-update -2. parsing claims in a folder +3. parsing data in a folder + + The parser will recursively traverse a directory with claims and or remittances healthcare-io.py --parse --folder [--batch ] [--resume] @@ -39,11 +63,6 @@ We wrote this frame to be used in both command line or as a library within in yo --batch number of processes to spawn to parse the files --resume tells the parser to resume parsing if all files weren't processed or new files were added into the folder -3. dashboard - - There is a built-in dashboard that has displays descriptive analytics in a web browser - - healthcare-io.py --server [--context ] **Embedded in Code :** diff --git a/healthcareio/__init__.py b/healthcareio/__init__.py index d63c74f..2af64db 100644 --- a/healthcareio/__init__.py +++ b/healthcareio/__init__.py @@ -17,4 +17,6 @@ Usage : from healthcareio import analytics import healthcareio.x12 as x12 +import healthcareio.params as params + # from healthcareio import server diff --git a/healthcareio/healthcare-io.py b/healthcareio/healthcare-io.py index 33efbf8..57176e1 100644 --- a/healthcareio/healthcare-io.py +++ b/healthcareio/healthcare-io.py @@ -32,9 +32,10 @@ Usage : from healthcareio.params import SYS_ARGS from transport import factory import requests - from healthcareio import analytics from healthcareio import server + + from healthcareio.parser import get_content import os import json @@ -56,10 +57,10 @@ if not os.path.exists(PATH) : import platform import sqlite3 as lite # PATH = os.sep.join([os.environ['HOME'],'.edi-parser']) -def register (**args) : +def signup (**args) : """ :email user's email address - :url url of the provider to register + :url url of the provider to signup """ email = args['email'] @@ -203,13 +204,28 @@ def upgrade(**args): """ url = args['url'] if 'url' in args else URL+"/upgrade" headers = {"key":args['key'],"email":args["email"],"url":url} +def check(**_args): + """ + This function will check if there is an update available (versions are in the configuration file) + :param url + """ + url = _args['url'][:-1] if _args['url'].endswith('/') else _args['url'] + url = url + "/version" + if 'version' not in _args : + version = {"_id":"version","current":0.0} + else: + version = _args['version'] + http = requests.session() + r = http.get(url) + return r.json() if __name__ == '__main__' : info = init() if 'out-folder' in SYS_ARGS : OUTPUT_FOLDER = SYS_ARGS['out-folder'] - + SYS_ARGS['url'] = SYS_ARGS['url'] if 'url' in SYS_ARGS else URL + if set(list(SYS_ARGS.keys())) & set(['signup','init']): # # This command will essentially get a new copy of the configurations @@ -217,10 +233,10 @@ if __name__ == '__main__' : # email = SYS_ARGS['signup'].strip() if 'signup' in SYS_ARGS else SYS_ARGS['init'] - url = SYS_ARGS['url'] if 'url' in SYS_ARGS else 'https://healthcareio.the-phi.com' + url = SYS_ARGS['url'] if 'url' in SYS_ARGS else URL store = SYS_ARGS['store'] if 'store' in SYS_ARGS else 'sqlite' db='healthcareio' if 'db' not in SYS_ARGS else SYS_ARGS['db'] - register(email=email,url=url,store=store,db=db) + signup(email=email,url=url,store=store,db=db) # else: # m = """ # usage: @@ -244,11 +260,17 @@ if __name__ == '__main__' : if 'file' in SYS_ARGS : files = [SYS_ARGS['file']] if not os.path.isdir(SYS_ARGS['file']) else [] if 'folder' in SYS_ARGS and os.path.exists(SYS_ARGS['folder']): - names = os.listdir(SYS_ARGS['folder']) - files += [os.sep.join([SYS_ARGS['folder'],name]) for name in names if not os.path.isdir(os.sep.join([SYS_ARGS['folder'],name]))] + for root,_dir,f in os.walk(SYS_ARGS['folder']) : + + if f : + files += [os.sep.join([root,name]) for name in f] + + # names = os.listdir(SYS_ARGS['folder']) + # files += [os.sep.join([SYS_ARGS['folder'],name]) for name in names if not os.path.isdir(os.sep.join([SYS_ARGS['folder'],name]))] else: # - # raise an erro + # raise an error + pass # # if the user has specified to resume, we should look into the logs and pull the files processed and those that haven't @@ -256,40 +278,13 @@ if __name__ == '__main__' : if 'resume' in SYS_ARGS : store_config = json.loads( (open(os.sep.join([PATH,'config.json']))).read() ) files = proxy.get.resume(files,store_config ) - print (["Found ",len(files)," files unprocessed"]) + # print (["Found ",len(files)," files unprocessed"]) # # @TODO: Log this here so we know what is being processed or not SCOPE = None if files : #and ('claims' in SYS_ARGS['parse'] or 'remits' in SYS_ARGS['parse']): - # logger = factory.instance(type='disk.DiskWriter',args={'path':os.sep.join([info['out-folder'],SYS_ARGS['parse']+'.log'])}) - # if info['store']['type'] == 'disk.DiskWriter' : - # info['store']['args']['path'] += (os.sep + 'healthcare-io.json') - # elif info['store']['type'] == 'disk.SQLiteWriter' : - # # info['store']['args']['path'] += (os.sep + 'healthcare-io.db3') - # pass - - - # if info['store']['type'] == 'disk.SQLiteWriter' : - # info['store']['args']['table'] = SYS_ARGS['parse'].strip().lower() - # _info = json.loads(json.dumps(info['store'])) - # _info['args']['table']='logs' - # else: - # # - # # if we are working with no-sql we will put the logs in it (performance )? - # info['store']['args']['doc'] = SYS_ARGS['parse'].strip().lower() - # _info = json.loads(json.dumps(info['store'])) - # _info['args']['doc'] = 'logs' - # logger = factory.instance(**_info) - - # writer = factory.instance(**info['store']) - - # - # we need to have batches ready for this in order to run some of these queries in parallel - # @TODO: Make sure it is with a persistence storage (not disk .. not thread/process safe yet) - # - Make sure we can leverage this on n-cores later on, for now the assumption is a single core - # BATCH_COUNT = 1 if 'batch' not in SYS_ARGS else int (SYS_ARGS['batch']) files = np.array_split(files,BATCH_COUNT) @@ -308,27 +303,7 @@ if __name__ == '__main__' : while len(procs) > 0 : procs = [proc for proc in procs if proc.is_alive()] time.sleep(2) - # for filename in files : - - # if filename.strip() == '': - # continue - # # content,logs = get_content(filename,CONFIG,CONFIG['SECTION']) - # # - # try: - # content,logs = parse(filename = filename,type=SYS_ARGS['parse']) - # if content : - # writer.write(content) - # if logs : - # [logger.write(dict(_row,**{"parse":SYS_ARGS['parse']})) for _row in logs] - # else: - # logger.write({"parse":SYS_ARGS['parse'],"name":filename,"completed":True,"rows":len(content)}) - # except Exception as e: - # logger.write({"parse":SYS_ARGS['parse'],"filename":filename,"completed":False,"rows":-1,"msg":e.args[0]}) - # # print ([filename,len(content)]) - # # - # # @TODO: forward this data to the writer and log engine - # # - + pass @@ -358,6 +333,28 @@ if __name__ == '__main__' : pointer = lambda : server.app.run(host='0.0.0.0',port=PORT,debug=DEBUG,threaded=False) pthread = Process(target=pointer,args=()) pthread.start() + elif 'check-update' in SYS_ARGS : + _args = {"url":SYS_ARGS['url']} + try: + if os.path.exists(os.sep.join([PATH,'config.json'])) : + SYS_ARGS['config'] = json.loads((open(os.sep.join([PATH,'config.json']))).read()) + else: + SYS_ARGS['config'] = {} + if 'version' in SYS_ARGS['config'] : + _args['version'] = SYS_ARGS['config']['version'] + version = check(**_args) + _version = {"current":0.0}if 'version' not in SYS_ARGS['config'] else SYS_ARGS['config']['version'] + if _version['current'] != version['current'] : + print () + print ("You need to upgrade your system to version to ",version['current']) + print ("\t- signup (for new configuration)") + print ("\t- use pip to upgrade the codebase") + else: + print () + print ("You are running the current configuraiton version ",_version.current) + except Exception as e: + print (e) + pass elif 'export' in SYS_ARGS: # @@ -373,11 +370,15 @@ if __name__ == '__main__' : cli: healthcare-io.py --<[signup|init]> --store [--batch ] - healthcare-io.py --parse claims --folder [--batch ] - healthcare-io.py --parse remits --folder [--batch ] [--resume] - + healthcare-io.py --parse --folder [--batch ] [--resume] + healthcare-io.py --check-update + action : + --signup|init signup user and get configuration file + --parse starts parsing + --check checks for updates parameters : --<[signup|init]> signup or get a configuration file from a parsing server + --folder location of the files (the program will recursively traverse it) --store data store mongo or sqlite or mongodb --resume will attempt to resume if there was an interruption """ diff --git a/healthcareio/server/__init__.py b/healthcareio/server/__init__.py index 1ffb98a..e548fd8 100644 --- a/healthcareio/server/__init__.py +++ b/healthcareio/server/__init__.py @@ -11,11 +11,11 @@ import numpy as np from healthcareio import x12 from multiprocessing import Process -from flask_socketio import SocketIO, emit, disconnect,send +# from flask_socketio import SocketIO, emit, disconnect,send from healthcareio.server import proxy PATH = os.sep.join([os.environ['HOME'],'.healthcareio','config.json']) app = Flask(__name__) -socket_ = SocketIO(app) +# socket_ = SocketIO(app) def resume (files): _args = SYS_ARGS['config']['store'].copy() @@ -30,7 +30,7 @@ def resume (files): _files = [item['name'] for item in _files] except Exception as e : pass - print (["found ",len(files),"\tProcessed ",len(_files)]) + return list(set(files) - set(_files)) @@ -66,14 +66,14 @@ def push() : _args = {"aggregate":"logs","cursor":{},"allowDiskUse":True,"pipeline":pipeline} r = pd.DataFrame(reader.read(mongo=_args)) r = healthcareio.analytics.Apex.apply({"chart":{"type":"donut","axis":{"x":"count","y":"type"}},"data":r}) - emit("update",r,json=True) + # emit("update",r,json=True) return r -@socket_.on('connect') -def client_connect(**r): - print ('Connection received') - print (r) - push() - pass +# @socket_.on('connect') +# def client_connect(**r): +# print ('Connection received') +# print (r) +# push() +# pass @app.route("/favicon.ico") def _icon(): diff --git a/healthcareio/server/proxy.py b/healthcareio/server/proxy.py index e5b09d5..2ad95c6 100644 --- a/healthcareio/server/proxy.py +++ b/healthcareio/server/proxy.py @@ -32,7 +32,7 @@ class get : _files = [item['name'] for item in _files] except Exception as e : pass - print (["found ",len(files),"\tProcessed ",len(_files)]) + print ( [len(list(set(files) - set(_files))),' files to be processed']) return list(set(files) - set(_files)) @staticmethod diff --git a/healthcareio/x12/__init__.py b/healthcareio/x12/__init__.py index 66dc05a..972a137 100644 --- a/healthcareio/x12/__init__.py +++ b/healthcareio/x12/__init__.py @@ -24,6 +24,7 @@ import sys from itertools import islice from multiprocessing import Process import transport +import jsonmerge class void : pass class Formatters : @@ -70,7 +71,7 @@ class Formatters : else: value = [ [prefix]+ self.split(item,'>') for item in row.replace('~','').split(sep)[1:] ] - + return value if type(value) == list and type(value[0]) != list else value[0] def get_config(self,config,row): """ @@ -95,7 +96,6 @@ class Formatters : if _row[0] in config['SIMILAR'] : key = config['SIMILAR'][_row[0]] _info = config[key] - return _info def hash(self,value): @@ -181,13 +181,16 @@ class Formatters : return x class Parser (Process): def __init__(self,path): + """ + :path path of the configuration file (it can be absolute) + """ Process.__init__(self) self.utils = Formatters() self.get = void() self.get.value = self.get_map self.get.default_value = self.get_default_value _config = json.loads(open(path).read()) - + self._custom_config = self.get_custom(path) self.config = _config['parser'] self.store = _config['store'] @@ -197,6 +200,27 @@ class Parser (Process): self.emit = void() self.emit.pre = None self.emit.post = None + def get_custom(self,path) : + """ + :path path of the configuration file (it can be absolute) + """ + # + # + _path = path.replace('config.json','') + if _path.endswith(os.sep) : + _path = _path[:-1] + + _config = {} + _path = os.sep.join([_path,'custom']) + if os.path.exists(_path) : + + files = os.listdir(_path) + if files : + fullname = os.sep.join([_path,files[0]]) + + _config = json.loads ( (open(fullname)).read() ) + return _config + def set_files(self,files): self.files = files def get_map(self,row,config,version=None): @@ -247,15 +271,18 @@ class Parser (Process): value = {key:value} if key not in value else value + else: if 'syn' in config and value in config['syn'] : value = config['syn'][value] if type(value) == dict : object_value = dict(object_value, **value) + else: object_value[key] = value + else: # # we are dealing with a complex object @@ -275,26 +302,35 @@ class Parser (Process): return object_value def apply(self,content,_code) : """ - :file content i.e a segment with the envelope - :_code 837 or 835 (helps get the appropriate configuration) + :content content of a file i.e a segment with the envelope + :_code 837 or 835 (helps get the appropriate configuration) """ util = Formatters() # header = default_value.copy() value = {} + for row in content[:] : + row = util.split(row.replace('\n','').replace('~','')) _info = util.get.config(self.config[_code][0],row) + if self._custom_config and _code in self._custom_config: + _cinfo = util.get.config(self._custom_config[_code],row) + else: + _cinfo = {} + # _info = self.consolidate(row=row,type=_code,config=_info,util=util) + # print ([row[0],_info]) + # print () + # continue + # _cinfo = util.get.config(self._custom_config[_code],row) + if _info : + try: - + _info = jsonmerge.merge(_info,_cinfo) tmp = self.get.value(row,_info) - # if 'P1080351470' in content[0] and 'PLB' in row: - # print (_info) - # print (row) - # print (tmp) if not tmp : continue if 'label' in _info : @@ -326,7 +362,9 @@ class Parser (Process): elif 'field' in _info : name = _info['field'] - value[name] = tmp + # value[name] = tmp + value = jsonmerge.merge(value,{name:tmp}) + else: @@ -341,6 +379,7 @@ class Parser (Process): return value if value else {} def get_default_value(self,content,_code): + util = Formatters() TOP_ROW = content[1].split('*') CATEGORY= content[2].split('*')[1].strip() @@ -359,6 +398,8 @@ class Parser (Process): value['payer_id'] = SENDER_ID else: value['provider_id'] = SENDER_ID + # + # Let's parse this for default values return value def read(self,filename) : @@ -381,8 +422,14 @@ class Parser (Process): INITIAL_ROWS = file[:4] if len(INITIAL_ROWS) < 3 : return None,[{"name":filename,"completed":False}],None - section = 'HL' if INITIAL_ROWS[1].split('*')[1] == 'HC' else 'CLP' - _code = '837' if section == 'HL' else '835' + # section = 'HL' if INITIAL_ROWS[1].split('*')[1] == 'HC' else 'CLP' + # _code = '837' if section == 'HL' else '835' + # print ([_code,section]) + _code = INITIAL_ROWS[2].split('*')[1].strip() + # section = 'CLP' if _code == '835' else 'HL' + section = self.config[_code][0]['SECTION'].strip() + # + # adjusting the DEFAULT_VALUE = self.get.default_value(INITIAL_ROWS,_code) DEFAULT_VALUE['name'] = filename.strip() # @@ -390,22 +437,30 @@ class Parser (Process): # index 1 identifies file type i.e CLM for claim and CLP for remittance segment = [] index = 0; + _toprows = [] for row in file : + row = row.replace('\r','') + if not segment and not row.startswith(section): + _toprows += [row] if row.startswith(section) and not segment: segment = [row] + continue elif segment and not row.startswith(section): segment.append(row) + if len(segment) > 1 and row.startswith(section): # # process the segment somewhere (create a thread maybe?) # # default_claim = dict({"index":index},**DEFAULT_VALUE) + # print (_toprows) _claim = self.apply(segment,_code) + # if _claim['claim_id'] == 'P1080351470' : # print (_claim) # _claim = dict(DEFAULT_VALUE,**_claim) @@ -425,12 +480,14 @@ class Parser (Process): claim = self.apply(segment,_code) if claim : claim['index'] = len(claims) + claim = jsonmerge.merge(claim,self.apply(_toprows,_code)) claims.append(dict(DEFAULT_VALUE,**claim)) if type(file) != list : file.close() # x12_file = open(filename.strip(),errors='ignore').read().split('\n') except Exception as e: + logs.append ({"parse":_code,"completed":False,"name":filename,"msg":e.args[0]}) return [],logs,None