bug fixes with SQLITE and exports

This commit is contained in:
Steve Nyemba 2022-01-29 14:40:51 -06:00
parent ec588fd9f4
commit 427e09870c
6 changed files with 336 additions and 205 deletions

View File

@ -34,7 +34,7 @@ from datetime import datetime
import copy import copy
import requests import requests
import time import time
from healthcareio.x12 import Parser
PATH = os.sep.join([os.environ.get('HOME'),'.healthcareio','config.json']) PATH = os.sep.join([os.environ.get('HOME'),'.healthcareio','config.json'])
STORE_URI = 'http://healthcareio.the-phi.com/store/healthcareio' STORE_URI = 'http://healthcareio.the-phi.com/store/healthcareio'
@ -82,7 +82,7 @@ def meta(config) :
key = _cache['key'] key = _cache['key']
if 'map' in config[key]: if 'map' in config[key]:
config[key]['map'][field] = -100 config[key]['map'][field] = -100
add_index = {} #-- tells if we should add _index attribute or not
for prefix in config : for prefix in config :
# if 'map' in config[prefix] : # if 'map' in config[prefix] :
# label = list(set(['label','field']) & set(config[prefix].keys())) # label = list(set(['label','field']) & set(config[prefix].keys()))
@ -95,22 +95,35 @@ def meta(config) :
if '@ref' in config[prefix] : #and set(['label','field','map']) & set(config[prefix]['@ref'].keys()): if '@ref' in config[prefix] : #and set(['label','field','map']) & set(config[prefix]['@ref'].keys()):
for subprefix in config[prefix]['@ref'] : for subprefix in config[prefix]['@ref'] :
_entry = config[prefix]['@ref'][subprefix] _entry = config[prefix]['@ref'][subprefix]
_id = list(set(['label','field']) & set(config[prefix]['@ref'][subprefix].keys()))
_id = _id[0]
table = config[prefix]['@ref'][subprefix][_id]
add_index[table] = 1 if _id == 'label' else 0
if 'map' in _entry : if 'map' in _entry :
_info += get_field(_entry) _info += get_field(_entry)
else: else:
_info += list(_entry.keys()) _info += list(_entry.keys())
if set(['label','field','map']) & set(config[prefix].keys()): if set(['label','field','map']) & set(config[prefix].keys()):
_entry = config[prefix] _entry = config[prefix]
_id = list(set(['label','field']) & set(config[prefix].keys()))
if _id :
_id = _id[0]
table = config[prefix][_id]
add_index[table] = 1 if _id == 'label' else 0
if 'map' in _entry : if 'map' in _entry :
_info += get_field(_entry) _info += get_field(_entry)
# #
# We need to organize the fields appropriately here # We need to organize the fields appropriately here
# #
# print (_info)
fields = {"main":[],"rel":{}} fields = {"main":[],"rel":{}}
for row in _info : for row in _info :
if type(row) == str : if type(row) == str :
@ -118,16 +131,36 @@ def meta(config) :
fields['main'] = list(set(fields['main'])) fields['main'] = list(set(fields['main']))
fields['main'].sort() fields['main'].sort()
else : else :
_id = list(set(add_index.keys()) & set(row.keys()))
fields['rel'] = jsonmerge.merge(fields['rel'],row) if _id :
_id = _id[0]
if add_index[_id] == 1 :
row[_id]+= ['_index']
if _id not in fields['rel']:
fields['rel'][_id] = row[_id]
else:
fields['rel'][_id] += row[_id]
else:
print ( _entry)
_id = list(row.keys())[0]
fields['rel'][_id] = row[_id] if _id not in fields['rel'] else fields['rel'][_id] + row[_id]
return fields return fields
def create (**_args) : def create (**_args) :
skip = [] if 'skip' not in _args else _args['skip'] skip = [] if 'skip' not in _args else _args['skip']
fields = ([_args['key']] if 'key' in _args else []) + _args['fields'] fields = ([_args['key']] if 'key' in _args else []) + _args['fields']
fields = ['_id'] + list(set(fields)) fields = ['_id'] + list(set(fields))
fields.sort()
table = _args['table'] table = _args['table']
sql = ['CREATE TABLE :table ',"(",",\n".join(["\t".join(["\t",name,"VARCHAR(125)"]) for name in fields]),")"] sql = ['CREATE TABLE :table ',"(",",\n".join(["\t".join(["\t",name,"VARCHAR(125)"]) for name in fields]),")" ]
return " ".join(sql) return " ".join(sql)
def read (**_args) : def read (**_args) :
""" """
@ -141,32 +174,80 @@ def read (**_args) :
# @TODO: Find a way to write the data into a data-store # @TODO: Find a way to write the data into a data-store
# - use dbi interface with pandas or stream it in # - use dbi interface with pandas or stream it in
# #
def init (**_args) : def init_sql(**_args):
"""
This function expresses how we can generically read data stored in JSON format from a relational table
:param type 835,837
:param skip list of fields to be skipped
"""
#
# we should acknowledge global variables CONFIG,CUSTOM_CONFIG
TYPE = _args['type']
_config = CONFIG['parser'][TYPE][0]
TABLE_NAME = 'claims' if TYPE== '837' else 'remits'
if TYPE in CUSTOM_CONFIG :
_config = jsonmerge.merge(_config,CUSTOM_CONFIG[TYPE])
#
_info = meta(_config)
_projectSQLite = [] #-- sqlite projection
for field_name in _info['main'] :
_projectSQLite += ["json_extract(data,'$."+field_name+"') "+field_name]
_projectSQLite = ",".join(_projectSQLite) #-- Wrapping up SQLITE projection on main table
SQL = "SELECT DISTINCT claims.id _id,:fields FROM :table, json_each(data)".replace(":fields",_projectSQLite).replace(":table",TABLE_NAME)
r = [{"table":TABLE_NAME,"read":{"sql":SQL},"sql":create(table=TABLE_NAME,fields=_info['main'])}]
for table in _info['rel'] :
#
# NOTE: Adding _index to the fields
fields = _info['rel'][table] #+["_index"]
project = [TABLE_NAME+".id _id","json_extract(data,'$.claim_id') as claim_id"]
fn_prefix = "json_extract(x.value,'$." if '_index' not in _info['rel'][table] else "json_extract(i.value,'$."
for field_name in fields :
# project += ["json_extract(x.value,'$."+field_name+"') "+field_name]
project += [fn_prefix+field_name+"') "+field_name]
SQL = "SELECT DISTINCT :fields FROM "+TABLE_NAME+", json_each(data) x, json_each(x.value) i where x.key = ':table'"
SQL = SQL.replace(":table",table).replace(":fields",",".join(project))
r += [{"table":table,"read":{"sql":SQL},"sql":create(table=table,key='claim_id',fields=fields)}]
return r
def init(**_args):
if 'provider' in CONFIG['store'] and CONFIG['store']['provider'] == 'sqlite' :
return init_sql(**_args)
else:
return init_mongo(**_args)
def init_mongo (**_args) :
""" """
This function is intended to determine the number of tables to be created, as well as their type. This function is intended to determine the number of tables to be created, as well as their type.
:param type {835,837} :param type {835,837}
:param skip list of fields to be skipped :param skip list of fields to be skipped
""" """
TYPE = _args['type'] TYPE = _args['type']
SKIP = _args['skip'] if 'skip' in _args else [] # SKIP = _args['skip'] if 'skip' in _args else []
_config = CONFIG['parser'][TYPE][0] _config = CONFIG['parser'][TYPE][0]
if TYPE in CUSTOM_CONFIG : if TYPE in CUSTOM_CONFIG :
_config = jsonmerge.merge(_config,CUSTOM_CONFIG[TYPE]) _config = jsonmerge.merge(_config,CUSTOM_CONFIG[TYPE])
_info = meta(_config)
# #
# @TODO: implement fields to be skipped ... # @TODO: implement fields to be skipped ...
# #
TABLE_NAME = 'claims' if TYPE== '837' else 'remits' TABLE_NAME = 'claims' if TYPE== '837' else 'remits'
_info = meta(_config)
# project = dict.fromkeys(["_id","claim_id"]+_info['main'],1) # project = dict.fromkeys(["_id","claim_id"]+_info['main'],1)
project = {} project = {}
for field_name in _info['main'] : for field_name in _info['main'] :
_name = "".join(["$",field_name]) _name = "".join(["$",field_name])
project[field_name] = {"$ifNull":[_name,""]} project[field_name] = {"$ifNull":[_name,""]}
project["_id"] = 1 project["_id"] = 1
project = {"$project":project} project = {"$project":project}
r = [{"table":TABLE_NAME,"mongo":{"aggregate":TABLE_NAME,"pipeline":[project],"cursor":{},"allowDiskUse":True},"sql":create(table=TABLE_NAME,fields=_info['main'])}] # _projectSQLite = ",".join(_projectSQLite) #-- Wrapping up SQLITE projection on main table
r = [{"table":TABLE_NAME,"read":{"mongo":{"aggregate":TABLE_NAME,"pipeline":[project],"cursor":{},"allowDiskUse":True}},"sql":create(table=TABLE_NAME,fields=_info['main'])}]
for table in _info['rel'] : for table in _info['rel'] :
# #
# NOTE: Adding _index to the fields # NOTE: Adding _index to the fields
@ -180,7 +261,8 @@ def init (**_args) :
project["_id"] = 1 project["_id"] = 1
# pipeline = [{"$match":{"procedures":{"$nin":[None,'']}}},{"$unwind":"$"+table},{"$project":project}] # pipeline = [{"$match":{"procedures":{"$nin":[None,'']}}},{"$unwind":"$"+table},{"$project":project}]
pipeline = [{"$match": {table: {"$nin": [None, ""]}}},{"$unwind":"$"+table},{"$project":project}] pipeline = [{"$match": {table: {"$nin": [None, ""]}}},{"$unwind":"$"+table},{"$project":project}]
r += [{"table":table,"mongo":{"aggregate":TABLE_NAME,"cursor":{},"pipeline":pipeline,"allowDiskUse":True},"sql":create(table=table,key='claim_id',fields=fields)}] cmd = {"mongo":{"aggregate":TABLE_NAME,"cursor":{},"pipeline":pipeline,"allowDiskUse":True}}
r += [{"table":table,"read":cmd,"sql":create(table=table,key='claim_id',fields=fields)}]
return r return r
@ -207,9 +289,14 @@ class Factory:
global PATH global PATH
global CONFIG global CONFIG
global CUSTOM_CONFIG global CUSTOM_CONFIG
PATH = os.sep.join([os.environ.get('HOME'),'.healthcareio','config.json'])
if os.path.exists(PATH): PATH = _args['config']
CONFIG = json.loads((open(PATH)).read())
# if 'config' in _args :
# PATH = _args['config']
# else:
# PATH = os.sep.join([os.environ.get('HOME'),'.healthcareio','config.json'])
CONFIG = Parser.setup(PATH)
CUSTOM_PATH = os.sep.join([os.environ.get('HOME'),'.healthcareio','custom']) CUSTOM_PATH = os.sep.join([os.environ.get('HOME'),'.healthcareio','custom'])
if os.path.exists(CUSTOM_PATH) and os.listdir(CUSTOM_PATH) : if os.path.exists(CUSTOM_PATH) and os.listdir(CUSTOM_PATH) :
@ -217,31 +304,49 @@ class Factory:
CUSTOM_CONFIG = json.loads((open(CUSTOM_PATH)).read()) CUSTOM_CONFIG = json.loads((open(CUSTOM_PATH)).read())
_features = Factory.license(email=CONFIG['owner']) _features = Factory.license(email=CONFIG['owner'])
store = copy.deepcopy(CONFIG['store']) X12_TYPE = _args['type']
store['type']='mongo.MongoReader' store = copy.deepcopy(CONFIG['store']) #-- reading the original data
#
# Formatting accordingly just in case
if 'provider' in store :
if 'table' in store:
store['table'] = 'claims' if X12_TYPE == '837' else 'remits'
store['context'] ='read'
else:
pass
# store['type']='mongo.MongoReader'
wstore = _args['write_store'] #-- output data store wstore = _args['write_store'] #-- output data store
TYPE = _args['type']
PREFIX = 'clm_' if TYPE == '837' else 'era_' PREFIX = 'clm_' if X12_TYPE == '837' else 'era_'
SCHEMA = '' if 'schema' not in wstore['args'] else wstore['args']['schema'] # SCHEMA = '' if 'schema' not in wstore['args'] else wstore['args']['schema']
_config = CONFIG['parser'][TYPE][0] SCHEMA = '' if 'schema' not in wstore else wstore['schema']
if TYPE in CUSTOM_CONFIG : _config = CONFIG['parser'][X12_TYPE][0]
_config = jsonmerge.merge(_config,CUSTOM_CONFIG[TYPE]) if X12_TYPE in CUSTOM_CONFIG :
_config = jsonmerge.merge(_config,CUSTOM_CONFIG[X12_TYPE])
# _info = meta(_config) # _info = meta(_config)
job_args = init(type=TYPE) job_args = init(type=X12_TYPE) #-- getting the queries that will generate the objects we are interested in
# print (json.dumps(job_args)) # print (json.dumps(job_args))
_jobs = [] _jobs = []
for row in job_args: for row in job_args:
# _store = json.loads(json.dumps(wstore)) # _store = json.loads(json.dumps(wstore))
_store = copy.deepcopy(wstore) _store = copy.deepcopy(wstore)
_store['args']['table'] = row['table'] # _store['args']['table'] = row['table']
if 'type' in _store :
_store['args']['table'] = row['table']
else:
_store['table'] = row['table']
_pipe = [ _pipe = [
workers.CreateSQL(prefix=PREFIX,schema=SCHEMA,store=_store,sql=row['sql']), workers.CreateSQL(prefix=PREFIX,schema=SCHEMA,store=_store,sql=row['sql']),
workers.Reader(prefix=PREFIX,schema=SCHEMA,store=store,mongo=row['mongo'],max_rows=250000,features=_features,table=row['table']), # workers.Reader(prefix=PREFIX,schema=SCHEMA,store=store,mongo=row['mongo'],max_rows=250000,features=_features,table=row['table']),
workers.Reader(prefix=PREFIX,schema=SCHEMA,store=store,read=row['read'],max_rows=250000,features=_features,table=row['table']),
workers.Writer(prefix=PREFIX,schema=SCHEMA,store=_store) workers.Writer(prefix=PREFIX,schema=SCHEMA,store=_store)
] ]
_jobs += [workers.Subject(observers=_pipe,name=row['table'])] _jobs += [workers.Subject(observers=_pipe,name=row['table'])]
return _jobs return _jobs
# if __name__ == '__main__' : # if __name__ == '__main__' :

View File

@ -93,9 +93,6 @@ class Worker :
self._apply() self._apply()
except Exception as error: except Exception as error:
pass pass
# print ()
# print (error)
# print ()
finally: finally:
self.caller.notify() self.caller.notify()
@ -119,17 +116,18 @@ class CreateSQL(Worker) :
def __init__(self,**_args): def __init__(self,**_args):
super().__init__(**_args) super().__init__(**_args)
self._sql = _args['sql'] self._sql = _args['sql']
def init(self,**_args): def init(self,**_args):
super().init(**_args) super().init(**_args)
def _apply(self) : def _apply(self) :
sqltable = self._info['table'] if 'provider' in self._info else self._info['args']['table']
sqltable = self.tablename(self._info['args']['table']) sqltable = self.tablename(sqltable)
# log = {"context":self.name(),"args":{"table":self._info['args']['table'],"sql":self._sql}} # log = {"context":self.name(),"args":{"table":self._info['args']['table'],"sql":self._sql}}
log = {"context":self.name(),"args":{"table":sqltable,"sql":self._sql.replace(":table",sqltable)}} log = {"context":self.name(),"args":{"table":sqltable,"sql":self._sql.replace(":table",sqltable)}}
try: try:
writer = transport.factory.instance(**self._info) writer = transport.factory.instance(**self._info)
writer.apply(self._sql.replace(":table",sqltable)) writer.apply(self._sql.replace(":table",sqltable))
writer.close() writer.close()
@ -153,9 +151,13 @@ class Reader(Worker):
super().__init__(**_args) super().__init__(**_args)
self.pipeline = _args['mongo'] #-- pipeline in the context of mongodb NOT ETL # self.pipeline = _args['mongo'] #-- pipeline in the context of mongodb NOT ETL
# self.pipeline = _args['mongo'] if 'mongo' in _args else _args['sql']
self.pipeline = _args['read'] ;
self.MAX_ROWS = _args['max_rows'] self.MAX_ROWS = _args['max_rows']
self.table = _args['table'] self.table = _args['table'] #-- target table
# is_demo = 'features' not in _args or ('features' in _args and ('export_etl' not in _args['features'] or _args['features']['export_etl'] == 0)) # is_demo = 'features' not in _args or ('features' in _args and ('export_etl' not in _args['features'] or _args['features']['export_etl'] == 0))
# #
@ -174,26 +176,36 @@ class Reader(Worker):
def init(self,**_args): def init(self,**_args):
super().init(**_args) super().init(**_args)
self.rows = [] self.rows = []
def _apply(self): def _apply(self):
try: try:
self.reader = transport.factory.instance(**self._info) ; self.reader = transport.factory.instance(**self._info) ;
self.rows = self.reader.read(mongo=self.pipeline) # print (self.pipeline)
# self.rows = self.reader.read(mongo=self.pipeline)
self.rows = self.reader.read(**self.pipeline)
if type(self.rows) == pd.DataFrame :
self.rows = self.rows.to_dict(orient='records')
# if 'provider' in self._info and self._info['provider'] == 'sqlite' :
# self.rows = self.rows.apply(lambda row: json.loads(row.data),axis=1).tolist()
N = len(self.rows) / self.MAX_ROWS if len(self.rows) > self.MAX_ROWS else 1 N = len(self.rows) / self.MAX_ROWS if len(self.rows) > self.MAX_ROWS else 1
N = int(N) N = int(N)
# self.rows = rows # self.rows = rows
_log = {"context":self.name(),"args":self._info['args']['db'], "status":1,"info":{"rows":len(self.rows),"table":self.table,"segments":N}} _log = {"context":self.name(), "status":1,"info":{"rows":len(self.rows),"table":self.table,"segments":N}}
self.rows = np.array_split(self.rows,N) self.rows = np.array_split(self.rows,N)
# self.get = lambda : rows #np.array_split(rows,N) # self.get = lambda : rows #np.array_split(rows,N)
self.reader.close() self.reader.close()
self.status = 1 self.status = 1
# #
except Exception as e : except Exception as e :
log['status'] = 0 _log['status'] = 0
log['info'] = {"error":e.args[0]} _log['info'] = {"error":e.args[0]}
print (e)
self.log(**_log) self.log(**_log)
@ -206,6 +218,9 @@ class Reader(Worker):
class Writer(Worker): class Writer(Worker):
def __init__(self,**_args): def __init__(self,**_args):
super().__init__(**_args) super().__init__(**_args)
if 'provider' in self._info :
self._info['context'] = 'write'
def init(self,**_args): def init(self,**_args):
""" """
:param store output data-store needed for writing :param store output data-store needed for writing
@ -215,25 +230,35 @@ class Writer(Worker):
self._invalues = _args['invalues'] self._invalues = _args['invalues']
def _apply(self): def _apply(self):
# table = self._info['args']['table'] if 'table' in self._info['args'] else 'N/A' # table = self._info['args']['table'] if 'table' in self._info['args'] else 'N/A'
table = self.tablename(self._info['args']['table']) # table = self.tablename(self._info['args']['table'])
if 'provider' in self._info :
table = self.tablename(self._info['table'])
self._info['table'] = table
else:
table = self.tablename(self._info['args']['table'])
self._info['args']['table'] = table
self._info['args']['table'] = table;
writer = transport.factory.instance(**self._info) writer = transport.factory.instance(**self._info)
index = 0 index = 0
if self._invalues : if self._invalues :
for rows in self._invalues : for rows in self._invalues :
# print (['segment # ',index,len(rows)]) # print (['segment # ',index,len(rows)])
self.log(**{"context":self.name(),"segment":(index+1),"args":{"rows":len(rows),"table":table}})
if len(rows) : # self.log(**{"context":self.name(),"segment":(index+1),"args":{"rows":len(rows),"table":table}})
if len(rows) > 0:
# #
# @TODO: Upgrade to mongodb 4.0+ and remove the line below # @TODO: Upgrade to mongodb 4.0+ and remove the line below
# Upon upgrade use the operator "$toString" in export.init function # Upon upgrade use the operator "$toString" in export.init function
# #
rows = [dict(item,**{"_id":str(item["_id"])}) for item in rows] rows = [dict(item,**{"_id":str(item["_id"])}) for item in rows]
writer.write(rows) writer.write(rows)
index += 1 index += 1
# for _e in rows : # for _e in rows :

View File

@ -46,6 +46,7 @@ import time
from healthcareio import x12 from healthcareio import x12
from healthcareio.export import export from healthcareio.export import export
import smart import smart
import transport
from healthcareio.server import proxy from healthcareio.server import proxy
import pandas as pd import pandas as pd
@ -58,8 +59,19 @@ if not os.path.exists(PATH) :
import platform import platform
import sqlite3 as lite import sqlite3 as lite
# PATH = os.sep.join([os.environ['HOME'],'.edi-parser']) # PATH = os.sep.join([os.environ['HOME'],'.edi-parser'])
CONFIG_FILE = os.sep.join([PATH,'config.json']) if 'config' not in SYS_ARGS else SYS_ARGS['config']
HELP_MESSAGE = """ HELP_MESSAGE = """
cli: cli:
#
# Signup, allows parsing configuration to be downloaded
#
# Support for SQLite3
healthcare-io.py --signup steve@the-phi.com --store sqlite
#or support for mongodb
healthcare-io.py --signup steve@the-phi.com --store mongo
healthcare-io.py --<[signup|init]> <email> --store <sqlite|mongo> [--batch <value>] healthcare-io.py --<[signup|init]> <email> --store <sqlite|mongo> [--batch <value>]
healthcare-io.py --parse --folder <path> [--batch <value>] [--resume] healthcare-io.py --parse --folder <path> [--batch <value>] [--resume]
@ -100,7 +112,8 @@ def signup (**args) :
# store = {"type":"disk.DiskWriter","args":{"path":OUTPUT_FOLDER}} # store = {"type":"disk.DiskWriter","args":{"path":OUTPUT_FOLDER}}
# if 'store' in args : # if 'store' in args :
# store = args['store'] # store = args['store']
filename = (os.sep.join([PATH,'config.json'])) # filename = (os.sep.join([PATH,'config.json']))
filename = CONFIG_FILE
info = r.json() #{"parser":r.json(),"store":store} info = r.json() #{"parser":r.json(),"store":store}
info = dict({"owner":email},**info) info = dict({"owner":email},**info)
info['store']['args']['path'] =os.sep.join([OUTPUT_FOLDER,'healthcare-io.db3']) #-- sql info['store']['args']['path'] =os.sep.join([OUTPUT_FOLDER,'healthcare-io.db3']) #-- sql
@ -121,101 +134,48 @@ def log(**args):
pass pass
def init(): def init():
""" """
read all the configuration from the read all the configuration from disk.
Requirements for configuration file :
{out-folder,store,837,835 }
""" """
filename = os.sep.join([PATH,'config.json']) # filename = os.sep.join([PATH,'config.json'])
filename = CONFIG_FILE
info = None info = None
if os.path.exists(filename): if os.path.exists(filename):
#
# Loading the configuration file (JSON format)
file = open(filename) file = open(filename)
info = json.loads(file.read()) info = json.loads(file.read())
if not os.path.exists(info['out-folder']) :
os.mkdir(info['out-folder'])
if info['store']['type'] == 'disk.SQLiteWriter' and not os.path.exists(info['store']['args']['path']) :
conn = lite.connect(info['store']['args']['path'],isolation_level=None) if 'output-folder' not in info and not os.path.exists(OUTPUT_FOLDER) :
os.mkdir(OUTPUT_FOLDER)
elif 'output-folder' in info and not os.path.exists(info['out-folder']) :
os.mkdir(info['out-folder'])
# if 'type' in info['store'] :
lwriter = None
is_sqlite = False
if'type' in info['store'] and info['store']['type'] == 'disk.SQLiteWriter' and not os.path.exists(info['store']['args']['path']) :
lwriter = transport.factory.instance(**info['store'])
is_sqlite = True
elif 'provider' in info['store'] and info['store']['provider'] == 'sqlite' :
lwriter = transport.instance(**info['store']) ;
is_sqlite = True
if lwriter and is_sqlite:
for key in info['schema'] : for key in info['schema'] :
_sql = info['schema'][key]['create'] if key != 'logs' :
# r = conn.execute("select * from sqlite_master where name in ('claims','remits')") _id = 'claims' if key == '837' else 'remits'
conn.execute(_sql) else:
conn.commit() _id = key
conn.close()
if not lwriter.has(table=_id) :
lwriter.apply(info['schema'][key]['create'])
# [lwriter.apply( info['schema'][key]['create']) for key in info['schema'] if not lwriter.has(table=key)]
lwriter.close()
return info return info
#
# Global variables that load the configuration files
# def parse(**args):
# """
# This function will parse the content of a claim or remittance (x12 format) give the following parameters
# :filename absolute path of the file to be parsed
# :type claims|remits in x12 format
# """
# global INFO
# if not INFO :
# INFO = init()
# if args['type'] == 'claims' :
# CONFIG = INFO['parser']['837']
# elif args['type'] == 'remits' :
# CONFIG = INFO['parser']['835']
# else:
# CONFIG = None
# if CONFIG :
# # CONFIG = CONFIG[-1] if 'version' not in args and (args['version'] < len(CONFIG)) else CONFIG[0]
# CONFIG = CONFIG[int(args['version'])-1] if 'version' in SYS_ARGS and int(SYS_ARGS['version']) < len(CONFIG) else CONFIG[-1]
# SECTION = CONFIG['SECTION']
# os.environ['HEALTHCAREIO_SALT'] = INFO['owner']
# return get_content(args['filename'],CONFIG,SECTION)
# def resume (files,id,config):
# _args = config['store'].copy()
# if 'mongo' in config['store']['type'] :
# _args['type'] = 'mongo.MongoReader'
# reader = factory.instance(**_args)
# _files = []
# if 'resume' in config['analytics'] :
# _args = config['analytics']['resume'][id]
# _files = reader.read(**_args)
# _files = [item['name'] for item in _files if item['name'] != None]
# return list(set(files) - set(_files))
# return files
# pass
# def apply(files,store_info,logger_info=None):
# """
# :files list of files to be processed in this given thread/process
# :store_info information about data-store, for now disk isn't thread safe
# :logger_info information about where to store the logs
# """
# if not logger_info :
# logger = factory.instance(type='disk.DiskWriter',args={'path':os.sep.join([info['out-folder'],SYS_ARGS['parse']+'.log'])})
# else:
# logger = factory.instance(**logger_info)
# writer = factory.instance(**store_info)
# for filename in files :
# if filename.strip() == '':
# continue
# # content,logs = get_content(filename,CONFIG,CONFIG['SECTION'])
# #
# try:
# content,logs = parse(filename = filename,type=SYS_ARGS['parse'])
# if content :
# writer.write(content)
# if logs :
# [logger.write(dict(_row,**{"parse":SYS_ARGS['parse']})) for _row in logs]
# else:
# logger.write({"parse":SYS_ARGS['parse'],"name":filename,"completed":True,"rows":len(content)})
# except Exception as e:
# logger.write({"parse":SYS_ARGS['parse'],"filename":filename,"completed":False,"rows":-1,"msg":e.args[0]})
# # print ([filename,len(content)])
# #
# # @TODO: forward this data to the writer and log engine
#
def upgrade(**args): def upgrade(**args):
""" """
:email provide us with who you are :email provide us with who you are
@ -295,7 +255,7 @@ if __name__ == '__main__' :
# if the user has specified to resume, we should look into the logs and pull the files processed and those that haven't # if the user has specified to resume, we should look into the logs and pull the files processed and those that haven't
# #
if 'resume' in SYS_ARGS : if 'resume' in SYS_ARGS :
store_config = json.loads( (open(os.sep.join([PATH,'config.json']))).read() ) store_config = json.loads( (open(CONFIG_FILE)).read() )
files = proxy.get.resume(files,store_config ) files = proxy.get.resume(files,store_config )
# print (["Found ",len(files)," files unprocessed"]) # print (["Found ",len(files)," files unprocessed"])
# #
@ -314,7 +274,9 @@ if __name__ == '__main__' :
row = row.tolist() row = row.tolist()
# logger.write({"process":index,"parse":SYS_ARGS['parse'],"file_count":len(row)}) # logger.write({"process":index,"parse":SYS_ARGS['parse'],"file_count":len(row)})
# proc = Process(target=apply,args=(row,info['store'],_info,)) # proc = Process(target=apply,args=(row,info['store'],_info,))
parser = x12.Parser(os.sep.join([PATH,'config.json'])) # parser = x12.Parser(os.sep.join([PATH,'config.json']))
parser = x12.Parser(CONFIG_FILE)
parser.set.files(row) parser.set.files(row)
parser.start() parser.start()
procs.append(parser) procs.append(parser)
@ -335,11 +297,11 @@ if __name__ == '__main__' :
# PATH= SYS_ARGS['config'] if 'config' in SYS_ARGS else os.sep.join([os.environ['HOME'],'.healthcareio','config.json']) # PATH= SYS_ARGS['config'] if 'config' in SYS_ARGS else os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
if os.path.exists(os.sep.join([PATH,'config.json'])) : if os.path.exists(CONFIG_FILE) :
e = analytics.engine(os.sep.join([PATH,'config.json'])) #--@TODO: make the configuration file globally accessible e = analytics.engine(CONFIG_FILE) #--@TODO: make the configuration file globally accessible
e.apply(type='claims',serialize=True) e.apply(type='claims',serialize=True)
SYS_ARGS['engine'] = e SYS_ARGS['engine'] = e
SYS_ARGS['config'] = json.loads(open(os.sep.join([PATH,'config.json'])).read()) SYS_ARGS['config'] = json.loads(open(CONFIG_FILE ).read())
else: else:
SYS_ARGS['config'] = {"owner":None,"store":None} SYS_ARGS['config'] = {"owner":None,"store":None}
@ -355,8 +317,8 @@ if __name__ == '__main__' :
elif 'check-update' in SYS_ARGS : elif 'check-update' in SYS_ARGS :
_args = {"url":SYS_ARGS['url']} _args = {"url":SYS_ARGS['url']}
try: try:
if os.path.exists(os.sep.join([PATH,'config.json'])) : if os.path.exists(CONFIG_FILE) :
SYS_ARGS['config'] = json.loads((open(os.sep.join([PATH,'config.json']))).read()) SYS_ARGS['config'] = json.loads(open(CONFIG_FILE ).read())
else: else:
SYS_ARGS['config'] = {} SYS_ARGS['config'] = {}
if 'version' in SYS_ARGS['config'] : if 'version' in SYS_ARGS['config'] :
@ -379,18 +341,23 @@ if __name__ == '__main__' :
# #
# this function is designed to export the data to csv # this function is designed to export the data to csv
# #
path = SYS_ARGS['config'] path = SYS_ARGS['export-config']
TYPE = SYS_ARGS['export'] if 'export' in SYS_ARGS else '835'
if not os.path.exists(path) or TYPE not in ['835','837']: X12_TYPE = SYS_ARGS['export'] if 'export' in SYS_ARGS else '835'
if not os.path.exists(path) or X12_TYPE not in ['835','837']:
print (HELP_MESSAGE) print (HELP_MESSAGE)
else: else:
# #
# Let's run the export function ..., This will push files into a data-store of choice Redshift, PostgreSQL, MySQL ... # Let's run the export function ..., This will push files into a data-store of choice Redshift, PostgreSQL, MySQL ...
# #
_store = {"type":"sql.SQLWriter","args":json.loads( (open(path) ).read())}
pipes = export.Factory.instance(type=TYPE,write_store=_store) #"inspect":0,"cast":0}}) # _store = {"type":"sql.SQLWriter","args":json.loads( (open(path) ).read())}
_store = json.loads( (open(path) ).read())
pipes = export.Factory.instance(type=X12_TYPE,write_store=_store,config = CONFIG_FILE) #"inspect":0,"cast":0}})
# pipes[0].run() # pipes[0].run()
# print (pipes)
for thread in pipes: for thread in pipes:
@ -399,12 +366,7 @@ if __name__ == '__main__' :
thread.start() thread.start()
time.sleep(1) time.sleep(1)
thread.join() thread.join()
# print (Subject.cache)
# while pipes :
# pipes = [thread for thread in pipes if thread.is_alive()]
# time.sleep(1)
else: else:

View File

@ -4,7 +4,7 @@ import healthcareio.analytics
import os import os
import json import json
import time import time
import smart # import smart
import transport import transport
import pandas as pd import pandas as pd
import numpy as np import numpy as np

View File

@ -54,10 +54,10 @@ class Formatters :
""" """
This function is designed to split an x12 row and This function is designed to split an x12 row and
""" """
value = [] value = []
if row.startswith(prefix) is False: if row.startswith(prefix) is False:
for row_value in row.replace('~','').split(sep) : for row_value in row.replace('~','').split(sep) :
if '>' in row_value and not row_value.startswith('HC'): if '>' in row_value and not row_value.startswith('HC'):
@ -184,7 +184,7 @@ class Formatters :
else: else:
return value return value
def procedure(self,value): def procedure(self,value):
for xchar in [':','<','|','>'] : for xchar in [':','<','|','>'] :
@ -204,8 +204,10 @@ class Formatters :
_value = str(value) _value = str(value)
return _value return _value
def diagnosis(self,value): def diagnosis(self,value):
return [ {"code":item[2], "type":item[1]} for item in value if len(item) > 1] return [ {"code":item[2], "type":item[1]} for item in value if len(item) > 1]
def parse_loc(self,value):
if ':' in value :
return dict(zip(['place_of_service','claim_indicator','claim_frequency'],value.split(':')))
def pos(self,value): def pos(self,value):
""" """
formatting place of service information within a segment (REF) formatting place of service information within a segment (REF)
@ -218,6 +220,23 @@ class Formatters :
x = {"place_of_service":x[0],"indicator":x[1],"frequency":x[2]} if len(x) == 3 else {"place_of_service":x[0],"indicator":None,"frequency":None} x = {"place_of_service":x[0],"indicator":x[1],"frequency":x[2]} if len(x) == 3 else {"place_of_service":x[0],"indicator":None,"frequency":None}
return x return x
class Parser (Process): class Parser (Process):
@staticmethod
def setup (path):
# self.config = _config['parser']
config = json.loads(open(path).read())
_config = config['parser']
#
# The parser may need some editing provided, this allows ease of developement and using alternate configurations
#
if type(_config['837']) == str or type(_config['835']) == str :
for _id in ['837','835'] :
if type(_config[_id]) == str and os.path.exists(_config[_id]):
_config[_id] = json.loads(open(_config[_id]).read())
if type(_config[_id]) == dict :
_config[_id] = [_config[_id]]
config['parser'] = _config
return config
def __init__(self,path): def __init__(self,path):
""" """
:path path of the configuration file (it can be absolute) :path path of the configuration file (it can be absolute)
@ -227,9 +246,21 @@ class Parser (Process):
self.get = void() self.get = void()
self.get.value = self.get_map self.get.value = self.get_map
self.get.default_value = self.get_default_value self.get.default_value = self.get_default_value
_config = json.loads(open(path).read()) # _config = json.loads(open(path).read())
self._custom_config = self.get_custom(path) self._custom_config = self.get_custom(path)
# self.config = _config['parser']
# #
# # The parser may need some editing provided, this allows ease of developement and using alternate configurations
# #
# if type(self.config['837']) == str or type(self.config['835']) == str :
# for _id in ['837','835'] :
# if type(self.config[_id]) == str:
# self.config[_id] = json.loads(open(self.config[_id]).read())
# if type(self.config[_id]) == dict :
# self.config[_id] = [self.config[_id]]
_config = Parser.setup(path)
self.config = _config['parser'] self.config = _config['parser']
self.store = _config['store'] self.store = _config['store']
self.cache = {} self.cache = {}
self.files = [] self.files = []
@ -261,10 +292,10 @@ class Parser (Process):
def set_files(self,files): def set_files(self,files):
self.files = files self.files = files
def get_map(self,row,config,version=None): def get_map(self,row,config,version=None):
# label = config['label'] if 'label' in config else None # label = config['label'] if 'label' in config else None
handler = Formatters() handler = Formatters()
if 'map' not in config and hasattr(handler,config['apply']): if 'map' not in config and hasattr(handler,config['apply']):
pointer = getattr(handler,config['apply']) pointer = getattr(handler,config['apply'])
@ -274,10 +305,11 @@ class Parser (Process):
# #
# Pull the goto configuration that skips rows # Pull the goto configuration that skips rows
# #
omap = config['map'] if not version or version not in config else config[version] omap = config['map'] if not version or version not in config else config[version]
anchors = config['anchors'] if 'anchors' in config else [] anchors = config['anchors'] if 'anchors' in config else []
rewrite = config['rewrite'] if 'rewrite' in config else {} rewrite = config['rewrite'] if 'rewrite' in config else {}
if len(row) == 2 and row[0] == 'HI' :
row = ([row[0]] + row[1].split(':'))
if type(row[0]) == str: if type(row[0]) == str:
object_value = {} object_value = {}
for key in omap : for key in omap :
@ -290,8 +322,7 @@ class Parser (Process):
index = aindex + index index = aindex + index
if index < len(row) : if index < len(row) :
value = row[index] value = row[index]
if 'cast' in config and key in config['cast'] and value.strip() != '' : if 'cast' in config and key in config['cast'] and value.strip() != '' :
if config['cast'][key] in ['float','int']: if config['cast'][key] in ['float','int']:
try: try:
@ -329,29 +360,17 @@ class Parser (Process):
value = value[_key] value = value[_key]
else: else:
value = "" value = ""
value = {key:value} if key not in value else value value = {key:value} if key not in value else value
else: else:
if 'syn' in config and value in config['syn'] : if 'syn' in config and value in config['syn'] :
# value = config['syn'][value] # value = config['syn'][value]
pass pass
if type(value) == dict : if type(value) == dict :
object_value = jsonmerge.merge(object_value, value)
# object_value = dict(object_value, **value) else:
object_value = jsonmerge.merge(object_value, value)
else:
object_value[key] = value object_value[key] = value
else: else:
# #
# we are dealing with a complex object # we are dealing with a complex object
@ -361,14 +380,6 @@ class Parser (Process):
value = self.get.value(row_item,config,version) value = self.get.value(row_item,config,version)
object_value.append(value) object_value.append(value)
#
# We need to add the index of the object it matters in determining the claim types
#
# object_value.append( list(get_map(row_item,config,version)))
# object_value = {label:object_value}
return object_value return object_value
def set_cache(self,tmp,_info) : def set_cache(self,tmp,_info) :
""" """
@ -379,6 +390,7 @@ class Parser (Process):
value=_info['cache']['value'] value=_info['cache']['value']
field = _info['cache']['field'] field = _info['cache']['field']
if value in tmp : if value in tmp :
self.cache [key] = {field:tmp[value]} self.cache [key] = {field:tmp[value]}
pass pass
def get_cache(self,row) : def get_cache(self,row) :
@ -398,10 +410,7 @@ class Parser (Process):
value = {} value = {}
for row in content[:] : for row in content[:] :
row = util.split(row.replace('\n','').replace('~','')) row = util.split(row.replace('\n','').replace('~',''))
_info = util.get.config(self.config[_code][0],row) _info = util.get.config(self.config[_code][0],row)
if self._custom_config and _code in self._custom_config: if self._custom_config and _code in self._custom_config:
_cinfo = util.get.config(self._custom_config[_code],row) _cinfo = util.get.config(self._custom_config[_code],row)
@ -458,7 +467,10 @@ class Parser (Process):
name = _info['field'] name = _info['field']
# value[name] = tmp # value[name] = tmp
# value = jsonmerge.merge(value,{name:tmp}) # value = jsonmerge.merge(value,{name:tmp})
value = dict(value,**{name:tmp}) if name not in value :
value = dict(value,**{name:tmp})
else:
value[name] = dict(value[name],**tmp)
else: else:
value = dict(value,**tmp) value = dict(value,**tmp)
@ -486,12 +498,12 @@ class Parser (Process):
TOP_ROW = content[1].split('*') TOP_ROW = content[1].split('*')
SUBMITTED_DATE = util.parse.date(TOP_ROW[4]) SUBMITTED_DATE = util.parse.date(TOP_ROW[4])
CATEGORY= content[2].split('*')[1].strip() CATEGORY= content[2].split('*')[1].strip()
VERSION = content[1].split('*')[-1].replace('~','').replace('\n','') VERSION = content[1].split('*')[-1].replace('~','').replace('\n','')
SENDER_ID = TOP_ROW[2] SENDER_ID = TOP_ROW[2]
row = util.split(content[3]) row = util.split(content[3])
_info = util.get_config(self.config[_code][0],row) _info = util.get_config(self.config[_code][0],row)
@ -501,7 +513,8 @@ class Parser (Process):
value["submitted"] = SUBMITTED_DATE value["submitted"] = SUBMITTED_DATE
value['sender_id'] = SENDER_ID value['sender_id'] = SENDER_ID
value = dict(value,**self.apply(content,_code)) # value = dict(value,**self.apply(content,_code))
value = jsonmerge.merge(value,self.apply(content,_code))
# Let's parse this for default values # Let's parse this for default values
return value #jsonmerge.merge(value,self.apply(content,_code)) return value #jsonmerge.merge(value,self.apply(content,_code))
@ -555,6 +568,7 @@ class Parser (Process):
index = 0; index = 0;
_toprows = [] _toprows = []
_default = None _default = None
for row in file : for row in file :
row = row.replace('\r','') row = row.replace('\r','')
@ -665,25 +679,50 @@ class Parser (Process):
for filename in self.files : for filename in self.files :
content,logs,_code = self.read(filename) content,logs,_code = self.read(filename)
self.finish(content,logs,_code) self.finish(content,logs,_code)
def finish(self,content,logs,_code) : def finish(self,content,logs,_code) :
args = self.store args = self.store
_args = json.loads(json.dumps(self.store)) _args = json.loads(json.dumps(self.store))
if args['type'] == 'mongo.MongoWriter' : ISNEW_MONGO = 'provider' in args and args['provider'] in ['mongo', 'mongodb']
args['args']['doc'] = 'claims' if _code == '837' else 'remits' ISLEG_MONGO = ('type' in args and args['type'] == 'mongo.MongoWriter')
_args['args']['doc'] = 'logs' if ISLEG_MONGO or ISNEW_MONGO:
else: if ISLEG_MONGO:
args['args']['table'] = 'claims' if _code == '837' else 'remits' # Legacy specification ...
_args['args']['table'] = 'logs' args['args']['doc'] = 'claims' if _code == '837' else 'remits'
_args['args']['doc'] = 'logs'
if content : else:
writer = transport.factory.instance(**args) args['doc'] = 'claims' if _code == '837' else 'remits'
writer.write(content) _args['doc'] = 'logs'
writer.close()
if logs :
else:
if 'type' in args :
# Legacy specification ...
args['args']['table'] = 'claims' if _code == '837' else 'remits'
_args['args']['table'] = 'logs'
table = args['args']['table']
else:
args['table']= 'claims' if _code == '837' else 'remits'
_args['table'] = 'logs'
table = args['table']
writer = transport.factory.instance(**args)
IS_SQLITE = type(writer) == transport.disk.SQLiteWriter
if content:
if IS_SQLITE :
for row in content :
writer.apply("""insert into :table(data) values (':values')""".replace(":values",json.dumps(row)).replace(":table",table) )
else:
writer.write(content)
writer.close()
if logs :
logger = transport.factory.instance(**_args) logger = transport.factory.instance(**_args)
logger.write(logs) if IS_SQLITE:
for row in logs:
logger.apply("""insert into logs values (':values')""".replace(":values",json.dumps(row)))
else:
logger.write(logs)
logger.close() logger.close()
if self.emit.post : if self.emit.post :

View File

@ -8,7 +8,7 @@ import sys
def read(fname): def read(fname):
return open(os.path.join(os.path.dirname(__file__), fname)).read() return open(os.path.join(os.path.dirname(__file__), fname)).read()
args = { args = {
"name":"healthcareio","version":"1.6.4.4", "name":"healthcareio","version":"1.6.4.6",
"author":"Vanderbilt University Medical Center", "author":"Vanderbilt University Medical Center",
"author_email":"steve.l.nyemba@vumc.org", "author_email":"steve.l.nyemba@vumc.org",
"include_package_data":True, "include_package_data":True,