from flask import Flask, request,render_template, send_from_directory from healthcareio.params import SYS_ARGS import healthcareio.analytics import os import json import time import smart import transport import pandas as pd import numpy as np import x12 from multiprocessing import Process from flask_socketio import SocketIO, emit, disconnect,send from healthcareio.server import proxy PATH = os.sep.join([os.environ['HOME'],'.healthcareio','config.json']) app = Flask(__name__) socket_ = SocketIO(app) def resume (files): _args = SYS_ARGS['config']['store'].copy() if 'mongo' in SYS_ARGS['config']['store']['type'] : _args['type'] = 'mongo.MongoReader' reader = transport.factory.instance(**_args) _files = [] try: pipeline = [{"$match":{"completed":{"$eq":True}}},{"$group":{"_id":"$name"}},{"$project":{"name":"$_id","_id":0}}] _args = {"aggregate":"logs","cursor":{},"allowDiskUse":True,"pipeline":pipeline} _files = reader.read(mongo = _args) _files = [item['name'] for item in _files] except Exception as e : pass print (["found ",len(files),"\tProcessed ",len(_files)]) return list(set(files) - set(_files)) def run (): # # let's get the files in the folder (perhaps recursively traverse them) # FILES = [] BATCH = int(SYS_ARGS['config']['args']['batch']) #-- number of processes (poorly named variable) for root,_dir,f in os.walk(SYS_ARGS['config']['args']['folder']) : if f : FILES += [os.sep.join([root,name]) for name in f] FILES = resume(FILES) FILES = np.array_split(FILES,BATCH) procs = [] for FILE_GROUP in FILES : FILE_GROUP = FILE_GROUP.tolist() # logger.write({"process":index,"parse":SYS_ARGS['parse'],"file_count":len(row)}) # proc = Process(target=apply,args=(row,info['store'],_info,)) parser = x12.Parser(PATH) #os.sep.join([PATH,'config.json'])) parser.set.files(FILE_GROUP) parser.start() procs.append(parser) SYS_ARGS['procs'] = procs # @socket_.on('data',namespace='/stream') def push() : _args = dict(SYS_ARGS['config']['store'].copy(),**{"type":"mongo.MongoReader"}) reader = transport.factory.instance(**_args) pipeline = [{"$group":{"_id":"$parse","claims":{"$addToSet":"$name"}}},{"$project":{"_id":0,"type":"$_id","count":{"$size":"$claims"}}}] _args = {"aggregate":"logs","cursor":{},"allowDiskUse":True,"pipeline":pipeline} r = pd.DataFrame(reader.read(mongo=_args)) r = healthcareio.analytics.Apex.apply({"chart":{"type":"donut","axis":{"x":"count","y":"type"}},"data":r}) emit("update",r,json=True) return r @socket_.on('connect') def client_connect(**r): print ('Connection received') print (r) push() pass @app.route("/favicon.ico") def _icon(): return send_from_directory(os.path.join([app.root_path, 'static','img','logo.svg']), 'favicon.ico', mimetype='image/vnd.microsoft.icon') @app.route("/") def init(): e = SYS_ARGS['engine'] sections = {"remits":e.info['835'],"claims":e.info['837']} _args = {"sections":sections,"store":SYS_ARGS["config"]["store"],"owner":SYS_ARGS['config']['owner'],"args":SYS_ARGS["config"]["args"]} return render_template("index.html",**_args) @app.route("/format//",methods=['POST']) def _format(id,index): e = SYS_ARGS['engine'] key = '837' if id == 'claims' else '835' index = int(index) # p = e.info[key][index] p = e.filter(type=id,index=index) r = [] for item in p['pipeline'] : _item= dict(item) _item = dict(_item,**healthcareio.analytics.Apex.apply(item)) del _item['data'] if 'apex' in _item or 'html' in _item: r.append(_item) r = {"id":p['id'],"pipeline":r} return json.dumps(r),200 @app.route("/get//",methods=['GET']) def get(id,index): e = SYS_ARGS['engine'] key = '837' if id == 'claims' else '835' index = int(index) # p = e.info[key][index] p = e.filter(type=id,index=index) r = {} for item in p[0]['pipeline'] : _item= [dict(item)] # r[item['label']] = item['data'].to_dict(orient='record') r[item['label']] = item['data'].to_dict('record') return json.dumps(r),200 @app.route("/reset",methods=["POST"]) def reset(): return "1",200 @app.route("/data",methods=['GET']) def get_data (): """ This function will return statistical data about the services i.e general statistics about what has/been processed """ HEADER = {"Content-type":"application/json"} _args = SYS_ARGS['config'] options = dict(proxy.get.files(_args),**proxy.get.processes(_args)) return json.dumps(options),HEADER @app.route("/log/",methods=["POST","PUT","GET"]) def log(id) : HEADER = {"Content-Type":"application/json; charset=utf8"} if id == 'params' and request.method in ['PUT', 'POST']: info = request.json _args = {"batch":info['batch'] if 'batch' in info else 1,"resume":True} # # We should update the configuration SYS_ARGS['config']['args'] = _args PATH = os.sep.join([os.environ['HOME'],'.healthcareio','config.json']) write = lambda content: (open(PATH,'w')).write(json.dumps(content)) proc = Process(target=write,args=(SYS_ARGS['config'],)) proc.start() return "1",HEADER pass @app.route("/io/",methods=['POST']) def io_data(id): if id == 'params' : _args = request.json # # Expecting batch,folder as parameters _args = request.json _args['resume'] = True print (_args) # # We should update the configuration SYS_ARGS['config']['args'] = _args # PATH = os.sep.join([os.environ['HOME'],'.healthcareio','config.json']) try: write = lambda content: (open(PATH,'w')).write(json.dumps(content)) proc = Process(target=write,args=(SYS_ARGS['config'],)) proc.start() # proc.join() return "1",200 except Exception as e : return "0",403 pass elif id == 'stop' : stop() pass elif id == 'run' : # run() _args = {"args":SYS_ARGS['config']['args'],"store":SYS_ARGS["config"]["store"]} proxy.run(_args) return "1",200 pass @app.route("/export") def export_form(): _args = {"context":SYS_ARGS['context']} return render_template("store.html",**_args) @app.route("/export",methods=['POST','PUT']) def apply_etl(): _info = request.json m = {'s3':'s3.s3Writer','mongo':'mongo.MongoWriter'} if _info : dest_args = {'type':m[_info['type']],"args": _info['content'] } src_args = SYS_ARGS['config']['store'] # print (_args) # writer = transport.factory.instance(**_args) proxy.publish(src_args,dest_args) return "1",405 else: return "0",404 @app.route("/update") def update(): pass return "0",405 @app.route("/reload",methods=['POST']) def reload(): # e = SYS_ARGS['engine'] # e.apply() PATH= SYS_ARGS['config'] if 'config' in SYS_ARGS else os.sep.join([os.environ['HOME'],'.healthcareio','config.json']) e = healthcareio.analytics.engine(PATH) # e.apply() SYS_ARGS['engine'] = e return "1",200 if __name__ == '__main__' : PORT = int(SYS_ARGS['port']) if 'port' in SYS_ARGS else 5500 DEBUG= int(SYS_ARGS['debug']) if 'debug' in SYS_ARGS else 0 SYS_ARGS['context'] = SYS_ARGS['context'] if 'context' in SYS_ARGS else '' # # PATH= SYS_ARGS['config'] if 'config' in SYS_ARGS else os.sep.join([os.environ['HOME'],'.healthcareio','config.json']) # # Adjusting configuration with parameters (batch,folder,resume) if 'args' not in SYS_ARGS['config'] : SYS_ARGS['config']["args"] = {"batch":1,"resume":True,"folder":"/data"} SYS_ARGS['procs'] = [] # SYS_ARGS['path'] = os.sep.join([os.environ['HOME'],'.healthcareio','config.json']) e = healthcareio.analytics.engine(PATH) e.apply(type='claims',serialize=False) SYS_ARGS['engine'] = e app.run(host='0.0.0.0',port=PORT,debug=DEBUG,threaded=True)