import pandas as pd import numpy as np import os import io import json from multiprocessing import Process import transport import sqlite3 as lite import numpy as np import transport import matplotlib.pyplot as plt import re, base64 # from weasyprint import HTML, CSS COLORS = ["#fbd1a2","#00b2ca","#1d4e89","#4682B4","#c5c3c6","#4c5c68","#1985a1","#f72585","#7209b7","#3a0ca3","#4361ee","#4cc9f0","#ff595e","#ffca3a","#8ac926","#1982c4","#6a4c93"] class stdev : def __init__(self) : self.values = [] def step(self,value): if value : #and type in [np.int64, np.int32,np.float64,np.float32, int]: self.values.append(value) def finalize(self): return np.std(self.values) if self.values else None # conn = lite.connect("/home/steve/healthcare-io/healthcare-io.db3") # conn.create_aggregate("stdev",1,stdev) # df = pd.read_sql("select count(distinct (json_extract(data,'$.patient_id'))) as patient_count, avg(json_array_length(data,'$.procedures')) mean, stdev(json_array_length(data,'$.procedures')) stdev from claims",conn) ROOT_FOLDER = 'stats' # plt.gcf().subplots_adjust(bottom=0.15) # from matplotlib import rcParams # rcParams.update({'figure.autolayout': True}) class Chart : @staticmethod def remove_borders(axes,wedges,labels,item) : # plt.axes() axes.spines["top"].set_visible(False) # plt.axes(). axes.spines["right"].set_visible(False) axes.legend(wedges, labels #,title=item['label'] ,loc="upper right",fontsize=12,bbox_to_anchor=(1, 0, 0.5, 1),fancybox=True,framealpha=0.2) # plt.axes(). # axes.spines["left"].set_visible(False) if 'axis' in item['chart'] : axes.set_ylabel(item['chart']['axis']['y']) axes.set_xlabel(item['chart']['axis']['x']) @staticmethod def donut(item,**args) : df = item['data'] x = item['chart']['x'] #args['x'] labels = item['chart']['y'] labels = df[labels] # figure = plt.figure() figure, axes = plt. subplots() # wedges, texts = plt.pie(df[x],labels=labels) colors = COLORS[:len(labels)] #np.random.choice(COLORS,len(labels),replace=False) wedges = axes.pie(df[x],labels=labels,wedgeprops=dict(width=0.3),colors=colors,autopct=lambda pct: "{:.2f}%\n({:.0f})".format(pct,int((pct/100)*df[x].sum() ))) #,autopct=lambda pct: func(pct, df[x].values)) # my_circle=plt.Circle( (0,0), 0.7, color='#ffffff',fill=True) # p=plt.gcf() # p.gca().add_artist(my_circle) # plt.legend(wedges, labels,title=item['label'],loc="upper right",bbox_to_anchor=(1, 0, 0.5, 1)) # axes.legend(wedges[0], labels,title=item['label'],loc="upper right",bbox_to_anchor=(1, 0, 0.5, 1),framealpha=0,edgecolor='#CAD5E0', # ) # x = plt.show() Chart.remove_borders(axes,wedges[0],labels,item) plt.close() return figure @staticmethod def barh(item,**args): """ This function will return/render a bar chart (horizontal) which is conducive to showing distributions of things like diagnosis codes """ # figure = plt.figure() figure, axes = plt. subplots() y_labels = item['chart']['y'][0] x_labels = item['chart']['x'] #[args['x']] if type(args['x']) == str else args['x'] df = item['data'].iloc[:9].copy() # odf = item['data'].iloc[9:].copy().mean().to_frame().T # odf[y_labels] = 'Other' # df = df.append(odf) wedges = [] # COLORS = ['#003f5c','#7a5195','#374c80','#bc5090','#ef5675','#ff764a','#ffa600'] for x_ in x_labels: index = x_labels.index(x_) color = COLORS[index] w = axes.barh(df[y_labels],df[x_],align='edge',label='counts' ,color=color) wedges += [w] # labels = [name.replace('_',' ') for name in x_labels] # axes.legend(wedges,[name.replace('_',' ') for name in x_labels], # title=item['label'], # framealpha=0, # edgecolor='#CAD5E0', # loc="upper right",bbox_to_anchor=(1, 0, 0.5, 1) # ) Chart.remove_borders(axes,wedges,[name.replace('_',' ') for name in x_labels],item) plt.close() return figure @staticmethod def spline(item,**args): """ """ df = item['data'] # figure = plt.figure() figure, axes = plt. subplots() wedges = [] item['chart']['x'] = [item['chart']['x']]if type(item['chart']['x']) == str else item['chart']['x'] # COLORS = ['#003f5c','#7a5195','#374c80','#bc5090','#ef5675','#ff764a','#ffa600'] for xl in item['chart']['x'] : x = df[xl] index = 0 for yl in item['chart']['y'] : y = df[yl] color = COLORS[index] if 'scatter' in args : w = plt.plot(x,y,'o',color=color) else: w = plt.plot(x,y,color=color,marker='o') wedges += w index += 1 # print (item['chart']['x']) # if 'axis' in item : # axes.set_ylabel(item['axis']['y']) # axes.set_xlabel(item['axis']['x']) # plt.title(item['label']) # axes.legend(wedges,[name.replace('_',' ') for name in item['chart']['y']], # title=item['label'], # framealpha=0, # edgecolor='#CAD5E0', # loc="upper right",bbox_to_anchor=(1, 0, 0.5, 1) # ) axes.grid(b=False,which='major',axis='x') Chart.remove_borders(axes,wedges,[name.replace('_',' ') for name in item['chart']['y']],item) plt.close() return figure @staticmethod def scatter(item,**args): return Chart.spline(item,scatter=True) class Apex : """ This class will format a data-frame to work with Apex charting engine """ @staticmethod def apply(item,theme={'mode':'light','palette':'palette6'}): pointer = item['chart']['type'] if hasattr(Apex,pointer) : pointer = getattr(Apex,pointer) options = pointer(item) if 'apex' in options and 'colors' in options['apex'] : del options['apex']['colors'] if 'apex' in options : options['apex']['theme'] = theme options['responsive']= [ { 'breakpoint': 1, 'options': { 'plotOptions':item['plotOptions'] if 'plotOptions' in item else None, } } ] return options else: print ("Oops") pass @staticmethod def radial(item): df = item['data'] x = item['chart']['axis']['x'] y = item['chart']['axis']['y'] labels = df[y].tolist() values = [float(np.round(value,2)) for value in df[x].tolist()] chart = {"type":"radialBar","height":200} option = {"chart":chart,"series":values,"labels":labels,"plotOptions":{"radialBar":{"hollow":{"size":"70%"}}}} return {'apex':option} @staticmethod def scatter(item): options = Apex.spline(item) options['apex']['chart']['type'] = 'scatter' return options @staticmethod def scalar(item): _df = item['data'] name = _df.columns.tolist()[0] value = _df[name].values.round(2)[0] html = '
:value
:label
' if value > 999 and value < 1000000 : value = " ".join([str(np.divide(value,1000).round(2)),"K"]) elif value > 999999 : #@ Think of considering the case of a billion ... value = " ".join([str(np.divide(value,1000000).round(2)),"M"]) else: value = str(value) unit = name.replace('_',' ') if 'unit' not in item else item['unit'] return {'html':html.replace(':value',value).replace(":label",unit)} @staticmethod def column(item): df = item['data'] N = df.shape[0] if df.shape[0] < 10 else 10 axis = item['chart']['axis'] x = axis['x'] if type(x) == list : x = x[0] axis['y'] = [axis['y']] if type(axis['y']) != list else axis['y'] series = [] for y in axis['y'] : series += [{"data": df[y].values.tolist()[:N],"name":y.upper().replace('_',' ')}] xtitle,ytitle = Apex.get_labels(item) options = {"chart":{"type":"bar"},"plotOptions":{"bar":{"horizontal":False,"width:":2,"color":["transparent"]}},"dataLabels":{"enabled":False},"legend":{"position":"right"}} options['xaxis'] = {"categories":df[x].values.tolist()[:N],"title":xtitle['title']} options['yaxis'] = ytitle options['series'] = series options['colors'] = COLORS[:df[x].size] return {"apex":options} # options = Apex.barh(item) # options['chart']['type'] = 'column' # options['plotOptions']['bar'] = {'horizontal':False,'columnWidth':'55%'} # options['stroke']={'show':True,'width':2,'colors':['transparent']} # return {"apex":options} @staticmethod def get_labels(item): xtitle = ytitle = "" if "labels" not in item['chart'] : xtitle = item['chart']['axis']['x'] ytitle = item['chart']['axis']['y'] else: xtitle = item['chart']['labels']['x'] ytitle = item['chart']['labels']['y'] xtitle = xtitle if type(xtitle) != list else xtitle[0] ytitle = ytitle if type(ytitle) != list else ytitle[0] return {"title":{"text":xtitle.lower().replace('_',' '),"style":{"fontWeight":"lighter"}}},{"title":{"text":ytitle.lower().replace('_',' '),"style":{"fontWeight":"lighter"}}} @staticmethod def bar(item): return Apex.barh(item) @staticmethod def barh(item): """ rendering a horizontal bar chart assuming for now that only one series is involved @TODO: alias this with bar (!= column) """ df = item['data'] N = df.shape[0] if df.shape[0] < 10 else 10 axis = item['chart']['axis'] y = axis['y'] if type(y) == list : y = y[0] axis['x'] = [axis['x']] if type(axis['x']) != list else axis['x'] # if not set(axis['x']) & set(df.columns.tolist()) : # print (set(axis['x']) & set(df.columns.tolist())) # print (axis['x']) # print (df.columns) # df.columns = axis['x'] series = [] _min=_max = 0 for x in axis['x'] : series += [{"data": df[x].values.tolist()[:N],"name":x.upper().replace('_',' ')}] _min = df[x].min() if df[x].min() < _min else _min _max = df[x].max() if df[x].max() > _max else _max xtitle , ytitle = Apex.get_labels(item) options = {"chart":{"type":"bar"},"plotOptions":{"bar":{"horizontal":True}},"dataLabels":{"enabled":False},"legend":{"position":"right"}} options['xaxis'] = {"categories":df[y].values.tolist()[:N],"title":xtitle['title']} options['yaxis'] = ytitle options['series'] = series options['colors'] = COLORS[:df[x].size] return {"apex":options} @staticmethod def spline(item): series = [] df = item['data'] N = df.shape[0] if df.shape[0] < 10 else 10 axis = item['chart']['axis'] x = axis['x'] _min=_max = 0 for y in axis['y'] : series += [{"data":df[y].values[:N].tolist(),"name":y.upper().replace('_',' ')}] _min = df[y].min() if df[y].min() < _min else _min _max = df[y].max() if df[y].max() > _max else _max colors = COLORS[:len(axis['y'])] options = {"chart":{"type":"line"},"series":series,"stroke":{"curve":"smooth"},"colors":colors,"legend":{"position":"right"}} xtitle , ytitle = Apex.get_labels(item) options['xaxis'] = {"categories":df[x].values[:N].tolist(),"title":xtitle['title']} options['yaxis'] = ytitle return {"apex":options} @staticmethod def donut(item): """ :pre data must have more than one item otherwise just make it a scalar here we will use the key as labels and the values as the values (obviously) labels are y-axis values are x-axis """ df = item['data'] if df.shape [0]> 1 : y_cols,x_cols = item['chart']['axis']['y'],item['chart']['axis']['x'] labels = df[y_cols].values.tolist() values = df[x_cols].values.round(2).tolist() else: labels = [name.upper().replace('_',' ') for name in df.columns.tolist()] df = df.astype(float) values = df.values.round(2).tolist()[0] if df.shape[1] > 1 else df.values.round(2).tolist() colors = COLORS[:len(values)] options = {"series":values,"colors":colors,"labels":labels,"dataLabels":{"enabled":True,"style":{"colors":["#000000"]},"dropShadow":{"enabled":False}},"chart":{"type":"donut","width":200},"plotOptions":{"pie":{"customScale":.9}},"legend":{"position":"right"}} return {"apex":options} pass class engine : """ This engine is designed to load the configuration and run the queries given they are remittance or claims @TODO: - make sure the readers of the queries are configurable i.e use data-transport """ def __init__(self,path) : """ Loading configuration file from a designated location ... """ f = open(path) ; _config = json.loads(f.read()) self.store_config = _config['store'] self.info = _config['analytics'] _args = self.store_config if self.store_config['type'] == 'mongo.MongoWriter' : _args['type'] = 'mongo.MongoReader' else: _args['type'] = 'disk.SQLiteReader' self.store_config = _args ; def filter (self,**args): """ type: claims or remits filter optional identifier claims, procedures, taxonomy, ... """ _m = {'claim':'837','claims':'837','remits':'835','remit':'835'} table = _m[ args['type']] _analytics = self.info[table] if 'index' in args : index = int(args['index']) _analytics = [_analytics[index]] _info = list(_analytics) #if 'filter' not in args else [item for item in analytics if args['filter'] == item['id']] # conn = lite.connect(self.store_config['args']['path'],isolation_level=None) # conn.create_aggregate("stdev",1,stdev) DB_TYPE = 'mongo' if (type(self.reader) == transport.mongo.MongoReader) else 'sql' if DB_TYPE == 'mongo' : self.store_config['args']['doc'] = args['type'] self.reader = transport.factory.instance(**self.store_config) r = [] for row in _info : pipeline = row['pipeline'] index = 0 for item in pipeline: if not item[DB_TYPE] : continue query = {DB_TYPE:item[DB_TYPE]} df = pd.DataFrame(self.reader.read(**query)) #item) df = df.fillna('N/A') # item['data'] = df chart = item['chart'] pipe = {"data":df,"chart":chart} for key in list(item.keys()) : if key not in ["chart","data","mongo","sql","couch"] : pipe[key] = item[key] r.append(pipe) self.reader.close() return {"id":_info[0]['id'],'pipeline':r} def apply (self,**args) : """ type: claims or remits filter optional identifier claims, procedures, taxonomy, ... """ _m = {'claim':'837','claims':'837','remits':'835','remit':'835'} # key = '837' if args['type'] == 'claims' else '835' table = _m[ args['type']] _analytics = self.info[table] if 'index' in args : index = int(args['index']) _analytics = [_analytics[index]] _info = list(_analytics) if 'filter' not in args else [item for item in analytics if args['filter'] == item['id']] # conn = lite.connect(self.store_config['args']['path'],isolation_level=None) # conn.create_aggregate("stdev",1,stdev) # # @TODO: Find a better way to handle database variance # # DB_TYPE = 'mongo' if (type(self.reader) == transport.mongo.MongoReader) else 'sql' if 'mongo' in self.store_config['type'] : DB_TYPE='mongo' else: DB_TYPE='sql' self.store_config['args']['table'] = args['type'] self.reader = transport.factory.instance(**self.store_config) r = [] for row in _info : pipeline = row['pipeline'] index = 0 for item in pipeline: # item['data'] = pd.read_sql(item['sql'],conn) # query = {DB_TYPE:item[DB_TYPE]} query = item[DB_TYPE] if not query : continue if DB_TYPE == 'sql' : query = {"sql":query} item['data'] = self.reader.read(**query) #item) if 'serialize' in args : # item['data'] = json.dumps(item['data'].to_dict(orient='record')) if type(item['data']) == pd.DataFrame else item['data'] item['data'] = json.dumps(item['data'].to_dict('record')) if type(item['data']) == pd.DataFrame else item['data'] else: item['data'] = (pd.DataFrame(item['data'])) pipeline[index] = item index += 1 # # row['pipeline']= pipeline # if 'info' in item: # item['info'] = item['info'].replace(":rows",str(item["data"].shape[0])) # conn.close() self.reader.close() return _info def _html(self,item) : figure = None df = item['data'] label = ['
',item['label'],'
'] text = ['
',df.describe().iloc[:].round(2).to_html().replace('_',' '),'
'] info = ['
',item['info'],'
'] if 'info' in item else [] if item['chart']['type'] in ['pie','donut','doughnut'] : figure = Chart.donut(item) text = ['
',df.to_html(index=False).replace('_',' '),'
'] elif item['chart']['type'] == 'scatter' : figure = Chart.scatter(item) elif item['chart']['type'] == 'spline' : figure = Chart.spline(item) elif item['chart']['type'] in ['barh','hbar'] : figure = Chart.barh(item) elif item['chart']['type'] == 'scalar' : figure = (item['data'].apply(lambda col: '
'+str(col.values[0].round(2))+'
'+col.name.replace('_', ' ')+'
' ).tolist()) label = text = [] pass if figure and item['chart']['type'] != 'scalar': stream = io.BytesIO() figure.savefig(stream,format='png',dpi=300,quality=95, bbox_inches = "tight",transparent=True) stream.seek(0) stream = base64.b64encode(stream.getvalue()).decode("utf-8") stream = "data:image/png;base64,"+stream figure = ['
',"
"] # figure.canvas.draw() # figure = "".join( map(chr,figure.canvas.tostring_argb())) #--bytes # else: # figure = [ ] if item['chart']['type'] != 'scalar': return ['
'] + [ " ".join(row) for row in [label,figure,text,info] if row] + ["
"] else: return [ " ".join(row) for row in [label,figure,text,info] if row] pass def _csv(self,item): pass def export(self,item,format): """ We have a pipeline here and we should attempt to build a figure using seaborn within an html template using jinja2 This is considered a page (or an item) of an analysis where we will have both data and rendering information with accompanying text """ html = [] for row in item['pipeline'] : p = [ "

",row['label'].replace('_',' '),"

"] y_label = [name for name in row['data'].columns if 'count' in name] x_label = list(set(row['data'].columns) - set(y_label)) N = row.shape[0] if 'info' in row : p += ["
",row['info'],'
'] pass class LogAnalytics : def __init__(self,path): logs = open(path).read().split('\n') logs = [json.loads(row) for row in logs if row.strip() != ''] self.remits = { "completed": np.sum([1 for row in logs if row['completed'] == True]), "files":len(logs) } # m = LogAnalytics('/home/steve/healthcare-io/remits.log') css = """ HealthcareIO - :title """ # folder = '/home/steve/.healthcareio/config.json' # e = engine(path=folder) # p = e.apply(type='claims') # values = [] # html = [css] # for row in p : # frame = [] # for item in row['pipeline'] : # if row['pipeline'].index(item) == 0 : # if item['chart']['type'] != 'scalar' : # # frame = ['
'] # pass # else: # frame = ['
'] # frame += e._html(item) #p[3]['pipeline'][0]) # frame += ['
'] if item['chart']['type'] == 'scalar' else [] # html += frame # html = '
' + "\n".join(html) + "
" # f = open('out.html','w') # f.write(html.replace(":title","Claims")) # # HTML(string=html).write_pdf('out.pdf',stylesheets=[CSS(string=css)]) # x.write_pdf('./out.pdf') # print (p[2]['pipeline'][0]['data']) # e.export (p[0]) # features = ['diagnosis.code'] # split(folder = folder, features=features)