From 3b5c9d15035cda5b2fa723e6731a98525e878995 Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Fri, 28 Jul 2023 09:09:18 -0500 Subject: [PATCH] bugfix: reformatting factory singleton and adding python queue as qlistener --- transport/__init__.py | 148 ++++++++++++++++++++++++++++++++++------- transport/mongo.py | 5 +- transport/providers.py | 63 ++++++++++++++++++ transport/qlistener.py | 42 ++++++++++++ transport/sql.py | 13 ++-- 5 files changed, 242 insertions(+), 29 deletions(-) create mode 100644 transport/providers.py create mode 100644 transport/qlistener.py diff --git a/transport/__init__.py b/transport/__init__.py index 57c3f22..8a45800 100644 --- a/transport/__init__.py +++ b/transport/__init__.py @@ -38,6 +38,7 @@ if sys.version_info[0] > 2 : from transport import sql as sql from transport import etl as etl from transport.version import __version__ + from transport import providers else: from common import Reader, Writer,Console #, factory import disk @@ -48,37 +49,39 @@ else: import sql import etl from version import __version__ + import providers import psycopg2 as pg import mysql.connector as my from google.cloud import bigquery as bq import nzpy as nz #--- netezza drivers import os -class providers : - POSTGRESQL = 'postgresql' - MONGODB = 'mongodb' +# class providers : +# POSTGRESQL = 'postgresql' +# MONGODB = 'mongodb' - BIGQUERY ='bigquery' - FILE = 'file' - ETL = 'etl' - SQLITE = 'sqlite' - SQLITE3= 'sqlite' - REDSHIFT = 'redshift' - NETEZZA = 'netezza' - MYSQL = 'mysql' - RABBITMQ = 'rabbitmq' - MARIADB = 'mariadb' - COUCHDB = 'couch' - CONSOLE = 'console' - ETL = 'etl' - # - # synonyms of the above - BQ = BIGQUERY - MONGO = MONGODB - FERRETDB= MONGODB - PG = POSTGRESQL - PSQL = POSTGRESQL - PGSQL = POSTGRESQL +# BIGQUERY ='bigquery' +# FILE = 'file' +# ETL = 'etl' +# SQLITE = 'sqlite' +# SQLITE3= 'sqlite' +# REDSHIFT = 'redshift' +# NETEZZA = 'netezza' +# MYSQL = 'mysql' +# RABBITMQ = 'rabbitmq' +# MARIADB = 'mariadb' +# COUCHDB = 'couch' +# CONSOLE = 'console' +# ETL = 'etl' +# # +# # synonyms of the above +# BQ = BIGQUERY +# MONGO = MONGODB +# FERRETDB= MONGODB +# PG = POSTGRESQL +# PSQL = POSTGRESQL +# PGSQL = POSTGRESQL +# import providers class IEncoder (json.JSONEncoder): def default (self,object): @@ -156,6 +159,103 @@ class factory : import time def instance(**_args): """ + creating an instance given the provider, we should have an idea of :class, :driver + :provider + :read|write = {connection to the database} + """ + _provider = _args['provider'] + _group = None + + for _id in providers.CATEGORIES : + if _provider in providers.CATEGORIES[_id] : + _group = _id + break + if _group : + _classPointer = _getClassInstance(_group,**_args) + # + # Let us reformat the arguments + if 'read' in _args or 'write' in _args : + _args = _args['read'] if 'read' in _args else _args['write'] + _args['provider'] = _provider + if _group == 'sql' : + _info = _get_alchemyEngine(**_args) + + _args = dict(_args,**_info) + _args['driver'] = providers.DRIVERS[_provider] + + else: + if _provider in providers.DEFAULT : + _default = providers.DEFAULT[_provider] + _defkeys = list(set(_default.keys()) - set(_args.keys())) + if _defkeys : + for key in _defkeys : + _args[key] = _default[key] + pass + # + # get default values from + + return _classPointer(**_args) + # + # Let us determine the category of the provider that has been given +def _get_alchemyEngine(**_args): + """ + This function returns the SQLAlchemy engine associated with parameters, This is only applicable for SQL _items + :_args arguments passed to the factory {provider and other} + """ + #@TODO: Enable authentication files (private_key) + _username = _args['username'] if 'username' in _args else '' + _password = _args['password'] if 'password' in _args else '' + _account = _args['account'] if 'account' in _args else '' + _database = _args['database'] + _provider = _args['provider'] + if _username != '': + _account = _username + ':'+_password+'@' + _host = _args['host'] if 'host' in _args else '' + _port = _args['port'] if 'port' in _args else '' + if _provider in providers.DEFAULT : + _default = providers.DEFAULT[_provider] + _host = _host if _host != '' else (_default['host'] if 'host' in _default else '') + _port = _port if _port != '' else (_default['port'] if 'port' in _default else '') + if _port == '': + _port = providers.DEFAULT['port'] if 'port' in providers.DEFAULT else '' + # + + if _host != '' and _port != '' : + _fhost = _host+":"+str(_port) #--formatted hostname + else: + _fhost = _host + # Let us update the parameters we have thus far + # + + + uri = ''.join([_provider,"://",_account,_fhost,'/',_database]) + + _engine = sqlalchemy.create_engine (uri,future=True) + _out = {'sqlalchemy':_engine} + _pargs = {'host':_host,'port':_port,'username':_username,'password':_password} + for key in _pargs : + if _pargs[key] != '' : + _out[key] = _pargs[key] + return _out +def _getClassInstance(_group,**_args): + """ + This function returns the class instance we are attempting to instanciate + :_group items in providers.CATEGORIES.keys() + :_args arguments passed to the factory class + """ + if 'read' in _args or 'write' in _args : + _context = 'read' if 'read' in _args else _args['write'] + _info = _args[_context] + else: + _context = _args['context'] if 'context' in _args else 'read' + _class = providers.READ[_group] if _context == 'read' else providers.WRITE[_group] + if type(_class) == dict and _args['provider'] in _class: + _class = _class[_args['provider']] + + return _class + +def __instance(**_args): + """ @param provider {file,sqlite,postgresql,redshift,bigquery,netezza,mongo,couch ...} @param context read|write|rw diff --git a/transport/mongo.py b/transport/mongo.py index ae07bce..96c9075 100644 --- a/transport/mongo.py +++ b/transport/mongo.py @@ -127,6 +127,8 @@ class MongoReader(Mongo,Reader): return pd.DataFrame(r) else: + + if 'table' in args or 'collection' in args : if 'table' in args: _uid = args['table'] @@ -134,7 +136,8 @@ class MongoReader(Mongo,Reader): _uid = args['collection'] else: _uid = self.uid - + else: + _uid = self.uid collection = self.db[_uid] _filter = args['filter'] if 'filter' in args else {} _df = pd.DataFrame(collection.find(_filter)) diff --git a/transport/providers.py b/transport/providers.py new file mode 100644 index 0000000..cf5ed59 --- /dev/null +++ b/transport/providers.py @@ -0,0 +1,63 @@ +from transport.common import Reader, Writer,Console #, factory +from transport import disk +import sqlite3 +from transport import s3 as s3 +from transport import rabbitmq as queue +from transport import couch as couch +from transport import mongo as mongo +from transport import sql as sql +from transport import etl as etl +from transport import qlistener +import psycopg2 as pg +import mysql.connector as my +from google.cloud import bigquery as bq +import nzpy as nz #--- netezza drivers +import os + +from transport.version import __version__ + +POSTGRESQL = 'postgresql' +MONGODB = 'mongodb' +HTTP='http' +BIGQUERY ='bigquery' +FILE = 'file' +ETL = 'etl' +SQLITE = 'sqlite' +SQLITE3= 'sqlite' +REDSHIFT = 'redshift' +NETEZZA = 'netezza' +MYSQL = 'mysql' +RABBITMQ = 'rabbitmq' +MARIADB = 'mariadb' +COUCHDB = 'couch' +CONSOLE = 'console' +ETL = 'etl' +# +# synonyms of the above +BQ = BIGQUERY +MONGO = MONGODB +FERRETDB= MONGODB +PG = POSTGRESQL +PSQL = POSTGRESQL +PGSQL = POSTGRESQL +S3 = 's3' +AWS_S3 = 's3' +RABBIT = RABBITMQ + +QLISTENER = 'qlistener' + +DRIVERS = {PG:pg,REDSHIFT:pg,MYSQL:my,MARIADB:my,NETEZZA:nz,SQLITE:sqlite3} +CATEGORIES ={'sql':[NETEZZA,PG,MYSQL,REDSHIFT,SQLITE,MARIADB],'nosql':[MONGODB,COUCHDB],'cloud':[BIGQUERY],'file':[FILE], + 'queue':[RABBIT,QLISTENER],'memory':[CONSOLE,QLISTENER],'http':[HTTP]} + +READ = {'sql':sql.SQLReader,'nosql':{MONGODB:mongo.MongoReader,COUCHDB:couch.CouchReader},'cloud':sql.BigQueryReader, + 'file':disk.DiskReader,'queue':{RABBIT:queue.QueueReader,QLISTENER:qlistener.qListener} + } +WRITE = {'sql':sql.SQLWriter,'nosql':{MONGODB:mongo.MongoWriter,COUCHDB:couch.CouchWriter},'cloud':sql.BigQueryWriter, + 'file':disk.DiskWriter,'queue':{RABBIT:queue.QueueWriter,QLISTENER:qlistener.qListener} + } +DEFAULT = {PG:{'host':'localhost','port':5432},MYSQL:{'host':'localhost','port':3306}} +DEFAULT[MONGODB] = {'port':27017,'host':'localhost'} +DEFAULT[REDSHIFT] = DEFAULT[PG] +DEFAULT[MARIADB] = DEFAULT[MYSQL] +DEFAULT[NETEZZA] = {'port':5480} \ No newline at end of file diff --git a/transport/qlistener.py b/transport/qlistener.py new file mode 100644 index 0000000..495b731 --- /dev/null +++ b/transport/qlistener.py @@ -0,0 +1,42 @@ +import queue +from threading import Thread, Lock +from transport.common import Reader,Writer +import numpy as np +import pandas as pd + +class qListener : + lock = Lock() + _queue = {'default':queue.Queue()} + def __init__(self,**_args): + self._cache = {} + self._callback = _args['callback'] if 'callback' in _args else None + self._id = _args['id'] if 'id' in _args else 'default' + if self._id not in qListener._queue : + qListener._queue[self._id] = queue.Queue() + thread = Thread(target=self._forward) + thread.start() + def _forward(self): + _q = qListener._queue[self._id] + _data = _q.get() + _q.task_done() + self._callback(_data) + + def has(self,**_args) : + return self._callback is not None + + + def close(self): + """ + This will empty the queue and have it ready for another operation + """ + _q = qListener._queue[self._id] + with _q.mutex: + _q.queue.clear() + _q.all_tasks_done.notify_all() + + def write(self,_data,**_args): + _id = _args['id'] if 'id' in _args else self._id + + _q = qListener._queue[_id] + _q.put(_data) + _q.join() diff --git a/transport/sql.py b/transport/sql.py index 0d74fdc..ffabb54 100644 --- a/transport/sql.py +++ b/transport/sql.py @@ -29,6 +29,7 @@ from multiprocessing import Lock, RLock import pandas as pd import numpy as np import nzpy as nz #--- netezza drivers +import sqlite3 import copy import os @@ -58,8 +59,8 @@ class SQLRW : # _info['host'] = 'localhost' if 'host' not in _args else _args['host'] # _info['port'] = SQLWriter.REFERENCE[_provider]['port'] if 'port' not in _args else _args['port'] - _info['host'] = _args['host'] - _info['port'] = _args['port'] + _info['host'] = _args['host'] if 'host' in _args else '' + _info['port'] = _args['port'] if 'port' in _args else '' # if 'host' in _args : # _info['host'] = 'localhost' if 'host' not in _args else _args['host'] @@ -98,8 +99,12 @@ class SQLRW : if _handler == my : _info['database'] = _info['dbname'] del _info['dbname'] - - self.conn = _handler.connect(**_info) + if _handler == sqlite3 : + _info = {'path':_info['dbname'],'isolation_level':'IMMEDIATE'} + if _handler != sqlite3 : + self.conn = _handler.connect(**_info) + else: + self.conn = _handler.connect(_info['path'],isolation_level='IMMEDIATE') self._engine = _args['sqlalchemy'] if 'sqlalchemy' in _args else None def meta(self,**_args): schema = []