659 lines
26 KiB
659 lines
26 KiB
import pandas as pd
import numpy as np
import os
import io
import json
from multiprocessing import Process
import transport
import sqlite3 as lite
import numpy as np
import transport
import matplotlib.pyplot as plt
import re, base64
# from weasyprint import HTML, CSS
COLORS = ["#fbd1a2","#00b2ca","#1d4e89","#4682B4","#c5c3c6","#4c5c68","#1985a1","#f72585","#7209b7","#3a0ca3","#4361ee","#4cc9f0","#ff595e","#ffca3a","#8ac926","#1982c4","#6a4c93"]
class stdev :
def __init__(self) :
self.values = []
def step(self,value):
if value : #and type in [np.int64, np.int32,np.float64,np.float32, int]:
def finalize(self):
return np.std(self.values) if self.values else None
# conn = lite.connect("/home/steve/healthcare-io/healthcare-io.db3")
# conn.create_aggregate("stdev",1,stdev)
# df = pd.read_sql("select count(distinct (json_extract(data,'$.patient_id'))) as patient_count, avg(json_array_length(data,'$.procedures')) mean, stdev(json_array_length(data,'$.procedures')) stdev from claims",conn)
ROOT_FOLDER = 'stats'
# plt.gcf().subplots_adjust(bottom=0.15)
# from matplotlib import rcParams
# rcParams.update({'figure.autolayout': True})
class Chart :
def remove_borders(axes,wedges,labels,item) :
# plt.axes()
# plt.axes().
axes.legend(wedges, labels #,title=item['label']
,loc="upper right",fontsize=12,bbox_to_anchor=(1, 0, 0.5, 1),fancybox=True,framealpha=0.2)
# plt.axes().
# axes.spines["left"].set_visible(False)
if 'axis' in item['chart'] :
def donut(item,**args) :
df = item['data']
x = item['chart']['x'] #args['x']
labels = item['chart']['y']
labels = df[labels]
# figure = plt.figure()
figure, axes = plt. subplots()
# wedges, texts = plt.pie(df[x],labels=labels)
colors = COLORS[:len(labels)] #np.random.choice(COLORS,len(labels),replace=False)
wedges = axes.pie(df[x],labels=labels,wedgeprops=dict(width=0.3),colors=colors,autopct=lambda pct: "{:.2f}%\n({:.0f})".format(pct,int((pct/100)*df[x].sum() ))) #,autopct=lambda pct: func(pct, df[x].values))
# my_circle=plt.Circle( (0,0), 0.7, color='#ffffff',fill=True)
# p=plt.gcf()
# p.gca().add_artist(my_circle)
# plt.legend(wedges, labels,title=item['label'],loc="upper right",bbox_to_anchor=(1, 0, 0.5, 1))
# axes.legend(wedges[0], labels,title=item['label'],loc="upper right",bbox_to_anchor=(1, 0, 0.5, 1),framealpha=0,edgecolor='#CAD5E0',
# )
# x = plt.show()
return figure
def barh(item,**args):
This function will return/render a bar chart (horizontal) which is conducive to showing distributions of things like diagnosis codes
# figure = plt.figure()
figure, axes = plt. subplots()
y_labels = item['chart']['y'][0]
x_labels = item['chart']['x'] #[args['x']] if type(args['x']) == str else args['x']
df = item['data'].iloc[:9].copy()
# odf = item['data'].iloc[9:].copy().mean().to_frame().T
# odf[y_labels] = 'Other'
# df = df.append(odf)
wedges = []
# COLORS = ['#003f5c','#7a5195','#374c80','#bc5090','#ef5675','#ff764a','#ffa600']
for x_ in x_labels:
index = x_labels.index(x_)
color = COLORS[index]
w = axes.barh(df[y_labels],df[x_],align='edge',label='counts' ,color=color)
wedges += [w]
# labels = [name.replace('_',' ') for name in x_labels]
# axes.legend(wedges,[name.replace('_',' ') for name in x_labels],
# title=item['label'],
# framealpha=0,
# edgecolor='#CAD5E0',
# loc="upper right",bbox_to_anchor=(1, 0, 0.5, 1)
# )
Chart.remove_borders(axes,wedges,[name.replace('_',' ') for name in x_labels],item)
return figure
def spline(item,**args):
df = item['data']
# figure = plt.figure()
figure, axes = plt. subplots()
wedges = []
item['chart']['x'] = [item['chart']['x']]if type(item['chart']['x']) == str else item['chart']['x']
# COLORS = ['#003f5c','#7a5195','#374c80','#bc5090','#ef5675','#ff764a','#ffa600']
for xl in item['chart']['x'] :
x = df[xl]
index = 0
for yl in item['chart']['y'] :
y = df[yl]
color = COLORS[index]
if 'scatter' in args :
w = plt.plot(x,y,'o',color=color)
w = plt.plot(x,y,color=color,marker='o')
wedges += w
index += 1
# print (item['chart']['x'])
# if 'axis' in item :
# axes.set_ylabel(item['axis']['y'])
# axes.set_xlabel(item['axis']['x'])
# plt.title(item['label'])
# axes.legend(wedges,[name.replace('_',' ') for name in item['chart']['y']],
# title=item['label'],
# framealpha=0,
# edgecolor='#CAD5E0',
# loc="upper right",bbox_to_anchor=(1, 0, 0.5, 1)
# )
Chart.remove_borders(axes,wedges,[name.replace('_',' ') for name in item['chart']['y']],item)
return figure
def scatter(item,**args):
return Chart.spline(item,scatter=True)
class Apex :
This class will format a data-frame to work with Apex charting engine
def apply(item,theme={'mode':'light','palette':'palette6'}):
pointer = item['chart']['type']
if hasattr(Apex,pointer) :
pointer = getattr(Apex,pointer)
options = pointer(item)
if 'apex' in options and 'colors' in options['apex'] :
del options['apex']['colors']
if 'apex' in options :
options['apex']['theme'] = theme
options['responsive']= [
'breakpoint': 1,
'options': {
'plotOptions':item['plotOptions'] if 'plotOptions' in item else None,
return options
print ("Oops")
def radial(item):
df = item['data']
x = item['chart']['axis']['x']
y = item['chart']['axis']['y']
labels = df[y].tolist()
values = [float(np.round(value,2)) for value in df[x].tolist()]
chart = {"type":"radialBar","height":200}
option = {"chart":chart,"series":values,"labels":labels,"plotOptions":{"radialBar":{"hollow":{"size":"70%"}}}}
return {'apex':option}
def scatter(item):
options = Apex.spline(item)
options['apex']['chart']['type'] = 'scatter'
return options
def scalar(item):
_df = item['data']
value = '0'
unit = ''
html = '<div class="scalar"><div class="value">:value</div><div class="label">:label</div></div>'
if _df.shape[0] > 0 :
print (_df)
print ('_____________________________________')
name = _df.columns.tolist()[0]
value = _df[name].values[0]
if value > 999 and value < 1000000 :
value = " ".join([str(np.divide(value,1000).round(2)),"K"])
elif value > 999999 :
#@ Think of considering the case of a billion ...
value = " ".join([str(np.divide(value,1000000).round(2)),"M"])
value = str(value)
unit = name.replace('_',' ') if 'unit' not in item else item['unit']
return {'html':html.replace(':value',value).replace(":label",unit)}
def column(item):
df = item['data']
N = df.shape[0] if df.shape[0] < 10 else 10
axis = item['chart']['axis']
x = axis['x']
if type(x) == list :
x = x[0]
axis['y'] = [axis['y']] if type(axis['y']) != list else axis['y']
series = []
for y in axis['y'] :
series += [{"data": df[y].values.tolist()[:N],"name":y.upper().replace('_',' ')}]
xtitle,ytitle = Apex.get_labels(item)
options = {"chart":{"type":"bar"},"plotOptions":{"bar":{"horizontal":False,"width:":2,"color":["transparent"]}},"dataLabels":{"enabled":False},"legend":{"position":"right"}}
options['xaxis'] = {"categories":df[x].values.tolist()[:N],"title":xtitle['title']}
options['yaxis'] = ytitle
options['series'] = series
options['colors'] = COLORS[:df[x].size]
return {"apex":options}
# options = Apex.barh(item)
# options['chart']['type'] = 'column'
# options['plotOptions']['bar'] = {'horizontal':False,'columnWidth':'55%'}
# options['stroke']={'show':True,'width':2,'colors':['transparent']}
# return {"apex":options}
def get_labels(item):
xtitle = ytitle = ""
if "labels" not in item['chart'] :
xtitle = item['chart']['axis']['x']
ytitle = item['chart']['axis']['y']
xtitle = item['chart']['labels']['x']
ytitle = item['chart']['labels']['y']
xtitle = xtitle if type(xtitle) != list else xtitle[0]
ytitle = ytitle if type(ytitle) != list else ytitle[0]
return {"title":{"text":xtitle.lower().replace('_',' '),"style":{"fontWeight":"lighter"}}},{"title":{"text":ytitle.lower().replace('_',' '),"style":{"fontWeight":"lighter"}}}
def bar(item):
return Apex.barh(item)
def barh(item):
rendering a horizontal bar chart assuming for now that only one series is involved
@TODO: alias this with bar (!= column)
df = item['data']
N = df.shape[0] if df.shape[0] < 10 else 10
axis = item['chart']['axis']
y = axis['y']
if type(y) == list :
y = y[0]
axis['x'] = [axis['x']] if type(axis['x']) != list else axis['x']
# if not set(axis['x']) & set(df.columns.tolist()) :
# print (set(axis['x']) & set(df.columns.tolist()))
# print (axis['x'])
# print (df.columns)
# df.columns = axis['x']
series = []
_min=_max = 0
for x in axis['x'] :
series += [{"data": df[x].values.tolist()[:N],"name":x.upper().replace('_',' ')}]
_min = df[x].min() if df[x].min() < _min else _min
_max = df[x].max() if df[x].max() > _max else _max
xtitle , ytitle = Apex.get_labels(item)
options = {"chart":{"type":"bar"},"plotOptions":{"bar":{"horizontal":True}},"dataLabels":{"enabled":False},"legend":{"position":"right"}}
options['xaxis'] = {"categories":df[y].values.tolist()[:N],"title":xtitle['title']}
options['yaxis'] = ytitle
options['series'] = series
options['colors'] = COLORS[:df[x].size]
return {"apex":options}
def spline(item):
series = []
df = item['data']
N = df.shape[0] if df.shape[0] < 10 else 10
axis = item['chart']['axis']
x = axis['x']
_min=_max = 0
for y in axis['y'] :
series += [{"data":df[y].values[:N].tolist(),"name":y.upper().replace('_',' ')}]
_min = df[y].min() if df[y].min() < _min else _min
_max = df[y].max() if df[y].max() > _max else _max
colors = COLORS[:len(axis['y'])]
options = {"chart":{"type":"line"},"series":series,"stroke":{"curve":"smooth"},"colors":colors,"legend":{"position":"right"}}
xtitle , ytitle = Apex.get_labels(item)
options['xaxis'] = {"categories":df[x].values[:N].tolist(),"title":xtitle['title']}
options['yaxis'] = ytitle
return {"apex":options}
def donut(item):
:pre data must have more than one item otherwise just make it a scalar
here we will use the key as labels and the values as the values (obviously)
labels are y-axis
values are x-axis
df = item['data']
if df.shape [0]> 1 :
y_cols,x_cols = item['chart']['axis']['y'],item['chart']['axis']['x']
labels = df[y_cols].values.tolist()
values = df[x_cols].values.round(2).tolist()
labels = [name.upper().replace('_',' ') for name in df.columns.tolist()]
# df = df.astype(float)
# values = df.values.round(2).tolist()[0] if df.shape[1] > 1 else df.values.round(2).tolist()
values = df[[name for name in df.columns if df[name].dtype in [float,int]] ].values.round(2).tolist()
colors = COLORS[:len(values)]
options = {"series":values,"colors":colors,"labels":labels,"dataLabels":{"enabled":True,"style":{"colors":["#000000"]},"dropShadow":{"enabled":False}},"chart":{"type":"donut","width":200},"plotOptions":{"pie":{"customScale":.9}},"legend":{"position":"right"}}
return {"apex":options}
class engine :
This engine is designed to load the configuration and run the queries given they are remittance or claims
- make sure the readers of the queries are configurable i.e use data-transport
def __init__(self,path) :
Loading configuration file from a designated location ...
f = open(path) ;
_config = json.loads(f.read())
self.store_config = _config['store']
self.info = _config['analytics']
_args = self.store_config
if 'type' not in self.store_config :
# This is the newer version of data-transport
self.store_config['context'] = 'read'
self.store_config = _args ;
def filter (self,**args):
type: claims or remits
filter optional identifier claims, procedures, taxonomy, ...
_m = {'claim':'837','claims':'837','remits':'835','remit':'835'}
table = _m[ args['type']]
_analytics = self.info[table]
if 'index' in args :
index = int(args['index'])
_analytics = [_analytics[index]]
_info = list(_analytics) #if 'filter' not in args else [item for item in analytics if args['filter'] == item['id']]
# conn = lite.connect(self.store_config['args']['path'],isolation_level=None)
# conn.create_aggregate("stdev",1,stdev)
DB_TYPE = 'mongo' if (type(self.reader) == transport.mongo.MongoReader) else 'sql'
# if DB_TYPE == 'mongo' :
# self.store_config['args']['doc'] = args['type']
self.reader = transport.factory.instance(**self.store_config)
r = []
for row in _info :
pipeline = row['pipeline']
index = 0
for item in pipeline:
if not item[DB_TYPE] :
query = {DB_TYPE:item[DB_TYPE]}
df = pd.DataFrame(self.reader.read(**query)) #item)
df = df.fillna('N/A')
# item['data'] = df
chart = item['chart']
pipe = {"data":df,"chart":chart}
for key in list(item.keys()) :
if key not in ["chart","data","mongo","sql","couch"] :
pipe[key] = item[key]
return {"id":_info[0]['id'],'pipeline':r}
def apply (self,**args) :
type: claims or remits
filter optional identifier claims, procedures, taxonomy, ...
_m = {'claim':'837','claims':'837','remits':'835','remit':'835'}
# key = '837' if args['type'] == 'claims' else '835'
table = _m[ args['type']]
_analytics = self.info[table]
if 'index' in args :
index = int(args['index'])
_analytics = [_analytics[index]]
_info = list(_analytics) if 'filter' not in args else [item for item in analytics if args['filter'] == item['id']]
self.reader = transport.factory.instance(**self.store_config)
DB_TYPE = 'mongo' if self.store_config ['provider'] in ['mongodb','mongo'] else 'sql'
r = []
for row in _info :
pipeline = row['pipeline']
index = 0
for item in pipeline:
# item['data'] = pd.read_sql(item['sql'],conn)
# query = {DB_TYPE:item[DB_TYPE]}
query = item[DB_TYPE]
if not query :
if DB_TYPE == 'sql' :
query = {"sql":query}
query = {DB_TYPE:query}
_df = self.reader.read(**query) #item)
print (query)
print (self.reader)
if 'serialize' in args :
# item['data'] = json.dumps(item['data'].to_dict(orient='record')) if type(item['data']) == pd.DataFrame else item['data']
item['data'] = json.dumps(_df.to_dict(orient='record'))
# item['data'] = (pd.DataFrame(item['data']))
item['data'] = _df
print (_df.head())
pipeline[index] = item
index += 1
row['pipeline']= pipeline
# if 'info' in item:
# item['info'] = item['info'].replace(":rows",str(item["data"].shape[0]))
# conn.close()
return _info
def _html(self,item) :
figure = None
df = item['data']
label = ['<div class="label">',item['label'],'</div>']
text = ['<div class="grid">',df.describe().iloc[:].round(2).to_html().replace('_',' '),'</div>']
info = ['<div class="info">',item['info'],'</div>'] if 'info' in item else []
if item['chart']['type'] in ['pie','donut','doughnut'] :
figure = Chart.donut(item)
text = ['<div class="grid">',df.to_html(index=False).replace('_',' '),'</div>']
elif item['chart']['type'] == 'scatter' :
figure = Chart.scatter(item)
elif item['chart']['type'] == 'spline' :
figure = Chart.spline(item)
elif item['chart']['type'] in ['barh','hbar'] :
figure = Chart.barh(item)
elif item['chart']['type'] == 'scalar' :
figure = (item['data'].apply(lambda col: '<div class="scalar"><div class="value bold">'+str(col.values[0].round(2))+'</div><div class="value-text">'+col.name.replace('_', ' ')+'</div></div>' ).tolist())
label = text = []
if figure and item['chart']['type'] != 'scalar':
stream = io.BytesIO()
figure.savefig(stream,format='png',dpi=300,quality=95, bbox_inches = "tight",transparent=True)
stream = base64.b64encode(stream.getvalue()).decode("utf-8")
stream = "data:image/png;base64,"+stream
figure = ['<div class="figure"><img src="'+stream+'">',"</div>"]
# figure.canvas.draw()
# figure = "".join( map(chr,figure.canvas.tostring_argb())) #--bytes
# else:
# figure = [ ]
if item['chart']['type'] != 'scalar':
return ['<div class="frame"><div class="chart '+ item['chart']['type']+'">'] + [ " ".join(row) for row in [label,figure,text,info] if row] + ["</div></div>"]
return [ " ".join(row) for row in [label,figure,text,info] if row]
def _csv(self,item):
def export(self,item,format):
We have a pipeline here and we should attempt to build a figure using seaborn within an html template using jinja2
This is considered a page (or an item) of an analysis where we will have both data and rendering information with accompanying text
html = []
for row in item['pipeline'] :
p = [ "<h2>",row['label'].replace('_',' '),"</h2>"]
y_label = [name for name in row['data'].columns if 'count' in name]
x_label = list(set(row['data'].columns) - set(y_label))
N = row.shape[0]
if 'info' in row :
p += ["<div class='info'>",row['info'],'</div>']
class LogAnalytics :
def __init__(self,path):
logs = open(path).read().split('\n')
logs = [json.loads(row) for row in logs if row.strip() != '']
self.remits = {
"completed": np.sum([1 for row in logs if row['completed'] == True]),
# m = LogAnalytics('/home/steve/healthcare-io/remits.log')
css = """
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>HealthcareIO - :title </title>
grid-template-columns:repeat(2,1fr) ;
.numbers {
/*border:1px solid #CAD5E0;*/
.numbers .scalar {
background-image: linear-gradient(to bottom, #f3f3f3,#d3d3d3, #ffffff);
border:1px solid #CAD5E0;
grid-template-rows:auto 28px; gap:2px;
.numbers .scalar .value-text {
border-top:1px solid #CAD5E0;
.numbers .scalar .value {
font-size:48px; text-align:right; font-weight:bold;}
.frame {
background-image: linear-gradient(to bottom, #f3f3f3,#d3d3d3, #ffffff);
border:1px solid #CAD5E0;
.figure {grid-area:figure; width:500px; height:350px; display:grid; align-items:center}
.info {height:28px; width:100%; grid-area:info;
text-align:center; text-transform:capitalize; padding:4px; font-size:12px; font-family:sans-serif; border-top:1px solid #CAD5E0;}
.grid {grid-area:grid; }
.label {grid-area:label; font-weight:bold; font-size: 22px; text-align:center; text-transform:capitalize}
.chart {
display:grid; grid-template-areas:
"label label label"
"figure grid grid"
"info info info" ;
img {height:auto; max-width:100% ;}
table {width:100%; border-collapse: collapse;}
table , TH, TD{ font-size:14px; padding:8px; font-family:sans-serif; border:1px; border:1px solid #CAD5E0;}
table thead, tbody th { padding:4px; text-transform:capitalize; background-color:#4682B4; color:#ffffff; text-align:center}
table thead tr th {text-align:center}
table tbody td {text-align:right; font-weight: lighter}
table tbody tr:nth-child(odd) {background: #95bce0}
table tbody tr:nth-child(even) {background: #c8e5ff}
# folder = '/home/steve/.healthcareio/config.json'
# e = engine(path=folder)
# p = e.apply(type='claims')
# values = []
# html = [css]
# for row in p :
# frame = []
# for item in row['pipeline'] :
# if row['pipeline'].index(item) == 0 :
# if item['chart']['type'] != 'scalar' :
# # frame = ['<div class="frame">']
# pass
# else:
# frame = ['<div><div class="numbers">']
# frame += e._html(item) #p[3]['pipeline'][0])
# frame += ['</div></div>'] if item['chart']['type'] == 'scalar' else []
# html += frame
# html = '<div class="pane">' + "\n".join(html) + "</div></div>"
# f = open('out.html','w')
# f.write(html.replace(":title","Claims"))
# HTML(string=html).write_pdf('out.pdf',stylesheets=[CSS(string=css)])
# x.write_pdf('./out.pdf')
# print (p[2]['pipeline'][0]['data'])
# e.export (p[0])
# features = ['diagnosis.code']
# split(folder = folder, features=features)