diff --git a/healthcareio/analytics.py b/healthcareio/analytics.py
index 442ad60..1171998 100644
--- a/healthcareio/analytics.py
+++ b/healthcareio/analytics.py
@@ -192,18 +192,23 @@ class Apex :
@staticmethod
def scalar(item):
_df = item['data']
-
- name = _df.columns.tolist()[0]
- value = _df[name].values.round(2)[0]
+ value = '0'
+ unit = ''
html = '
'
- if value > 999 and value < 1000000 :
- value = " ".join([str(np.divide(value,1000).round(2)),"K"])
- elif value > 999999 :
- #@ Think of considering the case of a billion ...
- value = " ".join([str(np.divide(value,1000000).round(2)),"M"])
- else:
- value = str(value)
- unit = name.replace('_',' ') if 'unit' not in item else item['unit']
+ if _df.shape[0] > 0 :
+ print (_df)
+ print ('_____________________________________')
+ name = _df.columns.tolist()[0]
+ value = _df[name].values[0]
+
+ if value > 999 and value < 1000000 :
+ value = " ".join([str(np.divide(value,1000).round(2)),"K"])
+ elif value > 999999 :
+ #@ Think of considering the case of a billion ...
+ value = " ".join([str(np.divide(value,1000000).round(2)),"M"])
+ else:
+ value = str(value)
+ unit = name.replace('_',' ') if 'unit' not in item else item['unit']
return {'html':html.replace(':value',value).replace(":label",unit)}
@staticmethod
def column(item):
@@ -319,8 +324,9 @@ class Apex :
values = df[x_cols].values.round(2).tolist()
else:
labels = [name.upper().replace('_',' ') for name in df.columns.tolist()]
- df = df.astype(float)
- values = df.values.round(2).tolist()[0] if df.shape[1] > 1 else df.values.round(2).tolist()
+ # df = df.astype(float)
+ # values = df.values.round(2).tolist()[0] if df.shape[1] > 1 else df.values.round(2).tolist()
+ values = df[[name for name in df.columns if df[name].dtype in [float,int]] ].values.round(2).tolist()
colors = COLORS[:len(values)]
options = {"series":values,"colors":colors,"labels":labels,"dataLabels":{"enabled":True,"style":{"colors":["#000000"]},"dropShadow":{"enabled":False}},"chart":{"type":"donut","width":200},"plotOptions":{"pie":{"customScale":.9}},"legend":{"position":"right"}}
@@ -343,10 +349,10 @@ class engine :
self.store_config = _config['store']
self.info = _config['analytics']
_args = self.store_config
- if self.store_config['type'] == 'mongo.MongoWriter' :
- _args['type'] = 'mongo.MongoReader'
- else:
- _args['type'] = 'disk.SQLiteReader'
+ if 'type' not in self.store_config :
+ #
+ # This is the newer version of data-transport
+ self.store_config['context'] = 'read'
self.store_config = _args ;
def filter (self,**args):
@@ -367,8 +373,8 @@ class engine :
# conn = lite.connect(self.store_config['args']['path'],isolation_level=None)
# conn.create_aggregate("stdev",1,stdev)
DB_TYPE = 'mongo' if (type(self.reader) == transport.mongo.MongoReader) else 'sql'
- if DB_TYPE == 'mongo' :
- self.store_config['args']['doc'] = args['type']
+ # if DB_TYPE == 'mongo' :
+ # self.store_config['args']['doc'] = args['type']
self.reader = transport.factory.instance(**self.store_config)
r = []
@@ -414,20 +420,8 @@ class engine :
_analytics = [_analytics[index]]
_info = list(_analytics) if 'filter' not in args else [item for item in analytics if args['filter'] == item['id']]
- # conn = lite.connect(self.store_config['args']['path'],isolation_level=None)
- # conn.create_aggregate("stdev",1,stdev)
- #
- # @TODO: Find a better way to handle database variance
- #
- # DB_TYPE = 'mongo' if (type(self.reader) == transport.mongo.MongoReader) else 'sql'
-
- if 'mongo' in self.store_config['type'] :
- DB_TYPE='mongo'
- else:
- DB_TYPE='sql'
- self.store_config['args']['table'] = args['type']
-
self.reader = transport.factory.instance(**self.store_config)
+ DB_TYPE = 'mongo' if self.store_config ['provider'] in ['mongodb','mongo'] else 'sql'
r = []
for row in _info :
pipeline = row['pipeline']
@@ -440,14 +434,22 @@ class engine :
continue
if DB_TYPE == 'sql' :
query = {"sql":query}
+ else:
+ query = {DB_TYPE:query}
- item['data'] = self.reader.read(**query) #item)
+ _df = self.reader.read(**query) #item)
+ print (query)
+ print (self.reader)
if 'serialize' in args :
# item['data'] = json.dumps(item['data'].to_dict(orient='record')) if type(item['data']) == pd.DataFrame else item['data']
- item['data'] = json.dumps(item['data'].to_dict('record')) if type(item['data']) == pd.DataFrame else item['data']
+ item['data'] = json.dumps(_df.to_dict(orient='record'))
else:
- item['data'] = (pd.DataFrame(item['data']))
+ # item['data'] = (pd.DataFrame(item['data']))
+ item['data'] = _df
+ pass
+ print (_df.head())
+ break
pipeline[index] = item
index += 1
#
diff --git a/healthcareio/docker/Dockerfile b/healthcareio/docker/Dockerfile
index 4ff6d14..3f2a89b 100644
--- a/healthcareio/docker/Dockerfile
+++ b/healthcareio/docker/Dockerfile
@@ -15,7 +15,7 @@ RUN ["apt-get","install","-y","mongodb","sqlite3","sqlite3-pcre","libsqlite3-dev
#
RUN ["pip3","install","--upgrade","pip"]
RUN ["pip3","install","numpy","pandas","git+https://dev.the-phi.com/git/steve/data-transport","botocore","matplotlib"]
-# RUN ["pip3","install","git+https://healthcare.the-phi.com/git/code/parser.git","botocore"]
+RUN ["pip3","install","git+https://healthcare.the-phi.com/git/code/parser.git"]
# RUN ["useradd", "-ms", "/bin/bash", "health-user"]
# USER health-user
#
diff --git a/healthcareio/export/export.py b/healthcareio/export/export.py
index ea74064..5382ba7 100644
--- a/healthcareio/export/export.py
+++ b/healthcareio/export/export.py
@@ -191,6 +191,7 @@ def init_sql(**_args):
_config = jsonmerge.merge(_config,CUSTOM_CONFIG[TYPE])
#
_info = meta(_config)
+
_projectSQLite = [] #-- sqlite projection
for field_name in _info['main'] :
_projectSQLite += ["json_extract(data,'$."+field_name+"') "+field_name]
@@ -211,12 +212,17 @@ def init_sql(**_args):
SQL = "SELECT DISTINCT :fields FROM "+TABLE_NAME+", json_each(data) x, json_each(x.value) i where x.key = ':table'"
SQL = SQL.replace(":table",table).replace(":fields",",".join(project))
r += [{"table":table,"read":{"sql":SQL},"sql":create(table=table,key='claim_id',fields=fields)}]
+
return r
def init(**_args):
- if 'provider' in CONFIG['store'] and CONFIG['store']['provider'] == 'sqlite' :
- return init_sql(**_args)
- else:
+ # if 'provider' in CONFIG['store'] and CONFIG['store']['provider'] == 'sqlite' :
+ # return init_sql(**_args)
+ # else:
+ # return init_mongo(**_args)
+ if ('provider' in CONFIG['store'] and CONFIG['store']['provider'] == 'mongo') or ('type' in CONFIG['store'] and 'mongo' in CONFIG['store']['type']):
return init_mongo(**_args)
+ else:
+ return init_sql(**_args)
def init_mongo (**_args) :
"""
This function is intended to determine the number of tables to be created, as well as their type.
@@ -330,6 +336,7 @@ class Factory:
job_args = init(type=X12_TYPE) #-- getting the queries that will generate the objects we are interested in
# print (json.dumps(job_args))
_jobs = []
+
for row in job_args:
# _store = json.loads(json.dumps(wstore))
_store = copy.deepcopy(wstore)
diff --git a/healthcareio/export/workers.py b/healthcareio/export/workers.py
index 72b99f6..fb457ab 100644
--- a/healthcareio/export/workers.py
+++ b/healthcareio/export/workers.py
@@ -12,6 +12,7 @@ from multiprocessing import Process, Lock
import numpy as np
import json
import pandas as pd
+from zmq import has
class Subject (Process):
cache = pd.DataFrame()
@@ -94,8 +95,8 @@ class Worker :
except Exception as error:
pass
finally:
-
- self.caller.notify()
+ if hasattr(self,'caller') :
+ self.caller.notify()
def _apply(self):
pass
def get(self):
@@ -176,11 +177,16 @@ class Reader(Worker):
def init(self,**_args):
super().init(**_args)
self.rows = []
-
+
def _apply(self):
try:
-
+ if 'type' in self._info :
+ self._info['type'] = self._info['type'].replace('Writer','Reader')
+ if 'fields' in self._info['args'] :
+ del self._info['args']['fields']
+ else:
+ self._info['context'] = 'read'
self.reader = transport.factory.instance(**self._info) ;
# self.rows = self.reader.read(mongo=self.pipeline)
@@ -206,7 +212,7 @@ class Reader(Worker):
except Exception as e :
_log['status'] = 0
_log['info'] = {"error":e.args[0]}
- print (e)
+ print ([e])
self.log(**_log)
@@ -221,13 +227,13 @@ class Writer(Worker):
super().__init__(**_args)
if 'provider' in self._info :
self._info['context'] = 'write'
-
+
def init(self,**_args):
"""
:param store output data-store needed for writing
:param invalues input values with to be written somewhere
"""
- super().init(**_args)
+
self._invalues = _args['invalues']
@@ -259,8 +265,8 @@ class Writer(Worker):
# Upon upgrade use the operator "$toString" in export.init function
#
rows = [dict(item,**{"_id":str(item["_id"])}) for item in rows]
-
- writer.write(rows)
+ _df = pd.DataFrame(rows)
+ writer.write(_df)
index += 1
# for _e in rows :
# writer.write(_e)
diff --git a/healthcareio/healthcare-io.py b/healthcareio/healthcare-io.py
index 0145dda..7134ae5 100644
--- a/healthcareio/healthcare-io.py
+++ b/healthcareio/healthcare-io.py
@@ -122,7 +122,14 @@ def signup (**args) :
file = open( filename,'w')
file.write( json.dumps(info))
file.close()
+ _m = """
+ Thank you for signingup!!
+ Your configuration file is store in :path,
+ - More information visit https://healthcareio.the-phi.com/parser
+ - Access the source https://healthcareio.the-phi.com/git/code/parser
+ """.replace(":path",CONFIG_FILE)
+ print (_m)
#
# Create the sqlite3 database to
@@ -155,12 +162,13 @@ def init():
# if 'type' in info['store'] :
lwriter = None
is_sqlite = False
- if'type' in info['store'] and info['store']['type'] == 'disk.SQLiteWriter' and not os.path.exists(info['store']['args']['path']) :
+ if'type' in info['store'] and info['store']['type'] == 'disk.SQLiteWriter' :
lwriter = transport.factory.instance(**info['store'])
is_sqlite = True
elif 'provider' in info['store'] and info['store']['provider'] == 'sqlite' :
lwriter = transport.instance(**info['store']) ;
is_sqlite = True
+
if lwriter and is_sqlite:
for key in info['schema'] :
if key != 'logs' :
@@ -168,7 +176,7 @@ def init():
else:
_id = key
- if not lwriter.has(table=_id) :
+ if not lwriter.has(table=_id) :
lwriter.apply(info['schema'][key]['create'])
# [lwriter.apply( info['schema'][key]['create']) for key in info['schema'] if not lwriter.has(table=key)]
@@ -285,6 +293,24 @@ if __name__ == '__main__' :
procs = [proc for proc in procs if proc.is_alive()]
time.sleep(2)
+ uri = OUTPUT_FOLDER
+ store_config = json.loads( (open(CONFIG_FILE)).read() )['store']
+ if 'type' in store_config :
+ uri = store_config['args']['host'] if 'host' in store_config['args'] else ( store_config['args']['path'] if 'path' in store_config['args'] else store_config['args']['database'])
+ if 'SQLite' in store_config['type']:
+ provider = 'sqlite'
+ elif 'sql' in store_config['type'] :
+ provider = 'SQL'
+ else:
+ provider = 'mongo'
+
+ else:
+ provider = store_config['provider']
+ _msg = """
+ Completed Parsing, The data is available in :provider database at :uri
+ Logs are equally available for errors and summary statistics to be compiled
+ """.replace(":provider",provider).replace(":uri",uri)
+ print (_msg)
pass
diff --git a/healthcareio/server/__init__.py b/healthcareio/server/__init__.py
index 861b438..3bf9cb7 100644
--- a/healthcareio/server/__init__.py
+++ b/healthcareio/server/__init__.py
@@ -225,6 +225,7 @@ if __name__ == '__main__' :
PATH= SYS_ARGS['config'] if 'config' in SYS_ARGS else os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
#
# Adjusting configuration with parameters (batch,folder,resume)
+ SYS_ARGS['config'] = json.loads(open(PATH).read())
if 'args' not in SYS_ARGS['config'] :
SYS_ARGS['config']["args"] = {"batch":1,"resume":True,"folder":"/data"}
diff --git a/healthcareio/server/index.py b/healthcareio/server/index.py
index 99f3709..533bb67 100644
--- a/healthcareio/server/index.py
+++ b/healthcareio/server/index.py
@@ -6,15 +6,16 @@ import json
app = Flask(__name__)
@app.route("/favicon.ico")
def _icon():
- return send_from_directory(os.path.join([app.root_path, 'static','img','logo.svg']),
+ return send_from_directory(os.path.join([app.root_path, 'static/img/logo.svg']),
'favicon.ico', mimetype='image/vnd.microsoft.icon')
@app.route("/")
def init():
e = SYS_ARGS['engine']
sections = {"remits":e.info['835'],"claims":e.info['837']}
- _args = {"sections":sections,"store":SYS_ARGS['config']['store']}
- print (SYS_ARGS['config']['store'])
- return render_template("setup.html",**_args)
+
+ _args = {"sections":sections,"store":SYS_ARGS['config']['store'],'args':{'batch':5}}
+
+ return render_template("index.html",**_args)
@app.route("/format//",methods=['POST'])
def _format(id,index):
@@ -73,13 +74,16 @@ def reload():
if __name__ == '__main__' :
PORT = int(SYS_ARGS['port']) if 'port' in SYS_ARGS else 5500
- DEBUG= int(SYS_ARGS['debug']) if 'debug' in SYS_ARGS else 0
+ DEBUG= int(SYS_ARGS['debug']) if 'debug' in SYS_ARGS else 1
SYS_ARGS['context'] = SYS_ARGS['context'] if 'context' in SYS_ARGS else ''
#
#
PATH= SYS_ARGS['config'] if 'config' in SYS_ARGS else os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
-
+ #
+ if os.path.exists(PATH) :
+ SYS_ARGS['config'] = json.loads(open(PATH).read())
e = healthcareio.analytics.engine(PATH)
# e.apply(type='claims',serialize=True)
SYS_ARGS['engine'] = e
+
app.run(host='0.0.0.0',port=PORT,debug=DEBUG,threaded=True)
\ No newline at end of file
diff --git a/healthcareio/server/proxy.py b/healthcareio/server/proxy.py
index 2ad95c6..f50fd34 100644
--- a/healthcareio/server/proxy.py
+++ b/healthcareio/server/proxy.py
@@ -37,14 +37,17 @@ class get :
@staticmethod
def processes(_args):
- _info = pd.DataFrame(smart.top.read(name='healthcare-io.py'))[['name','cpu','mem']]
+ APP_NAME ='healthcare-io'
+ _info = smart.top.read(name=APP_NAME) #pd.DataFrame(smart.top.read(name='healthcare-io'))[['name','cpu','mem']]
+
if _info.shape[0] == 0 :
- _info = pd.DataFrame({"name":["healthcare-io.py"],"cpu":[0],"mem":[0]})
+ _info = pd.DataFrame({"name":[APP_NAME],"cpu":[0],"mem":[0]})
# _info = pd.DataFrame(_info.groupby(['name']).sum())
# _info['name'] = ['healthcare-io.py']
m = {'cpu':'CPU','mem':'RAM','name':'name'}
- _info.columns = [m[name] for name in _info.columns.tolist()]
+ _info = _info.rename(columns=m)
+ # _info.columns = [m[name] for name in _info.columns.tolist() if name in m]
_info.index = np.arange(_info.shape[0])
charts = []
@@ -56,23 +59,20 @@ class get :
{"data":df, "chart":{"type":"radial","axis":{"x":label,"y":"name"}}}
)['apex']
)
- #
- # This will update the counts for the processes, upon subsequent requests so as to show the change
- #
- N = 0
- lprocs = []
- for proc in get.PROCS :
- if proc.is_alive() :
- lprocs.append(proc)
- N = len(lprocs)
- get.PROCS = lprocs
- return {"process":{"chart":charts,"counts":N}}
+
+ return {"process":{"chart":charts,"counts":_info.shape[0]}}
@staticmethod
def files (_args):
- _info = smart.folder.read(path='/data')
+ folder = _args['args']['folder']
+ _info = smart.folder.read(path=folder)
+
N = _info.files.tolist()[0]
- if 'mongo' in _args['store']['type'] :
- store_args = dict(_args['store'].copy(),**{"type":"mongo.MongoReader"})
+ store_args = _args['store'].copy()
+ store_args['context'] = 'read'
+
+ # if 'mongo' in _args['store']['type'] :
+ if _args['store']['provider'] in ['mongo', 'mongodb']:
+ # store_args = dict(_args['store'].copy(),**{"type":"mongo.MongoReader"})
# reader = transport.factory.instance(**_args)
pipeline = [{"$group":{"_id":"$name","count":{"$sum":{"$cond":[{"$eq":["$completed",True]},1,0]}} }},{"$group":{"_id":None,"count":{"$sum":"$count"}}},{"$project":{"_id":0,"status":"completed","count":1}}]
@@ -83,12 +83,15 @@ class get :
else:
- store_args = dict(_args['store'].copy(),**{"type":"disk.SQLiteReader"})
- store_args['args']['table'] = 'logs'
+ # store_args = dict(_args['store'].copy(),**{"type":"disk.SQLiteReader"})
+
+ # store_args['args']['table'] = 'logs'
+ store_args['table'] = 'logs'
query= {"sql":"select count(distinct json_extract(data,'$.name')) as count, 'completed' status from logs where json_extract(data,'$.completed') = true"}
_query={"sql":"select json_extract(data,'$.parse') as type,count(distinct json_extract(data,'$.name')) as count from logs group by type"} #-- distribution claim/remits
reader = transport.factory.instance(**store_args)
_info = pd.DataFrame(reader.read(**query))
+
if not _info.shape[0] :
_info = pd.DataFrame({"status":["completed"],"count":[0]})
_info['count'] = np.round( (_info['count'] * 100 )/N,2)
@@ -97,11 +100,6 @@ class get :
#
# Let us classify the files now i.e claims / remits
#
-
-
- # pipeline = [{"$group":{"_id":"$parse","claims":{"$addToSet":"$name"}}},{"$project":{"_id":0,"type":"$_id","count":{"$size":"$claims"}}}]
- # _args = {"aggregate":"logs","cursor":{},"allowDiskUse":True,"pipeline":pipeline}
- # r = pd.DataFrame(reader.read(mongo=_args))
r = pd.DataFrame(reader.read(**_query)) #-- distribution claims/remits
r = Apex.apply({"chart":{"type":"donut","axis":{"x":"count","y":"type"}},"data":r})['apex']
r['chart']['height'] = '100%'
diff --git a/healthcareio/server/templates/setup.html b/healthcareio/server/templates/setup.html
index df2fdb8..1cc70cf 100644
--- a/healthcareio/server/templates/setup.html
+++ b/healthcareio/server/templates/setup.html
@@ -63,10 +63,10 @@
//
// We should insure the listeners are enabled
if(monitor.listen.handler == null){
- monitor.listen.handler = setInterval(
+ /*monitor.listen.handler = setInterval(
function(){
console.log('running ...')
- monitor.data()},5000)
+ monitor.data()},5000)*/
}
}else{
diff --git a/healthcareio/x12/__init__.py b/healthcareio/x12/__init__.py
index 3760e69..c46a7e3 100644
--- a/healthcareio/x12/__init__.py
+++ b/healthcareio/x12/__init__.py
@@ -24,6 +24,7 @@ import sys
from itertools import islice
from multiprocessing import Process
import transport
+from transport import providers
import jsonmerge
import copy
@@ -236,7 +237,54 @@ class Parser (Process):
_config[_id] = [_config[_id]]
config['parser'] = _config
return config
-
+ @staticmethod
+ def init(**_args):
+ """
+ This function allows to initialize the database that will store the claims if need be
+ :path configuration file
+ """
+ PATH = os.sep.join([os.environ['HOME'],'.healthcareio'])
+ filename = os.sep.join([PATH,'config.json'])
+
+ filename = _args['path'] if 'path' in _args else filename
+ info = None
+ if os.path.exists(filename):
+ #
+ # Loading the configuration file (JSON format)
+ file = open(filename)
+ info = json.loads(file.read())
+
+
+ OUTPUT_FOLDER = info['out-folder']
+ if 'output-folder' not in info and not os.path.exists(OUTPUT_FOLDER) :
+ os.mkdir(OUTPUT_FOLDER)
+ elif 'output-folder' in info and not os.path.exists(info['out-folder']) :
+ os.mkdir(info['out-folder'])
+ # if 'type' in info['store'] :
+ lwriter = None
+ IS_SQL = False
+ if'type' in info['store'] and info['store']['type'] == 'disk.SQLiteWriter' :
+ lwriter = transport.factory.instance(**info['store'])
+ IS_SQL = True
+ elif 'provider' in info['store'] and info['store']['provider'] == 'sqlite' :
+ lwriter = transport.instance(**info['store']) ;
+ IS_SQL = [providers.SQLITE,providers.POSTGRESQL,providers.NETEZZA,providers.MYSQL,providers.MARIADB]
+
+ if lwriter and IS_SQL:
+ for key in info['schema'] :
+ if key != 'logs' :
+ _id = 'claims' if key == '837' else 'remits'
+ else:
+ _id = key
+
+ if not lwriter.has(table=_id) :
+ lwriter.apply(info['schema'][key]['create'])
+
+ # [lwriter.apply( info['schema'][key]['create']) for key in info['schema'] if not lwriter.has(table=key)]
+ lwriter.close()
+
+ return info
+
def __init__(self,path):
"""
:path path of the configuration file (it can be absolute)
diff --git a/setup.py b/setup.py
index 05a9596..e4be345 100644
--- a/setup.py
+++ b/setup.py
@@ -8,7 +8,7 @@ import sys
def read(fname):
return open(os.path.join(os.path.dirname(__file__), fname)).read()
args = {
- "name":"healthcareio","version":"1.6.4.6",
+ "name":"healthcareio","version":"1.6.4.8",
"author":"Vanderbilt University Medical Center",
"author_email":"steve.l.nyemba@vumc.org",
"include_package_data":True,