diff --git a/healthcareio/analytics.py b/healthcareio/analytics.py index 875a8cd..442ad60 100644 --- a/healthcareio/analytics.py +++ b/healthcareio/analytics.py @@ -11,7 +11,7 @@ import transport import matplotlib.pyplot as plt import re, base64 # from weasyprint import HTML, CSS -COLORS = ["#f79256","#7dcfb6","#fbd1a2","#00b2ca","#1d4e89","#4682B4","#c5c3c6","#4c5c68","#1985a1","#f72585","#7209b7","#3a0ca3","#4361ee","#4cc9f0","#ff595e","#ffca3a","#8ac926","#1982c4","#6a4c93"] +COLORS = ["#fbd1a2","#00b2ca","#1d4e89","#4682B4","#c5c3c6","#4c5c68","#1985a1","#f72585","#7209b7","#3a0ca3","#4361ee","#4cc9f0","#ff595e","#ffca3a","#8ac926","#1982c4","#6a4c93"] class stdev : def __init__(self) : self.values = [] @@ -149,11 +149,16 @@ class Apex : This class will format a data-frame to work with Apex charting engine """ @staticmethod - def apply(item): + def apply(item,theme={'mode':'light','palette':'palette6'}): pointer = item['chart']['type'] if hasattr(Apex,pointer) : pointer = getattr(Apex,pointer) + options = pointer(item) + if 'apex' in options and 'colors' in options['apex'] : + del options['apex']['colors'] + if 'apex' in options : + options['apex']['theme'] = theme options['responsive']= [ { 'breakpoint': 1, @@ -168,6 +173,18 @@ class Apex : print ("Oops") pass @staticmethod + def radial(item): + df = item['data'] + x = item['chart']['axis']['x'] + y = item['chart']['axis']['y'] + + labels = df[y].tolist() + values = [float(np.round(value,2)) for value in df[x].tolist()] + chart = {"type":"radialBar","height":200} + option = {"chart":chart,"series":values,"labels":labels,"plotOptions":{"radialBar":{"hollow":{"size":"70%"}}}} + return {'apex':option} + + @staticmethod def scatter(item): options = Apex.spline(item) options['apex']['chart']['type'] = 'scatter' @@ -175,7 +192,7 @@ class Apex : @staticmethod def scalar(item): _df = item['data'] - print (_df) + name = _df.columns.tolist()[0] value = _df[name].values.round(2)[0] html = '
:value
:label
' @@ -235,16 +252,17 @@ class Apex : @TODO: alias this with bar (!= column) """ df = item['data'] + N = df.shape[0] if df.shape[0] < 10 else 10 axis = item['chart']['axis'] y = axis['y'] if type(y) == list : y = y[0] axis['x'] = [axis['x']] if type(axis['x']) != list else axis['x'] - if not set(axis['x']) & set(df.columns.tolist()) : - print (set(axis['x']) & set(df.columns.tolist())) - print (axis['x']) - print (df.columns) + # if not set(axis['x']) & set(df.columns.tolist()) : + # print (set(axis['x']) & set(df.columns.tolist())) + # print (axis['x']) + # print (df.columns) # df.columns = axis['x'] series = [] _min=_max = 0 @@ -294,7 +312,6 @@ class Apex : values are x-axis """ df = item['data'] - if df.shape [0]> 1 : y_cols,x_cols = item['chart']['axis']['y'],item['chart']['axis']['x'] labels = df[y_cols].values.tolist() @@ -302,10 +319,11 @@ class Apex : values = df[x_cols].values.round(2).tolist() else: labels = [name.upper().replace('_',' ') for name in df.columns.tolist()] + df = df.astype(float) values = df.values.round(2).tolist()[0] if df.shape[1] > 1 else df.values.round(2).tolist() colors = COLORS[:len(values)] - options = {"series":values,"colors":colors,"labels":labels,"chart":{"type":"donut"},"plotOptions":{"pie":{"customScale":.8}},"legend":{"position":"right"}} + options = {"series":values,"colors":colors,"labels":labels,"dataLabels":{"enabled":True,"style":{"colors":["#000000"]},"dropShadow":{"enabled":False}},"chart":{"type":"donut","width":200},"plotOptions":{"pie":{"customScale":.9}},"legend":{"position":"right"}} return {"apex":options} pass @@ -329,43 +347,117 @@ class engine : _args['type'] = 'mongo.MongoReader' else: _args['type'] = 'disk.SQLiteReader' - self.reader = transport.factory.instance(**_args) + self.store_config = _args ; + + def filter (self,**args): + """ + type: claims or remits + filter optional identifier claims, procedures, taxonomy, ... + """ + + + _m = {'claim':'837','claims':'837','remits':'835','remit':'835'} + table = _m[ args['type']] + _analytics = self.info[table] + if 'index' in args : + index = int(args['index']) + _analytics = [_analytics[index]] + + _info = list(_analytics) #if 'filter' not in args else [item for item in analytics if args['filter'] == item['id']] + # conn = lite.connect(self.store_config['args']['path'],isolation_level=None) + # conn.create_aggregate("stdev",1,stdev) + DB_TYPE = 'mongo' if (type(self.reader) == transport.mongo.MongoReader) else 'sql' + if DB_TYPE == 'mongo' : + self.store_config['args']['doc'] = args['type'] + + self.reader = transport.factory.instance(**self.store_config) + r = [] + for row in _info : + pipeline = row['pipeline'] + + index = 0 + for item in pipeline: + if not item[DB_TYPE] : + continue + query = {DB_TYPE:item[DB_TYPE]} + + df = pd.DataFrame(self.reader.read(**query)) #item) + df = df.fillna('N/A') + # item['data'] = df + chart = item['chart'] + pipe = {"data":df,"chart":chart} + for key in list(item.keys()) : + if key not in ["chart","data","mongo","sql","couch"] : + pipe[key] = item[key] + + + + r.append(pipe) + self.reader.close() + return {"id":_info[0]['id'],'pipeline':r} + def apply (self,**args) : """ type: claims or remits filter optional identifier claims, procedures, taxonomy, ... """ + + _m = {'claim':'837','claims':'837','remits':'835','remit':'835'} # key = '837' if args['type'] == 'claims' else '835' table = _m[ args['type']] - analytics = self.info[table] + + _analytics = self.info[table] if 'index' in args : index = int(args['index']) - analytics = [analytics[index]] + _analytics = [_analytics[index]] - _info = list(analytics) if 'filter' not in args else [item for item in analytics if args['filter'] == item['id']] + _info = list(_analytics) if 'filter' not in args else [item for item in analytics if args['filter'] == item['id']] # conn = lite.connect(self.store_config['args']['path'],isolation_level=None) # conn.create_aggregate("stdev",1,stdev) - DB_TYPE = 'mongo' if (type(self.reader) == transport.mongo.MongoReader) else 'sql' + # + # @TODO: Find a better way to handle database variance + # + # DB_TYPE = 'mongo' if (type(self.reader) == transport.mongo.MongoReader) else 'sql' + + if 'mongo' in self.store_config['type'] : + DB_TYPE='mongo' + else: + DB_TYPE='sql' + self.store_config['args']['table'] = args['type'] + + self.reader = transport.factory.instance(**self.store_config) r = [] for row in _info : - - for item in row['pipeline'] : + pipeline = row['pipeline'] + index = 0 + for item in pipeline: # item['data'] = pd.read_sql(item['sql'],conn) - query = {DB_TYPE:item[DB_TYPE]} - item['data'] = self.reader.read(**item) + # query = {DB_TYPE:item[DB_TYPE]} + query = item[DB_TYPE] + if not query : + continue + if DB_TYPE == 'sql' : + query = {"sql":query} + + item['data'] = self.reader.read(**query) #item) if 'serialize' in args : - item['data'] = json.dumps(item['data'].to_dict(orient='record')) if type(item['data']) == pd.DataFrame else item['data'] + # item['data'] = json.dumps(item['data'].to_dict(orient='record')) if type(item['data']) == pd.DataFrame else item['data'] + item['data'] = json.dumps(item['data'].to_dict('record')) if type(item['data']) == pd.DataFrame else item['data'] else: item['data'] = (pd.DataFrame(item['data'])) - + pipeline[index] = item + index += 1 + # + # + row['pipeline']= pipeline # if 'info' in item: # item['info'] = item['info'].replace(":rows",str(item["data"].shape[0])) # conn.close() - + self.reader.close() return _info def _html(self,item) : diff --git a/healthcareio/healthcare-io.py b/healthcareio/healthcare-io.py index 037586c..be6012d 100644 --- a/healthcareio/healthcare-io.py +++ b/healthcareio/healthcare-io.py @@ -43,6 +43,8 @@ import numpy as np from multiprocessing import Process import time from healthcareio import x12 +import smart +import pandas as pd PATH = os.sep.join([os.environ['HOME'],'.healthcareio']) OUTPUT_FOLDER = os.sep.join([os.environ['HOME'],'healthcare-io']) @@ -337,10 +339,20 @@ if __name__ == '__main__' : # PATH= SYS_ARGS['config'] if 'config' in SYS_ARGS else os.sep.join([os.environ['HOME'],'.healthcareio','config.json']) - e = analytics.engine(os.sep.join([PATH,'config.json'])) #--@TODO: make the configuration file globally accessible - e.apply(type='claims',serialize=True) - SYS_ARGS['engine'] = e + if os.path.exists(os.sep.join([PATH,'config.json'])) : + e = analytics.engine(os.sep.join([PATH,'config.json'])) #--@TODO: make the configuration file globally accessible + e.apply(type='claims',serialize=True) + SYS_ARGS['engine'] = e + SYS_ARGS['config'] = json.loads(open(os.sep.join([PATH,'config.json'])).read()) + else: + SYS_ARGS['config'] = {"owner":None,"store":None} + + if 'args' not in SYS_ARGS['config'] : + SYS_ARGS['config']["args"] = {"batch":1,"resume":True,"folder":"/data"} + me = pd.DataFrame(smart.top.read(name='healthcare-io.py')).args.unique().tolist() + SYS_ARGS['me'] = me[0] #-- This key will identify the current process + pointer = lambda : server.app.run(host='0.0.0.0',port=PORT,debug=DEBUG,threaded=False) pthread = Process(target=pointer,args=()) pthread.start() diff --git a/healthcareio/x12/__init__.py b/healthcareio/x12/__init__.py index 2e22d98..66dc05a 100644 --- a/healthcareio/x12/__init__.py +++ b/healthcareio/x12/__init__.py @@ -49,8 +49,9 @@ class Formatters : """ This function is designed to split an x12 row and """ + value = [] if row.startswith(prefix) is False: - value = [] + for row_value in row.replace('~','').split(sep) : @@ -65,10 +66,12 @@ class Formatters : else : value.append(row_value.replace('\n','')) - return [xchar.replace('\r','') for xchar in value] #row.replace('~','').split(sep) + value = [xchar.replace('\r','') for xchar in value] #row.replace('~','').split(sep) else: - return [ [prefix]+ self.split(item,'>') for item in row.replace('~','').split(sep)[1:] ] + value = [ [prefix]+ self.split(item,'>') for item in row.replace('~','').split(sep)[1:] ] + + return value if type(value) == list and type(value[0]) != list else value[0] def get_config(self,config,row): """ This function will return the meaningfull parts of the configuration for a given item @@ -130,7 +133,7 @@ class Formatters : terms = value[1].split('>') return {'type':terms[0],'code':terms[1],"amount":float(value[2])} else: - + return {"code":value[2],"type":value[1],"amount":float(value[3])} def sv2(self,value): # @@ -191,6 +194,9 @@ class Parser (Process): self.files = [] self.set = void() self.set.files = self.set_files + self.emit = void() + self.emit.pre = None + self.emit.post = None def set_files(self,files): self.files = files def get_map(self,row,config,version=None): @@ -328,7 +334,7 @@ class Parser (Process): pass except Exception as e : - + print ('__',e.args) pass @@ -433,6 +439,9 @@ class Parser (Process): # self.finish(claims,logs,_code) return claims,logs,_code def run(self): + if self.emit.pre : + self.emit.pre() + for filename in self.files : content,logs,_code = self.read(filename) self.finish(content,logs,_code) @@ -442,14 +451,22 @@ class Parser (Process): if args['type'] == 'mongo.MongoWriter' : args['args']['doc'] = 'claims' if _code == '837' else 'remits' _args['args']['doc'] = 'logs' + else: + args['args']['table'] = 'claims' if _code == '837' else 'remits' + _args['args']['table'] = 'logs' + if content : writer = transport.factory.instance(**args) writer.write(content) writer.close() if logs : + logger = transport.factory.instance(**_args) logger.write(logs) + logger.close() + if self.emit.post : + self.emit.post(content,logs)