parser/healthcareio/export/workers.py

283 lines
9.3 KiB
Python
Raw Normal View History

2021-01-12 22:08:41 +00:00
"""
HealthcareIO - The Phi Technology LLC 2020
This file contains functionalities that implement elements of an ETL pipeline that will consist of various workers.
The pipeline is built around an observer design pattern.
@TODO: Integrate with airflow and other process monitoring tools
"""
import transport
import os
2021-02-07 22:02:35 +00:00
from multiprocessing import Process, Lock
2021-01-12 22:08:41 +00:00
import numpy as np
import json
2021-02-07 22:02:35 +00:00
import pandas as pd
2021-01-12 22:08:41 +00:00
class Subject (Process):
2021-02-07 22:02:35 +00:00
cache = pd.DataFrame()
lock = Lock()
@staticmethod
def log(_args):
Subject.lock.acquire()
try:
Subject.cache = Subject.cache.append(pd.DataFrame([_args]))
except Exception as e :
print (e)
finally:
Subject.lock.release()
2021-01-12 22:08:41 +00:00
def __init__(self,**_args):
super().__init__()
self.observers = _args['observers']
self.index = 0
self.name = _args['name']
2021-02-06 18:08:43 +00:00
self.table = self.observers[1].table
2021-02-07 22:02:35 +00:00
self.m = {}
2021-01-12 22:08:41 +00:00
pass
def run(self):
self.notify()
def notify(self):
if self.index < len(self.observers) :
2021-02-07 22:02:35 +00:00
2021-01-12 22:08:41 +00:00
observer = self.observers[self.index]
_observer = None if self.index == 0 else self.observers[self.index -1]
_invalues = None if not _observer else _observer.get()
2021-02-07 22:02:35 +00:00
if _observer is None :
self.m['table'] = self.name
2021-01-12 22:08:41 +00:00
observer.init(caller=self,invalues = _invalues)
self.index += 1
observer.execute()
2021-02-07 22:02:35 +00:00
print ({"table":self.table,"module":observer.name(),"status":observer.status})
# self.m[observer.name()] = observer.status
else:
pass
2021-01-12 22:08:41 +00:00
class Worker :
def __init__(self,**_args):
#PATH = os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
#CONFIG = json.loads((open(PATH)).read())
self._info = _args['store']
self.logs = []
self.schema = _args['schema']
self.prefix = _args['prefix']
2021-02-07 22:02:35 +00:00
self.status = 0
2021-01-12 22:08:41 +00:00
def name(self):
return self.__class__.__name__
def log (self,**_args):
"""
This function is designed to log to either the console or a data-store
"""
2021-02-07 22:02:35 +00:00
# print (_args)
2021-01-12 22:08:41 +00:00
pass
def init(self,**_args):
"""
Initializing a worker with arguments needed for it to perform it's task basic information needed are
:param caller caller to be notified
:param store data-store information i.e (pgsql,couchdb, mongo ...)
"""
self.caller = _args['caller']
#self._info = _args['store']
self._invalues = _args['invalues'] if 'invalues' in _args else None
def execute(self):
try:
self._apply()
2021-02-06 18:08:43 +00:00
except Exception as error:
2021-02-07 22:02:35 +00:00
pass
2021-01-12 22:08:41 +00:00
finally:
self.caller.notify()
def _apply(self):
pass
def get(self):
pass
def notify(self):
self.caller.notify()
def tablename(self,name) :
PREFIX_SEPARATOR = '_' if '_' not in self.prefix else ''
SCHEMA_SEPARATOR = '' if self.schema.strip() =='' else '.'
TABLE_NAME = PREFIX_SEPARATOR.join([self.prefix,name])
return SCHEMA_SEPARATOR.join([self.schema,TABLE_NAME])
class CreateSQL(Worker) :
"""
This class is intended to create an SQL Table given the
"""
def __init__(self,**_args):
super().__init__(**_args)
self._sql = _args['sql']
2022-01-29 20:40:51 +00:00
2021-01-12 22:08:41 +00:00
def init(self,**_args):
super().init(**_args)
def _apply(self) :
2022-01-29 20:40:51 +00:00
sqltable = self._info['table'] if 'provider' in self._info else self._info['args']['table']
sqltable = self.tablename(sqltable)
2021-01-12 22:08:41 +00:00
# log = {"context":self.name(),"args":{"table":self._info['args']['table'],"sql":self._sql}}
log = {"context":self.name(),"args":{"table":sqltable,"sql":self._sql.replace(":table",sqltable)}}
try:
2022-01-29 20:40:51 +00:00
2021-01-12 22:08:41 +00:00
writer = transport.factory.instance(**self._info)
writer.apply(self._sql.replace(":table",sqltable))
writer.close()
log['status'] = 1
2021-02-07 22:02:35 +00:00
self.status = 1
2021-01-12 22:08:41 +00:00
except Exception as e:
log['status'] = 0
log['info'] = {"error":e.args[0]}
2021-02-07 22:02:35 +00:00
# print (e)
2021-01-12 22:08:41 +00:00
finally:
self.log(**log)
class Reader(Worker):
"""
read from mongodb and and make the data available to a third party
:param pipeline mongodb command
:param max_rows maximum rows to be written in a single insert
"""
def __init__(self,**_args):
super().__init__(**_args)
2022-01-29 20:40:51 +00:00
# self.pipeline = _args['mongo'] #-- pipeline in the context of mongodb NOT ETL
# self.pipeline = _args['mongo'] if 'mongo' in _args else _args['sql']
self.pipeline = _args['read'] ;
2021-01-12 22:08:41 +00:00
self.MAX_ROWS = _args['max_rows']
2022-01-29 20:40:51 +00:00
self.table = _args['table'] #-- target table
2021-01-12 22:08:41 +00:00
# is_demo = 'features' not in _args or ('features' in _args and ('export_etl' not in _args['features'] or _args['features']['export_etl'] == 0))
#
# @TODO: Bundle the limits with the features so as to insure that it doesn't come across as a magic number
#
# LIMIT = -1
# if is_demo :
# LIMIT = 10000
# if set(['find','distinct']) & set(self.pipeline.keys()) :
# self.pipeline['limit'] = LIMIT
# elif 'aggregate' in self.pipeline :
# self.pipeline['pipeline'] = [{"$limit":LIMIT}] + self.pipeline['pipeline']
# self.log(**{"context":self.name(),"demo":is_demo,"args":{"limit":LIMIT}})
def init(self,**_args):
super().init(**_args)
self.rows = []
2022-01-29 20:40:51 +00:00
2021-01-12 22:08:41 +00:00
def _apply(self):
2021-02-07 22:02:35 +00:00
try:
2022-01-29 20:40:51 +00:00
2021-02-07 22:02:35 +00:00
self.reader = transport.factory.instance(**self._info) ;
2022-02-10 18:21:40 +00:00
2022-01-29 20:40:51 +00:00
# self.rows = self.reader.read(mongo=self.pipeline)
self.rows = self.reader.read(**self.pipeline)
if type(self.rows) == pd.DataFrame :
self.rows = self.rows.to_dict(orient='records')
# if 'provider' in self._info and self._info['provider'] == 'sqlite' :
# self.rows = self.rows.apply(lambda row: json.loads(row.data),axis=1).tolist()
2021-02-07 22:02:35 +00:00
N = len(self.rows) / self.MAX_ROWS if len(self.rows) > self.MAX_ROWS else 1
N = int(N)
# self.rows = rows
2022-02-10 18:21:04 +00:00
2022-01-29 20:40:51 +00:00
_log = {"context":self.name(), "status":1,"info":{"rows":len(self.rows),"table":self.table,"segments":N}}
2021-02-07 22:02:35 +00:00
self.rows = np.array_split(self.rows,N)
2022-01-29 20:40:51 +00:00
2021-02-07 22:02:35 +00:00
# self.get = lambda : rows #np.array_split(rows,N)
self.reader.close()
self.status = 1
#
except Exception as e :
2022-01-29 20:40:51 +00:00
_log['status'] = 0
_log['info'] = {"error":e.args[0]}
print (e)
2021-02-07 22:02:35 +00:00
2021-01-12 22:08:41 +00:00
self.log(**_log)
2021-02-07 22:02:35 +00:00
2021-01-12 22:08:41 +00:00
# @TODO: Call the caller and notify it that this here is done
def get(self):
return self.rows
class Writer(Worker):
def __init__(self,**_args):
super().__init__(**_args)
2022-01-29 20:40:51 +00:00
if 'provider' in self._info :
self._info['context'] = 'write'
2021-01-12 22:08:41 +00:00
def init(self,**_args):
"""
:param store output data-store needed for writing
:param invalues input values with to be written somewhere
"""
super().init(**_args)
self._invalues = _args['invalues']
2022-01-29 20:40:51 +00:00
2021-01-12 22:08:41 +00:00
def _apply(self):
# table = self._info['args']['table'] if 'table' in self._info['args'] else 'N/A'
2022-01-29 20:40:51 +00:00
# table = self.tablename(self._info['args']['table'])
if 'provider' in self._info :
table = self.tablename(self._info['table'])
self._info['table'] = table
else:
table = self.tablename(self._info['args']['table'])
self._info['args']['table'] = table
2021-01-12 22:08:41 +00:00
writer = transport.factory.instance(**self._info)
2022-01-29 20:40:51 +00:00
2021-01-12 22:08:41 +00:00
index = 0
if self._invalues :
for rows in self._invalues :
# print (['segment # ',index,len(rows)])
2022-01-29 20:40:51 +00:00
# self.log(**{"context":self.name(),"segment":(index+1),"args":{"rows":len(rows),"table":table}})
if len(rows) > 0:
#
# @TODO: Upgrade to mongodb 4.0+ and remove the line below
# Upon upgrade use the operator "$toString" in export.init function
#
rows = [dict(item,**{"_id":str(item["_id"])}) for item in rows]
2022-01-29 20:40:51 +00:00
writer.write(rows)
2021-01-12 22:08:41 +00:00
index += 1
# for _e in rows :
# writer.write(_e)
2021-02-07 22:02:35 +00:00
self.status = 1
2021-01-12 22:08:41 +00:00
else:
print ("No data was passed")
writer.close()
#_args = {"type":"mongo.MongoReader","args":{"db":"parserio","doc":"logs"}}
#reader = Reader()
#reader.init(store = _args,pipeline={"distinct":"claims","key":"name"})
#reader._apply()
#print (reader.get())
#for row in reader.get() :
# print (row)