parser/healthcareio/export/export.py

382 lines
15 KiB
Python
Raw Normal View History

2021-01-12 22:08:41 +00:00
"""
This file implements exporting data from a mongodb database to another data-store in relational format (csv). Any other use case will have to be performed with mongodb native tools
target:
File/SQLite
PostgreSQL
MySQL
@TODO:
- Insure to support both schemas and table prefixes
Usage :
License:
Copyright 2019, The Phi Technology LLC
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""
import transport
import numpy as np
import os
import json
import jsonmerge
import sys
# from airflow import DAG
from datetime import timedelta
# import healthcareio.export.workers as workers
from healthcareio.export import workers
import platform
from datetime import datetime
import copy
import requests
import time
2022-01-29 20:40:51 +00:00
from healthcareio.x12 import Parser
2021-01-12 22:08:41 +00:00
PATH = os.sep.join([os.environ.get('HOME'),'.healthcareio','config.json'])
2021-01-18 16:24:45 +00:00
2021-01-12 22:08:41 +00:00
STORE_URI = 'http://healthcareio.the-phi.com/store/healthcareio'
#
# let us see if we have any custom configurations ...
2021-01-18 16:24:45 +00:00
2021-01-12 22:08:41 +00:00
PATH = os.sep.join([os.environ.get('HOME'),'.healthcareio','custom'])
2021-01-18 16:24:45 +00:00
CONFIG = {}
2021-01-12 22:08:41 +00:00
CUSTOM_CONFIG = {}
2021-01-18 17:33:08 +00:00
# if os.path.exists(PATH) and os.listdir(PATH) :
# CONFIG = json.loads((open(PATH)).read())
# PATH = os.sep.join([PATH,os.listdir(PATH)[0]])
# CUSTOM_CONFIG = json.loads((open(PATH)).read())
2021-01-12 22:08:41 +00:00
2021-01-18 16:24:45 +00:00
# _args = dict(CONFIG['store'],**{'type':'mongo.MongoReader'})
2021-01-12 22:08:41 +00:00
def get_field_names (_map,label):
fields = list(_map.keys())
return fields if not label else [{label:fields}]
def get_field (entry):
label = list(set(['label','field']) & set(entry.keys()))
label = None if not label else entry[label[0]]
if 'map' not in entry :
return None
_map = entry['map']
return get_field_names(_map,label)
pass
#
#-- Get the fields to export that will go in the the unwind ;
#
def meta(config) :
"""
This function will return the metadata associated with a given configuraiton 835 or 838
:params config configuration section
"""
_info = []
table_count = 1
2021-03-10 20:44:14 +00:00
cached = {}
for prefix in config :
if 'cache' in config[prefix] :
_cache = config[prefix]['cache']
field = _cache['field']
key = _cache['key']
if 'map' in config[key]:
config[key]['map'][field] = -100
2022-01-29 20:40:51 +00:00
add_index = {} #-- tells if we should add _index attribute or not
2021-01-12 22:08:41 +00:00
for prefix in config :
#
# Running through every element that we can parse (from the configuration)
# This allows us to create ER-Like structures from the data we have
#
2021-01-12 22:08:41 +00:00
if type(config[prefix]) != dict :
continue
2021-02-07 22:02:35 +00:00
if '@ref' in config[prefix] : #and set(['label','field','map']) & set(config[prefix]['@ref'].keys()):
2021-01-12 22:08:41 +00:00
for subprefix in config[prefix]['@ref'] :
2022-01-29 20:40:51 +00:00
2021-01-12 22:08:41 +00:00
_entry = config[prefix]['@ref'][subprefix]
2022-01-29 20:40:51 +00:00
_id = list(set(['label','field']) & set(config[prefix]['@ref'][subprefix].keys()))
if _id :
#
# In case a reference item has a parent field/label
_id = _id[0]
table = config[prefix]['@ref'][subprefix][_id]
add_index[table] = 1 if _id == 'label' else 0
2021-02-07 22:02:35 +00:00
if 'map' in _entry :
2022-01-29 20:40:51 +00:00
2021-02-07 22:02:35 +00:00
_info += get_field(_entry)
else:
_info += list(_entry.keys())
2021-02-08 20:47:49 +00:00
if set(['label','field','map']) & set(config[prefix].keys()):
2021-01-12 22:08:41 +00:00
_entry = config[prefix]
2022-01-29 20:40:51 +00:00
_id = list(set(['label','field']) & set(config[prefix].keys()))
if _id :
_id = _id[0]
table = config[prefix][_id]
add_index[table] = 1 if _id == 'label' else 0
2021-01-12 22:08:41 +00:00
if 'map' in _entry :
_info += get_field(_entry)
2022-01-29 20:40:51 +00:00
2021-01-12 22:08:41 +00:00
2022-01-29 20:40:51 +00:00
2021-01-12 22:08:41 +00:00
#
# We need to organize the fields appropriately here
#
2022-01-29 20:40:51 +00:00
# print (_info)
2021-01-12 22:08:41 +00:00
fields = {"main":[],"rel":{}}
for row in _info :
if type(row) == str :
fields['main'] += [row]
2021-02-07 22:02:35 +00:00
fields['main'] = list(set(fields['main']))
fields['main'].sort()
2021-01-12 22:08:41 +00:00
else :
2022-01-29 20:40:51 +00:00
_id = list(set(add_index.keys()) & set(row.keys()))
if _id :
_id = _id[0]
if add_index[_id] == 1 :
row[_id]+= ['_index']
if _id not in fields['rel']:
fields['rel'][_id] = row[_id]
else:
fields['rel'][_id] += row[_id]
else:
print ( _entry)
_id = list(row.keys())[0]
fields['rel'][_id] = row[_id] if _id not in fields['rel'] else fields['rel'][_id] + row[_id]
2021-01-12 22:08:41 +00:00
2022-01-29 20:40:51 +00:00
2021-01-12 22:08:41 +00:00
return fields
def create (**_args) :
skip = [] if 'skip' not in _args else _args['skip']
fields = ([_args['key']] if 'key' in _args else []) + _args['fields']
fields = ['_id'] + list(set(fields))
2022-01-29 20:40:51 +00:00
fields.sort()
2021-01-12 22:08:41 +00:00
table = _args['table']
2022-01-29 20:40:51 +00:00
sql = ['CREATE TABLE :table ',"(",",\n".join(["\t".join(["\t",name,"VARCHAR(125)"]) for name in fields]),")" ]
2021-01-12 22:08:41 +00:00
return " ".join(sql)
def read (**_args) :
"""
This function will read rows with a set number of files and store them into a data-store
"""
files = _args['files']
fields = _args ['fields']
name = _args['id']
pipeline= {"find":name,"filter":{"name":{"$in":files}}}
#
# @TODO: Find a way to write the data into a data-store
# - use dbi interface with pandas or stream it in
#
2022-01-29 20:40:51 +00:00
def init_sql(**_args):
"""
This function expresses how we can generically read data stored in JSON format from a relational table
:param type 835,837
:param skip list of fields to be skipped
"""
#
# we should acknowledge global variables CONFIG,CUSTOM_CONFIG
TYPE = _args['type']
_config = CONFIG['parser'][TYPE][0]
TABLE_NAME = 'claims' if TYPE== '837' else 'remits'
if TYPE in CUSTOM_CONFIG :
_config = jsonmerge.merge(_config,CUSTOM_CONFIG[TYPE])
#
_info = meta(_config)
_projectSQLite = [] #-- sqlite projection
for field_name in _info['main'] :
_projectSQLite += ["json_extract(data,'$."+field_name+"') "+field_name]
_projectSQLite = ",".join(_projectSQLite) #-- Wrapping up SQLITE projection on main table
SQL = "SELECT DISTINCT claims.id _id,:fields FROM :table, json_each(data)".replace(":fields",_projectSQLite).replace(":table",TABLE_NAME)
r = [{"table":TABLE_NAME,"read":{"sql":SQL},"sql":create(table=TABLE_NAME,fields=_info['main'])}]
for table in _info['rel'] :
#
# NOTE: Adding _index to the fields
fields = _info['rel'][table] #+["_index"]
project = [TABLE_NAME+".id _id","json_extract(data,'$.claim_id') as claim_id"]
fn_prefix = "json_extract(x.value,'$." if '_index' not in _info['rel'][table] else "json_extract(i.value,'$."
for field_name in fields :
# project += ["json_extract(x.value,'$."+field_name+"') "+field_name]
project += [fn_prefix+field_name+"') "+field_name]
SQL = "SELECT DISTINCT :fields FROM "+TABLE_NAME+", json_each(data) x, json_each(x.value) i where x.key = ':table'"
SQL = SQL.replace(":table",table).replace(":fields",",".join(project))
r += [{"table":table,"read":{"sql":SQL},"sql":create(table=table,key='claim_id',fields=fields)}]
return r
def init(**_args):
if 'provider' in CONFIG['store'] and CONFIG['store']['provider'] == 'sqlite' :
return init_sql(**_args)
else:
return init_mongo(**_args)
def init_mongo (**_args) :
2021-01-12 22:08:41 +00:00
"""
This function is intended to determine the number of tables to be created, as well as their type.
:param type {835,837}
:param skip list of fields to be skipped
"""
TYPE = _args['type']
2022-01-29 20:40:51 +00:00
# SKIP = _args['skip'] if 'skip' in _args else []
2021-01-12 22:08:41 +00:00
_config = CONFIG['parser'][TYPE][0]
if TYPE in CUSTOM_CONFIG :
_config = jsonmerge.merge(_config,CUSTOM_CONFIG[TYPE])
2022-01-29 20:40:51 +00:00
_info = meta(_config)
2021-01-12 22:08:41 +00:00
#
# @TODO: implement fields to be skipped ...
#
TABLE_NAME = 'claims' if TYPE== '837' else 'remits'
2022-01-29 20:40:51 +00:00
2021-01-12 22:08:41 +00:00
# project = dict.fromkeys(["_id","claim_id"]+_info['main'],1)
project = {}
2022-01-29 20:40:51 +00:00
2021-01-12 22:08:41 +00:00
for field_name in _info['main'] :
_name = "".join(["$",field_name])
project[field_name] = {"$ifNull":[_name,""]}
2022-01-29 20:40:51 +00:00
project["_id"] = 1
2021-01-12 22:08:41 +00:00
project = {"$project":project}
2022-01-29 20:40:51 +00:00
# _projectSQLite = ",".join(_projectSQLite) #-- Wrapping up SQLITE projection on main table
r = [{"table":TABLE_NAME,"read":{"mongo":{"aggregate":TABLE_NAME,"pipeline":[project],"cursor":{},"allowDiskUse":True}},"sql":create(table=TABLE_NAME,fields=_info['main'])}]
2021-01-12 22:08:41 +00:00
for table in _info['rel'] :
#
# NOTE: Adding _index to the fields
fields = _info['rel'][table] +["_index"]
project = {"_id":1,"claim_id":1,"_index":1} #dict.fromkeys(["_id","claim_id"]+fields,[ ".".join([table,field_name]) for field_name in fields])
2021-01-12 22:08:41 +00:00
for field_name in fields :
# project[field_name] = "$"+".".join([table,field_name])
_name = "$"+".".join([table,field_name])
project[field_name] = {"$ifNull":[_name,""]} #{"$cond":[{"$eq":[_name,None]},"",_name]}
project["_id"] = 1
# pipeline = [{"$match":{"procedures":{"$nin":[None,'']}}},{"$unwind":"$"+table},{"$project":project}]
2021-02-16 17:55:18 +00:00
pipeline = [{"$match": {table: {"$nin": [None, ""]}}},{"$unwind":"$"+table},{"$project":project}]
2022-01-29 20:40:51 +00:00
cmd = {"mongo":{"aggregate":TABLE_NAME,"cursor":{},"pipeline":pipeline,"allowDiskUse":True}}
r += [{"table":table,"read":cmd,"sql":create(table=table,key='claim_id',fields=fields)}]
2021-01-12 22:08:41 +00:00
return r
class Factory:
@staticmethod
def license(**_args):
body = {}
body['email'] = _args['email']
body['host'] = platform.node()
body['date'] = {"month":datetime.now().month,"year":datetime.now().year,"day":datetime.now().day}
headers = {'uid': body['email'],'content-type':'application/json'}
uri = STORE_URI+'/init'
http = requests.session()
r = http.post(uri,headers=headers,data=body)
return r.json() if r.status_code == 200 else {}
@staticmethod
def instance(**_args):
"""
The creation process will only require a target store and a type (385,837)
:param type EDI type to be processed i.e 835 or 837
:param write_store target data-store (redshift, mariadb,mongodb ...)
"""
2021-01-18 16:24:45 +00:00
global PATH
global CONFIG
global CUSTOM_CONFIG
2022-01-29 20:40:51 +00:00
PATH = _args['config']
# if 'config' in _args :
# PATH = _args['config']
# else:
# PATH = os.sep.join([os.environ.get('HOME'),'.healthcareio','config.json'])
CONFIG = Parser.setup(PATH)
2021-01-18 16:24:45 +00:00
CUSTOM_PATH = os.sep.join([os.environ.get('HOME'),'.healthcareio','custom'])
if os.path.exists(CUSTOM_PATH) and os.listdir(CUSTOM_PATH) :
CUSTOM_PATH = os.sep.join([CUSTOM_PATH,os.listdir(CUSTOM_PATH)[0]])
CUSTOM_CONFIG = json.loads((open(CUSTOM_PATH)).read())
2021-01-12 22:08:41 +00:00
_features = Factory.license(email=CONFIG['owner'])
2022-01-29 20:40:51 +00:00
X12_TYPE = _args['type']
store = copy.deepcopy(CONFIG['store']) #-- reading the original data
#
# Formatting accordingly just in case
if 'provider' in store :
if 'table' in store:
store['table'] = 'claims' if X12_TYPE == '837' else 'remits'
store['context'] ='read'
else:
pass
# store['type']='mongo.MongoReader'
2021-01-12 22:08:41 +00:00
wstore = _args['write_store'] #-- output data store
2022-01-29 20:40:51 +00:00
PREFIX = 'clm_' if X12_TYPE == '837' else 'era_'
# SCHEMA = '' if 'schema' not in wstore['args'] else wstore['args']['schema']
SCHEMA = '' if 'schema' not in wstore else wstore['schema']
_config = CONFIG['parser'][X12_TYPE][0]
if X12_TYPE in CUSTOM_CONFIG :
_config = jsonmerge.merge(_config,CUSTOM_CONFIG[X12_TYPE])
2021-01-12 22:08:41 +00:00
# _info = meta(_config)
2022-01-29 20:40:51 +00:00
job_args = init(type=X12_TYPE) #-- getting the queries that will generate the objects we are interested in
2021-01-12 22:08:41 +00:00
# print (json.dumps(job_args))
_jobs = []
for row in job_args:
# _store = json.loads(json.dumps(wstore))
_store = copy.deepcopy(wstore)
2021-02-06 18:08:43 +00:00
2022-01-29 20:40:51 +00:00
# _store['args']['table'] = row['table']
if 'type' in _store :
_store['args']['table'] = row['table']
else:
_store['table'] = row['table']
2021-01-12 22:08:41 +00:00
_pipe = [
workers.CreateSQL(prefix=PREFIX,schema=SCHEMA,store=_store,sql=row['sql']),
2022-01-29 20:40:51 +00:00
# workers.Reader(prefix=PREFIX,schema=SCHEMA,store=store,mongo=row['mongo'],max_rows=250000,features=_features,table=row['table']),
workers.Reader(prefix=PREFIX,schema=SCHEMA,store=store,read=row['read'],max_rows=250000,features=_features,table=row['table']),
2021-01-12 22:08:41 +00:00
workers.Writer(prefix=PREFIX,schema=SCHEMA,store=_store)
]
_jobs += [workers.Subject(observers=_pipe,name=row['table'])]
2022-01-29 20:40:51 +00:00
2021-01-12 22:08:41 +00:00
return _jobs
# if __name__ == '__main__' :
# pass
# pipes = Factory.instance(type='835',write_store={"type":"sql.SQLWriter","args":{"provider":"postgresql","db":"sample",}}) #"inspect":0,"cast":0}})
# # pipes[0].run()
# for thread in pipes:
# thread.start()
# time.sleep(1)
# while pipes :
# pipes = [thread for thread in pipes if thread.is_alive()]
# time.sleep(10)
# print (Factory.license(email='steve@the-phi.com'))
#
# check account with basic inormation
#
# class Observerob:
# def __init__(**_args) :
# #-- Let us all flatten the table
# #
# TYPE = '835'
# _config = jsonmerge.merge(CONFIG['parser'][TYPE][0],CUSTOM_CONFIG[TYPE])
# # f = meta(CONFIG['parser'][TYPE][0])
# # _f = meta(CUSTOM_CONFIG[TYPE])
# f = meta(_config)
# # print (json.dumps( (f)))
# print (json.dumps(init(type='835')))
# # print (create(fields=f['rel']['adjudicated'],table='adjudicated',key='claim_id'))