bugfix: reformatting factory singleton and adding python queue as qlistener

This commit is contained in:
Steve Nyemba 2023-07-28 09:09:18 -05:00
parent f4c0ca80aa
commit 3b5c9d1503
5 changed files with 242 additions and 29 deletions

View File

@ -38,6 +38,7 @@ if sys.version_info[0] > 2 :
from transport import sql as sql from transport import sql as sql
from transport import etl as etl from transport import etl as etl
from transport.version import __version__ from transport.version import __version__
from transport import providers
else: else:
from common import Reader, Writer,Console #, factory from common import Reader, Writer,Console #, factory
import disk import disk
@ -48,37 +49,39 @@ else:
import sql import sql
import etl import etl
from version import __version__ from version import __version__
import providers
import psycopg2 as pg import psycopg2 as pg
import mysql.connector as my import mysql.connector as my
from google.cloud import bigquery as bq from google.cloud import bigquery as bq
import nzpy as nz #--- netezza drivers import nzpy as nz #--- netezza drivers
import os import os
class providers : # class providers :
POSTGRESQL = 'postgresql' # POSTGRESQL = 'postgresql'
MONGODB = 'mongodb' # MONGODB = 'mongodb'
BIGQUERY ='bigquery' # BIGQUERY ='bigquery'
FILE = 'file' # FILE = 'file'
ETL = 'etl' # ETL = 'etl'
SQLITE = 'sqlite' # SQLITE = 'sqlite'
SQLITE3= 'sqlite' # SQLITE3= 'sqlite'
REDSHIFT = 'redshift' # REDSHIFT = 'redshift'
NETEZZA = 'netezza' # NETEZZA = 'netezza'
MYSQL = 'mysql' # MYSQL = 'mysql'
RABBITMQ = 'rabbitmq' # RABBITMQ = 'rabbitmq'
MARIADB = 'mariadb' # MARIADB = 'mariadb'
COUCHDB = 'couch' # COUCHDB = 'couch'
CONSOLE = 'console' # CONSOLE = 'console'
ETL = 'etl' # ETL = 'etl'
# # #
# synonyms of the above # # synonyms of the above
BQ = BIGQUERY # BQ = BIGQUERY
MONGO = MONGODB # MONGO = MONGODB
FERRETDB= MONGODB # FERRETDB= MONGODB
PG = POSTGRESQL # PG = POSTGRESQL
PSQL = POSTGRESQL # PSQL = POSTGRESQL
PGSQL = POSTGRESQL # PGSQL = POSTGRESQL
# import providers
class IEncoder (json.JSONEncoder): class IEncoder (json.JSONEncoder):
def default (self,object): def default (self,object):
@ -156,6 +159,103 @@ class factory :
import time import time
def instance(**_args): def instance(**_args):
""" """
creating an instance given the provider, we should have an idea of :class, :driver
:provider
:read|write = {connection to the database}
"""
_provider = _args['provider']
_group = None
for _id in providers.CATEGORIES :
if _provider in providers.CATEGORIES[_id] :
_group = _id
break
if _group :
_classPointer = _getClassInstance(_group,**_args)
#
# Let us reformat the arguments
if 'read' in _args or 'write' in _args :
_args = _args['read'] if 'read' in _args else _args['write']
_args['provider'] = _provider
if _group == 'sql' :
_info = _get_alchemyEngine(**_args)
_args = dict(_args,**_info)
_args['driver'] = providers.DRIVERS[_provider]
else:
if _provider in providers.DEFAULT :
_default = providers.DEFAULT[_provider]
_defkeys = list(set(_default.keys()) - set(_args.keys()))
if _defkeys :
for key in _defkeys :
_args[key] = _default[key]
pass
#
# get default values from
return _classPointer(**_args)
#
# Let us determine the category of the provider that has been given
def _get_alchemyEngine(**_args):
"""
This function returns the SQLAlchemy engine associated with parameters, This is only applicable for SQL _items
:_args arguments passed to the factory {provider and other}
"""
#@TODO: Enable authentication files (private_key)
_username = _args['username'] if 'username' in _args else ''
_password = _args['password'] if 'password' in _args else ''
_account = _args['account'] if 'account' in _args else ''
_database = _args['database']
_provider = _args['provider']
if _username != '':
_account = _username + ':'+_password+'@'
_host = _args['host'] if 'host' in _args else ''
_port = _args['port'] if 'port' in _args else ''
if _provider in providers.DEFAULT :
_default = providers.DEFAULT[_provider]
_host = _host if _host != '' else (_default['host'] if 'host' in _default else '')
_port = _port if _port != '' else (_default['port'] if 'port' in _default else '')
if _port == '':
_port = providers.DEFAULT['port'] if 'port' in providers.DEFAULT else ''
#
if _host != '' and _port != '' :
_fhost = _host+":"+str(_port) #--formatted hostname
else:
_fhost = _host
# Let us update the parameters we have thus far
#
uri = ''.join([_provider,"://",_account,_fhost,'/',_database])
_engine = sqlalchemy.create_engine (uri,future=True)
_out = {'sqlalchemy':_engine}
_pargs = {'host':_host,'port':_port,'username':_username,'password':_password}
for key in _pargs :
if _pargs[key] != '' :
_out[key] = _pargs[key]
return _out
def _getClassInstance(_group,**_args):
"""
This function returns the class instance we are attempting to instanciate
:_group items in providers.CATEGORIES.keys()
:_args arguments passed to the factory class
"""
if 'read' in _args or 'write' in _args :
_context = 'read' if 'read' in _args else _args['write']
_info = _args[_context]
else:
_context = _args['context'] if 'context' in _args else 'read'
_class = providers.READ[_group] if _context == 'read' else providers.WRITE[_group]
if type(_class) == dict and _args['provider'] in _class:
_class = _class[_args['provider']]
return _class
def __instance(**_args):
"""
@param provider {file,sqlite,postgresql,redshift,bigquery,netezza,mongo,couch ...} @param provider {file,sqlite,postgresql,redshift,bigquery,netezza,mongo,couch ...}
@param context read|write|rw @param context read|write|rw

View File

@ -127,6 +127,8 @@ class MongoReader(Mongo,Reader):
return pd.DataFrame(r) return pd.DataFrame(r)
else: else:
if 'table' in args or 'collection' in args : if 'table' in args or 'collection' in args :
if 'table' in args: if 'table' in args:
_uid = args['table'] _uid = args['table']
@ -134,7 +136,8 @@ class MongoReader(Mongo,Reader):
_uid = args['collection'] _uid = args['collection']
else: else:
_uid = self.uid _uid = self.uid
else:
_uid = self.uid
collection = self.db[_uid] collection = self.db[_uid]
_filter = args['filter'] if 'filter' in args else {} _filter = args['filter'] if 'filter' in args else {}
_df = pd.DataFrame(collection.find(_filter)) _df = pd.DataFrame(collection.find(_filter))

63
transport/providers.py Normal file
View File

@ -0,0 +1,63 @@
from transport.common import Reader, Writer,Console #, factory
from transport import disk
import sqlite3
from transport import s3 as s3
from transport import rabbitmq as queue
from transport import couch as couch
from transport import mongo as mongo
from transport import sql as sql
from transport import etl as etl
from transport import qlistener
import psycopg2 as pg
import mysql.connector as my
from google.cloud import bigquery as bq
import nzpy as nz #--- netezza drivers
import os
from transport.version import __version__
POSTGRESQL = 'postgresql'
MONGODB = 'mongodb'
HTTP='http'
BIGQUERY ='bigquery'
FILE = 'file'
ETL = 'etl'
SQLITE = 'sqlite'
SQLITE3= 'sqlite'
REDSHIFT = 'redshift'
NETEZZA = 'netezza'
MYSQL = 'mysql'
RABBITMQ = 'rabbitmq'
MARIADB = 'mariadb'
COUCHDB = 'couch'
CONSOLE = 'console'
ETL = 'etl'
#
# synonyms of the above
BQ = BIGQUERY
MONGO = MONGODB
FERRETDB= MONGODB
PG = POSTGRESQL
PSQL = POSTGRESQL
PGSQL = POSTGRESQL
S3 = 's3'
AWS_S3 = 's3'
RABBIT = RABBITMQ
QLISTENER = 'qlistener'
DRIVERS = {PG:pg,REDSHIFT:pg,MYSQL:my,MARIADB:my,NETEZZA:nz,SQLITE:sqlite3}
CATEGORIES ={'sql':[NETEZZA,PG,MYSQL,REDSHIFT,SQLITE,MARIADB],'nosql':[MONGODB,COUCHDB],'cloud':[BIGQUERY],'file':[FILE],
'queue':[RABBIT,QLISTENER],'memory':[CONSOLE,QLISTENER],'http':[HTTP]}
READ = {'sql':sql.SQLReader,'nosql':{MONGODB:mongo.MongoReader,COUCHDB:couch.CouchReader},'cloud':sql.BigQueryReader,
'file':disk.DiskReader,'queue':{RABBIT:queue.QueueReader,QLISTENER:qlistener.qListener}
}
WRITE = {'sql':sql.SQLWriter,'nosql':{MONGODB:mongo.MongoWriter,COUCHDB:couch.CouchWriter},'cloud':sql.BigQueryWriter,
'file':disk.DiskWriter,'queue':{RABBIT:queue.QueueWriter,QLISTENER:qlistener.qListener}
}
DEFAULT = {PG:{'host':'localhost','port':5432},MYSQL:{'host':'localhost','port':3306}}
DEFAULT[MONGODB] = {'port':27017,'host':'localhost'}
DEFAULT[REDSHIFT] = DEFAULT[PG]
DEFAULT[MARIADB] = DEFAULT[MYSQL]
DEFAULT[NETEZZA] = {'port':5480}

42
transport/qlistener.py Normal file
View File

@ -0,0 +1,42 @@
import queue
from threading import Thread, Lock
from transport.common import Reader,Writer
import numpy as np
import pandas as pd
class qListener :
lock = Lock()
_queue = {'default':queue.Queue()}
def __init__(self,**_args):
self._cache = {}
self._callback = _args['callback'] if 'callback' in _args else None
self._id = _args['id'] if 'id' in _args else 'default'
if self._id not in qListener._queue :
qListener._queue[self._id] = queue.Queue()
thread = Thread(target=self._forward)
thread.start()
def _forward(self):
_q = qListener._queue[self._id]
_data = _q.get()
_q.task_done()
self._callback(_data)
def has(self,**_args) :
return self._callback is not None
def close(self):
"""
This will empty the queue and have it ready for another operation
"""
_q = qListener._queue[self._id]
with _q.mutex:
_q.queue.clear()
_q.all_tasks_done.notify_all()
def write(self,_data,**_args):
_id = _args['id'] if 'id' in _args else self._id
_q = qListener._queue[_id]
_q.put(_data)
_q.join()

View File

@ -29,6 +29,7 @@ from multiprocessing import Lock, RLock
import pandas as pd import pandas as pd
import numpy as np import numpy as np
import nzpy as nz #--- netezza drivers import nzpy as nz #--- netezza drivers
import sqlite3
import copy import copy
import os import os
@ -58,8 +59,8 @@ class SQLRW :
# _info['host'] = 'localhost' if 'host' not in _args else _args['host'] # _info['host'] = 'localhost' if 'host' not in _args else _args['host']
# _info['port'] = SQLWriter.REFERENCE[_provider]['port'] if 'port' not in _args else _args['port'] # _info['port'] = SQLWriter.REFERENCE[_provider]['port'] if 'port' not in _args else _args['port']
_info['host'] = _args['host'] _info['host'] = _args['host'] if 'host' in _args else ''
_info['port'] = _args['port'] _info['port'] = _args['port'] if 'port' in _args else ''
# if 'host' in _args : # if 'host' in _args :
# _info['host'] = 'localhost' if 'host' not in _args else _args['host'] # _info['host'] = 'localhost' if 'host' not in _args else _args['host']
@ -98,8 +99,12 @@ class SQLRW :
if _handler == my : if _handler == my :
_info['database'] = _info['dbname'] _info['database'] = _info['dbname']
del _info['dbname'] del _info['dbname']
if _handler == sqlite3 :
self.conn = _handler.connect(**_info) _info = {'path':_info['dbname'],'isolation_level':'IMMEDIATE'}
if _handler != sqlite3 :
self.conn = _handler.connect(**_info)
else:
self.conn = _handler.connect(_info['path'],isolation_level='IMMEDIATE')
self._engine = _args['sqlalchemy'] if 'sqlalchemy' in _args else None self._engine = _args['sqlalchemy'] if 'sqlalchemy' in _args else None
def meta(self,**_args): def meta(self,**_args):
schema = [] schema = []