371 lines
12 KiB
Python
371 lines
12 KiB
Python
"""
|
|
Data Transport, The Phi Technology LLC
|
|
Steve L. Nyemba, steve@the-phi.com
|
|
|
|
This library is designed to serve as a wrapper to a set of supported data stores :
|
|
- couchdb
|
|
- mongodb
|
|
- Files (character delimited)
|
|
- Queues (RabbmitMq)
|
|
- Session (Flask)
|
|
- s3
|
|
- sqlite
|
|
The supported operations are read/write and providing meta data to the calling code
|
|
Requirements :
|
|
pymongo
|
|
boto
|
|
couldant
|
|
The configuration for the data-store is as follows :
|
|
e.g:
|
|
mongodb
|
|
provider:'mongodb',[port:27017],[host:localhost],db:<name>,doc:<_name>,context:<read|write>
|
|
"""
|
|
|
|
# import pandas as pd
|
|
# import numpy as np
|
|
import json
|
|
import importlib
|
|
import sys
|
|
import sqlalchemy
|
|
from datetime import datetime
|
|
if sys.version_info[0] > 2 :
|
|
# from transport.common import Reader, Writer,Console #, factory
|
|
from transport import disk
|
|
|
|
from transport import s3 as s3
|
|
from transport import rabbitmq as queue
|
|
from transport import couch as couch
|
|
from transport import mongo as mongo
|
|
from transport import sql as sql
|
|
from transport import etl as etl
|
|
# from transport.version import __version__
|
|
from info import __version__,__author__
|
|
from transport import providers
|
|
else:
|
|
from common import Reader, Writer,Console #, factory
|
|
import disk
|
|
import queue
|
|
import couch
|
|
import mongo
|
|
import s3
|
|
import sql
|
|
import etl
|
|
from info import __version__,__author__
|
|
import providers
|
|
|
|
import numpy as np
|
|
from psycopg2.extensions import register_adapter, AsIs
|
|
register_adapter(np.int64, AsIs)
|
|
|
|
# import psycopg2 as pg
|
|
# import mysql.connector as my
|
|
# from google.cloud import bigquery as bq
|
|
# import nzpy as nz #--- netezza drivers
|
|
import os
|
|
|
|
# class providers :
|
|
# POSTGRESQL = 'postgresql'
|
|
# MONGODB = 'mongodb'
|
|
|
|
# BIGQUERY ='bigquery'
|
|
# FILE = 'file'
|
|
# ETL = 'etl'
|
|
# SQLITE = 'sqlite'
|
|
# SQLITE3= 'sqlite'
|
|
# REDSHIFT = 'redshift'
|
|
# NETEZZA = 'netezza'
|
|
# MYSQL = 'mysql'
|
|
# RABBITMQ = 'rabbitmq'
|
|
# MARIADB = 'mariadb'
|
|
# COUCHDB = 'couch'
|
|
# CONSOLE = 'console'
|
|
# ETL = 'etl'
|
|
# #
|
|
# # synonyms of the above
|
|
# BQ = BIGQUERY
|
|
# MONGO = MONGODB
|
|
# FERRETDB= MONGODB
|
|
# PG = POSTGRESQL
|
|
# PSQL = POSTGRESQL
|
|
# PGSQL = POSTGRESQL
|
|
# import providers
|
|
|
|
# class IEncoder (json.JSONEncoder):
|
|
# def IEncoder (self,object):
|
|
# if type(object) == np.integer :
|
|
# return int(object)
|
|
# elif type(object) == np.floating:
|
|
# return float(object)
|
|
# elif type(object) == np.ndarray :
|
|
# return object.tolist()
|
|
# elif type(object) == datetime :
|
|
# return o.isoformat()
|
|
# else:
|
|
# return super(IEncoder,self).default(object)
|
|
|
|
class factory :
|
|
# TYPE = {"sql":{"providers":["postgresql","mysql","neteeza","bigquery","mariadb","redshift"]}}
|
|
# PROVIDERS = {
|
|
# "etl":{"class":{"read":etl.instance,"write":etl.instance}},
|
|
# # "console":{"class":{"write":Console,"read":Console}},
|
|
# "file":{"class":{"read":disk.DiskReader,"write":disk.DiskWriter}},
|
|
# "sqlite":{"class":{"read":disk.SQLiteReader,"write":disk.SQLiteWriter}},
|
|
# "postgresql":{"port":5432,"host":"localhost","database":None,"driver":pg,"default":{"type":"VARCHAR"},"class":{"read":sql.SQLReader,"write":sql.SQLWriter}},
|
|
# "redshift":{"port":5432,"host":"localhost","database":None,"driver":pg,"default":{"type":"VARCHAR"},"class":{"read":sql.SQLReader,"write":sql.SQLWriter}},
|
|
# "bigquery":{"class":{"read":sql.BQReader,"write":sql.BQWriter}},
|
|
# "mysql":{"port":3306,"host":"localhost","default":{"type":"VARCHAR(256)"},"driver":my,"class":{"read":sql.SQLReader,"write":sql.SQLWriter}},
|
|
# "mariadb":{"port":3306,"host":"localhost","default":{"type":"VARCHAR(256)"},"driver":my,"class":{"read":sql.SQLReader,"write":sql.SQLWriter}},
|
|
# "mongo":{"port":27017,"host":"localhost","class":{"read":mongo.MongoReader,"write":mongo.MongoWriter}},
|
|
# "couch":{"port":5984,"host":"localhost","class":{"read":couch.CouchReader,"write":couch.CouchWriter}},
|
|
# "netezza":{"port":5480,"driver":nz,"default":{"type":"VARCHAR(256)"},"class":{"read":sql.SQLReader,"write":sql.SQLWriter}},
|
|
# "rabbitmq":{"port":5672,"host":"localhost","class":{"read":queue.QueueReader,"write":queue.QueueWriter,"listen":queue.QueueListener,"listener":queue.QueueListener},"default":{"type":"application/json"}}}
|
|
# #
|
|
# # creating synonyms
|
|
# PROVIDERS['mongodb'] = PROVIDERS['mongo']
|
|
# PROVIDERS['couchdb'] = PROVIDERS['couch']
|
|
# PROVIDERS['bq'] = PROVIDERS['bigquery']
|
|
# PROVIDERS['sqlite3'] = PROVIDERS['sqlite']
|
|
# PROVIDERS['rabbit'] = PROVIDERS['rabbitmq']
|
|
# PROVIDERS['rabbitmq-server'] = PROVIDERS['rabbitmq']
|
|
|
|
@staticmethod
|
|
def instance(**_args):
|
|
if 'type' in _args :
|
|
#
|
|
# Legacy code being returned
|
|
return factory._instance(**_args);
|
|
|
|
|
|
|
|
else:
|
|
return instance(**_args)
|
|
@staticmethod
|
|
def _instance(**args):
|
|
"""
|
|
This class will create an instance of a transport when providing
|
|
:type name of the type we are trying to create
|
|
:args The arguments needed to create the instance
|
|
"""
|
|
source = args['type']
|
|
params = args['args']
|
|
anObject = None
|
|
|
|
if source in ['HttpRequestReader','HttpSessionWriter']:
|
|
#
|
|
# @TODO: Make sure objects are serializable, be smart about them !!
|
|
#
|
|
aClassName = ''.join([source,'(**params)'])
|
|
|
|
|
|
else:
|
|
|
|
stream = json.dumps(params)
|
|
aClassName = ''.join([source,'(**',stream,')'])
|
|
|
|
try:
|
|
anObject = eval( aClassName)
|
|
#setattr(anObject,'name',source)
|
|
except Exception as e:
|
|
print(['Error ',e])
|
|
return anObject
|
|
|
|
import time
|
|
def instance(**_pargs):
|
|
"""
|
|
creating an instance given the provider, we should have an idea of :class, :driver
|
|
:provider
|
|
:read|write = {connection to the database}
|
|
"""
|
|
#
|
|
# @TODO: provide authentication file that will hold all the parameters, that will later on be used
|
|
#
|
|
_args = dict(_pargs,**{})
|
|
if 'auth_file' in _args :
|
|
path = _args['auth_file']
|
|
file = open(path)
|
|
_config = json.loads( file.read())
|
|
_args = dict(_args,**_config)
|
|
file.close()
|
|
|
|
_provider = _args['provider']
|
|
_context = list( set(['read','write','listen']) & set(_args.keys()) )
|
|
if _context :
|
|
_context = _context[0]
|
|
else:
|
|
_context = _args['context'] if 'context' in _args else 'read'
|
|
# _group = None
|
|
|
|
|
|
# for _id in providers.CATEGORIES :
|
|
# if _provider in providers.CATEGORIES[_id] :
|
|
# _group = _id
|
|
# break
|
|
# if _group :
|
|
|
|
if _provider in providers.PROVIDERS and _context in providers.PROVIDERS[_provider]:
|
|
|
|
# _classPointer = _getClassInstance(_group,**_args)
|
|
_classPointer = providers.PROVIDERS[_provider][_context]
|
|
#
|
|
# Let us reformat the arguments
|
|
# if 'read' in _args or 'write' in _args :
|
|
# _args = _args['read'] if 'read' in _args else _args['write']
|
|
# _args['provider'] = _provider
|
|
# if _group == 'sql' :
|
|
if _provider in providers.CATEGORIES['sql'] :
|
|
_info = _get_alchemyEngine(**_args)
|
|
|
|
_args = dict(_args,**_info)
|
|
_args['driver'] = providers.DRIVERS[_provider]
|
|
|
|
else:
|
|
if _provider in providers.DEFAULT :
|
|
_default = providers.DEFAULT[_provider]
|
|
_defkeys = list(set(_default.keys()) - set(_args.keys()))
|
|
if _defkeys :
|
|
for key in _defkeys :
|
|
_args[key] = _default[key]
|
|
pass
|
|
#
|
|
# get default values from
|
|
|
|
return _classPointer(**_args)
|
|
#
|
|
# Let us determine the category of the provider that has been given
|
|
def _get_alchemyEngine(**_args):
|
|
"""
|
|
This function returns the SQLAlchemy engine associated with parameters, This is only applicable for SQL _items
|
|
:_args arguments passed to the factory {provider and other}
|
|
"""
|
|
_provider = _args['provider']
|
|
_pargs = {}
|
|
if _provider == providers.SQLITE3 :
|
|
_path = _args['database'] if 'database' in _args else _args['path']
|
|
uri = ''.join([_provider,':///',_path])
|
|
|
|
else:
|
|
|
|
#@TODO: Enable authentication files (private_key)
|
|
_username = _args['username'] if 'username' in _args else ''
|
|
_password = _args['password'] if 'password' in _args else ''
|
|
_account = _args['account'] if 'account' in _args else ''
|
|
_database = _args['database'] if 'database' in _args else _args['path']
|
|
|
|
if _username != '':
|
|
_account = _username + ':'+_password+'@'
|
|
_host = _args['host'] if 'host' in _args else ''
|
|
_port = _args['port'] if 'port' in _args else ''
|
|
if _provider in providers.DEFAULT :
|
|
_default = providers.DEFAULT[_provider]
|
|
_host = _host if _host != '' else (_default['host'] if 'host' in _default else '')
|
|
_port = _port if _port != '' else (_default['port'] if 'port' in _default else '')
|
|
if _port == '':
|
|
_port = providers.DEFAULT['port'] if 'port' in providers.DEFAULT else ''
|
|
#
|
|
|
|
if _host != '' and _port != '' :
|
|
_fhost = _host+":"+str(_port) #--formatted hostname
|
|
else:
|
|
_fhost = _host
|
|
# Let us update the parameters we have thus far
|
|
#
|
|
|
|
|
|
uri = ''.join([_provider,"://",_account,_fhost,'/',_database])
|
|
_pargs = {'host':_host,'port':_port,'username':_username,'password':_password}
|
|
_engine = sqlalchemy.create_engine (uri,future=True)
|
|
_out = {'sqlalchemy':_engine}
|
|
|
|
for key in _pargs :
|
|
if _pargs[key] != '' :
|
|
_out[key] = _pargs[key]
|
|
return _out
|
|
@DeprecationWarning
|
|
def _getClassInstance(_group,**_args):
|
|
"""
|
|
This function returns the class instance we are attempting to instanciate
|
|
:_group items in providers.CATEGORIES.keys()
|
|
:_args arguments passed to the factory class
|
|
"""
|
|
# if 'read' in _args or 'write' in _args :
|
|
# _context = 'read' if 'read' in _args else _args['write']
|
|
# _info = _args[_context]
|
|
# else:
|
|
# _context = _args['context'] if 'context' in _args else 'read'
|
|
# _class = providers.READ[_group] if _context == 'read' else providers.WRITE[_group]
|
|
# if type(_class) == dict and _args['provider'] in _class:
|
|
# _class = _class[_args['provider']]
|
|
|
|
# return _class
|
|
|
|
@DeprecationWarning
|
|
def __instance(**_args):
|
|
"""
|
|
|
|
@param provider {file,sqlite,postgresql,redshift,bigquery,netezza,mongo,couch ...}
|
|
@param context read|write|rw
|
|
@param _args argument to got with the datastore (username,password,host,port ...)
|
|
"""
|
|
|
|
provider = _args['provider']
|
|
context = _args['context']if 'context' in _args else None
|
|
_id = context if context in list(factory.PROVIDERS[provider]['class'].keys()) else 'read'
|
|
if _id :
|
|
args = {'provider':_id}
|
|
for key in factory.PROVIDERS[provider] :
|
|
if key == 'class' :
|
|
continue
|
|
value = factory.PROVIDERS[provider][key]
|
|
args[key] = value
|
|
#
|
|
#
|
|
|
|
args = dict(args,**_args)
|
|
|
|
# print (provider in factory.PROVIDERS)
|
|
if 'class' in factory.PROVIDERS[provider]:
|
|
pointer = factory.PROVIDERS[provider]['class'][_id]
|
|
else:
|
|
pointer = sql.SQLReader if _id == 'read' else sql.SQLWriter
|
|
#
|
|
# Let us try to establish an sqlalchemy wrapper
|
|
try:
|
|
account = ''
|
|
host = ''
|
|
if provider not in [providers.BIGQUERY,providers.MONGODB, providers.COUCHDB, providers.SQLITE, providers.CONSOLE,providers.ETL, providers.FILE, providers.RABBITMQ] :
|
|
# if provider not in ['bigquery','mongodb','mongo','couchdb','sqlite','console','etl','file','rabbitmq'] :
|
|
#
|
|
# In these cases we are assuming RDBMS and thus would exclude NoSQL and BigQuery
|
|
username = args['username'] if 'username' in args else ''
|
|
password = args['password'] if 'password' in args else ''
|
|
if username == '' :
|
|
account = ''
|
|
else:
|
|
account = username + ':'+password+'@'
|
|
host = args['host']
|
|
if 'port' in args :
|
|
host = host+":"+str(args['port'])
|
|
|
|
database = args['database']
|
|
elif provider in [providers.SQLITE,providers.FILE]:
|
|
account = ''
|
|
host = ''
|
|
database = args['path'] if 'path' in args else args['database']
|
|
|
|
if provider not in [providers.MONGODB, providers.COUCHDB, providers.BIGQUERY, providers.CONSOLE, providers.ETL,providers.FILE,providers.RABBITMQ] :
|
|
# if provider not in ['mongodb','mongo','couchdb','bigquery','console','etl','file','rabbitmq'] :
|
|
uri = ''.join([provider,"://",account,host,'/',database])
|
|
|
|
e = sqlalchemy.create_engine (uri,future=True)
|
|
args['sqlalchemy'] = e
|
|
|
|
#
|
|
# @TODO: Include handling of bigquery with SQLAlchemy
|
|
except Exception as e:
|
|
print (_args)
|
|
print (e)
|
|
|
|
return pointer(**args)
|
|
|
|
return None
|