283 lines
9.3 KiB
Python
283 lines
9.3 KiB
Python
"""
|
|
HealthcareIO - The Phi Technology LLC 2020
|
|
|
|
This file contains functionalities that implement elements of an ETL pipeline that will consist of various workers.
|
|
The pipeline is built around an observer design pattern.
|
|
|
|
@TODO: Integrate with airflow and other process monitoring tools
|
|
"""
|
|
import transport
|
|
import os
|
|
from multiprocessing import Process, Lock
|
|
import numpy as np
|
|
import json
|
|
import pandas as pd
|
|
|
|
class Subject (Process):
|
|
cache = pd.DataFrame()
|
|
lock = Lock()
|
|
@staticmethod
|
|
def log(_args):
|
|
Subject.lock.acquire()
|
|
try:
|
|
Subject.cache = Subject.cache.append(pd.DataFrame([_args]))
|
|
except Exception as e :
|
|
print (e)
|
|
finally:
|
|
Subject.lock.release()
|
|
def __init__(self,**_args):
|
|
super().__init__()
|
|
self.observers = _args['observers']
|
|
self.index = 0
|
|
self.name = _args['name']
|
|
self.table = self.observers[1].table
|
|
self.m = {}
|
|
|
|
|
|
pass
|
|
def run(self):
|
|
self.notify()
|
|
def notify(self):
|
|
if self.index < len(self.observers) :
|
|
|
|
observer = self.observers[self.index]
|
|
_observer = None if self.index == 0 else self.observers[self.index -1]
|
|
_invalues = None if not _observer else _observer.get()
|
|
if _observer is None :
|
|
self.m['table'] = self.name
|
|
|
|
|
|
|
|
|
|
observer.init(caller=self,invalues = _invalues)
|
|
self.index += 1
|
|
observer.execute()
|
|
print ({"table":self.table,"module":observer.name(),"status":observer.status})
|
|
# self.m[observer.name()] = observer.status
|
|
|
|
else:
|
|
pass
|
|
|
|
|
|
|
|
|
|
class Worker :
|
|
def __init__(self,**_args):
|
|
#PATH = os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
|
|
#CONFIG = json.loads((open(PATH)).read())
|
|
self._info = _args['store']
|
|
self.logs = []
|
|
self.schema = _args['schema']
|
|
self.prefix = _args['prefix']
|
|
self.status = 0
|
|
|
|
def name(self):
|
|
return self.__class__.__name__
|
|
def log (self,**_args):
|
|
"""
|
|
This function is designed to log to either the console or a data-store
|
|
"""
|
|
# print (_args)
|
|
pass
|
|
def init(self,**_args):
|
|
"""
|
|
Initializing a worker with arguments needed for it to perform it's task basic information needed are
|
|
:param caller caller to be notified
|
|
:param store data-store information i.e (pgsql,couchdb, mongo ...)
|
|
"""
|
|
self.caller = _args['caller']
|
|
#self._info = _args['store']
|
|
self._invalues = _args['invalues'] if 'invalues' in _args else None
|
|
def execute(self):
|
|
try:
|
|
self._apply()
|
|
except Exception as error:
|
|
pass
|
|
finally:
|
|
|
|
self.caller.notify()
|
|
def _apply(self):
|
|
pass
|
|
def get(self):
|
|
pass
|
|
def notify(self):
|
|
self.caller.notify()
|
|
def tablename(self,name) :
|
|
PREFIX_SEPARATOR = '_' if '_' not in self.prefix else ''
|
|
SCHEMA_SEPARATOR = '' if self.schema.strip() =='' else '.'
|
|
TABLE_NAME = PREFIX_SEPARATOR.join([self.prefix,name])
|
|
return SCHEMA_SEPARATOR.join([self.schema,TABLE_NAME])
|
|
|
|
|
|
class CreateSQL(Worker) :
|
|
"""
|
|
This class is intended to create an SQL Table given the
|
|
"""
|
|
def __init__(self,**_args):
|
|
super().__init__(**_args)
|
|
self._sql = _args['sql']
|
|
|
|
def init(self,**_args):
|
|
super().init(**_args)
|
|
|
|
def _apply(self) :
|
|
sqltable = self._info['table'] if 'provider' in self._info else self._info['args']['table']
|
|
sqltable = self.tablename(sqltable)
|
|
# log = {"context":self.name(),"args":{"table":self._info['args']['table'],"sql":self._sql}}
|
|
log = {"context":self.name(),"args":{"table":sqltable,"sql":self._sql.replace(":table",sqltable)}}
|
|
try:
|
|
|
|
|
|
writer = transport.factory.instance(**self._info)
|
|
writer.apply(self._sql.replace(":table",sqltable))
|
|
writer.close()
|
|
log['status'] = 1
|
|
self.status = 1
|
|
except Exception as e:
|
|
log['status'] = 0
|
|
log['info'] = {"error":e.args[0]}
|
|
|
|
# print (e)
|
|
finally:
|
|
self.log(**log)
|
|
|
|
class Reader(Worker):
|
|
"""
|
|
read from mongodb and and make the data available to a third party
|
|
:param pipeline mongodb command
|
|
:param max_rows maximum rows to be written in a single insert
|
|
"""
|
|
def __init__(self,**_args):
|
|
super().__init__(**_args)
|
|
|
|
|
|
# self.pipeline = _args['mongo'] #-- pipeline in the context of mongodb NOT ETL
|
|
|
|
# self.pipeline = _args['mongo'] if 'mongo' in _args else _args['sql']
|
|
self.pipeline = _args['read'] ;
|
|
self.MAX_ROWS = _args['max_rows']
|
|
self.table = _args['table'] #-- target table
|
|
|
|
|
|
# is_demo = 'features' not in _args or ('features' in _args and ('export_etl' not in _args['features'] or _args['features']['export_etl'] == 0))
|
|
#
|
|
# @TODO: Bundle the limits with the features so as to insure that it doesn't come across as a magic number
|
|
#
|
|
# LIMIT = -1
|
|
# if is_demo :
|
|
# LIMIT = 10000
|
|
# if set(['find','distinct']) & set(self.pipeline.keys()) :
|
|
# self.pipeline['limit'] = LIMIT
|
|
# elif 'aggregate' in self.pipeline :
|
|
|
|
# self.pipeline['pipeline'] = [{"$limit":LIMIT}] + self.pipeline['pipeline']
|
|
# self.log(**{"context":self.name(),"demo":is_demo,"args":{"limit":LIMIT}})
|
|
|
|
def init(self,**_args):
|
|
super().init(**_args)
|
|
self.rows = []
|
|
|
|
|
|
def _apply(self):
|
|
try:
|
|
|
|
self.reader = transport.factory.instance(**self._info) ;
|
|
|
|
# self.rows = self.reader.read(mongo=self.pipeline)
|
|
self.rows = self.reader.read(**self.pipeline)
|
|
|
|
if type(self.rows) == pd.DataFrame :
|
|
self.rows = self.rows.to_dict(orient='records')
|
|
# if 'provider' in self._info and self._info['provider'] == 'sqlite' :
|
|
# self.rows = self.rows.apply(lambda row: json.loads(row.data),axis=1).tolist()
|
|
|
|
N = len(self.rows) / self.MAX_ROWS if len(self.rows) > self.MAX_ROWS else 1
|
|
N = int(N)
|
|
# self.rows = rows
|
|
|
|
_log = {"context":self.name(), "status":1,"info":{"rows":len(self.rows),"table":self.table,"segments":N}}
|
|
self.rows = np.array_split(self.rows,N)
|
|
|
|
|
|
# self.get = lambda : rows #np.array_split(rows,N)
|
|
self.reader.close()
|
|
self.status = 1
|
|
#
|
|
except Exception as e :
|
|
_log['status'] = 0
|
|
_log['info'] = {"error":e.args[0]}
|
|
print (e)
|
|
|
|
|
|
self.log(**_log)
|
|
|
|
# @TODO: Call the caller and notify it that this here is done
|
|
def get(self):
|
|
return self.rows
|
|
|
|
|
|
class Writer(Worker):
|
|
def __init__(self,**_args):
|
|
super().__init__(**_args)
|
|
if 'provider' in self._info :
|
|
self._info['context'] = 'write'
|
|
|
|
def init(self,**_args):
|
|
"""
|
|
:param store output data-store needed for writing
|
|
:param invalues input values with to be written somewhere
|
|
"""
|
|
super().init(**_args)
|
|
|
|
self._invalues = _args['invalues']
|
|
|
|
|
|
def _apply(self):
|
|
|
|
# table = self._info['args']['table'] if 'table' in self._info['args'] else 'N/A'
|
|
# table = self.tablename(self._info['args']['table'])
|
|
if 'provider' in self._info :
|
|
table = self.tablename(self._info['table'])
|
|
self._info['table'] = table
|
|
else:
|
|
table = self.tablename(self._info['args']['table'])
|
|
self._info['args']['table'] = table
|
|
|
|
writer = transport.factory.instance(**self._info)
|
|
|
|
index = 0
|
|
|
|
if self._invalues :
|
|
for rows in self._invalues :
|
|
# print (['segment # ',index,len(rows)])
|
|
|
|
# self.log(**{"context":self.name(),"segment":(index+1),"args":{"rows":len(rows),"table":table}})
|
|
|
|
if len(rows) > 0:
|
|
#
|
|
# @TODO: Upgrade to mongodb 4.0+ and remove the line below
|
|
# Upon upgrade use the operator "$toString" in export.init function
|
|
#
|
|
rows = [dict(item,**{"_id":str(item["_id"])}) for item in rows]
|
|
|
|
writer.write(rows)
|
|
index += 1
|
|
# for _e in rows :
|
|
# writer.write(_e)
|
|
|
|
|
|
self.status = 1
|
|
else:
|
|
print ("No data was passed")
|
|
|
|
|
|
writer.close()
|
|
|
|
#_args = {"type":"mongo.MongoReader","args":{"db":"parserio","doc":"logs"}}
|
|
#reader = Reader()
|
|
#reader.init(store = _args,pipeline={"distinct":"claims","key":"name"})
|
|
#reader._apply()
|
|
#print (reader.get())
|
|
#for row in reader.get() :
|
|
# print (row)
|