""" This file serves as proxy to healthcare-io, it will be embedded into the API """ import os import transport import numpy as np from healthcareio import x12 import pandas as pd import smart from healthcareio.analytics import Apex import time class get : PROCS = [] PATH = os.sep.join([os.environ['HOME'],'.healthcareio','config.json']) @staticmethod def resume (files,args): """ This function will determine the appropriate files to be processed by performing a simple complementary set operation against the logs @TODO: Support data-stores other than mongodb :param files list of files within a folder :param _args configuration """ _args = args['store'].copy() if 'mongo' in _args['type'] : _args['type'] = 'mongo.MongoReader' reader = transport.factory.instance(**_args) _files = [] try: pipeline = [{"$match":{"completed":{"$eq":True}}},{"$group":{"_id":"$name"}},{"$project":{"name":"$_id","_id":0}}] _args = {"aggregate":"logs","cursor":{},"allowDiskUse":True,"pipeline":pipeline} _files = reader.read(mongo = _args) _files = [item['name'] for item in _files] except Exception as e : pass print ( [len(list(set(files) - set(_files))),' files to be processed']) return list(set(files) - set(_files)) @staticmethod def processes(_args): _info = pd.DataFrame(smart.top.read(name='healthcare-io.py'))[['name','cpu','mem']] if _info.shape[0] == 0 : _info = pd.DataFrame({"name":["healthcare-io.py"],"cpu":[0],"mem":[0]}) # _info = pd.DataFrame(_info.groupby(['name']).sum()) # _info['name'] = ['healthcare-io.py'] m = {'cpu':'CPU','mem':'RAM','name':'name'} _info.columns = [m[name] for name in _info.columns.tolist()] _info.index = np.arange(_info.shape[0]) charts = [] for label in ['CPU','RAM'] : value = _info[label].sum() df = pd.DataFrame({"name":[label],label:[value]}) charts.append ( Apex.apply( {"data":df, "chart":{"type":"radial","axis":{"x":label,"y":"name"}}} )['apex'] ) # # This will update the counts for the processes, upon subsequent requests so as to show the change # N = 0 lprocs = [] for proc in get.PROCS : if proc.is_alive() : lprocs.append(proc) N = len(lprocs) get.PROCS = lprocs return {"process":{"chart":charts,"counts":N}} @staticmethod def files (_args): _info = smart.folder.read(path='/data') N = _info.files.tolist()[0] if 'mongo' in _args['store']['type'] : store_args = dict(_args['store'].copy(),**{"type":"mongo.MongoReader"}) # reader = transport.factory.instance(**_args) pipeline = [{"$group":{"_id":"$name","count":{"$sum":{"$cond":[{"$eq":["$completed",True]},1,0]}} }},{"$group":{"_id":None,"count":{"$sum":"$count"}}},{"$project":{"_id":0,"status":"completed","count":1}}] query = {"mongo":{"aggregate":"logs","allowDiskUse":True,"cursor":{},"pipeline":pipeline}} # _info = pd.DataFrame(reader.read(mongo={"aggregate":"logs","allowDiskUse":True,"cursor":{},"pipeline":pipeline})) pipeline = [{"$group":{"_id":"$parse","claims":{"$addToSet":"$name"}}},{"$project":{"_id":0,"type":"$_id","count":{"$size":"$claims"}}}] _query = {"mongo":{"aggregate":"logs","cursor":{},"allowDiskUse":True,"pipeline":pipeline}} #-- distribution claims/remits else: store_args = dict(_args['store'].copy(),**{"type":"disk.SQLiteReader"}) store_args['args']['table'] = 'logs' query= {"sql":"select count(distinct json_extract(data,'$.name')) as count, 'completed' status from logs where json_extract(data,'$.completed') = true"} _query={"sql":"select json_extract(data,'$.parse') as type,count(distinct json_extract(data,'$.name')) as count from logs group by type"} #-- distribution claim/remits reader = transport.factory.instance(**store_args) _info = pd.DataFrame(reader.read(**query)) if not _info.shape[0] : _info = pd.DataFrame({"status":["completed"],"count":[0]}) _info['count'] = np.round( (_info['count'] * 100 )/N,2) charts = [Apex.apply({"data":_info,"chart":{"type":"radial","axis":{"y":"status","x":"count"}}})['apex']] # # Let us classify the files now i.e claims / remits # # pipeline = [{"$group":{"_id":"$parse","claims":{"$addToSet":"$name"}}},{"$project":{"_id":0,"type":"$_id","count":{"$size":"$claims"}}}] # _args = {"aggregate":"logs","cursor":{},"allowDiskUse":True,"pipeline":pipeline} # r = pd.DataFrame(reader.read(mongo=_args)) r = pd.DataFrame(reader.read(**_query)) #-- distribution claims/remits r = Apex.apply({"chart":{"type":"donut","axis":{"x":"count","y":"type"}},"data":r})['apex'] r['chart']['height'] = '100%' r['legend']['position'] = 'bottom' charts += [r] return {"files":{"counts":N,"chart":charts}} pass # # Process handling .... def run (_args) : """ This function will run the jobs and insure as processes (as daemons). :param _args system configuration """ FILES = [] BATCH = int(_args['args']['batch']) #-- number of processes (poorly named variable) for root,_dir,f in os.walk(_args['args']['folder']) : if f : FILES += [os.sep.join([root,name]) for name in f] FILES = get.resume(FILES,_args) FILES = np.array_split(FILES,BATCH) for FILE_GROUP in FILES : FILE_GROUP = FILE_GROUP.tolist() # logger.write({"process":index,"parse":_args['parse'],"file_count":len(row)}) # proc = Process(target=apply,args=(row,info['store'],_info,)) parser = x12.Parser(get.PATH) #os.sep.join([PATH,'config.json'])) parser.set.files(FILE_GROUP) parser.daemon = True parser.start() get.PROCS.append(parser) time.sleep(3) # # @TODO:consider submitting an update to clients via publish/subscribe framework # return get.PROCS def stop(_args): for job in get.PROCS : if job.is_alive() : job.terminate() get.PROCS = [] # # @TODO: consider submitting an update to clients via publish/subscribe framework pass def write(src_args,dest_args,files) : # # @TODO: Support for SQLite pass def publish (src_args,dest_args,folder="/data"): FILES = [] for root,_dir,f in os.walk(folder) : if f : FILES += [os.sep.join([root,name]) for name in f] # # @TODO: Add support for SQLite .... FILES = np.array_split(FILES,4)