parser/healthcareio/analytics.py

659 lines
26 KiB
Python

import pandas as pd
import numpy as np
import os
import io
import json
from multiprocessing import Process
import transport
import sqlite3 as lite
import numpy as np
import transport
import matplotlib.pyplot as plt
import re, base64
# from weasyprint import HTML, CSS
COLORS = ["#fbd1a2","#00b2ca","#1d4e89","#4682B4","#c5c3c6","#4c5c68","#1985a1","#f72585","#7209b7","#3a0ca3","#4361ee","#4cc9f0","#ff595e","#ffca3a","#8ac926","#1982c4","#6a4c93"]
class stdev :
def __init__(self) :
self.values = []
def step(self,value):
if value : #and type in [np.int64, np.int32,np.float64,np.float32, int]:
self.values.append(value)
def finalize(self):
return np.std(self.values) if self.values else None
# conn = lite.connect("/home/steve/healthcare-io/healthcare-io.db3")
# conn.create_aggregate("stdev",1,stdev)
# df = pd.read_sql("select count(distinct (json_extract(data,'$.patient_id'))) as patient_count, avg(json_array_length(data,'$.procedures')) mean, stdev(json_array_length(data,'$.procedures')) stdev from claims",conn)
ROOT_FOLDER = 'stats'
# plt.gcf().subplots_adjust(bottom=0.15)
# from matplotlib import rcParams
# rcParams.update({'figure.autolayout': True})
class Chart :
@staticmethod
def remove_borders(axes,wedges,labels,item) :
# plt.axes()
axes.spines["top"].set_visible(False)
# plt.axes().
axes.spines["right"].set_visible(False)
axes.legend(wedges, labels #,title=item['label']
,loc="upper right",fontsize=12,bbox_to_anchor=(1, 0, 0.5, 1),fancybox=True,framealpha=0.2)
# plt.axes().
# axes.spines["left"].set_visible(False)
if 'axis' in item['chart'] :
axes.set_ylabel(item['chart']['axis']['y'])
axes.set_xlabel(item['chart']['axis']['x'])
@staticmethod
def donut(item,**args) :
df = item['data']
x = item['chart']['x'] #args['x']
labels = item['chart']['y']
labels = df[labels]
# figure = plt.figure()
figure, axes = plt. subplots()
# wedges, texts = plt.pie(df[x],labels=labels)
colors = COLORS[:len(labels)] #np.random.choice(COLORS,len(labels),replace=False)
wedges = axes.pie(df[x],labels=labels,wedgeprops=dict(width=0.3),colors=colors,autopct=lambda pct: "{:.2f}%\n({:.0f})".format(pct,int((pct/100)*df[x].sum() ))) #,autopct=lambda pct: func(pct, df[x].values))
# my_circle=plt.Circle( (0,0), 0.7, color='#ffffff',fill=True)
# p=plt.gcf()
# p.gca().add_artist(my_circle)
# plt.legend(wedges, labels,title=item['label'],loc="upper right",bbox_to_anchor=(1, 0, 0.5, 1))
# axes.legend(wedges[0], labels,title=item['label'],loc="upper right",bbox_to_anchor=(1, 0, 0.5, 1),framealpha=0,edgecolor='#CAD5E0',
# )
# x = plt.show()
Chart.remove_borders(axes,wedges[0],labels,item)
plt.close()
return figure
@staticmethod
def barh(item,**args):
"""
This function will return/render a bar chart (horizontal) which is conducive to showing distributions of things like diagnosis codes
"""
# figure = plt.figure()
figure, axes = plt. subplots()
y_labels = item['chart']['y'][0]
x_labels = item['chart']['x'] #[args['x']] if type(args['x']) == str else args['x']
df = item['data'].iloc[:9].copy()
# odf = item['data'].iloc[9:].copy().mean().to_frame().T
# odf[y_labels] = 'Other'
# df = df.append(odf)
wedges = []
# COLORS = ['#003f5c','#7a5195','#374c80','#bc5090','#ef5675','#ff764a','#ffa600']
for x_ in x_labels:
index = x_labels.index(x_)
color = COLORS[index]
w = axes.barh(df[y_labels],df[x_],align='edge',label='counts' ,color=color)
wedges += [w]
# labels = [name.replace('_',' ') for name in x_labels]
# axes.legend(wedges,[name.replace('_',' ') for name in x_labels],
# title=item['label'],
# framealpha=0,
# edgecolor='#CAD5E0',
# loc="upper right",bbox_to_anchor=(1, 0, 0.5, 1)
# )
Chart.remove_borders(axes,wedges,[name.replace('_',' ') for name in x_labels],item)
plt.close()
return figure
@staticmethod
def spline(item,**args):
"""
"""
df = item['data']
# figure = plt.figure()
figure, axes = plt. subplots()
wedges = []
item['chart']['x'] = [item['chart']['x']]if type(item['chart']['x']) == str else item['chart']['x']
# COLORS = ['#003f5c','#7a5195','#374c80','#bc5090','#ef5675','#ff764a','#ffa600']
for xl in item['chart']['x'] :
x = df[xl]
index = 0
for yl in item['chart']['y'] :
y = df[yl]
color = COLORS[index]
if 'scatter' in args :
w = plt.plot(x,y,'o',color=color)
else:
w = plt.plot(x,y,color=color,marker='o')
wedges += w
index += 1
# print (item['chart']['x'])
# if 'axis' in item :
# axes.set_ylabel(item['axis']['y'])
# axes.set_xlabel(item['axis']['x'])
# plt.title(item['label'])
# axes.legend(wedges,[name.replace('_',' ') for name in item['chart']['y']],
# title=item['label'],
# framealpha=0,
# edgecolor='#CAD5E0',
# loc="upper right",bbox_to_anchor=(1, 0, 0.5, 1)
# )
axes.grid(b=False,which='major',axis='x')
Chart.remove_borders(axes,wedges,[name.replace('_',' ') for name in item['chart']['y']],item)
plt.close()
return figure
@staticmethod
def scatter(item,**args):
return Chart.spline(item,scatter=True)
class Apex :
"""
This class will format a data-frame to work with Apex charting engine
"""
@staticmethod
def apply(item,theme={'mode':'light','palette':'palette6'}):
pointer = item['chart']['type']
if hasattr(Apex,pointer) :
pointer = getattr(Apex,pointer)
options = pointer(item)
if 'apex' in options and 'colors' in options['apex'] :
del options['apex']['colors']
if 'apex' in options :
options['apex']['theme'] = theme
options['responsive']= [
{
'breakpoint': 1,
'options': {
'plotOptions':item['plotOptions'] if 'plotOptions' in item else None,
}
}
]
return options
else:
print ("Oops")
pass
@staticmethod
def radial(item):
df = item['data']
x = item['chart']['axis']['x']
y = item['chart']['axis']['y']
labels = df[y].tolist()
values = [float(np.round(value,2)) for value in df[x].tolist()]
chart = {"type":"radialBar","height":200}
option = {"chart":chart,"series":values,"labels":labels,"plotOptions":{"radialBar":{"hollow":{"size":"70%"}}}}
return {'apex':option}
@staticmethod
def scatter(item):
options = Apex.spline(item)
options['apex']['chart']['type'] = 'scatter'
return options
@staticmethod
def scalar(item):
_df = item['data']
value = '0'
unit = ''
html = '<div class="scalar"><div class="value">:value</div><div class="label">:label</div></div>'
if _df.shape[0] > 0 :
print (_df)
print ('_____________________________________')
name = _df.columns.tolist()[0]
value = _df[name].values[0]
if value > 999 and value < 1000000 :
value = " ".join([str(np.divide(value,1000).round(2)),"K"])
elif value > 999999 :
#@ Think of considering the case of a billion ...
value = " ".join([str(np.divide(value,1000000).round(2)),"M"])
else:
value = str(value)
unit = name.replace('_',' ') if 'unit' not in item else item['unit']
return {'html':html.replace(':value',value).replace(":label",unit)}
@staticmethod
def column(item):
df = item['data']
N = df.shape[0] if df.shape[0] < 10 else 10
axis = item['chart']['axis']
x = axis['x']
if type(x) == list :
x = x[0]
axis['y'] = [axis['y']] if type(axis['y']) != list else axis['y']
series = []
for y in axis['y'] :
series += [{"data": df[y].values.tolist()[:N],"name":y.upper().replace('_',' ')}]
xtitle,ytitle = Apex.get_labels(item)
options = {"chart":{"type":"bar"},"plotOptions":{"bar":{"horizontal":False,"width:":2,"color":["transparent"]}},"dataLabels":{"enabled":False},"legend":{"position":"right"}}
options['xaxis'] = {"categories":df[x].values.tolist()[:N],"title":xtitle['title']}
options['yaxis'] = ytitle
options['series'] = series
options['colors'] = COLORS[:df[x].size]
return {"apex":options}
# options = Apex.barh(item)
# options['chart']['type'] = 'column'
# options['plotOptions']['bar'] = {'horizontal':False,'columnWidth':'55%'}
# options['stroke']={'show':True,'width':2,'colors':['transparent']}
# return {"apex":options}
@staticmethod
def get_labels(item):
xtitle = ytitle = ""
if "labels" not in item['chart'] :
xtitle = item['chart']['axis']['x']
ytitle = item['chart']['axis']['y']
else:
xtitle = item['chart']['labels']['x']
ytitle = item['chart']['labels']['y']
xtitle = xtitle if type(xtitle) != list else xtitle[0]
ytitle = ytitle if type(ytitle) != list else ytitle[0]
return {"title":{"text":xtitle.lower().replace('_',' '),"style":{"fontWeight":"lighter"}}},{"title":{"text":ytitle.lower().replace('_',' '),"style":{"fontWeight":"lighter"}}}
@staticmethod
def bar(item):
return Apex.barh(item)
@staticmethod
def barh(item):
"""
rendering a horizontal bar chart assuming for now that only one series is involved
@TODO: alias this with bar (!= column)
"""
df = item['data']
N = df.shape[0] if df.shape[0] < 10 else 10
axis = item['chart']['axis']
y = axis['y']
if type(y) == list :
y = y[0]
axis['x'] = [axis['x']] if type(axis['x']) != list else axis['x']
# if not set(axis['x']) & set(df.columns.tolist()) :
# print (set(axis['x']) & set(df.columns.tolist()))
# print (axis['x'])
# print (df.columns)
# df.columns = axis['x']
series = []
_min=_max = 0
for x in axis['x'] :
series += [{"data": df[x].values.tolist()[:N],"name":x.upper().replace('_',' ')}]
_min = df[x].min() if df[x].min() < _min else _min
_max = df[x].max() if df[x].max() > _max else _max
xtitle , ytitle = Apex.get_labels(item)
options = {"chart":{"type":"bar"},"plotOptions":{"bar":{"horizontal":True}},"dataLabels":{"enabled":False},"legend":{"position":"right"}}
options['xaxis'] = {"categories":df[y].values.tolist()[:N],"title":xtitle['title']}
options['yaxis'] = ytitle
options['series'] = series
options['colors'] = COLORS[:df[x].size]
return {"apex":options}
@staticmethod
def spline(item):
series = []
df = item['data']
N = df.shape[0] if df.shape[0] < 10 else 10
axis = item['chart']['axis']
x = axis['x']
_min=_max = 0
for y in axis['y'] :
series += [{"data":df[y].values[:N].tolist(),"name":y.upper().replace('_',' ')}]
_min = df[y].min() if df[y].min() < _min else _min
_max = df[y].max() if df[y].max() > _max else _max
colors = COLORS[:len(axis['y'])]
options = {"chart":{"type":"line"},"series":series,"stroke":{"curve":"smooth"},"colors":colors,"legend":{"position":"right"}}
xtitle , ytitle = Apex.get_labels(item)
options['xaxis'] = {"categories":df[x].values[:N].tolist(),"title":xtitle['title']}
options['yaxis'] = ytitle
return {"apex":options}
@staticmethod
def donut(item):
"""
:pre data must have more than one item otherwise just make it a scalar
here we will use the key as labels and the values as the values (obviously)
labels are y-axis
values are x-axis
"""
df = item['data']
if df.shape [0]> 1 :
y_cols,x_cols = item['chart']['axis']['y'],item['chart']['axis']['x']
labels = df[y_cols].values.tolist()
values = df[x_cols].values.round(2).tolist()
else:
labels = [name.upper().replace('_',' ') for name in df.columns.tolist()]
# df = df.astype(float)
# values = df.values.round(2).tolist()[0] if df.shape[1] > 1 else df.values.round(2).tolist()
values = df[[name for name in df.columns if df[name].dtype in [float,int]] ].values.round(2).tolist()
colors = COLORS[:len(values)]
options = {"series":values,"colors":colors,"labels":labels,"dataLabels":{"enabled":True,"style":{"colors":["#000000"]},"dropShadow":{"enabled":False}},"chart":{"type":"donut","width":200},"plotOptions":{"pie":{"customScale":.9}},"legend":{"position":"right"}}
return {"apex":options}
pass
class engine :
"""
This engine is designed to load the configuration and run the queries given they are remittance or claims
@TODO:
- make sure the readers of the queries are configurable i.e use data-transport
"""
def __init__(self,path) :
"""
Loading configuration file from a designated location ...
"""
f = open(path) ;
_config = json.loads(f.read())
self.store_config = _config['store']
self.info = _config['analytics']
_args = self.store_config
if 'type' not in self.store_config :
#
# This is the newer version of data-transport
self.store_config['context'] = 'read'
self.store_config = _args ;
def filter (self,**args):
"""
type: claims or remits
filter optional identifier claims, procedures, taxonomy, ...
"""
_m = {'claim':'837','claims':'837','remits':'835','remit':'835'}
table = _m[ args['type']]
_analytics = self.info[table]
if 'index' in args :
index = int(args['index'])
_analytics = [_analytics[index]]
_info = list(_analytics) #if 'filter' not in args else [item for item in analytics if args['filter'] == item['id']]
# conn = lite.connect(self.store_config['args']['path'],isolation_level=None)
# conn.create_aggregate("stdev",1,stdev)
DB_TYPE = 'mongo' if (type(self.reader) == transport.mongo.MongoReader) else 'sql'
# if DB_TYPE == 'mongo' :
# self.store_config['args']['doc'] = args['type']
self.reader = transport.factory.instance(**self.store_config)
r = []
for row in _info :
pipeline = row['pipeline']
index = 0
for item in pipeline:
if not item[DB_TYPE] :
continue
query = {DB_TYPE:item[DB_TYPE]}
df = pd.DataFrame(self.reader.read(**query)) #item)
df = df.fillna('N/A')
# item['data'] = df
chart = item['chart']
pipe = {"data":df,"chart":chart}
for key in list(item.keys()) :
if key not in ["chart","data","mongo","sql","couch"] :
pipe[key] = item[key]
r.append(pipe)
self.reader.close()
return {"id":_info[0]['id'],'pipeline':r}
def apply (self,**args) :
"""
type: claims or remits
filter optional identifier claims, procedures, taxonomy, ...
"""
_m = {'claim':'837','claims':'837','remits':'835','remit':'835'}
# key = '837' if args['type'] == 'claims' else '835'
table = _m[ args['type']]
_analytics = self.info[table]
if 'index' in args :
index = int(args['index'])
_analytics = [_analytics[index]]
_info = list(_analytics) if 'filter' not in args else [item for item in analytics if args['filter'] == item['id']]
self.reader = transport.factory.instance(**self.store_config)
DB_TYPE = 'mongo' if self.store_config ['provider'] in ['mongodb','mongo'] else 'sql'
r = []
for row in _info :
pipeline = row['pipeline']
index = 0
for item in pipeline:
# item['data'] = pd.read_sql(item['sql'],conn)
# query = {DB_TYPE:item[DB_TYPE]}
query = item[DB_TYPE]
if not query :
continue
if DB_TYPE == 'sql' :
query = {"sql":query}
else:
query = {DB_TYPE:query}
_df = self.reader.read(**query) #item)
print (query)
print (self.reader)
if 'serialize' in args :
# item['data'] = json.dumps(item['data'].to_dict(orient='record')) if type(item['data']) == pd.DataFrame else item['data']
item['data'] = json.dumps(_df.to_dict(orient='record'))
else:
# item['data'] = (pd.DataFrame(item['data']))
item['data'] = _df
pass
print (_df.head())
break
pipeline[index] = item
index += 1
#
#
row['pipeline']= pipeline
# if 'info' in item:
# item['info'] = item['info'].replace(":rows",str(item["data"].shape[0]))
# conn.close()
self.reader.close()
return _info
def _html(self,item) :
figure = None
df = item['data']
label = ['<div class="label">',item['label'],'</div>']
text = ['<div class="grid">',df.describe().iloc[:].round(2).to_html().replace('_',' '),'</div>']
info = ['<div class="info">',item['info'],'</div>'] if 'info' in item else []
if item['chart']['type'] in ['pie','donut','doughnut'] :
figure = Chart.donut(item)
text = ['<div class="grid">',df.to_html(index=False).replace('_',' '),'</div>']
elif item['chart']['type'] == 'scatter' :
figure = Chart.scatter(item)
elif item['chart']['type'] == 'spline' :
figure = Chart.spline(item)
elif item['chart']['type'] in ['barh','hbar'] :
figure = Chart.barh(item)
elif item['chart']['type'] == 'scalar' :
figure = (item['data'].apply(lambda col: '<div class="scalar"><div class="value bold">'+str(col.values[0].round(2))+'</div><div class="value-text">'+col.name.replace('_', ' ')+'</div></div>' ).tolist())
label = text = []
pass
if figure and item['chart']['type'] != 'scalar':
stream = io.BytesIO()
figure.savefig(stream,format='png',dpi=300,quality=95, bbox_inches = "tight",transparent=True)
stream.seek(0)
stream = base64.b64encode(stream.getvalue()).decode("utf-8")
stream = "data:image/png;base64,"+stream
figure = ['<div class="figure"><img src="'+stream+'">',"</div>"]
# figure.canvas.draw()
# figure = "".join( map(chr,figure.canvas.tostring_argb())) #--bytes
# else:
# figure = [ ]
if item['chart']['type'] != 'scalar':
return ['<div class="frame"><div class="chart '+ item['chart']['type']+'">'] + [ " ".join(row) for row in [label,figure,text,info] if row] + ["</div></div>"]
else:
return [ " ".join(row) for row in [label,figure,text,info] if row]
pass
def _csv(self,item):
pass
def export(self,item,format):
"""
We have a pipeline here and we should attempt to build a figure using seaborn within an html template using jinja2
This is considered a page (or an item) of an analysis where we will have both data and rendering information with accompanying text
"""
html = []
for row in item['pipeline'] :
p = [ "<h2>",row['label'].replace('_',' '),"</h2>"]
y_label = [name for name in row['data'].columns if 'count' in name]
x_label = list(set(row['data'].columns) - set(y_label))
N = row.shape[0]
if 'info' in row :
p += ["<div class='info'>",row['info'],'</div>']
pass
class LogAnalytics :
def __init__(self,path):
logs = open(path).read().split('\n')
logs = [json.loads(row) for row in logs if row.strip() != '']
self.remits = {
"completed": np.sum([1 for row in logs if row['completed'] == True]),
"files":len(logs)
}
# m = LogAnalytics('/home/steve/healthcare-io/remits.log')
css = """
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>HealthcareIO - :title </title>
<style>
body{
padding:8px;
padding-left:4%;
padding-right:4%;
}
.pane{
padding:4px;
display:grid;
gap:16px;
grid-template-columns:repeat(2,1fr) ;
}
.numbers {
display:grid;
grid-template-columns:repeat(2,1fr);
gap:16px;
/*padding:2px;*/
/*border:1px solid #CAD5E0;*/
}
.numbers .scalar {
padding:8px;
background-image: linear-gradient(to bottom, #f3f3f3,#d3d3d3, #ffffff);
border:1px solid #CAD5E0;
font-family:sans-serif;
text-transform:capitalize;
text-align:right;
font-size:12px;
display:grid;
grid-template-rows:auto 28px; gap:2px;
}
.numbers .scalar .value-text {
border-top:1px solid #CAD5E0;
padding:8px;
font-weight:bold;
align-items:center;
font-size:14px;
display:grid;
}
.numbers .scalar .value {
display:grid;
color:#004b79;
align-content:center;
font-size:48px; text-align:right; font-weight:bold;}
.frame {
background-image: linear-gradient(to bottom, #f3f3f3,#d3d3d3, #ffffff);
padding:2px;
border:1px solid #CAD5E0;
}
.figure {grid-area:figure; width:500px; height:350px; display:grid; align-items:center}
.info {height:28px; width:100%; grid-area:info;
display:grid;
align-items:center;
text-align:center; text-transform:capitalize; padding:4px; font-size:12px; font-family:sans-serif; border-top:1px solid #CAD5E0;}
.grid {grid-area:grid; }
.label {grid-area:label; font-weight:bold; font-size: 22px; text-align:center; text-transform:capitalize}
.chart {
padding:4px;
padding:8px;
display:grid; grid-template-areas:
"label label label"
"figure grid grid"
"info info info" ;
gap:2px;
}
img {height:auto; max-width:100% ;}
table {width:100%; border-collapse: collapse;}
table , TH, TD{ font-size:14px; padding:8px; font-family:sans-serif; border:1px; border:1px solid #CAD5E0;}
table thead, tbody th { padding:4px; text-transform:capitalize; background-color:#4682B4; color:#ffffff; text-align:center}
table thead tr th {text-align:center}
table tbody td {text-align:right; font-weight: lighter}
table tbody tr:nth-child(odd) {background: #95bce0}
table tbody tr:nth-child(even) {background: #c8e5ff}
</style>
"""
# folder = '/home/steve/.healthcareio/config.json'
# e = engine(path=folder)
# p = e.apply(type='claims')
# values = []
# html = [css]
# for row in p :
# frame = []
# for item in row['pipeline'] :
# if row['pipeline'].index(item) == 0 :
# if item['chart']['type'] != 'scalar' :
# # frame = ['<div class="frame">']
# pass
# else:
# frame = ['<div><div class="numbers">']
# frame += e._html(item) #p[3]['pipeline'][0])
# frame += ['</div></div>'] if item['chart']['type'] == 'scalar' else []
# html += frame
# html = '<div class="pane">' + "\n".join(html) + "</div></div>"
# f = open('out.html','w')
# f.write(html.replace(":title","Claims"))
#
# HTML(string=html).write_pdf('out.pdf',stylesheets=[CSS(string=css)])
# x.write_pdf('./out.pdf')
# print (p[2]['pipeline'][0]['data'])
# e.export (p[0])
# features = ['diagnosis.code']
# split(folder = folder, features=features)