diff --git a/healthcareio/__init__.py b/healthcareio/__init__.py
index 97f80a5..64f7099 100644
--- a/healthcareio/__init__.py
+++ b/healthcareio/__init__.py
@@ -14,50 +14,6 @@ Usage :
Embedded :
"""
-import healthcareio
-import os
-import requests
-import platform
-import sqlite3 as lite
-from transport import factory
-import json
-#import healthcareio.params as params
-PATH = os.sep.join([os.environ['HOME'],'.healthcareio'])
-OUTPUT_FOLDER = os.sep.join([os.environ['HOME'],'healthcare-io'])
-
-def register (**args) :
- """
- This function will reset/register a user i.e they will download the configuration
- :email user's email address
- :url url of the provider to register
- """
- URL = "https://healthcareio.the-phi.com" if 'url' not in args else args['url']
-
- args['out_folder'] = os.sep.join([args['path'],args['out_folder']])
- email = args['email']
- url = args['url'] if 'url' in args else URL
- folders = [PATH,OUTPUT_FOLDER]
- for path in folders :
- if not os.path.exists(path) :
- os.mkdir(path)
-
- #
- #
- headers = {"email":email,"client":platform.node()}
- http = requests.session()
- r = http.post(url,headers=headers)
-
- #
- # store = {"type":"disk.DiskWriter","args":{"path":OUTPUT_FOLDER}}
- # if 'store' in args :
- # store = args['store']
- filename = (os.sep.join([PATH,'config.json']))
- info = r.json() #{"parser":r.json(),"store":store}
- info = dict({"owner":email},**info)
- info['store']['args']['path'] =os.sep.join([OUTPUT_FOLDER,'healthcare-io.db3']) #-- sql
- info['out-folder'] = OUTPUT_FOLDER
-
- file = open( filename,'w')
- file.write( json.dumps(info))
- file.close()
+from healthcareio import analytics
+# from healthcareio import server
diff --git a/healthcareio/analytics.py b/healthcareio/analytics.py
index 7ae8b6d..d9f393a 100644
--- a/healthcareio/analytics.py
+++ b/healthcareio/analytics.py
@@ -175,7 +175,8 @@ class Apex :
@staticmethod
def scalar(item):
_df = item['data']
- name = str(_df.columns[0])
+ print (_df)
+ name = _df.columns.tolist()[0]
value = _df[name].values.round(2)[0]
html = '
'
if value > 999 and value < 1000000 :
@@ -240,9 +241,15 @@ class Apex :
if type(y) == list :
y = y[0]
axis['x'] = [axis['x']] if type(axis['x']) != list else axis['x']
+ if not set(axis['x']) & set(df.columns.tolist()) :
+ print (set(axis['x']) & set(df.columns.tolist()))
+ print (axis['x'])
+ print (df.columns)
+ # df.columns = axis['x']
series = []
_min=_max = 0
for x in axis['x'] :
+
series += [{"data": df[x].values.tolist()[:N],"name":x.upper().replace('_',' ')}]
_min = df[x].min() if df[x].min() < _min else _min
_max = df[x].max() if df[x].max() > _max else _max
@@ -317,6 +324,12 @@ class engine :
_config = json.loads(f.read())
self.store_config = _config['store']
self.info = _config['analytics']
+ _args = self.store_config
+ if self.store_config['type'] == 'mongo.MongoWriter' :
+ _args['type'] = 'mongo.MongoReader'
+ else:
+ _args['type'] = 'disk.SQLiteReader'
+ self.reader = transport.factory.instance(**_args)
def apply (self,**args) :
"""
@@ -332,19 +345,26 @@ class engine :
analytics = [analytics[index]]
_info = list(analytics) if 'filter' not in args else [item for item in analytics if args['filter'] == item['id']]
- conn = lite.connect(self.store_config['args']['path'],isolation_level=None)
- conn.create_aggregate("stdev",1,stdev)
-
+ # conn = lite.connect(self.store_config['args']['path'],isolation_level=None)
+ # conn.create_aggregate("stdev",1,stdev)
+ DB_TYPE = 'mongo' if (type(self.reader) == transport.mongo.MongoReader) else 'sql'
r = []
for row in _info :
for item in row['pipeline'] :
- item['data'] = pd.read_sql(item['sql'],conn)
+ # item['data'] = pd.read_sql(item['sql'],conn)
+ query = {DB_TYPE:item[DB_TYPE]}
+ item['data'] = self.reader.read(**item)
if 'serialize' in args :
- item['data'] = json.dumps(item['data'].to_dict(orient='record'))
+
+ item['data'] = json.dumps(item['data'].to_dict(orient='record')) if type(item['data']) == pd.DataFrame else item['data']
+ else:
+ item['data'] = (pd.DataFrame(item['data']))
+
+
# if 'info' in item:
# item['info'] = item['info'].replace(":rows",str(item["data"].shape[0]))
- conn.close()
+ # conn.close()
return _info
@@ -540,4 +560,5 @@ css = """
# print (p[2]['pipeline'][0]['data'])
# e.export (p[0])
# features = ['diagnosis.code']
-# split(folder = folder, features=features)
\ No newline at end of file
+# split(folder = folder, features=features)
+
diff --git a/healthcareio/healthcare-io.py b/healthcareio/healthcare-io.py
index 9c52009..53f14a1 100644
--- a/healthcareio/healthcare-io.py
+++ b/healthcareio/healthcare-io.py
@@ -32,10 +32,18 @@ Usage :
from healthcareio.params import SYS_ARGS
from transport import factory
import requests
+
+from healthcareio import analytics
+from healthcareio import server
from healthcareio.parser import get_content
import os
import json
import sys
+import numpy as np
+from multiprocessing import Process
+import time
+
+
PATH = os.sep.join([os.environ['HOME'],'.healthcareio'])
OUTPUT_FOLDER = os.sep.join([os.environ['HOME'],'healthcare-io'])
INFO = None
@@ -60,7 +68,8 @@ def register (**args) :
#
#
- headers = {"email":email,"client":platform.node()}
+ store = args['store'] if 'store' in args else 'sqlite'
+ headers = {"email":email,"client":platform.node(),"store":store,"db":args['db']}
http = requests.session()
r = http.post(url,headers=headers)
@@ -82,22 +91,6 @@ def register (**args) :
# Create the sqlite3 database to
-def analytics(**args):
- """
- This fucntion will only compute basic distributions of a given feature for a given claim
- @args
- @param x: vector of features to process
- @param apply: operation to be applied {dist}
- """
- if args['apply'] in ['dist','distribution'] :
- """
- This section of the code will return the distribution of a given space.
- It is intended to be applied on several claims/remits
- """
- x = pd.DataFrame(args['x'],columns=['x'])
- return x.groupby(['x']).size().to_frame().T.to_dict(orient='record')
-
-
def log(**args):
"""
This function will perform a log of anything provided to it
@@ -152,7 +145,39 @@ def parse(**args):
return get_content(args['filename'],CONFIG,SECTION)
+def apply(files,store_info,logger_info=None):
+ """
+ :files list of files to be processed in this given thread/process
+ :store_info information about data-store, for now disk isn't thread safe
+ :logger_info information about where to store the logs
+ """
+ if not logger_info :
+ logger = factory.instance(type='disk.DiskWriter',args={'path':os.sep.join([info['out-folder'],SYS_ARGS['parse']+'.log'])})
+ else:
+ logger = factory.instance(**logger_info)
+
+ writer = factory.instance(**store_info)
+ for filename in files :
+
+ if filename.strip() == '':
+ continue
+ # content,logs = get_content(filename,CONFIG,CONFIG['SECTION'])
+ #
+ try:
+ content,logs = parse(filename = filename,type=SYS_ARGS['parse'])
+ if content :
+ writer.write(content)
+ if logs :
+ [logger.write(dict(_row,**{"parse":SYS_ARGS['parse']})) for _row in logs]
+ else:
+ logger.write({"parse":SYS_ARGS['parse'],"name":filename,"completed":True,"rows":len(content)})
+ except Exception as e:
+ logger.write({"parse":SYS_ARGS['parse'],"filename":filename,"completed":False,"rows":-1,"msg":e.args[0]})
+ # print ([filename,len(content)])
+ #
+ # @TODO: forward this data to the writer and log engine
+ #
def upgrade(**args):
"""
:email provide us with who you are
@@ -175,8 +200,9 @@ if __name__ == '__main__' :
email = SYS_ARGS['signup'].strip() if 'signup' in SYS_ARGS else SYS_ARGS['init']
url = SYS_ARGS['url'] if 'url' in SYS_ARGS else 'https://healthcareio.the-phi.com'
-
- register(email=email,url=url)
+ store = SYS_ARGS['store'] if 'store' in SYS_ARGS else 'sqlite'
+ db='healthcareio' if 'db' not in SYS_ARGS else SYS_ARGS['db']
+ register(email=email,url=url,store=store,db=db)
# else:
# m = """
# usage:
@@ -218,46 +244,95 @@ if __name__ == '__main__' :
# CONFIG = CONFIG[ int(SYS_ARGS['version'])]
# else:
# CONFIG = CONFIG[-1]
+ logger = factory.instance(type='disk.DiskWriter',args={'path':os.sep.join([info['out-folder'],SYS_ARGS['parse']+'.log'])})
if info['store']['type'] == 'disk.DiskWriter' :
info['store']['args']['path'] += (os.sep + 'healthcare-io.json')
elif info['store']['type'] == 'disk.SQLiteWriter' :
# info['store']['args']['path'] += (os.sep + 'healthcare-io.db3')
pass
+
+
if info['store']['type'] == 'disk.SQLiteWriter' :
info['store']['args']['table'] = SYS_ARGS['parse'].strip().lower()
else:
+ #
+ # if we are working with no-sql we will put the logs in it (performance )?
info['store']['args']['doc'] = SYS_ARGS['parse'].strip().lower()
+ _info = json.loads(json.dumps(info['store']))
+ _info['args']['doc'] = 'logs'
+ logger = factory.instance(**_info)
+
writer = factory.instance(**info['store'])
- logger = factory.instance(type='disk.DiskWriter',args={'path':os.sep.join([info['out-folder'],SYS_ARGS['parse']+'.log'])})
+
+ #
+ # we need to have batches ready for this in order to run some of these queries in parallel
+ # @TODO: Make sure it is with a persistence storage (not disk .. not thread/process safe yet)
+ # - Make sure we can leverage this on n-cores later on, for now the assumption is a single core
+ #
+ BATCH_COUNT = 1 if 'batch' not in SYS_ARGS else int (SYS_ARGS['batch'])
+
#logger = factory.instance(type='mongo.MongoWriter',args={'db':'healthcareio','doc':SYS_ARGS['parse']+'_logs'})
# schema = info['schema']
# for key in schema :
# sql = schema[key]['create']
# writer.write(sql)
- for filename in files :
+ files = np.array_split(files,BATCH_COUNT)
+ procs = []
+ index = 0
+ for row in files :
+
+ row = row.tolist()
+ logger.write({"process":index,"parse":SYS_ARGS['parse'],"file_count":len(row)})
+ proc = Process(target=apply,args=(row,info['store'],_info,))
+ proc.start()
+ procs.append(proc)
+ index = index + 1
+ while len(procs) > 0 :
+ procs = [proc for proc in procs if proc.is_alive()]
+ time.sleep(2)
+ # for filename in files :
+
+ # if filename.strip() == '':
+ # continue
+ # # content,logs = get_content(filename,CONFIG,CONFIG['SECTION'])
+ # #
+ # try:
+ # content,logs = parse(filename = filename,type=SYS_ARGS['parse'])
+ # if content :
+ # writer.write(content)
+ # if logs :
+ # [logger.write(dict(_row,**{"parse":SYS_ARGS['parse']})) for _row in logs]
+ # else:
+ # logger.write({"parse":SYS_ARGS['parse'],"name":filename,"completed":True,"rows":len(content)})
+ # except Exception as e:
+ # logger.write({"parse":SYS_ARGS['parse'],"filename":filename,"completed":False,"rows":-1,"msg":e.args[0]})
+ # # print ([filename,len(content)])
+ # #
+ # # @TODO: forward this data to the writer and log engine
+ # #
+
- if filename.strip() == '':
- continue
- # content,logs = get_content(filename,CONFIG,CONFIG['SECTION'])
- #
- try:
- content,logs = parse(filename = filename,type=SYS_ARGS['parse'])
- if content :
- writer.write(content)
- if logs :
- [logger.write(_row) for _row in logs]
- else:
- logger.write({"name":filename,"completed":True,"rows":len(content)})
- except Exception as e:
- logger.write({"filename":filename,"completed":False,"rows":-1,"msg":e.args[0]})
- # print ([filename,len(content)])
- #
- # @TODO: forward this data to the writer and log engine
- #
pass
+ elif 'analytics' in SYS_ARGS :
+ PORT = int(SYS_ARGS['port']) if 'port' in SYS_ARGS else 5500
+ DEBUG= int(SYS_ARGS['debug']) if 'debug' in SYS_ARGS else 0
+ SYS_ARGS['context'] = SYS_ARGS['context'] if 'context' in SYS_ARGS else ''
+ #
+ #
+
+ # PATH= SYS_ARGS['config'] if 'config' in SYS_ARGS else os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
+
+ e = analytics.engine(os.sep.join([PATH,'config.json'])) #--@TODO: make the configuration file globally accessible
+ e.apply(type='claims',serialize=True)
+ SYS_ARGS['engine'] = e
+
+ pointer = lambda : server.app.run(host='0.0.0.0',port=PORT,debug=DEBUG,threaded=False)
+ pthread = Process(target=pointer,args=())
+ pthread.start()
+
elif 'export' in SYS_ARGS:
#
# this function is designed to export the data to csv
@@ -267,7 +342,17 @@ if __name__ == '__main__' :
if set([format]) not in ['xls','csv'] :
format = 'csv'
-
+ else:
+ msg = """
+ CLI Usage
+ healthcare-io.py --register --store
+ healthcare-io.py --parse claims --folder [--batch ]
+ healthcare-io.py --parse remits --folder [--batch ]
+ parameters :
+ --<[signup|init]> signup or get a configuration file from a parsing server
+ --store data store mongo or sqlite
+ """
+ print(msg)
pass
# """
# The program was called from the command line thus we are expecting
diff --git a/healthcareio/params.py b/healthcareio/params.py
index 517d356..a9d2bc9 100644
--- a/healthcareio/params.py
+++ b/healthcareio/params.py
@@ -8,10 +8,12 @@ if len(sys.argv) > 1:
value = None
if sys.argv[i].startswith('--'):
key = sys.argv[i][2:] #.replace('-','')
+
SYS_ARGS[key] = 1
+
if i + 1 < N:
value = sys.argv[i + 1] = sys.argv[i+1].strip()
- if key and value:
+ if key and value and not value.startswith('--'):
SYS_ARGS[key] = value
diff --git a/healthcareio/parser.py b/healthcareio/parser.py
index b499f76..f15ae14 100644
--- a/healthcareio/parser.py
+++ b/healthcareio/parser.py
@@ -91,6 +91,27 @@ def format_date(value) :
return "-".join([year,month,day])
def format_time(value):
return ":".join([value[:2],value[2:] ])[:5]
+def sv2_parse(value):
+ #
+ # @TODO: Sometimes there's a suffix (need to inventory all the variations)
+ #
+ if '>' in value or ':' in value:
+ xchar = '>' if '>' in value else ':'
+ _values = value.split(xchar)
+ modifier = {}
+
+ if len(_values) > 2 :
+
+ modifier= {"code":_values[2]}
+ if len(_values) > 3 :
+ modifier['type'] = _values[3]
+ _value = {"code":_values[1],"type":_values[0]}
+ if modifier :
+ _value['modifier'] = modifier
+
+ return _value
+ else:
+ return value
def format_proc(value):
for xchar in [':','<'] :
if xchar in value and len(value.split(xchar)) > 1 :
@@ -110,11 +131,11 @@ def format_pos(value):
x = {"code":x[0],"indicator":x[1],"frequency":x[2]} if len(x) == 3 else {"code":x[0],"indicator":None,"frequency":None}
return x
-def get_map(row,config,version):
+def get_map(row,config,version=None):
label = config['label'] if 'label' in config else None
- omap = config['map'] if version not in config else config[version]
+ omap = config['map'] if not version or version not in config else config[version]
anchors = config['anchors'] if 'anchors' in config else []
if type(row[0]) == str:
object_value = {}
@@ -136,6 +157,9 @@ def get_map(row,config,version):
if type(value) == dict :
for objkey in value :
+
+ if type(value[objkey]) == dict :
+ continue
if 'syn' in config and value[objkey] in config['syn'] :
value[objkey] = config['syn'][ value[objkey]]
value = {key:value} if key not in value else value