parser/healthcareio/server/__init__.py

238 lines
8.2 KiB
Python
Raw Normal View History

2020-09-26 22:01:23 +00:00
from flask import Flask, request,render_template, send_from_directory
from healthcareio.params import SYS_ARGS
import healthcareio.analytics
import os
import json
2020-12-11 12:55:34 +00:00
import time
import smart
import transport
import pandas as pd
import numpy as np
import x12
from multiprocessing import Process
from flask_socketio import SocketIO, emit, disconnect,send
from healthcareio.server import proxy
PATH = os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
2020-09-26 22:01:23 +00:00
app = Flask(__name__)
2020-12-11 12:55:34 +00:00
socket_ = SocketIO(app)
def resume (files):
_args = SYS_ARGS['config']['store'].copy()
if 'mongo' in SYS_ARGS['config']['store']['type'] :
_args['type'] = 'mongo.MongoReader'
reader = transport.factory.instance(**_args)
_files = []
try:
pipeline = [{"$match":{"completed":{"$eq":True}}},{"$group":{"_id":"$name"}},{"$project":{"name":"$_id","_id":0}}]
_args = {"aggregate":"logs","cursor":{},"allowDiskUse":True,"pipeline":pipeline}
_files = reader.read(mongo = _args)
_files = [item['name'] for item in _files]
except Exception as e :
pass
print (["found ",len(files),"\tProcessed ",len(_files)])
return list(set(files) - set(_files))
def run ():
#
# let's get the files in the folder (perhaps recursively traverse them)
#
FILES = []
BATCH = int(SYS_ARGS['config']['args']['batch']) #-- number of processes (poorly named variable)
for root,_dir,f in os.walk(SYS_ARGS['config']['args']['folder']) :
if f :
FILES += [os.sep.join([root,name]) for name in f]
FILES = resume(FILES)
FILES = np.array_split(FILES,BATCH)
procs = []
for FILE_GROUP in FILES :
FILE_GROUP = FILE_GROUP.tolist()
# logger.write({"process":index,"parse":SYS_ARGS['parse'],"file_count":len(row)})
# proc = Process(target=apply,args=(row,info['store'],_info,))
parser = x12.Parser(PATH) #os.sep.join([PATH,'config.json']))
parser.set.files(FILE_GROUP)
parser.start()
procs.append(parser)
SYS_ARGS['procs'] = procs
# @socket_.on('data',namespace='/stream')
def push() :
_args = dict(SYS_ARGS['config']['store'].copy(),**{"type":"mongo.MongoReader"})
reader = transport.factory.instance(**_args)
pipeline = [{"$group":{"_id":"$parse","claims":{"$addToSet":"$name"}}},{"$project":{"_id":0,"type":"$_id","count":{"$size":"$claims"}}}]
_args = {"aggregate":"logs","cursor":{},"allowDiskUse":True,"pipeline":pipeline}
r = pd.DataFrame(reader.read(mongo=_args))
r = healthcareio.analytics.Apex.apply({"chart":{"type":"donut","axis":{"x":"count","y":"type"}},"data":r})
emit("update",r,json=True)
return r
@socket_.on('connect')
def client_connect(**r):
print ('Connection received')
print (r)
push()
pass
2020-09-26 22:01:23 +00:00
@app.route("/favicon.ico")
def _icon():
return send_from_directory(os.path.join([app.root_path, 'static','img','logo.svg']),
'favicon.ico', mimetype='image/vnd.microsoft.icon')
@app.route("/")
def init():
e = SYS_ARGS['engine']
sections = {"remits":e.info['835'],"claims":e.info['837']}
2020-12-11 12:55:34 +00:00
_args = {"sections":sections,"store":SYS_ARGS["config"]["store"],"owner":SYS_ARGS['config']['owner'],"args":SYS_ARGS["config"]["args"]}
2020-09-26 22:01:23 +00:00
return render_template("index.html",**_args)
@app.route("/format/<id>/<index>",methods=['POST'])
def _format(id,index):
e = SYS_ARGS['engine']
key = '837' if id == 'claims' else '835'
index = int(index)
# p = e.info[key][index]
2020-12-11 12:55:34 +00:00
p = e.filter(type=id,index=index)
2020-09-26 22:01:23 +00:00
r = []
2020-12-11 12:55:34 +00:00
for item in p['pipeline'] :
_item= dict(item)
2020-09-26 22:01:23 +00:00
_item = dict(_item,**healthcareio.analytics.Apex.apply(item))
2020-12-11 12:55:34 +00:00
del _item['data']
2020-09-26 22:01:23 +00:00
if 'apex' in _item or 'html' in _item:
r.append(_item)
2020-12-11 12:55:34 +00:00
r = {"id":p['id'],"pipeline":r}
2020-09-26 22:01:23 +00:00
return json.dumps(r),200
2020-12-11 12:55:34 +00:00
2020-09-26 22:01:23 +00:00
@app.route("/get/<id>/<index>",methods=['GET'])
def get(id,index):
e = SYS_ARGS['engine']
key = '837' if id == 'claims' else '835'
index = int(index)
# p = e.info[key][index]
2020-12-11 12:55:34 +00:00
p = e.filter(type=id,index=index)
2020-09-26 22:01:23 +00:00
r = {}
for item in p[0]['pipeline'] :
_item= [dict(item)]
2020-12-11 12:55:34 +00:00
# r[item['label']] = item['data'].to_dict(orient='record')
r[item['label']] = item['data'].to_dict('record')
2020-09-26 22:01:23 +00:00
return json.dumps(r),200
2020-12-11 12:55:34 +00:00
@app.route("/reset",methods=["POST"])
def reset():
return "1",200
@app.route("/data",methods=['GET'])
def get_data ():
"""
This function will return statistical data about the services i.e general statistics about what has/been processed
"""
HEADER = {"Content-type":"application/json"}
_args = SYS_ARGS['config']
options = dict(proxy.get.files(_args),**proxy.get.processes(_args))
return json.dumps(options),HEADER
@app.route("/log/<id>",methods=["POST","PUT","GET"])
def log(id) :
HEADER = {"Content-Type":"application/json; charset=utf8"}
if id == 'params' and request.method in ['PUT', 'POST']:
info = request.json
_args = {"batch":info['batch'] if 'batch' in info else 1,"resume":True}
#
# We should update the configuration
SYS_ARGS['config']['args'] = _args
PATH = os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
write = lambda content: (open(PATH,'w')).write(json.dumps(content))
proc = Process(target=write,args=(SYS_ARGS['config'],))
proc.start()
return "1",HEADER
pass
@app.route("/io/<id>",methods=['POST'])
def io_data(id):
if id == 'params' :
_args = request.json
#
# Expecting batch,folder as parameters
_args = request.json
_args['resume'] = True
print (_args)
#
# We should update the configuration
SYS_ARGS['config']['args'] = _args
# PATH = os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
try:
write = lambda content: (open(PATH,'w')).write(json.dumps(content))
proc = Process(target=write,args=(SYS_ARGS['config'],))
proc.start()
# proc.join()
return "1",200
except Exception as e :
return "0",403
pass
elif id == 'stop' :
stop()
pass
elif id == 'run' :
# run()
_args = {"args":SYS_ARGS['config']['args'],"store":SYS_ARGS["config"]["store"]}
proxy.run(_args)
return "1",200
pass
@app.route("/export")
def export_form():
_args = {"context":SYS_ARGS['context']}
return render_template("store.html",**_args)
@app.route("/export",methods=['POST','PUT'])
def apply_etl():
_info = request.json
m = {'s3':'s3.s3Writer','mongo':'mongo.MongoWriter'}
if _info :
dest_args = {'type':m[_info['type']],"args": _info['content'] }
src_args = SYS_ARGS['config']['store']
# print (_args)
# writer = transport.factory.instance(**_args)
proxy.publish(src_args,dest_args)
return "1",405
else:
return "0",404
@app.route("/update")
def update():
pass
return "0",405
2020-09-26 22:01:23 +00:00
@app.route("/reload",methods=['POST'])
def reload():
# e = SYS_ARGS['engine']
# e.apply()
PATH= SYS_ARGS['config'] if 'config' in SYS_ARGS else os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
e = healthcareio.analytics.engine(PATH)
# e.apply()
SYS_ARGS['engine'] = e
return "1",200
if __name__ == '__main__' :
PORT = int(SYS_ARGS['port']) if 'port' in SYS_ARGS else 5500
DEBUG= int(SYS_ARGS['debug']) if 'debug' in SYS_ARGS else 0
SYS_ARGS['context'] = SYS_ARGS['context'] if 'context' in SYS_ARGS else ''
2020-12-11 12:55:34 +00:00
2020-09-26 22:01:23 +00:00
#
#
PATH= SYS_ARGS['config'] if 'config' in SYS_ARGS else os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
2020-12-11 12:55:34 +00:00
#
# Adjusting configuration with parameters (batch,folder,resume)
if 'args' not in SYS_ARGS['config'] :
SYS_ARGS['config']["args"] = {"batch":1,"resume":True,"folder":"/data"}
SYS_ARGS['procs'] = []
2020-09-26 22:01:23 +00:00
2020-12-11 12:55:34 +00:00
# SYS_ARGS['path'] = os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
2020-09-26 22:01:23 +00:00
e = healthcareio.analytics.engine(PATH)
2020-12-11 12:55:34 +00:00
e.apply(type='claims',serialize=False)
2020-09-26 22:01:23 +00:00
SYS_ARGS['engine'] = e
app.run(host='0.0.0.0',port=PORT,debug=DEBUG,threaded=True)