parser/healthcareio/server/proxy.py

171 lines
6.9 KiB
Python

"""
This file serves as proxy to healthcare-io, it will be embedded into the API
"""
import os
import transport
import numpy as np
from healthcareio import x12
import pandas as pd
import smart
from healthcareio.analytics import Apex
import time
class get :
PROCS = []
PATH = os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
@staticmethod
def resume (files,args):
"""
This function will determine the appropriate files to be processed by performing a simple complementary set operation against the logs
@TODO: Support data-stores other than mongodb
:param files list of files within a folder
:param _args configuration
"""
_args = args['store'].copy()
if 'mongo' in _args['type'] :
_args['type'] = 'mongo.MongoReader'
reader = transport.factory.instance(**_args)
_files = []
try:
pipeline = [{"$match":{"completed":{"$eq":True}}},{"$group":{"_id":"$name"}},{"$project":{"name":"$_id","_id":0}}]
_args = {"aggregate":"logs","cursor":{},"allowDiskUse":True,"pipeline":pipeline}
_files = reader.read(mongo = _args)
_files = [item['name'] for item in _files]
except Exception as e :
pass
print ( [len(list(set(files) - set(_files))),' files to be processed'])
return list(set(files) - set(_files))
@staticmethod
def processes(_args):
_info = pd.DataFrame(smart.top.read(name='healthcare-io.py'))[['name','cpu','mem']]
if _info.shape[0] == 0 :
_info = pd.DataFrame({"name":["healthcare-io.py"],"cpu":[0],"mem":[0]})
# _info = pd.DataFrame(_info.groupby(['name']).sum())
# _info['name'] = ['healthcare-io.py']
m = {'cpu':'CPU','mem':'RAM','name':'name'}
_info.columns = [m[name] for name in _info.columns.tolist()]
_info.index = np.arange(_info.shape[0])
charts = []
for label in ['CPU','RAM'] :
value = _info[label].sum()
df = pd.DataFrame({"name":[label],label:[value]})
charts.append (
Apex.apply(
{"data":df, "chart":{"type":"radial","axis":{"x":label,"y":"name"}}}
)['apex']
)
#
# This will update the counts for the processes, upon subsequent requests so as to show the change
#
N = 0
lprocs = []
for proc in get.PROCS :
if proc.is_alive() :
lprocs.append(proc)
N = len(lprocs)
get.PROCS = lprocs
return {"process":{"chart":charts,"counts":N}}
@staticmethod
def files (_args):
_info = smart.folder.read(path='/data')
N = _info.files.tolist()[0]
if 'mongo' in _args['store']['type'] :
store_args = dict(_args['store'].copy(),**{"type":"mongo.MongoReader"})
# reader = transport.factory.instance(**_args)
pipeline = [{"$group":{"_id":"$name","count":{"$sum":{"$cond":[{"$eq":["$completed",True]},1,0]}} }},{"$group":{"_id":None,"count":{"$sum":"$count"}}},{"$project":{"_id":0,"status":"completed","count":1}}]
query = {"mongo":{"aggregate":"logs","allowDiskUse":True,"cursor":{},"pipeline":pipeline}}
# _info = pd.DataFrame(reader.read(mongo={"aggregate":"logs","allowDiskUse":True,"cursor":{},"pipeline":pipeline}))
pipeline = [{"$group":{"_id":"$parse","claims":{"$addToSet":"$name"}}},{"$project":{"_id":0,"type":"$_id","count":{"$size":"$claims"}}}]
_query = {"mongo":{"aggregate":"logs","cursor":{},"allowDiskUse":True,"pipeline":pipeline}} #-- distribution claims/remits
else:
store_args = dict(_args['store'].copy(),**{"type":"disk.SQLiteReader"})
store_args['args']['table'] = 'logs'
query= {"sql":"select count(distinct json_extract(data,'$.name')) as count, 'completed' status from logs where json_extract(data,'$.completed') = true"}
_query={"sql":"select json_extract(data,'$.parse') as type,count(distinct json_extract(data,'$.name')) as count from logs group by type"} #-- distribution claim/remits
reader = transport.factory.instance(**store_args)
_info = pd.DataFrame(reader.read(**query))
if not _info.shape[0] :
_info = pd.DataFrame({"status":["completed"],"count":[0]})
_info['count'] = np.round( (_info['count'] * 100 )/N,2)
charts = [Apex.apply({"data":_info,"chart":{"type":"radial","axis":{"y":"status","x":"count"}}})['apex']]
#
# Let us classify the files now i.e claims / remits
#
# pipeline = [{"$group":{"_id":"$parse","claims":{"$addToSet":"$name"}}},{"$project":{"_id":0,"type":"$_id","count":{"$size":"$claims"}}}]
# _args = {"aggregate":"logs","cursor":{},"allowDiskUse":True,"pipeline":pipeline}
# r = pd.DataFrame(reader.read(mongo=_args))
r = pd.DataFrame(reader.read(**_query)) #-- distribution claims/remits
r = Apex.apply({"chart":{"type":"donut","axis":{"x":"count","y":"type"}},"data":r})['apex']
r['chart']['height'] = '100%'
r['legend']['position'] = 'bottom'
charts += [r]
return {"files":{"counts":N,"chart":charts}}
pass
#
# Process handling ....
def run (_args) :
"""
This function will run the jobs and insure as processes (as daemons).
:param _args system configuration
"""
FILES = []
BATCH = int(_args['args']['batch']) #-- number of processes (poorly named variable)
for root,_dir,f in os.walk(_args['args']['folder']) :
if f :
FILES += [os.sep.join([root,name]) for name in f]
FILES = get.resume(FILES,_args)
FILES = np.array_split(FILES,BATCH)
for FILE_GROUP in FILES :
FILE_GROUP = FILE_GROUP.tolist()
# logger.write({"process":index,"parse":_args['parse'],"file_count":len(row)})
# proc = Process(target=apply,args=(row,info['store'],_info,))
parser = x12.Parser(get.PATH) #os.sep.join([PATH,'config.json']))
parser.set.files(FILE_GROUP)
parser.daemon = True
parser.start()
get.PROCS.append(parser)
time.sleep(3)
#
# @TODO:consider submitting an update to clients via publish/subscribe framework
#
return get.PROCS
def stop(_args):
for job in get.PROCS :
if job.is_alive() :
job.terminate()
get.PROCS = []
#
# @TODO: consider submitting an update to clients via publish/subscribe framework
pass
def write(src_args,dest_args,files) :
#
# @TODO: Support for SQLite
pass
def publish (src_args,dest_args,folder="/data"):
FILES = []
for root,_dir,f in os.walk(folder) :
if f :
FILES += [os.sep.join([root,name]) for name in f]
#
# @TODO: Add support for SQLite ....
FILES = np.array_split(FILES,4)