data-transport/transport/__init__.py

175 lines
5.2 KiB
Python
Raw Normal View History

"""
2022-01-29 17:15:45 +00:00
Data Transport, The Phi Technology LLC
Steve L. Nyemba, steve@the-phi.com
2022-01-29 17:15:45 +00:00
This library is designed to serve as a wrapper to a set of supported data stores :
- couchdb
- mongodb
- Files (character delimited)
- Queues (RabbmitMq)
- Session (Flask)
- s3
2022-01-29 17:15:45 +00:00
- sqlite
The supported operations are read/write and providing meta data to the calling code
Requirements :
pymongo
boto
couldant
The configuration for the data-store is as follows :
2022-01-29 17:15:45 +00:00
e.g:
mongodb
provider:'mongodb',[port:27017],[host:localhost],db:<name>,doc:<_name>,context:<read|write>
"""
__author__ = 'The Phi Technology'
2021-07-08 22:31:29 +00:00
import pandas as pd
import numpy as np
import json
import importlib
2019-11-05 03:51:20 +00:00
import sys
2022-03-03 22:08:24 +00:00
import sqlalchemy
2019-11-05 03:51:20 +00:00
if sys.version_info[0] > 2 :
from transport.common import Reader, Writer #, factory
from transport import disk
2020-05-18 02:57:18 +00:00
from transport import s3 as s3
from transport import rabbitmq as queue
2019-11-05 03:51:20 +00:00
from transport import couch as couch
from transport import mongo as mongo
2021-01-02 11:29:52 +00:00
from transport import sql as sql
2019-11-05 03:51:20 +00:00
else:
from common import Reader, Writer #, factory
import disk
import queue
import couch
import mongo
import s3
2021-01-02 11:29:52 +00:00
import sql
2021-11-18 21:21:26 +00:00
import psycopg2 as pg
import mysql.connector as my
from google.cloud import bigquery as bq
import nzpy as nz #--- netezza drivers
import os
2019-11-05 03:51:20 +00:00
2021-12-09 21:25:58 +00:00
class factory :
2021-11-18 21:21:26 +00:00
TYPE = {"sql":{"providers":["postgresql","mysql","neteeza","bigquery","mariadb","redshift"]}}
PROVIDERS = {
"file":{"class":{"read":disk.DiskReader,"write":disk.DiskWriter}},
"sqlite":{"class":{"read":disk.SQLiteReader,"write":disk.SQLiteWriter}},
"postgresql":{"port":5432,"host":"localhost","database":os.environ['USER'],"driver":pg,"default":{"type":"VARCHAR"}},
"redshift":{"port":5432,"host":"localhost","database":os.environ['USER'],"driver":pg,"default":{"type":"VARCHAR"}},
"bigquery":{"class":{"read":sql.BQReader,"write":sql.BQWriter}},
2022-03-03 22:08:24 +00:00
"mysql":{"port":3306,"host":"localhost","default":{"type":"VARCHAR(256)"},"driver":my},
"mariadb":{"port":3306,"host":"localhost","default":{"type":"VARCHAR(256)"},"driver":my},
2021-12-09 21:25:58 +00:00
"mongo":{"port":27017,"host":"localhost","class":{"read":mongo.MongoReader,"write":mongo.MongoWriter}},
"couch":{"port":5984,"host":"localhost","class":{"read":couch.CouchReader,"write":couch.CouchWriter}},
2021-11-18 21:21:26 +00:00
"netezza":{"port":5480,"driver":nz,"default":{"type":"VARCHAR(256)"}}}
2021-12-09 21:25:58 +00:00
#
# creating synonyms
PROVIDERS['mongodb'] = PROVIDERS['mongo']
PROVIDERS['couchdb'] = PROVIDERS['couch']
PROVIDERS['sqlite3'] = PROVIDERS['sqlite']
2022-01-29 17:15:45 +00:00
@staticmethod
2022-01-29 17:15:45 +00:00
def instance(**_args):
if 'type' in _args :
#
# Legacy code being returned
return factory._instance(**_args);
else:
return instance(**_args)
@staticmethod
def _instance(**args):
"""
This class will create an instance of a transport when providing
:type name of the type we are trying to create
:args The arguments needed to create the instance
"""
source = args['type']
params = args['args']
anObject = None
if source in ['HttpRequestReader','HttpSessionWriter']:
#
# @TODO: Make sure objects are serializable, be smart about them !!
#
aClassName = ''.join([source,'(**params)'])
2019-09-17 16:21:42 +00:00
else:
stream = json.dumps(params)
aClassName = ''.join([source,'(**',stream,')'])
2019-09-17 16:21:42 +00:00
try:
anObject = eval( aClassName)
#setattr(anObject,'name',source)
2019-11-05 03:51:20 +00:00
except Exception as e:
2019-11-05 22:04:54 +00:00
print(['Error ',e])
return anObject
2021-07-08 22:31:29 +00:00
import time
2021-12-09 21:25:58 +00:00
def instance(**_args):
2021-11-18 21:21:26 +00:00
"""
@param provider {file,sqlite,postgresql,redshift,bigquery,netezza,mongo,couch ...}
@param context read|write|rw
@param _args argument to got with the datastore (username,password,host,port ...)
"""
2021-12-09 21:25:58 +00:00
provider = _args['provider']
2022-01-29 17:15:45 +00:00
context = _args['context']if 'context' in _args else None
2021-12-09 21:25:58 +00:00
_id = context if context in ['read','write'] else 'read'
2021-11-18 21:21:26 +00:00
if _id :
args = {'provider':_id}
for key in factory.PROVIDERS[provider] :
if key == 'class' :
continue
value = factory.PROVIDERS[provider][key]
args[key] = value
#
#
2022-01-29 17:15:45 +00:00
2021-11-18 21:21:26 +00:00
args = dict(args,**_args)
2021-11-18 21:21:26 +00:00
# print (provider in factory.PROVIDERS)
if 'class' in factory.PROVIDERS[provider]:
pointer = factory.PROVIDERS[provider]['class'][_id]
else:
pointer = sql.SQLReader if _id == 'read' else sql.SQLWriter
2022-03-03 22:08:24 +00:00
#
# Let us try to establish an sqlalchemy wrapper
try:
host = ''
if provider not in ['bigquery','mongodb','couchdb','sqlite'] :
#
# In these cases we are assuming RDBMS and thus would exclude NoSQL and BigQuery
username = args['username'] if 'username' in args else ''
password = args['password'] if 'password' in args else ''
if username == '' :
account = ''
else:
account = username + ':'+password+'@'
host = args['host']
if 'port' in args :
host = host+":"+str(args['port'])
database = args['database']
elif provider == 'sqlite':
account = ''
host = ''
database = args['path'] if 'path' in args else args['database']
if provider not in ['mongodb','couchdb','bigquery'] :
uri = ''.join([provider,"://",account,host,'/',database])
e = sqlalchemy.create_engine (uri)
args['sqlalchemy'] = e
#
# @TODO: Include handling of bigquery with SQLAlchemy
except Exception as e:
print (e)
2021-11-18 21:21:26 +00:00
return pointer(**args)
2022-01-29 17:15:45 +00:00
return None