parser/healthcareio/server/__init__.py

238 lines
8.2 KiB
Python

from flask import Flask, request,render_template, send_from_directory
from healthcareio.params import SYS_ARGS
import healthcareio.analytics
import os
import json
import time
import smart
import transport
import pandas as pd
import numpy as np
from healthcareio import x12
from multiprocessing import Process
from flask_socketio import SocketIO, emit, disconnect,send
from healthcareio.server import proxy
PATH = os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
app = Flask(__name__)
socket_ = SocketIO(app)
def resume (files):
_args = SYS_ARGS['config']['store'].copy()
if 'mongo' in SYS_ARGS['config']['store']['type'] :
_args['type'] = 'mongo.MongoReader'
reader = transport.factory.instance(**_args)
_files = []
try:
pipeline = [{"$match":{"completed":{"$eq":True}}},{"$group":{"_id":"$name"}},{"$project":{"name":"$_id","_id":0}}]
_args = {"aggregate":"logs","cursor":{},"allowDiskUse":True,"pipeline":pipeline}
_files = reader.read(mongo = _args)
_files = [item['name'] for item in _files]
except Exception as e :
pass
print (["found ",len(files),"\tProcessed ",len(_files)])
return list(set(files) - set(_files))
def run ():
#
# let's get the files in the folder (perhaps recursively traverse them)
#
FILES = []
BATCH = int(SYS_ARGS['config']['args']['batch']) #-- number of processes (poorly named variable)
for root,_dir,f in os.walk(SYS_ARGS['config']['args']['folder']) :
if f :
FILES += [os.sep.join([root,name]) for name in f]
FILES = resume(FILES)
FILES = np.array_split(FILES,BATCH)
procs = []
for FILE_GROUP in FILES :
FILE_GROUP = FILE_GROUP.tolist()
# logger.write({"process":index,"parse":SYS_ARGS['parse'],"file_count":len(row)})
# proc = Process(target=apply,args=(row,info['store'],_info,))
parser = x12.Parser(PATH) #os.sep.join([PATH,'config.json']))
parser.set.files(FILE_GROUP)
parser.start()
procs.append(parser)
SYS_ARGS['procs'] = procs
# @socket_.on('data',namespace='/stream')
def push() :
_args = dict(SYS_ARGS['config']['store'].copy(),**{"type":"mongo.MongoReader"})
reader = transport.factory.instance(**_args)
pipeline = [{"$group":{"_id":"$parse","claims":{"$addToSet":"$name"}}},{"$project":{"_id":0,"type":"$_id","count":{"$size":"$claims"}}}]
_args = {"aggregate":"logs","cursor":{},"allowDiskUse":True,"pipeline":pipeline}
r = pd.DataFrame(reader.read(mongo=_args))
r = healthcareio.analytics.Apex.apply({"chart":{"type":"donut","axis":{"x":"count","y":"type"}},"data":r})
emit("update",r,json=True)
return r
@socket_.on('connect')
def client_connect(**r):
print ('Connection received')
print (r)
push()
pass
@app.route("/favicon.ico")
def _icon():
return send_from_directory(os.path.join([app.root_path, 'static','img','logo.svg']),
'favicon.ico', mimetype='image/vnd.microsoft.icon')
@app.route("/")
def init():
e = SYS_ARGS['engine']
sections = {"remits":e.info['835'],"claims":e.info['837']}
_args = {"sections":sections,"store":SYS_ARGS["config"]["store"],"owner":SYS_ARGS['config']['owner'],"args":SYS_ARGS["config"]["args"]}
return render_template("index.html",**_args)
@app.route("/format/<id>/<index>",methods=['POST'])
def _format(id,index):
e = SYS_ARGS['engine']
key = '837' if id == 'claims' else '835'
index = int(index)
# p = e.info[key][index]
p = e.filter(type=id,index=index)
r = []
for item in p['pipeline'] :
_item= dict(item)
_item = dict(_item,**healthcareio.analytics.Apex.apply(item))
del _item['data']
if 'apex' in _item or 'html' in _item:
r.append(_item)
r = {"id":p['id'],"pipeline":r}
return json.dumps(r),200
@app.route("/get/<id>/<index>",methods=['GET'])
def get(id,index):
e = SYS_ARGS['engine']
key = '837' if id == 'claims' else '835'
index = int(index)
# p = e.info[key][index]
p = e.filter(type=id,index=index)
r = {}
for item in p[0]['pipeline'] :
_item= [dict(item)]
# r[item['label']] = item['data'].to_dict(orient='record')
r[item['label']] = item['data'].to_dict('record')
return json.dumps(r),200
@app.route("/reset",methods=["POST"])
def reset():
return "1",200
@app.route("/data",methods=['GET'])
def get_data ():
"""
This function will return statistical data about the services i.e general statistics about what has/been processed
"""
HEADER = {"Content-type":"application/json"}
_args = SYS_ARGS['config']
options = dict(proxy.get.files(_args),**proxy.get.processes(_args))
return json.dumps(options),HEADER
@app.route("/log/<id>",methods=["POST","PUT","GET"])
def log(id) :
HEADER = {"Content-Type":"application/json; charset=utf8"}
if id == 'params' and request.method in ['PUT', 'POST']:
info = request.json
_args = {"batch":info['batch'] if 'batch' in info else 1,"resume":True}
#
# We should update the configuration
SYS_ARGS['config']['args'] = _args
PATH = os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
write = lambda content: (open(PATH,'w')).write(json.dumps(content))
proc = Process(target=write,args=(SYS_ARGS['config'],))
proc.start()
return "1",HEADER
pass
@app.route("/io/<id>",methods=['POST'])
def io_data(id):
if id == 'params' :
_args = request.json
#
# Expecting batch,folder as parameters
_args = request.json
_args['resume'] = True
print (_args)
#
# We should update the configuration
SYS_ARGS['config']['args'] = _args
# PATH = os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
try:
write = lambda content: (open(PATH,'w')).write(json.dumps(content))
proc = Process(target=write,args=(SYS_ARGS['config'],))
proc.start()
# proc.join()
return "1",200
except Exception as e :
return "0",403
pass
elif id == 'stop' :
stop()
pass
elif id == 'run' :
# run()
_args = {"args":SYS_ARGS['config']['args'],"store":SYS_ARGS["config"]["store"]}
proxy.run(_args)
return "1",200
pass
@app.route("/export")
def export_form():
_args = {"context":SYS_ARGS['context']}
return render_template("store.html",**_args)
@app.route("/export",methods=['POST','PUT'])
def apply_etl():
_info = request.json
m = {'s3':'s3.s3Writer','mongo':'mongo.MongoWriter'}
if _info :
dest_args = {'type':m[_info['type']],"args": _info['content'] }
src_args = SYS_ARGS['config']['store']
# print (_args)
# writer = transport.factory.instance(**_args)
proxy.publish(src_args,dest_args)
return "1",405
else:
return "0",404
@app.route("/update")
def update():
pass
return "0",405
@app.route("/reload",methods=['POST'])
def reload():
# e = SYS_ARGS['engine']
# e.apply()
PATH= SYS_ARGS['config'] if 'config' in SYS_ARGS else os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
e = healthcareio.analytics.engine(PATH)
# e.apply()
SYS_ARGS['engine'] = e
return "1",200
if __name__ == '__main__' :
PORT = int(SYS_ARGS['port']) if 'port' in SYS_ARGS else 5500
DEBUG= int(SYS_ARGS['debug']) if 'debug' in SYS_ARGS else 0
SYS_ARGS['context'] = SYS_ARGS['context'] if 'context' in SYS_ARGS else ''
#
#
PATH= SYS_ARGS['config'] if 'config' in SYS_ARGS else os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
#
# Adjusting configuration with parameters (batch,folder,resume)
if 'args' not in SYS_ARGS['config'] :
SYS_ARGS['config']["args"] = {"batch":1,"resume":True,"folder":"/data"}
SYS_ARGS['procs'] = []
# SYS_ARGS['path'] = os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
e = healthcareio.analytics.engine(PATH)
e.apply(type='claims',serialize=False)
SYS_ARGS['engine'] = e
app.run(host='0.0.0.0',port=PORT,debug=DEBUG,threaded=True)