from flask import Flask, request,render_template, send_from_directory from healthcareio.params import SYS_ARGS import healthcareio.analytics import os import json import time import smart import transport import pandas as pd import numpy as np from healthcareio import x12 from multiprocessing import Process from flask_socketio import SocketIO, emit, disconnect,send from healthcareio.server import proxy PATH = os.sep.join([os.environ['HOME'],'.healthcareio','config.json']) app = Flask(__name__) socket_ = SocketIO(app) def resume (files): _args = SYS_ARGS['config']['store'].copy() if 'mongo' in SYS_ARGS['config']['store']['type'] : _args['type'] = 'mongo.MongoReader' reader = transport.factory.instance(**_args) _files = [] try: pipeline = [{"$match":{"completed":{"$eq":True}}},{"$group":{"_id":"$name"}},{"$project":{"name":"$_id","_id":0}}] _args = {"aggregate":"logs","cursor":{},"allowDiskUse":True,"pipeline":pipeline} _files = reader.read(mongo = _args) _files = [item['name'] for item in _files] except Exception as e : pass print (["found ",len(files),"\tProcessed ",len(_files)]) return list(set(files) - set(_files)) def run (): # # let's get the files in the folder (perhaps recursively traverse them) # FILES = [] BATCH = int(SYS_ARGS['config']['args']['batch']) #-- number of processes (poorly named variable) for root,_dir,f in os.walk(SYS_ARGS['config']['args']['folder']) : if f : FILES += [os.sep.join([root,name]) for name in f] FILES = resume(FILES) FILES = np.array_split(FILES,BATCH) procs = [] for FILE_GROUP in FILES : FILE_GROUP = FILE_GROUP.tolist() # logger.write({"process":index,"parse":SYS_ARGS['parse'],"file_count":len(row)}) # proc = Process(target=apply,args=(row,info['store'],_info,)) parser = x12.Parser(PATH) #os.sep.join([PATH,'config.json'])) parser.set.files(FILE_GROUP) parser.start() procs.append(parser) SYS_ARGS['procs'] = procs # @socket_.on('data',namespace='/stream') def push() : _args = dict(SYS_ARGS['config']['store'].copy(),**{"type":"mongo.MongoReader"}) reader = transport.factory.instance(**_args) pipeline = [{"$group":{"_id":"$parse","claims":{"$addToSet":"$name"}}},{"$project":{"_id":0,"type":"$_id","count":{"$size":"$claims"}}}] _args = {"aggregate":"logs","cursor":{},"allowDiskUse":True,"pipeline":pipeline} r = pd.DataFrame(reader.read(mongo=_args)) r = healthcareio.analytics.Apex.apply({"chart":{"type":"donut","axis":{"x":"count","y":"type"}},"data":r}) emit("update",r,json=True) return r @socket_.on('connect') def client_connect(**r): print ('Connection received') print (r) push() pass @app.route("/favicon.ico") def _icon(): return send_from_directory(os.path.join([app.root_path, 'static','img','logo.svg']), 'favicon.ico', mimetype='image/vnd.microsoft.icon') @app.route("/") def init(): e = SYS_ARGS['engine'] sections = {"remits":e.info['835'],"claims":e.info['837']} _args = {"sections":sections,"store":SYS_ARGS["config"]["store"],"owner":SYS_ARGS['config']['owner'],"args":SYS_ARGS["config"]["args"]} return render_template("index.html",**_args) @app.route("/format//",methods=['POST']) def _format(id,index): e = SYS_ARGS['engine'] key = '837' if id == 'claims' else '835' index = int(index) # p = e.info[key][index] p = e.filter(type=id,index=index) r = [] for item in p['pipeline'] : _item= dict(item) _item = dict(_item,**healthcareio.analytics.Apex.apply(item)) del _item['data'] if 'apex' in _item or 'html' in _item: r.append(_item) r = {"id":p['id'],"pipeline":r} return json.dumps(r),200 @app.route("/get//",methods=['GET']) def get(id,index): e = SYS_ARGS['engine'] key = '837' if id == 'claims' else '835' index = int(index) # p = e.info[key][index] p = e.filter(type=id,index=index) r = {} for item in p[0]['pipeline'] : _item= [dict(item)] # r[item['label']] = item['data'].to_dict(orient='record') r[item['label']] = item['data'].to_dict('record') return json.dumps(r),200 @app.route("/reset",methods=["POST"]) def reset(): return "1",200 @app.route("/data",methods=['GET']) def get_data (): """ This function will return statistical data about the services i.e general statistics about what has/been processed """ HEADER = {"Content-type":"application/json"} _args = SYS_ARGS['config'] options = dict(proxy.get.files(_args),**proxy.get.processes(_args)) return json.dumps(options),HEADER @app.route("/log/",methods=["POST","PUT","GET"]) def log(id) : HEADER = {"Content-Type":"application/json; charset=utf8"} if id == 'params' and request.method in ['PUT', 'POST']: info = request.json _args = {"batch":info['batch'] if 'batch' in info else 1,"resume":True} # # We should update the configuration SYS_ARGS['config']['args'] = _args PATH = os.sep.join([os.environ['HOME'],'.healthcareio','config.json']) write = lambda content: (open(PATH,'w')).write(json.dumps(content)) proc = Process(target=write,args=(SYS_ARGS['config'],)) proc.start() return "1",HEADER pass @app.route("/io/",methods=['POST']) def io_data(id): if id == 'params' : _args = request.json # # Expecting batch,folder as parameters _args = request.json _args['resume'] = True print (_args) # # We should update the configuration SYS_ARGS['config']['args'] = _args # PATH = os.sep.join([os.environ['HOME'],'.healthcareio','config.json']) try: write = lambda content: (open(PATH,'w')).write(json.dumps(content)) proc = Process(target=write,args=(SYS_ARGS['config'],)) proc.start() # proc.join() return "1",200 except Exception as e : return "0",403 pass elif id == 'stop' : stop() pass elif id == 'run' : # run() _args = {"args":SYS_ARGS['config']['args'],"store":SYS_ARGS["config"]["store"]} proxy.run(_args) return "1",200 pass @app.route("/export") def export_form(): _args = {"context":SYS_ARGS['context']} return render_template("store.html",**_args) @app.route("/export",methods=['POST','PUT']) def apply_etl(): _info = request.json m = {'s3':'s3.s3Writer','mongo':'mongo.MongoWriter'} if _info : dest_args = {'type':m[_info['type']],"args": _info['content'] } src_args = SYS_ARGS['config']['store'] # print (_args) # writer = transport.factory.instance(**_args) proxy.publish(src_args,dest_args) return "1",405 else: return "0",404 @app.route("/update") def update(): pass return "0",405 @app.route("/reload",methods=['POST']) def reload(): # e = SYS_ARGS['engine'] # e.apply() PATH= SYS_ARGS['config'] if 'config' in SYS_ARGS else os.sep.join([os.environ['HOME'],'.healthcareio','config.json']) e = healthcareio.analytics.engine(PATH) # e.apply() SYS_ARGS['engine'] = e return "1",200 if __name__ == '__main__' : PORT = int(SYS_ARGS['port']) if 'port' in SYS_ARGS else 5500 DEBUG= int(SYS_ARGS['debug']) if 'debug' in SYS_ARGS else 0 SYS_ARGS['context'] = SYS_ARGS['context'] if 'context' in SYS_ARGS else '' # # PATH= SYS_ARGS['config'] if 'config' in SYS_ARGS else os.sep.join([os.environ['HOME'],'.healthcareio','config.json']) # # Adjusting configuration with parameters (batch,folder,resume) if 'args' not in SYS_ARGS['config'] : SYS_ARGS['config']["args"] = {"batch":1,"resume":True,"folder":"/data"} SYS_ARGS['procs'] = [] # SYS_ARGS['path'] = os.sep.join([os.environ['HOME'],'.healthcareio','config.json']) e = healthcareio.analytics.engine(PATH) e.apply(type='claims',serialize=False) SYS_ARGS['engine'] = e app.run(host='0.0.0.0',port=PORT,debug=DEBUG,threaded=True)