bug fixes with parser

This commit is contained in:
Steve Nyemba 2020-12-11 06:45:10 -06:00
parent 8b4eb81564
commit ed782b7e40
3 changed files with 150 additions and 29 deletions

View File

@ -11,7 +11,7 @@ import transport
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import re, base64 import re, base64
# from weasyprint import HTML, CSS # from weasyprint import HTML, CSS
COLORS = ["#f79256","#7dcfb6","#fbd1a2","#00b2ca","#1d4e89","#4682B4","#c5c3c6","#4c5c68","#1985a1","#f72585","#7209b7","#3a0ca3","#4361ee","#4cc9f0","#ff595e","#ffca3a","#8ac926","#1982c4","#6a4c93"] COLORS = ["#fbd1a2","#00b2ca","#1d4e89","#4682B4","#c5c3c6","#4c5c68","#1985a1","#f72585","#7209b7","#3a0ca3","#4361ee","#4cc9f0","#ff595e","#ffca3a","#8ac926","#1982c4","#6a4c93"]
class stdev : class stdev :
def __init__(self) : def __init__(self) :
self.values = [] self.values = []
@ -149,11 +149,16 @@ class Apex :
This class will format a data-frame to work with Apex charting engine This class will format a data-frame to work with Apex charting engine
""" """
@staticmethod @staticmethod
def apply(item): def apply(item,theme={'mode':'light','palette':'palette6'}):
pointer = item['chart']['type'] pointer = item['chart']['type']
if hasattr(Apex,pointer) : if hasattr(Apex,pointer) :
pointer = getattr(Apex,pointer) pointer = getattr(Apex,pointer)
options = pointer(item) options = pointer(item)
if 'apex' in options and 'colors' in options['apex'] :
del options['apex']['colors']
if 'apex' in options :
options['apex']['theme'] = theme
options['responsive']= [ options['responsive']= [
{ {
'breakpoint': 1, 'breakpoint': 1,
@ -168,6 +173,18 @@ class Apex :
print ("Oops") print ("Oops")
pass pass
@staticmethod @staticmethod
def radial(item):
df = item['data']
x = item['chart']['axis']['x']
y = item['chart']['axis']['y']
labels = df[y].tolist()
values = [float(np.round(value,2)) for value in df[x].tolist()]
chart = {"type":"radialBar","height":200}
option = {"chart":chart,"series":values,"labels":labels,"plotOptions":{"radialBar":{"hollow":{"size":"70%"}}}}
return {'apex':option}
@staticmethod
def scatter(item): def scatter(item):
options = Apex.spline(item) options = Apex.spline(item)
options['apex']['chart']['type'] = 'scatter' options['apex']['chart']['type'] = 'scatter'
@ -175,7 +192,7 @@ class Apex :
@staticmethod @staticmethod
def scalar(item): def scalar(item):
_df = item['data'] _df = item['data']
print (_df)
name = _df.columns.tolist()[0] name = _df.columns.tolist()[0]
value = _df[name].values.round(2)[0] value = _df[name].values.round(2)[0]
html = '<div class="scalar"><div class="value">:value</div><div class="label">:label</div></div>' html = '<div class="scalar"><div class="value">:value</div><div class="label">:label</div></div>'
@ -235,16 +252,17 @@ class Apex :
@TODO: alias this with bar (!= column) @TODO: alias this with bar (!= column)
""" """
df = item['data'] df = item['data']
N = df.shape[0] if df.shape[0] < 10 else 10 N = df.shape[0] if df.shape[0] < 10 else 10
axis = item['chart']['axis'] axis = item['chart']['axis']
y = axis['y'] y = axis['y']
if type(y) == list : if type(y) == list :
y = y[0] y = y[0]
axis['x'] = [axis['x']] if type(axis['x']) != list else axis['x'] axis['x'] = [axis['x']] if type(axis['x']) != list else axis['x']
if not set(axis['x']) & set(df.columns.tolist()) : # if not set(axis['x']) & set(df.columns.tolist()) :
print (set(axis['x']) & set(df.columns.tolist())) # print (set(axis['x']) & set(df.columns.tolist()))
print (axis['x']) # print (axis['x'])
print (df.columns) # print (df.columns)
# df.columns = axis['x'] # df.columns = axis['x']
series = [] series = []
_min=_max = 0 _min=_max = 0
@ -294,7 +312,6 @@ class Apex :
values are x-axis values are x-axis
""" """
df = item['data'] df = item['data']
if df.shape [0]> 1 : if df.shape [0]> 1 :
y_cols,x_cols = item['chart']['axis']['y'],item['chart']['axis']['x'] y_cols,x_cols = item['chart']['axis']['y'],item['chart']['axis']['x']
labels = df[y_cols].values.tolist() labels = df[y_cols].values.tolist()
@ -302,10 +319,11 @@ class Apex :
values = df[x_cols].values.round(2).tolist() values = df[x_cols].values.round(2).tolist()
else: else:
labels = [name.upper().replace('_',' ') for name in df.columns.tolist()] labels = [name.upper().replace('_',' ') for name in df.columns.tolist()]
df = df.astype(float)
values = df.values.round(2).tolist()[0] if df.shape[1] > 1 else df.values.round(2).tolist() values = df.values.round(2).tolist()[0] if df.shape[1] > 1 else df.values.round(2).tolist()
colors = COLORS[:len(values)] colors = COLORS[:len(values)]
options = {"series":values,"colors":colors,"labels":labels,"chart":{"type":"donut"},"plotOptions":{"pie":{"customScale":.8}},"legend":{"position":"right"}} options = {"series":values,"colors":colors,"labels":labels,"dataLabels":{"enabled":True,"style":{"colors":["#000000"]},"dropShadow":{"enabled":False}},"chart":{"type":"donut","width":200},"plotOptions":{"pie":{"customScale":.9}},"legend":{"position":"right"}}
return {"apex":options} return {"apex":options}
pass pass
@ -329,43 +347,117 @@ class engine :
_args['type'] = 'mongo.MongoReader' _args['type'] = 'mongo.MongoReader'
else: else:
_args['type'] = 'disk.SQLiteReader' _args['type'] = 'disk.SQLiteReader'
self.reader = transport.factory.instance(**_args) self.store_config = _args ;
def filter (self,**args):
"""
type: claims or remits
filter optional identifier claims, procedures, taxonomy, ...
"""
_m = {'claim':'837','claims':'837','remits':'835','remit':'835'}
table = _m[ args['type']]
_analytics = self.info[table]
if 'index' in args :
index = int(args['index'])
_analytics = [_analytics[index]]
_info = list(_analytics) #if 'filter' not in args else [item for item in analytics if args['filter'] == item['id']]
# conn = lite.connect(self.store_config['args']['path'],isolation_level=None)
# conn.create_aggregate("stdev",1,stdev)
DB_TYPE = 'mongo' if (type(self.reader) == transport.mongo.MongoReader) else 'sql'
if DB_TYPE == 'mongo' :
self.store_config['args']['doc'] = args['type']
self.reader = transport.factory.instance(**self.store_config)
r = []
for row in _info :
pipeline = row['pipeline']
index = 0
for item in pipeline:
if not item[DB_TYPE] :
continue
query = {DB_TYPE:item[DB_TYPE]}
df = pd.DataFrame(self.reader.read(**query)) #item)
df = df.fillna('N/A')
# item['data'] = df
chart = item['chart']
pipe = {"data":df,"chart":chart}
for key in list(item.keys()) :
if key not in ["chart","data","mongo","sql","couch"] :
pipe[key] = item[key]
r.append(pipe)
self.reader.close()
return {"id":_info[0]['id'],'pipeline':r}
def apply (self,**args) : def apply (self,**args) :
""" """
type: claims or remits type: claims or remits
filter optional identifier claims, procedures, taxonomy, ... filter optional identifier claims, procedures, taxonomy, ...
""" """
_m = {'claim':'837','claims':'837','remits':'835','remit':'835'} _m = {'claim':'837','claims':'837','remits':'835','remit':'835'}
# key = '837' if args['type'] == 'claims' else '835' # key = '837' if args['type'] == 'claims' else '835'
table = _m[ args['type']] table = _m[ args['type']]
analytics = self.info[table]
_analytics = self.info[table]
if 'index' in args : if 'index' in args :
index = int(args['index']) index = int(args['index'])
analytics = [analytics[index]] _analytics = [_analytics[index]]
_info = list(analytics) if 'filter' not in args else [item for item in analytics if args['filter'] == item['id']] _info = list(_analytics) if 'filter' not in args else [item for item in analytics if args['filter'] == item['id']]
# conn = lite.connect(self.store_config['args']['path'],isolation_level=None) # conn = lite.connect(self.store_config['args']['path'],isolation_level=None)
# conn.create_aggregate("stdev",1,stdev) # conn.create_aggregate("stdev",1,stdev)
DB_TYPE = 'mongo' if (type(self.reader) == transport.mongo.MongoReader) else 'sql' #
# @TODO: Find a better way to handle database variance
#
# DB_TYPE = 'mongo' if (type(self.reader) == transport.mongo.MongoReader) else 'sql'
if 'mongo' in self.store_config['type'] :
DB_TYPE='mongo'
else:
DB_TYPE='sql'
self.store_config['args']['table'] = args['type']
self.reader = transport.factory.instance(**self.store_config)
r = [] r = []
for row in _info : for row in _info :
pipeline = row['pipeline']
for item in row['pipeline'] : index = 0
for item in pipeline:
# item['data'] = pd.read_sql(item['sql'],conn) # item['data'] = pd.read_sql(item['sql'],conn)
query = {DB_TYPE:item[DB_TYPE]} # query = {DB_TYPE:item[DB_TYPE]}
item['data'] = self.reader.read(**item) query = item[DB_TYPE]
if not query :
continue
if DB_TYPE == 'sql' :
query = {"sql":query}
item['data'] = self.reader.read(**query) #item)
if 'serialize' in args : if 'serialize' in args :
item['data'] = json.dumps(item['data'].to_dict(orient='record')) if type(item['data']) == pd.DataFrame else item['data'] # item['data'] = json.dumps(item['data'].to_dict(orient='record')) if type(item['data']) == pd.DataFrame else item['data']
item['data'] = json.dumps(item['data'].to_dict('record')) if type(item['data']) == pd.DataFrame else item['data']
else: else:
item['data'] = (pd.DataFrame(item['data'])) item['data'] = (pd.DataFrame(item['data']))
pipeline[index] = item
index += 1
#
#
row['pipeline']= pipeline
# if 'info' in item: # if 'info' in item:
# item['info'] = item['info'].replace(":rows",str(item["data"].shape[0])) # item['info'] = item['info'].replace(":rows",str(item["data"].shape[0]))
# conn.close() # conn.close()
self.reader.close()
return _info return _info
def _html(self,item) : def _html(self,item) :

View File

@ -43,6 +43,8 @@ import numpy as np
from multiprocessing import Process from multiprocessing import Process
import time import time
from healthcareio import x12 from healthcareio import x12
import smart
import pandas as pd
PATH = os.sep.join([os.environ['HOME'],'.healthcareio']) PATH = os.sep.join([os.environ['HOME'],'.healthcareio'])
OUTPUT_FOLDER = os.sep.join([os.environ['HOME'],'healthcare-io']) OUTPUT_FOLDER = os.sep.join([os.environ['HOME'],'healthcare-io'])
@ -337,9 +339,19 @@ if __name__ == '__main__' :
# PATH= SYS_ARGS['config'] if 'config' in SYS_ARGS else os.sep.join([os.environ['HOME'],'.healthcareio','config.json']) # PATH= SYS_ARGS['config'] if 'config' in SYS_ARGS else os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
e = analytics.engine(os.sep.join([PATH,'config.json'])) #--@TODO: make the configuration file globally accessible if os.path.exists(os.sep.join([PATH,'config.json'])) :
e.apply(type='claims',serialize=True) e = analytics.engine(os.sep.join([PATH,'config.json'])) #--@TODO: make the configuration file globally accessible
SYS_ARGS['engine'] = e e.apply(type='claims',serialize=True)
SYS_ARGS['engine'] = e
SYS_ARGS['config'] = json.loads(open(os.sep.join([PATH,'config.json'])).read())
else:
SYS_ARGS['config'] = {"owner":None,"store":None}
if 'args' not in SYS_ARGS['config'] :
SYS_ARGS['config']["args"] = {"batch":1,"resume":True,"folder":"/data"}
me = pd.DataFrame(smart.top.read(name='healthcare-io.py')).args.unique().tolist()
SYS_ARGS['me'] = me[0] #-- This key will identify the current process
pointer = lambda : server.app.run(host='0.0.0.0',port=PORT,debug=DEBUG,threaded=False) pointer = lambda : server.app.run(host='0.0.0.0',port=PORT,debug=DEBUG,threaded=False)
pthread = Process(target=pointer,args=()) pthread = Process(target=pointer,args=())

View File

@ -49,8 +49,9 @@ class Formatters :
""" """
This function is designed to split an x12 row and This function is designed to split an x12 row and
""" """
value = []
if row.startswith(prefix) is False: if row.startswith(prefix) is False:
value = []
for row_value in row.replace('~','').split(sep) : for row_value in row.replace('~','').split(sep) :
@ -65,10 +66,12 @@ class Formatters :
else : else :
value.append(row_value.replace('\n','')) value.append(row_value.replace('\n',''))
return [xchar.replace('\r','') for xchar in value] #row.replace('~','').split(sep) value = [xchar.replace('\r','') for xchar in value] #row.replace('~','').split(sep)
else: else:
return [ [prefix]+ self.split(item,'>') for item in row.replace('~','').split(sep)[1:] ] value = [ [prefix]+ self.split(item,'>') for item in row.replace('~','').split(sep)[1:] ]
return value if type(value) == list and type(value[0]) != list else value[0]
def get_config(self,config,row): def get_config(self,config,row):
""" """
This function will return the meaningfull parts of the configuration for a given item This function will return the meaningfull parts of the configuration for a given item
@ -191,6 +194,9 @@ class Parser (Process):
self.files = [] self.files = []
self.set = void() self.set = void()
self.set.files = self.set_files self.set.files = self.set_files
self.emit = void()
self.emit.pre = None
self.emit.post = None
def set_files(self,files): def set_files(self,files):
self.files = files self.files = files
def get_map(self,row,config,version=None): def get_map(self,row,config,version=None):
@ -433,6 +439,9 @@ class Parser (Process):
# self.finish(claims,logs,_code) # self.finish(claims,logs,_code)
return claims,logs,_code return claims,logs,_code
def run(self): def run(self):
if self.emit.pre :
self.emit.pre()
for filename in self.files : for filename in self.files :
content,logs,_code = self.read(filename) content,logs,_code = self.read(filename)
self.finish(content,logs,_code) self.finish(content,logs,_code)
@ -442,14 +451,22 @@ class Parser (Process):
if args['type'] == 'mongo.MongoWriter' : if args['type'] == 'mongo.MongoWriter' :
args['args']['doc'] = 'claims' if _code == '837' else 'remits' args['args']['doc'] = 'claims' if _code == '837' else 'remits'
_args['args']['doc'] = 'logs' _args['args']['doc'] = 'logs'
else:
args['args']['table'] = 'claims' if _code == '837' else 'remits'
_args['args']['table'] = 'logs'
if content : if content :
writer = transport.factory.instance(**args) writer = transport.factory.instance(**args)
writer.write(content) writer.write(content)
writer.close() writer.close()
if logs : if logs :
logger = transport.factory.instance(**_args) logger = transport.factory.instance(**_args)
logger.write(logs) logger.write(logs)
logger.close() logger.close()
if self.emit.post :
self.emit.post(content,logs)