data-transport/transport/__init__.py

339 lines
11 KiB
Python
Raw Normal View History

"""
2022-01-29 17:15:45 +00:00
Data Transport, The Phi Technology LLC
Steve L. Nyemba, steve@the-phi.com
2022-01-29 17:15:45 +00:00
This library is designed to serve as a wrapper to a set of supported data stores :
- couchdb
- mongodb
- Files (character delimited)
- Queues (RabbmitMq)
- Session (Flask)
- s3
2022-01-29 17:15:45 +00:00
- sqlite
The supported operations are read/write and providing meta data to the calling code
Requirements :
pymongo
boto
couldant
The configuration for the data-store is as follows :
2022-01-29 17:15:45 +00:00
e.g:
mongodb
provider:'mongodb',[port:27017],[host:localhost],db:<name>,doc:<_name>,context:<read|write>
"""
2023-07-17 20:42:42 +00:00
2021-07-08 22:31:29 +00:00
import pandas as pd
import numpy as np
import json
import importlib
2019-11-05 03:51:20 +00:00
import sys
2022-03-03 22:08:24 +00:00
import sqlalchemy
2019-11-05 03:51:20 +00:00
if sys.version_info[0] > 2 :
from transport.common import Reader, Writer,Console #, factory
from transport import disk
2020-05-18 02:57:18 +00:00
from transport import s3 as s3
from transport import rabbitmq as queue
from transport import couch as couch
from transport import mongo as mongo
from transport import sql as sql
from transport import etl as etl
2023-07-17 20:42:42 +00:00
from transport.version import __version__
from transport import providers
2019-11-05 03:51:20 +00:00
else:
from common import Reader, Writer,Console #, factory
import disk
import queue
import couch
import mongo
import s3
import sql
import etl
2023-07-17 20:42:42 +00:00
from version import __version__
import providers
2021-11-18 21:21:26 +00:00
import psycopg2 as pg
import mysql.connector as my
from google.cloud import bigquery as bq
import nzpy as nz #--- netezza drivers
import os
2019-11-05 03:51:20 +00:00
# class providers :
# POSTGRESQL = 'postgresql'
# MONGODB = 'mongodb'
2023-05-25 14:39:51 +00:00
# BIGQUERY ='bigquery'
# FILE = 'file'
# ETL = 'etl'
# SQLITE = 'sqlite'
# SQLITE3= 'sqlite'
# REDSHIFT = 'redshift'
# NETEZZA = 'netezza'
# MYSQL = 'mysql'
# RABBITMQ = 'rabbitmq'
# MARIADB = 'mariadb'
# COUCHDB = 'couch'
# CONSOLE = 'console'
# ETL = 'etl'
# #
# # synonyms of the above
# BQ = BIGQUERY
# MONGO = MONGODB
# FERRETDB= MONGODB
# PG = POSTGRESQL
# PSQL = POSTGRESQL
# PGSQL = POSTGRESQL
# import providers
2021-12-09 21:25:58 +00:00
class IEncoder (json.JSONEncoder):
def default (self,object):
if type(object) == np.integer :
return int(object)
elif type(object) == np.floating:
return float(object)
elif type(object) == np.ndarray :
return object.tolist()
else:
return super(IEncoder,self).default(object)
class factory :
2021-11-18 21:21:26 +00:00
TYPE = {"sql":{"providers":["postgresql","mysql","neteeza","bigquery","mariadb","redshift"]}}
PROVIDERS = {
"etl":{"class":{"read":etl.instance,"write":etl.instance}},
"console":{"class":{"write":Console,"read":Console}},
2021-11-18 21:21:26 +00:00
"file":{"class":{"read":disk.DiskReader,"write":disk.DiskWriter}},
"sqlite":{"class":{"read":disk.SQLiteReader,"write":disk.SQLiteWriter}},
"postgresql":{"port":5432,"host":"localhost","database":None,"driver":pg,"default":{"type":"VARCHAR"},"class":{"read":sql.SQLReader,"write":sql.SQLWriter}},
"redshift":{"port":5432,"host":"localhost","database":None,"driver":pg,"default":{"type":"VARCHAR"},"class":{"read":sql.SQLReader,"write":sql.SQLWriter}},
2021-11-18 21:21:26 +00:00
"bigquery":{"class":{"read":sql.BQReader,"write":sql.BQWriter}},
"mysql":{"port":3306,"host":"localhost","default":{"type":"VARCHAR(256)"},"driver":my,"class":{"read":sql.SQLReader,"write":sql.SQLWriter}},
"mariadb":{"port":3306,"host":"localhost","default":{"type":"VARCHAR(256)"},"driver":my,"class":{"read":sql.SQLReader,"write":sql.SQLWriter}},
2021-12-09 21:25:58 +00:00
"mongo":{"port":27017,"host":"localhost","class":{"read":mongo.MongoReader,"write":mongo.MongoWriter}},
"couch":{"port":5984,"host":"localhost","class":{"read":couch.CouchReader,"write":couch.CouchWriter}},
"netezza":{"port":5480,"driver":nz,"default":{"type":"VARCHAR(256)"},"class":{"read":sql.SQLReader,"write":sql.SQLWriter}},
"rabbitmq":{"port":5672,"host":"localhost","class":{"read":queue.QueueReader,"write":queue.QueueWriter,"listen":queue.QueueListener,"listener":queue.QueueListener},"default":{"type":"application/json"}}}
2021-12-09 21:25:58 +00:00
#
# creating synonyms
PROVIDERS['mongodb'] = PROVIDERS['mongo']
PROVIDERS['couchdb'] = PROVIDERS['couch']
PROVIDERS['bq'] = PROVIDERS['bigquery']
2021-12-09 21:25:58 +00:00
PROVIDERS['sqlite3'] = PROVIDERS['sqlite']
PROVIDERS['rabbit'] = PROVIDERS['rabbitmq']
PROVIDERS['rabbitmq-server'] = PROVIDERS['rabbitmq']
2022-01-29 17:15:45 +00:00
@staticmethod
2022-01-29 17:15:45 +00:00
def instance(**_args):
if 'type' in _args :
#
# Legacy code being returned
return factory._instance(**_args);
else:
return instance(**_args)
@staticmethod
def _instance(**args):
"""
This class will create an instance of a transport when providing
:type name of the type we are trying to create
:args The arguments needed to create the instance
"""
source = args['type']
params = args['args']
anObject = None
if source in ['HttpRequestReader','HttpSessionWriter']:
#
# @TODO: Make sure objects are serializable, be smart about them !!
#
aClassName = ''.join([source,'(**params)'])
2019-09-17 16:21:42 +00:00
else:
stream = json.dumps(params)
aClassName = ''.join([source,'(**',stream,')'])
2019-09-17 16:21:42 +00:00
try:
anObject = eval( aClassName)
#setattr(anObject,'name',source)
2019-11-05 03:51:20 +00:00
except Exception as e:
2019-11-05 22:04:54 +00:00
print(['Error ',e])
return anObject
2021-07-08 22:31:29 +00:00
import time
2023-09-08 16:28:35 +00:00
def instance(**_pargs):
2021-11-18 21:21:26 +00:00
"""
creating an instance given the provider, we should have an idea of :class, :driver
:provider
:read|write = {connection to the database}
"""
2023-09-08 16:28:35 +00:00
#
# @TODO: provide authentication file that will hold all the parameters, that will later on be used
#
_args = dict(_pargs,**{})
if 'auth_file' in _args :
path = _args['auth_file']
file = open(path)
_config = json.loads( file.read())
_args = dict(_args,**_config)
file.close()
_provider = _args['provider']
_group = None
2023-09-08 16:28:35 +00:00
for _id in providers.CATEGORIES :
if _provider in providers.CATEGORIES[_id] :
_group = _id
break
if _group :
2023-09-08 16:28:35 +00:00
_classPointer = _getClassInstance(_group,**_args)
#
# Let us reformat the arguments
if 'read' in _args or 'write' in _args :
_args = _args['read'] if 'read' in _args else _args['write']
_args['provider'] = _provider
if _group == 'sql' :
_info = _get_alchemyEngine(**_args)
_args = dict(_args,**_info)
_args['driver'] = providers.DRIVERS[_provider]
else:
if _provider in providers.DEFAULT :
_default = providers.DEFAULT[_provider]
_defkeys = list(set(_default.keys()) - set(_args.keys()))
if _defkeys :
for key in _defkeys :
_args[key] = _default[key]
pass
#
# get default values from
return _classPointer(**_args)
#
# Let us determine the category of the provider that has been given
def _get_alchemyEngine(**_args):
"""
This function returns the SQLAlchemy engine associated with parameters, This is only applicable for SQL _items
:_args arguments passed to the factory {provider and other}
"""
#@TODO: Enable authentication files (private_key)
_username = _args['username'] if 'username' in _args else ''
_password = _args['password'] if 'password' in _args else ''
_account = _args['account'] if 'account' in _args else ''
_database = _args['database']
_provider = _args['provider']
if _username != '':
_account = _username + ':'+_password+'@'
_host = _args['host'] if 'host' in _args else ''
_port = _args['port'] if 'port' in _args else ''
if _provider in providers.DEFAULT :
_default = providers.DEFAULT[_provider]
_host = _host if _host != '' else (_default['host'] if 'host' in _default else '')
_port = _port if _port != '' else (_default['port'] if 'port' in _default else '')
if _port == '':
_port = providers.DEFAULT['port'] if 'port' in providers.DEFAULT else ''
#
if _host != '' and _port != '' :
_fhost = _host+":"+str(_port) #--formatted hostname
else:
_fhost = _host
# Let us update the parameters we have thus far
#
uri = ''.join([_provider,"://",_account,_fhost,'/',_database])
_engine = sqlalchemy.create_engine (uri,future=True)
_out = {'sqlalchemy':_engine}
_pargs = {'host':_host,'port':_port,'username':_username,'password':_password}
for key in _pargs :
if _pargs[key] != '' :
_out[key] = _pargs[key]
return _out
def _getClassInstance(_group,**_args):
"""
This function returns the class instance we are attempting to instanciate
:_group items in providers.CATEGORIES.keys()
:_args arguments passed to the factory class
"""
if 'read' in _args or 'write' in _args :
_context = 'read' if 'read' in _args else _args['write']
_info = _args[_context]
else:
_context = _args['context'] if 'context' in _args else 'read'
_class = providers.READ[_group] if _context == 'read' else providers.WRITE[_group]
if type(_class) == dict and _args['provider'] in _class:
_class = _class[_args['provider']]
return _class
def __instance(**_args):
"""
2021-11-18 21:21:26 +00:00
@param provider {file,sqlite,postgresql,redshift,bigquery,netezza,mongo,couch ...}
@param context read|write|rw
@param _args argument to got with the datastore (username,password,host,port ...)
"""
2021-12-09 21:25:58 +00:00
provider = _args['provider']
2022-01-29 17:15:45 +00:00
context = _args['context']if 'context' in _args else None
_id = context if context in list(factory.PROVIDERS[provider]['class'].keys()) else 'read'
2021-11-18 21:21:26 +00:00
if _id :
args = {'provider':_id}
for key in factory.PROVIDERS[provider] :
if key == 'class' :
continue
value = factory.PROVIDERS[provider][key]
args[key] = value
#
#
2022-01-29 17:15:45 +00:00
2021-11-18 21:21:26 +00:00
args = dict(args,**_args)
2021-11-18 21:21:26 +00:00
# print (provider in factory.PROVIDERS)
if 'class' in factory.PROVIDERS[provider]:
pointer = factory.PROVIDERS[provider]['class'][_id]
else:
pointer = sql.SQLReader if _id == 'read' else sql.SQLWriter
2022-03-03 22:08:24 +00:00
#
# Let us try to establish an sqlalchemy wrapper
try:
account = ''
2022-03-03 22:08:24 +00:00
host = ''
if provider not in [providers.BIGQUERY,providers.MONGODB, providers.COUCHDB, providers.SQLITE, providers.CONSOLE,providers.ETL, providers.FILE, providers.RABBITMQ] :
# if provider not in ['bigquery','mongodb','mongo','couchdb','sqlite','console','etl','file','rabbitmq'] :
2022-03-03 22:08:24 +00:00
#
# In these cases we are assuming RDBMS and thus would exclude NoSQL and BigQuery
username = args['username'] if 'username' in args else ''
password = args['password'] if 'password' in args else ''
if username == '' :
account = ''
else:
account = username + ':'+password+'@'
host = args['host']
if 'port' in args :
host = host+":"+str(args['port'])
database = args['database']
elif provider in [providers.SQLITE,providers.FILE]:
2022-03-03 22:08:24 +00:00
account = ''
host = ''
database = args['path'] if 'path' in args else args['database']
if provider not in [providers.MONGODB, providers.COUCHDB, providers.BIGQUERY, providers.CONSOLE, providers.ETL,providers.FILE,providers.RABBITMQ] :
# if provider not in ['mongodb','mongo','couchdb','bigquery','console','etl','file','rabbitmq'] :
2022-03-03 22:08:24 +00:00
uri = ''.join([provider,"://",account,host,'/',database])
2022-03-08 00:50:29 +00:00
e = sqlalchemy.create_engine (uri,future=True)
2022-03-03 22:08:24 +00:00
args['sqlalchemy'] = e
2022-03-31 22:13:24 +00:00
2022-03-03 22:08:24 +00:00
#
# @TODO: Include handling of bigquery with SQLAlchemy
except Exception as e:
print (_args)
2022-03-03 22:08:24 +00:00
print (e)
return pointer(**args)
2022-01-29 17:15:45 +00:00
return None