refactoring version 2.0

This commit is contained in:
Steve Nyemba 2024-03-28 15:34:39 -05:00
parent 764b3d6af0
commit e7838f5de1
26 changed files with 1773 additions and 1021 deletions

View File

@ -48,24 +48,8 @@ import typer
import os import os
import transport import transport
from transport import etl from transport import etl
from transport import providers # from transport import providers
# SYS_ARGS = {}
# if len(sys.argv) > 1:
# N = len(sys.argv)
# for i in range(1,N):
# value = None
# if sys.argv[i].startswith('--'):
# key = sys.argv[i][2:] #.replace('-','')
# SYS_ARGS[key] = 1
# if i + 1 < N:
# value = sys.argv[i + 1] = sys.argv[i+1].strip()
# if key and value and not value.startswith('--'):
# SYS_ARGS[key] = value
# i += 2
app = typer.Typer() app = typer.Typer()
@ -77,7 +61,7 @@ def wait(jobs):
jobs = [thread for thread in jobs if thread.is_alive()] jobs = [thread for thread in jobs if thread.is_alive()]
time.sleep(1) time.sleep(1)
@app.command() @app.command(name="apply")
def move (path,index=None): def move (path,index=None):
_proxy = lambda _object: _object.write(_object.read()) _proxy = lambda _object: _object.write(_object.read())
@ -90,27 +74,14 @@ def move (path,index=None):
etl.instance(**_config) etl.instance(**_config)
else: else:
etl.instance(config=_config) etl.instance(config=_config)
@app.command(name="providers")
def supported (format:str="table") :
"""
This function will print supported providers and their associated classifications
"""
_df = (transport.supported())
print (json.dumps(_df.to_dict(orient="list")))
#
# if type(_config) == dict :
# _object = transport.etl.instance(**_config)
# _proxy(_object)
# else:
# #
# # here we are dealing with a list of objects (long ass etl job)
# jobs = []
# failed = []
# for _args in _config :
# if index and _config.index(_args) != index :
# continue
# _object=transport.etl.instance(**_args)
# thread = Process(target=_proxy,args=(_object,))
# thread.start()
# jobs.append(thread())
# if _config.index(_args) == 0 :
# thread.join()
# wait(jobs)
@app.command() @app.command()
def version(): def version():
print (transport.version.__version__) print (transport.version.__version__)

View File

@ -1,5 +1,5 @@
__author__ = 'The Phi Technology' __author__ = 'The Phi Technology'
__version__= '1.9.8.20' __version__= '2.0.0'
__license__=""" __license__="""

View File

@ -11,360 +11,79 @@ This library is designed to serve as a wrapper to a set of supported data stores
- s3 - s3
- sqlite - sqlite
The supported operations are read/write and providing meta data to the calling code The supported operations are read/write and providing meta data to the calling code
Requirements : We separated reads from writes to mitigate accidents associated with writes.
pymongo Source Code is available under MIT License:
boto https://healthcareio.the-phi.com/data-transport
couldant https://hiplab.mc.vanderbilt.edu/git/hiplab/data-transport
The configuration for the data-store is as follows :
e.g:
mongodb
provider:'mongodb',[port:27017],[host:localhost],db:<name>,doc:<_name>,context:<read|write>
""" """
# import pandas as pd
# import numpy as np
import json
import importlib
import sys
import sqlalchemy
from datetime import datetime
if sys.version_info[0] > 2 :
# from transport.common import Reader, Writer,Console #, factory
from transport import disk
from transport import s3 as s3
from transport import rabbitmq as queue
from transport import couch as couch
from transport import mongo as mongo
from transport import sql as sql
from transport import etl as etl
# from transport.version import __version__
from info import __version__,__author__
from transport import providers
else:
from common import Reader, Writer,Console #, factory
import disk
import queue
import couch
import mongo
import s3
import sql
import etl
from info import __version__,__author__
import providers
import numpy as np import numpy as np
from psycopg2.extensions import register_adapter, AsIs from transport import sql, nosql, cloud, other
register_adapter(np.int64, AsIs) import pandas as pd
import json
# import psycopg2 as pg
# import mysql.connector as my
# from google.cloud import bigquery as bq
# import nzpy as nz #--- netezza drivers
import os import os
from info import __version__,__author__
# class providers : PROVIDERS = {}
# POSTGRESQL = 'postgresql' def init():
# MONGODB = 'mongodb' global PROVIDERS
for _module in [cloud,sql,nosql,other] :
# BIGQUERY ='bigquery' for _provider_name in dir(_module) :
# FILE = 'file' if _provider_name.startswith('__') :
# ETL = 'etl' continue
# SQLITE = 'sqlite' PROVIDERS[_provider_name] = {'module':getattr(_module,_provider_name),'type':_module.__name__}
# SQLITE3= 'sqlite' # print ([ {name:getattr(sql,name)} for name in dir(sql) if not name.startswith('__')])
# REDSHIFT = 'redshift'
# NETEZZA = 'netezza'
# MYSQL = 'mysql'
# RABBITMQ = 'rabbitmq'
# MARIADB = 'mariadb'
# COUCHDB = 'couch'
# CONSOLE = 'console'
# ETL = 'etl'
# #
# # synonyms of the above
# BQ = BIGQUERY
# MONGO = MONGODB
# FERRETDB= MONGODB
# PG = POSTGRESQL
# PSQL = POSTGRESQL
# PGSQL = POSTGRESQL
# import providers
# class IEncoder (json.JSONEncoder):
# def IEncoder (self,object):
# if type(object) == np.integer :
# return int(object)
# elif type(object) == np.floating:
# return float(object)
# elif type(object) == np.ndarray :
# return object.tolist()
# elif type(object) == datetime :
# return o.isoformat()
# else:
# return super(IEncoder,self).default(object)
def instance (**_args):
"""
type:
read: true|false (default true)
auth_file
"""
global PROVIDERS
if 'auth_file' in _args:
if os.path.exists(_args['auth_file']) :
f = open(_args['auth_file'])
_args = dict (_args,** json.loads(f.read()) )
f.close()
else:
filename = _args['auth_file']
raise Exception(f" {filename} was not found or is invalid")
if _args['provider'] in PROVIDERS :
_info = PROVIDERS[_args['provider']]
_module = _info['module']
if 'context' in _args :
_context = _args['context']
else:
_context = 'read'
_pointer = getattr(_module,'Reader') if _context == 'read' else getattr(_module,'Writer')
return _pointer (**_args)
pass
else:
raise Exception ("Missing or Unknown provider")
pass
def supported ():
_info = {}
for _provider in PROVIDERS :
_item = PROVIDERS[_provider]
if _item['type'] not in _info :
_info[_item['type']] = []
_info[_item['type']].append(_provider)
_df = pd.DataFrame()
for _id in _info :
if not _df.shape[0] :
_df = pd.DataFrame(_info[_id],columns=[_id.replace('transport.','')])
else:
_df = pd.DataFrame(_info[_id],columns=[_id.replace('transport.','')]).join(_df, how='outer')
return _df.fillna('')
class factory : class factory :
# TYPE = {"sql":{"providers":["postgresql","mysql","neteeza","bigquery","mariadb","redshift"]}} pass
# PROVIDERS = { factory.instance = instance
# "etl":{"class":{"read":etl.instance,"write":etl.instance}}, init()
# # "console":{"class":{"write":Console,"read":Console}}, # if __name__ == '__main__' :
# "file":{"class":{"read":disk.DiskReader,"write":disk.DiskWriter}}, # # if not PROVIDERS :
# "sqlite":{"class":{"read":disk.SQLiteReader,"write":disk.SQLiteWriter}}, # init()
# "postgresql":{"port":5432,"host":"localhost","database":None,"driver":pg,"default":{"type":"VARCHAR"},"class":{"read":sql.SQLReader,"write":sql.SQLWriter}}, # print (list(PROVIDERS.keys()))
# "redshift":{"port":5432,"host":"localhost","database":None,"driver":pg,"default":{"type":"VARCHAR"},"class":{"read":sql.SQLReader,"write":sql.SQLWriter}}, # pgr = instance(provider='postgresql',database='io',table='foo',write=True)
# "bigquery":{"class":{"read":sql.BQReader,"write":sql.BQWriter}}, # print (pgr.read())
# "mysql":{"port":3306,"host":"localhost","default":{"type":"VARCHAR(256)"},"driver":my,"class":{"read":sql.SQLReader,"write":sql.SQLWriter}}, # print ()
# "mariadb":{"port":3306,"host":"localhost","default":{"type":"VARCHAR(256)"},"driver":my,"class":{"read":sql.SQLReader,"write":sql.SQLWriter}}, # print (supported())
# "mongo":{"port":27017,"host":"localhost","class":{"read":mongo.MongoReader,"write":mongo.MongoWriter}},
# "couch":{"port":5984,"host":"localhost","class":{"read":couch.CouchReader,"write":couch.CouchWriter}},
# "netezza":{"port":5480,"driver":nz,"default":{"type":"VARCHAR(256)"},"class":{"read":sql.SQLReader,"write":sql.SQLWriter}},
# "rabbitmq":{"port":5672,"host":"localhost","class":{"read":queue.QueueReader,"write":queue.QueueWriter,"listen":queue.QueueListener,"listener":queue.QueueListener},"default":{"type":"application/json"}}}
# #
# # creating synonyms
# PROVIDERS['mongodb'] = PROVIDERS['mongo']
# PROVIDERS['couchdb'] = PROVIDERS['couch']
# PROVIDERS['bq'] = PROVIDERS['bigquery']
# PROVIDERS['sqlite3'] = PROVIDERS['sqlite']
# PROVIDERS['rabbit'] = PROVIDERS['rabbitmq']
# PROVIDERS['rabbitmq-server'] = PROVIDERS['rabbitmq']
@staticmethod
def instance(**_args):
if 'type' in _args :
#
# Legacy code being returned
return factory._instance(**_args);
else:
return instance(**_args)
@staticmethod
def _instance(**args):
"""
This class will create an instance of a transport when providing
:type name of the type we are trying to create
:args The arguments needed to create the instance
"""
source = args['type']
params = args['args']
anObject = None
if source in ['HttpRequestReader','HttpSessionWriter']:
#
# @TODO: Make sure objects are serializable, be smart about them !!
#
aClassName = ''.join([source,'(**params)'])
else:
stream = json.dumps(params)
aClassName = ''.join([source,'(**',stream,')'])
try:
anObject = eval( aClassName)
#setattr(anObject,'name',source)
except Exception as e:
print(['Error ',e])
return anObject
import time
def instance(**_pargs):
"""
creating an instance given the provider, we should have an idea of :class, :driver
:provider
:read|write = {connection to the database}
"""
#
# @TODO: provide authentication file that will hold all the parameters, that will later on be used
#
_args = dict(_pargs,**{})
if 'auth_file' in _args :
path = _args['auth_file']
file = open(path)
_config = json.loads( file.read())
_args = dict(_args,**_config)
file.close()
_provider = _args['provider']
_context = list( set(['read','write','listen']) & set(_args.keys()) )
if _context :
_context = _context[0]
else:
_context = _args['context'] if 'context' in _args else 'read'
# _group = None
# for _id in providers.CATEGORIES :
# if _provider in providers.CATEGORIES[_id] :
# _group = _id
# break
# if _group :
if _provider in providers.PROVIDERS and _context in providers.PROVIDERS[_provider]:
# _classPointer = _getClassInstance(_group,**_args)
_classPointer = providers.PROVIDERS[_provider][_context]
#
# Let us reformat the arguments
# if 'read' in _args or 'write' in _args :
# _args = _args['read'] if 'read' in _args else _args['write']
# _args['provider'] = _provider
# if _group == 'sql' :
if _provider in providers.CATEGORIES['sql'] :
_info = _get_alchemyEngine(**_args)
_args = dict(_args,**_info)
_args['driver'] = providers.DRIVERS[_provider]
else:
if _provider in providers.DEFAULT :
_default = providers.DEFAULT[_provider]
_defkeys = list(set(_default.keys()) - set(_args.keys()))
if _defkeys :
for key in _defkeys :
_args[key] = _default[key]
pass
#
# get default values from
return _classPointer(**_args)
#
# Let us determine the category of the provider that has been given
def _get_alchemyEngine(**_args):
"""
This function returns the SQLAlchemy engine associated with parameters, This is only applicable for SQL _items
:_args arguments passed to the factory {provider and other}
"""
_provider = _args['provider']
_pargs = {}
if _provider == providers.SQLITE3 :
_path = _args['database'] if 'database' in _args else _args['path']
uri = ''.join([_provider,':///',_path])
else:
#@TODO: Enable authentication files (private_key)
_username = _args['username'] if 'username' in _args else ''
_password = _args['password'] if 'password' in _args else ''
_account = _args['account'] if 'account' in _args else ''
_database = _args['database'] if 'database' in _args else _args['path']
if _username != '':
_account = _username + ':'+_password+'@'
_host = _args['host'] if 'host' in _args else ''
_port = _args['port'] if 'port' in _args else ''
if _provider in providers.DEFAULT :
_default = providers.DEFAULT[_provider]
_host = _host if _host != '' else (_default['host'] if 'host' in _default else '')
_port = _port if _port != '' else (_default['port'] if 'port' in _default else '')
if _port == '':
_port = providers.DEFAULT['port'] if 'port' in providers.DEFAULT else ''
#
if _host != '' and _port != '' :
_fhost = _host+":"+str(_port) #--formatted hostname
else:
_fhost = _host
# Let us update the parameters we have thus far
#
uri = ''.join([_provider,"://",_account,_fhost,'/',_database])
_pargs = {'host':_host,'port':_port,'username':_username,'password':_password}
_engine = sqlalchemy.create_engine (uri,future=True)
_out = {'sqlalchemy':_engine}
for key in _pargs :
if _pargs[key] != '' :
_out[key] = _pargs[key]
return _out
@DeprecationWarning
def _getClassInstance(_group,**_args):
"""
This function returns the class instance we are attempting to instanciate
:_group items in providers.CATEGORIES.keys()
:_args arguments passed to the factory class
"""
# if 'read' in _args or 'write' in _args :
# _context = 'read' if 'read' in _args else _args['write']
# _info = _args[_context]
# else:
# _context = _args['context'] if 'context' in _args else 'read'
# _class = providers.READ[_group] if _context == 'read' else providers.WRITE[_group]
# if type(_class) == dict and _args['provider'] in _class:
# _class = _class[_args['provider']]
# return _class
@DeprecationWarning
def __instance(**_args):
"""
@param provider {file,sqlite,postgresql,redshift,bigquery,netezza,mongo,couch ...}
@param context read|write|rw
@param _args argument to got with the datastore (username,password,host,port ...)
"""
provider = _args['provider']
context = _args['context']if 'context' in _args else None
_id = context if context in list(factory.PROVIDERS[provider]['class'].keys()) else 'read'
if _id :
args = {'provider':_id}
for key in factory.PROVIDERS[provider] :
if key == 'class' :
continue
value = factory.PROVIDERS[provider][key]
args[key] = value
#
#
args = dict(args,**_args)
# print (provider in factory.PROVIDERS)
if 'class' in factory.PROVIDERS[provider]:
pointer = factory.PROVIDERS[provider]['class'][_id]
else:
pointer = sql.SQLReader if _id == 'read' else sql.SQLWriter
#
# Let us try to establish an sqlalchemy wrapper
try:
account = ''
host = ''
if provider not in [providers.BIGQUERY,providers.MONGODB, providers.COUCHDB, providers.SQLITE, providers.CONSOLE,providers.ETL, providers.FILE, providers.RABBITMQ] :
# if provider not in ['bigquery','mongodb','mongo','couchdb','sqlite','console','etl','file','rabbitmq'] :
#
# In these cases we are assuming RDBMS and thus would exclude NoSQL and BigQuery
username = args['username'] if 'username' in args else ''
password = args['password'] if 'password' in args else ''
if username == '' :
account = ''
else:
account = username + ':'+password+'@'
host = args['host']
if 'port' in args :
host = host+":"+str(args['port'])
database = args['database']
elif provider in [providers.SQLITE,providers.FILE]:
account = ''
host = ''
database = args['path'] if 'path' in args else args['database']
if provider not in [providers.MONGODB, providers.COUCHDB, providers.BIGQUERY, providers.CONSOLE, providers.ETL,providers.FILE,providers.RABBITMQ] :
# if provider not in ['mongodb','mongo','couchdb','bigquery','console','etl','file','rabbitmq'] :
uri = ''.join([provider,"://",account,host,'/',database])
e = sqlalchemy.create_engine (uri,future=True)
args['sqlalchemy'] = e
#
# @TODO: Include handling of bigquery with SQLAlchemy
except Exception as e:
print (_args)
print (e)
return pointer(**args)
return None

View File

@ -0,0 +1,6 @@
"""
Steve L. Nyemba, nyemba@gmail.com
This namespace implements support for cloud databases databricks,bigquery ...
"""
from . import bigquery, databricks, nextcloud, s3

156
transport/cloud/bigquery.py Normal file
View File

@ -0,0 +1,156 @@
"""
Implementing support for google's bigquery
- cloud.bigquery.Read
- cloud.bigquery.Write
"""
import json
from google.oauth2 import service_account
from google.cloud import bigquery as bq
from multiprocessing import Lock, RLock
import pandas as pd
import pandas_gbq as pd_gbq
import numpy as np
import time
MAX_CHUNK = 2000000
class BigQuery:
def __init__(self,**_args):
path = _args['service_key'] if 'service_key' in _args else _args['private_key']
self.credentials = service_account.Credentials.from_service_account_file(path)
self.dataset = _args['dataset'] if 'dataset' in _args else None
self.path = path
self.dtypes = _args['dtypes'] if 'dtypes' in _args else None
self.table = _args['table'] if 'table' in _args else None
self.client = bq.Client.from_service_account_json(self.path)
def meta(self,**_args):
"""
This function returns meta data for a given table or query with dataset/table properly formatted
:param table name of the name WITHOUT including dataset
:param sql sql query to be pulled,
"""
table = _args['table'] if 'table' in _args else self.table
try:
if table :
_dataset = self.dataset if 'dataset' not in _args else _args['dataset']
sql = f"""SELECT column_name as name, data_type as type FROM {_dataset}.INFORMATION_SCHEMA.COLUMNS WHERE table_name = '{table}' """
_info = {'credentials':self.credentials,'dialect':'standard'}
return pd_gbq.read_gbq(sql,**_info).to_dict(orient='records')
# return self.read(sql=sql).to_dict(orient='records')
# ref = self.client.dataset(self.dataset).table(table)
# _schema = self.client.get_table(ref).schema
# return [{"name":_item.name,"type":_item.field_type,"description":( "" if not hasattr(_item,"description") else _item.description )} for _item in _schema]
else :
return []
except Exception as e:
return []
def has(self,**_args):
found = False
try:
_has = self.meta(**_args)
found = _has is not None and len(_has) > 0
except Exception as e:
pass
return found
class Reader (BigQuery):
"""
Implementing support for reading from bigquery, This class acts as a wrapper around google's API
"""
def __init__(self,**_args):
super().__init__(**_args)
def apply(self,sql):
return self.read(sql=sql)
def read(self,**_args):
SQL = None
table = self.table if 'table' not in _args else _args['table']
if 'sql' in _args :
SQL = _args['sql']
elif table:
table = "".join(["`",table,"`"]) if '.' in table else "".join(["`:dataset.",table,"`"])
SQL = "SELECT * FROM :table ".replace(":table",table)
if not SQL :
return None
if SQL and 'limit' in _args:
SQL += " LIMIT "+str(_args['limit'])
if (':dataset' in SQL or ':DATASET' in SQL) and self.dataset:
SQL = SQL.replace(':dataset',self.dataset).replace(':DATASET',self.dataset)
_info = {'credentials':self.credentials,'dialect':'standard'}
return pd_gbq.read_gbq(SQL,**_info) if SQL else None
# return self.client.query(SQL).to_dataframe() if SQL else None
class Writer (BigQuery):
"""
This class implements support for writing against bigquery
"""
lock = RLock()
def __init__(self,**_args):
super().__init__(**_args)
self.parallel = False if 'lock' not in _args else _args['lock']
self.table = _args['table'] if 'table' in _args else None
self.mode = {'if_exists':'append','chunksize':900000,'destination_table':self.table,'credentials':self.credentials}
self._chunks = 1 if 'chunks' not in _args else int(_args['chunks'])
self._location = 'US' if 'location' not in _args else _args['location']
def write(self,_data,**_args) :
"""
This function will perform a write to bigquery
:_data data-frame to be written to bigquery
"""
try:
if self.parallel or 'lock' in _args :
Write.lock.acquire()
_args['table'] = self.table if 'table' not in _args else _args['table']
self._write(_data,**_args)
finally:
if self.parallel:
Write.lock.release()
def submit(self,_sql):
"""
Write the output of a massive query to a given table, biquery will handle this as a job
This function will return the job identifier
"""
_config = bq.QueryJobConfig()
_config.destination = self.client.dataset(self.dataset).table(self.table)
_config.allow_large_results = True
# _config.write_disposition = bq.bq_consts.WRITE_APPEND
_config.dry_run = False
# _config.priority = 'BATCH'
_resp = self.client.query(_sql,location=self._location,job_config=_config)
return _resp.job_id
def status (self,_id):
return self.client.get_job(_id,location=self._location)
def _write(self,_info,**_args) :
_df = None
if type(_info) in [list,pd.DataFrame] :
if type(_info) == list :
_df = pd.DataFrame(_info)
elif type(_info) == pd.DataFrame :
_df = _info
if '.' not in _args['table'] :
self.mode['destination_table'] = '.'.join([self.dataset,_args['table']])
else:
self.mode['destination_table'] = _args['table'].strip()
if 'schema' in _args :
self.mode['table_schema'] = _args['schema']
#
# Let us insure that the types are somewhat compatible ...
# _map = {'INTEGER':np.int64,'DATETIME':'datetime64[ns]','TIMESTAMP':'datetime64[ns]','FLOAT':np.float64,'DOUBLE':np.float64,'STRING':str}
# _mode = copy.deepcopy(self.mode)
_mode = self.mode
# _df.to_gbq(**self.mode) #if_exists='append',destination_table=partial,credentials=credentials,chunksize=90000)
#
# Let us adjust the chunking here
self._chunks = 10 if _df.shape[0] > MAX_CHUNK and self._chunks == 1 else self._chunks
_indexes = np.array_split(np.arange(_df.shape[0]),self._chunks)
for i in _indexes :
_df.iloc[i].to_gbq(**self.mode)
time.sleep(1)
pass

View File

@ -0,0 +1,111 @@
"""
This file implements databricks handling, This functionality will rely on databricks-sql-connector
LICENSE (MIT)
Copyright 2016-2020, The Phi Technology LLC
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
@TODO:
- Migrate SQLite to SQL hierarchy
- Include Write in Chunks from pandas
"""
import os
import sqlalchemy
# from transport.common import Reader,Writer
import pandas as pd
class Bricks:
"""
:host
:token
:database
:cluster_path
:table
"""
def __init__(self,**_args):
_host = _args['host']
_token= _args['token']
_cluster_path = _args['cluster_path']
self._schema = _args['schema'] if 'schema' in _args else _args['database']
_catalog = _args['catalog']
self._table = _args['table'] if 'table' in _args else None
#
# @TODO:
# Sometimes when the cluster isn't up and running it takes a while, the user should be alerted of this
#
_uri = f'''databricks+connector://token:{_token}@{_host}?http_path={_cluster_path}&catalog={_catalog}&schema={self._schema}'''
self._engine = sqlalchemy.create_engine (_uri)
pass
def meta(self,**_args):
table = _args['table'] if 'table' in _args else self._table
if not table :
return []
else:
if sqlalchemy.__version__.startswith('1.') :
_m = sqlalchemy.MetaData(bind=self._engine)
_m.reflect(only=[table])
else:
_m = sqlalchemy.MetaData()
_m.reflect(bind=self._engine)
#
# Let's retrieve te information associated with a table
#
return [{'name':_attr.name,'type':_attr.type} for _attr in _m.tables[table].columns]
def has(self,**_args):
return self.meta(**_args)
def apply(self,_sql):
try:
if _sql.lower().startswith('select') :
return pd.read_sql(_sql,self._engine)
except Exception as e:
pass
class Reader(Bricks):
"""
This class is designed for reads and will execute reads against a table name or a select SQL statement
"""
def __init__(self,**_args):
super().__init__(**_args)
def read(self,**_args):
limit = None if 'limit' not in _args else str(_args['limit'])
if 'sql' in _args :
sql = _args['sql']
elif 'table' in _args :
table = _args['table']
sql = f'SELECT * FROM {table}'
if limit :
sql = sql + f' LIMIT {limit}'
if 'sql' in _args or 'table' in _args :
return self.apply(sql)
else:
return pd.DataFrame()
pass
class Writer(Bricks):
def __init__(self,**_args):
super().__init__(**_args)
def write(self,_data,**_args):
"""
This data will write data to data-bricks against a given table. If the table is not specified upon initiazation, it can be specified here
_data: data frame to push to databricks
_args: chunks, table, schema
"""
_schema = self._schema if 'schema' not in _args else _args['schema']
_table = self._table if 'table' not in _args else _args['table']
_df = _data if type(_data) == pd.DataFrame else _data
if type(_df) == dict :
_df = [_df]
if type(_df) == list :
_df = pd.DataFrame(_df)
_df.to_sql(
name=_table,schema=_schema,
con=self._engine,if_exists='append',index=False);
pass

View File

@ -0,0 +1,80 @@
"""
We are implementing transport to and from nextcloud (just like s3)
"""
import os
import sys
from transport.common import IEncoder
import pandas as pd
from io import StringIO
import json
import nextcloud_client as nextcloud
class Nextcloud :
def __init__(self,**_args):
pass
self._delimiter = None
self._handler = nextcloud.Client(_args['url'])
_uid = _args['uid']
_token = _args['token']
self._uri = _args['folder'] if 'folder' in _args else './'
if self._uri.endswith('/') :
self._uri = self._uri[:-1]
self._file = None if 'file' not in _args else _args['file']
self._handler.login(_uid,_token)
def close(self):
try:
self._handler.logout()
except Exception as e:
pass
class Reader(Nextcloud):
def __init__(self,**_args):
# self._file = [] if 'file' not in _args else _args['file']
super().__init__(**_args)
pass
def read(self,**_args):
_filename = self._file if 'file' not in _args else _args['file']
#
# @TODO: if _filename is none, an exception should be raised
#
_uri = '/'.join([self._uri,_filename])
if self._handler.get_file(_uri) :
#
#
_info = self._handler.file_info(_uri)
_content = self._handler.get_file_contents(_uri).decode('utf8')
if _info.get_content_type() == 'text/csv' :
#
# @TODO: enable handling of csv, xls, parquet, pickles
_file = StringIO(_content)
return pd.read_csv(_file)
else:
#
# if it is neither a structured document like csv, we will return the content as is
return _content
return None
class Writer (Nextcloud):
"""
This class will write data to an instance of nextcloud
"""
def __init__(self,**_args) :
super().__init__(**_args)
self
def write(self,_data,**_args):
"""
This function will upload a file to a given destination
:file has the uri of the location of the file
"""
_filename = self._file if 'file' not in _args else _args['file']
_uri = '/'.join([self._uri,_filename])
if type(_data) == pd.DataFrame :
f = StringIO()
_data.to_csv(f,index=False)
_content = f.getvalue()
elif type(_data) == dict :
_content = json.dumps(_data,cls=IEncoder)
else:
_content = str(_data)
self._handler.put_file_contents(_uri,_content)

127
transport/cloud/s3.py Normal file
View File

@ -0,0 +1,127 @@
"""
Data Transport - 1.0
Steve L. Nyemba, The Phi Technology LLC
This file is a wrapper around s3 bucket provided by AWS for reading and writing content
"""
from datetime import datetime
import boto
from boto.s3.connection import S3Connection, OrdinaryCallingFormat
import numpy as np
import botocore
from smart_open import smart_open
import sys
import json
from io import StringIO
import json
class s3 :
"""
@TODO: Implement a search function for a file given a bucket??
"""
def __init__(self,**args) :
"""
This function will extract a file or set of files from s3 bucket provided
@param access_key
@param secret_key
@param path location of the file
@param filter filename or filtering elements
"""
try:
self.s3 = S3Connection(args['access_key'],args['secret_key'],calling_format=OrdinaryCallingFormat())
self.bucket = self.s3.get_bucket(args['bucket'].strip(),validate=False) if 'bucket' in args else None
# self.path = args['path']
self.filter = args['filter'] if 'filter' in args else None
self.filename = args['file'] if 'file' in args else None
self.bucket_name = args['bucket'] if 'bucket' in args else None
except Exception as e :
self.s3 = None
self.bucket = None
print (e)
def meta(self,**args):
"""
:name name of the bucket
"""
info = self.list(**args)
[item.open() for item in info]
return [{"name":item.name,"size":item.size} for item in info]
def list(self,**args):
"""
This function will list the content of a bucket, the bucket must be provided by the name
:name name of the bucket
"""
return list(self.s3.get_bucket(args['name']).list())
def buckets(self):
#
# This function will return all buckets, not sure why but it should be used cautiously
# based on why the s3 infrastructure is used
#
return [item.name for item in self.s3.get_all_buckets()]
# def buckets(self):
pass
# """
# This function is a wrapper around the bucket list of buckets for s3
# """
# return self.s3.get_all_buckets()
class Reader(s3) :
"""
Because s3 contains buckets and files, reading becomes a tricky proposition :
- list files if file is None
- stream content if file is Not None
@TODO: support read from all buckets, think about it
"""
def __init__(self,**args) :
s3.__init__(self,**args)
def files(self):
r = []
try:
return [item.name for item in self.bucket if item.size > 0]
except Exception as e:
pass
return r
def stream(self,limit=-1):
"""
At this point we should stream a file from a given bucket
"""
key = self.bucket.get_key(self.filename.strip())
if key is None :
yield None
else:
count = 0
with smart_open(key) as remote_file:
for line in remote_file:
if count == limit and limit > 0 :
break
yield line
count += 1
def read(self,**args) :
if self.filename is None :
#
# returning the list of files because no one file was specified.
return self.files()
else:
limit = args['size'] if 'size' in args else -1
return self.stream(limit)
class Writer(s3) :
def __init__(self,**args) :
s3.__init__(self,**args)
def mkdir(self,name):
"""
This function will create a folder in a bucket
:name name of the folder
"""
self.s3.put_object(Bucket=self.bucket_name,key=(name+'/'))
def write(self,content):
file = StringIO(content.decode("utf8"))
self.s3.upload_fileobj(file,self.bucket_name,self.filename)
pass

View File

@ -0,0 +1,10 @@
"""
Steve L. Nyemba, nyemba@gmail.com
This namespace implements support for cloud databases couchdb,mongodb, cloudant ...
"""
from transport.nosql import couchdb
from transport.nosql import mongodb
# from . import mongodb
# from . import couchdb
cloudant = couchdb

213
transport/nosql/couchdb.py Normal file
View File

@ -0,0 +1,213 @@
"""
Data-Transport
Steve L. Nyemba, The Phi Technology
This file is a wrapper around couchdb using IBM Cloudant SDK that has an interface to couchdb
"""
import cloudant
import json
import sys
# from transport.common import Reader, Writer
from datetime import datetime
class Couch:
"""
This class is a wrapper for read/write against couchdb. The class captures common operations for read/write.
@param url host & port reference default http://localhost:5984
@param doc user id involved
@param dbname database name (target)
"""
def __init__(self,**args):
url = args['url'] if 'url' in args else 'http://localhost:5984'
self._id = args['doc']
dbname = args['dbname']
if 'username' not in args and 'password' not in args :
self.server = cloudant.CouchDB(None,None,url=url)
else:
self.server = cloudant.CouchDB(args['username'],args['password'],url=url)
self.server.connect()
if dbname in self.server.all_dbs() :
self.dbase = self.server.get(dbname,dbname,True)
#
# @TODO Check if the database exists ...
#
doc = cloudant.document.Document(self.dbase,self._id) #self.dbase.get(self._id)
if not doc.exists():
doc = self.dbase.create_document({"_id":self._id})
doc.save()
else:
self.dbase = None
"""
Insuring the preconditions are met for processing
"""
def isready(self):
p = self.server.metadata() != {}
if p == False or not self.dbase:
return False
#
# At this point we are sure that the server is connected
# We are also sure that the database actually exists
#
doc = cloudant.document.Document(self.dbase,self._id)
# q = self.dbase.all_docs(key=self._id)['rows']
# if not q :
if not doc.exists():
return False
return True
def view(self,**args):
"""
The function will execute a view (provivded a user is authenticated)
:id design document _design/xxxx (provide full name with _design prefix)
:view_name name of the view i.e
:key(s) key(s) to be used to filter the content
"""
document = cloudant.design_document.DesignDocument(self.dbase,args['id'])
document.fetch()
params = {'group_level':1,'group':True}
if 'key' in args :
params ['key'] = args['key']
elif 'keys' in args :
params['keys'] = args['keys']
return document.get_view(args['view_name'])(**params)['rows']
class Reader(Couch):
"""
This function will read an attachment from couchdb and return it to calling code. The attachment must have been placed before hand (otherwise oops)
@T: Account for security & access control
"""
def __init__(self,**args):
"""
@param filename filename (attachment)
"""
#
# setting the basic parameters for
Couch.__init__(self,**args)
if 'filename' in args :
self.filename = args['filename']
else:
self.filename = None
def stream(self):
#
# @TODO Need to get this working ...
#
document = cloudant.document.Document(self.dbase,self._id)
# content = self.dbase.fetch_attachment(self._id,self.filename).split('\n') ;
content = self.get_attachment(self.filename)
for row in content:
yield row
def read(self,**args):
if self.filename is not None:
self.stream()
else:
return self.basic_read()
def basic_read(self):
document = cloudant.document.Document(self.dbase,self._id)
# document = self.dbase.get(self._id)
if document.exists() :
document.fetch()
document = dict(document)
del document['_rev']
else:
document = {}
return document
class Writer(Couch):
"""
This class will write on a couchdb document provided a scope
The scope is the attribute that will be on the couchdb document
"""
def __init__(self,**args):
"""
@param uri host & port reference
@param uid user id involved
@param filename filename (attachment)
@param dbname database name (target)
"""
super().__init__(self,**args)
def set (self,info):
document = cloudant.document.Document(self.dbase,self._id)
if document.exists() :
keys = list(set(document.keys()) - set(['_id','_rev','_attachments']))
for id in keys :
document.field_set(document,id,None)
for id in info :
value = info[id]
document.info(document,id,value)
document.save()
pass
else:
_document = dict({"_id":self._id},**args)
document.create_document(_document)
def write(self,info):
"""
write a given attribute to a document database
@info object to be written to the to an attribute. this
"""
# document = self.dbase.get(self._id)
document = cloudant.document.Document(self.dbase,self._id) #.get(self._id)
if document.exists() is False :
document = self.dbase.create_document({"_id":self._id})
# label = params['label']
# row = params['row']
# if label not in document :
# document[label] = []
# document[label].append(row)
for key in info :
if key in document and type(document[key]) == list :
document[key] += info[key]
else:
document[key] = info[key]
document.save()
# self.dbase.bulk_docs([document])
# self.dbase.save_doc(document)
def upload(self,**args):
"""
:param name name of the file to be uploaded
:param data content of the file (binary or text)
:param content_type (default)
"""
mimetype = args['content_type'] if 'content_type' in args else 'text/plain'
document = cloudant.document.Document(self.dbase,self.uid)
document.put_attachment(self.dbase,args['filename'],mimetype,args['content'])
document.save()
def archive(self,params=None):
"""
This function will archive the document onto itself.
"""
# document = self.dbase.all_docs(self._id,include_docs=True)
document = cloudant.document.Document(self.dbase,self.filename)
document.fetch()
content = {}
# _doc = {}
for id in document:
if id not in ['_id','_rev','_attachments'] :
content[id] = document[id]
del document[id]
content = json.dumps(content)
# document= _doc
now = str(datetime.today())
name = '-'.join([document['_id'] , now,'.json'])
self.upload(filename=name,data=content,content_type='application/json')
# self.dbase.bulk_docs([document])
# self.dbase.put_attachment(document,content,name,'application/json')
# document.put_attachment(self.dbase,name,'application/json',content)
# document.save()

242
transport/nosql/mongodb.py Normal file
View File

@ -0,0 +1,242 @@
"""
Data Transport - 1.0
Steve L. Nyemba, The Phi Technology LLC
This file is a wrapper around mongodb for reading/writing content against a mongodb server and executing views (mapreduce)
"""
from pymongo import MongoClient
import bson
from bson.objectid import ObjectId
from bson.binary import Binary
# import nujson as json
from datetime import datetime
import pandas as pd
import numpy as np
import gridfs
import sys
import json
import re
from multiprocessing import Lock, RLock
from transport.common import IEncoder
class Mongo :
lock = RLock()
"""
Basic mongodb functions are captured here
"""
def __init__(self,**args):
"""
:dbname database name/identifier
:host host and port of the database by default localhost:27017
:username username for authentication
:password password for current user
"""
self.host = 'localhost' if 'host' not in args else args['host']
self.mechanism= 'SCRAM-SHA-256' if 'mechanism' not in args else args['mechanism']
# authSource=(args['authSource'] if 'authSource' in args else self.dbname)
self._lock = False if 'lock' not in args else args['lock']
self.dbname = None
username = password = None
if 'auth_file' in args :
_info = json.loads((open(args['auth_file'])).read())
else:
_info = {}
_args = dict(args,**_info)
_map = {'dbname':'db','database':'db','table':'uid','collection':'uid','col':'uid','doc':'uid'}
for key in _args :
if key in ['username','password'] :
username = _args['username'] if key=='username' else username
password = _args['password'] if key == 'password' else password
continue
value = _args[key]
if key in _map :
key = _map[key]
self.setattr(key,value)
#
# Let us perform aliasing in order to remain backwards compatible
self.dbname = self.db if hasattr(self,'db')else self.dbname
self.collection = _args['table'] if 'table' in _args else (_args['doc'] if 'doc' in _args else (_args['collection'] if 'collection' in _args else None))
if username and password :
self.client = MongoClient(self.host,
username=username,
password=password ,
authSource=self.authSource,
authMechanism=self.mechanism)
else:
self.client = MongoClient(self.host,maxPoolSize=10000)
self.db = self.client[self.dbname]
def isready(self):
p = self.dbname in self.client.list_database_names()
q = self.collection in self.client[self.dbname].list_collection_names()
return p and q
def setattr(self,key,value):
_allowed = ['host','port','db','doc','collection','authSource','mechanism']
if key in _allowed :
setattr(self,key,value)
pass
def close(self):
self.client.close()
def meta(self,**_args):
return []
class Reader(Mongo):
"""
This class will read from a mongodb data store and return the content of a document (not a collection)
"""
def __init__(self,**args):
Mongo.__init__(self,**args)
def read(self,**args):
if 'mongo' in args or 'cmd' in args or 'pipeline' in args:
#
# @TODO:
cmd = {}
if 'aggregate' not in cmd and 'aggregate' not in args:
cmd['aggregate'] = self.collection
elif 'aggregate' in args :
cmd['aggregate'] = args['aggregate']
if 'pipeline' in args :
cmd['pipeline']= args['pipeline']
if 'pipeline' not in args or 'aggregate' not in cmd :
cmd = args['mongo'] if 'mongo' in args else args['cmd']
if "aggregate" in cmd :
if "allowDiskUse" not in cmd :
cmd["allowDiskUse"] = True
if "cursor" not in cmd :
cmd["cursor"] = {}
r = []
out = self.db.command(cmd)
#@TODO: consider using a yield (generator) works wonders
while True :
if 'values' in out :
r += out['values']
if 'cursor' in out :
key = 'firstBatch' if 'firstBatch' in out['cursor'] else 'nextBatch'
else:
key = 'n'
if 'cursor' in out and out['cursor'][key] :
r += list(out['cursor'][key])
elif key in out and out[key]:
r.append (out[key])
# yield out['cursor'][key]
if key not in ['firstBatch','nextBatch'] or ('cursor' in out and out['cursor']['id'] == 0) :
break
else:
out = self.db.command({"getMore":out['cursor']['id'],"collection":out['cursor']['ns'].split(".")[-1]})
return pd.DataFrame(r)
else:
if 'table' in args or 'collection' in args :
if 'table' in args:
_uid = args['table']
elif 'collection' in args :
_uid = args['collection']
else:
_uid = self.collection
else:
_uid = self.collection
collection = self.db[_uid]
_filter = args['filter'] if 'filter' in args else {}
_df = pd.DataFrame(collection.find(_filter))
columns = _df.columns.tolist()[1:]
return _df[columns]
def view(self,**args):
"""
This function is designed to execute a view (map/reduce) operation
"""
pass
class Writer(Mongo):
"""
This class is designed to write to a mongodb collection within a database
"""
def __init__(self,**args):
Mongo.__init__(self,**args)
def upload(self,**args) :
"""
This function will upload a file to the current database (using GridFS)
:param data binary stream/text to be stored
:param filename filename to be used
:param encoding content_encoding (default utf-8)
"""
if 'encoding' not in args :
args['encoding'] = 'utf-8'
gfs = GridFS(self.db)
gfs.put(**args)
def archive(self):
"""
This function will archive documents to the
"""
collection = self.db[self.collection]
rows = list(collection.find())
for row in rows :
if type(row['_id']) == ObjectId :
row['_id'] = str(row['_id'])
stream = Binary(json.dumps(collection,cls=IEncoder).encode())
collection.delete_many({})
now = "-".join([str(datetime.now().year()),str(datetime.now().month), str(datetime.now().day)])
name = ".".join([self.collection,'archive',now])+".json"
description = " ".join([self.collection,'archive',str(len(rows))])
self.upload(filename=name,data=stream,description=description,content_type='application/json')
# gfs = GridFS(self.db)
# gfs.put(filename=name,description=description,data=stream,encoding='utf-8')
# self.write({{"filename":name,"file":stream,"description":descriptions}})
pass
def write(self,info,**_args):
"""
This function will write to a given collection i.e add a record to a collection (no updates)
@param info new record in the collection to be added
"""
# document = self.db[self.collection].find()
#collection = self.db[self.collection]
# if type(info) == list :
# self.db[self.collection].insert_many(info)
# else:
try:
if 'table' in _args or 'collection' in _args :
_uid = _args['table'] if 'table' in _args else _args['collection']
else:
_uid = self.collection if 'doc' not in _args else _args['doc']
if self._lock :
Mongo.lock.acquire()
if type(info) == list or type(info) == pd.DataFrame :
info if type(info) == list else info.to_dict(orient='records')
info = json.loads(json.dumps(info,cls=IEncoder))
self.db[_uid].insert_many(info)
else:
self.db[_uid].insert_one(json.loads(json.dumps(info,cls=IEncoder)))
finally:
if self._lock :
Mongo.lock.release()
def set(self,document):
"""
if no identifier is provided the function will delete the entire collection and set the new document.
Please use this function with great care (archive the content first before using it... for safety)
"""
collection = self.db[self.collection]
if collection.count_document() > 0 and '_id' in document:
id = document['_id']
del document['_id']
collection.find_one_and_replace({'_id':id},document)
else:
collection.delete_many({})
self.write(info)
def close(self):
Mongo.close(self)
# collecton.update_one({"_id":self.collection},document,True)

View File

@ -0,0 +1 @@
from . import files, http, rabbitmq, callback, files

View File

@ -0,0 +1,45 @@
import queue
from threading import Thread, Lock
from transport.common import Reader,Writer
import numpy as np
import pandas as pd
class Writer :
lock = Lock()
_queue = {'default':queue.Queue()}
def __init__(self,**_args):
self._cache = {}
self._callback = _args['callback'] if 'callback' in _args else None
self._id = _args['id'] if 'id' in _args else 'default'
if self._id not in Writer._queue :
Writer._queue[self._id] = queue.Queue()
thread = Thread(target=self._forward)
thread.start()
def _forward(self):
_q = Writer._queue[self._id]
_data = _q.get()
_q.task_done()
self._callback(_data)
def has(self,**_args) :
return self._callback is not None
def close(self):
"""
This will empty the queue and have it ready for another operation
"""
_q = Writer._queue[self._id]
with _q.mutex:
_q.queue.clear()
_q.all_tasks_done.notify_all()
def write(self,_data,**_args):
_id = _args['id'] if 'id' in _args else self._id
_q = Writer._queue[_id]
_q.put(_data)
_q.join()
# self.callback = print

View File

@ -0,0 +1,7 @@
from . import callback
class Writer (callback.Writer):
def __init__(self,**_args):
super().__init__(callback=print)

68
transport/other/files.py Normal file
View File

@ -0,0 +1,68 @@
"""
This file is a wrapper around pandas built-in functionalities to handle character delimited files
"""
import pandas as pd
import numpy as np
import os
class File :
def __init__(self,**params):
"""
@param path absolute path of the file to be read
"""
self.path = params['path'] if 'path' in params else None
self.delimiter = params['delimiter'] if 'delimiter' in params else ','
def isready(self):
return os.path.exists(self.path)
def meta(self,**_args):
return []
class Reader (File):
"""
This class is designed to read data from disk (location on hard drive)
@pre : isready() == True
"""
def __init__(self,**_args):
super().__init__(**_args)
def read(self,**args):
_path = self.path if 'path' not in args else args['path']
_delimiter = self.delimiter if 'delimiter' not in args else args['delimiter']
return pd.read_csv(_path,delimiter=self.delimiter)
def stream(self,**args):
raise Exception ("streaming needs to be implemented")
class Writer (File):
"""
This function writes output to disk in a designated location. The function will write a text to a text file
- If a delimiter is provided it will use that to generate a xchar-delimited file
- If not then the object will be dumped as is
"""
# THREAD_LOCK = RLock()
def __init__(self,**_args):
super().__init__(**_args)
self._mode = 'w' if 'mode' not in _args else _args['mode']
def write(self,info,**_args):
"""
This function writes a record to a designated file
@param label <passed|broken|fixed|stats>
@param row row to be written
"""
try:
_delim = self._delimiter if 'delimiter' not in _args else _args['delimiter']
_path = self._path if 'path' not in _args else _args['path']
_mode = self._mode if 'mode' not in _args else _args['mode']
info.to_csv(_path,index=False,sep=_delim)
pass
except Exception as e:
#
# Not sure what should be done here ...
pass
finally:
# DiskWriter.THREAD_LOCK.release()
pass

88
transport/other/http.py Normal file
View File

@ -0,0 +1,88 @@
from flask import request, session
from datetime import datetime
import re
# from transport.common import Reader, Writer
import json
import requests
from io import StringIO
import pandas as pd
class Reader:
"""
This class is designed to read data from an Http request file handler provided to us by flask
The file will be heald in memory and processed accordingly
NOTE: This is inefficient and can crash a micro-instance (becareful)
"""
def __init__(self,**_args):
self._url = _args['url']
self._headers = None if 'headers' not in _args else _args['headers']
# def isready(self):
# return self.file_length > 0
def format(self,_response):
_mimetype= _response.headers['Content-Type']
if _mimetype == 'text/csv' or 'text/csv':
_content = _response.text
return pd.read_csv(StringIO(_content))
#
# @TODO: Add support for excel, JSON and other file formats that fit into a data-frame
#
return _response.text
def read(self,**_args):
if self._headers :
r = requests.get(self._url,headers = self._headers)
else:
r = requests.get(self._url,headers = self._headers)
return self.format(r)
class Writer:
"""
This class is designed to submit data to an endpoint (url)
"""
def __init__(self,**_args):
"""
@param key required session key
"""
self._url = _args['url']
self._name = _args['name']
self._method = 'post' if 'method' not in _args else _args['method']
# self.session = params['queue']
# self.session['sql'] = []
# self.session['csv'] = []
# self.tablename = re.sub('..+$','',params['filename'])
# self.session['uid'] = params['uid']
#self.xchar = params['xchar']
def format_sql(self,row):
values = "','".join([col.replace('"','').replace("'",'') for col in row])
return "".join(["INSERT INTO :table VALUES('",values,"');\n"]).replace(':table',self.tablename)
def isready(self):
return True
def write(self,_data,**_args):
#
#
_method = self._method if 'method' not in _args else _args['method']
_method = _method.lower()
_mimetype = 'text/csv'
if type(_data) == dict :
_mimetype = 'application/json'
_content = _data
else:
_content = _data.to_dict(orient='records')
_headers = {'Content-Type':_mimetype}
_pointer = getattr(requests,_method)
_pointer ({self._name:_content},headers=_headers)
# label = params['label']
# row = params ['row']
# if label == 'usable':
# self.session['csv'].append(self.format(row,','))
# self.session['sql'].append(self.format_sql(row))

272
transport/other/rabbitmq.py Normal file
View File

@ -0,0 +1,272 @@
"""
Data Transport - 1.0
Steve L. Nyemba, The Phi Technology LLC
This file is a wrapper around rabbitmq server for reading and writing content to a queue (exchange)
"""
import pika
from datetime import datetime
import re
import json
import os
import sys
# if sys.version_info[0] > 2 :
# from transport.common import Reader, Writer
# else:
# from common import Reader, Writer
import json
from multiprocessing import RLock
class MessageQueue:
"""
This class hierarchy is designed to handle interactions with a queue server using pika framework (our tests are based on rabbitmq)
:host
:xid identifier of the exchange
:qid identifier of the queue
"""
def __init__(self,**params):
self.host= 'localhost' if 'host' not in params else params['host'] #-- location of the queue server
self.port= 5672 if 'port' not in params else params['port']
self.virtual_host = '/' if 'vhost' not in params else params['vhost']
self.exchange = params['exchange'] if 'exchange' in params else 'amq.direct' #-- exchange
self.queue = params['queue'] if 'queue' in params else 'demo'
self.connection = None
self.channel = None
self.name = self.__class__.__name__.lower() if 'name' not in params else params['name']
username = password = None
if 'username' in params :
username = params['username']
password = params['password']
if 'auth_file' in params :
_info = json.loads((open(params['auth_file'])).read())
username=_info['username']
password=_info['password']
self.virtual_host = _info['virtual_host'] if 'virtual_host' in _info else self.virtual_host
self.exchange = _info['exchange'] if 'exchange' in _info else self.exchange
self.queue = _info['queue'] if 'queue' in _info else self.queue
self.credentials= pika.PlainCredentials('guest','guest')
if 'username' in params :
self.credentials = pika.PlainCredentials(
params['username'],
('' if 'password' not in params else params['password'])
)
def init(self,label=None):
properties = pika.ConnectionParameters(host=self.host,port=self.port,virtual_host=self.virtual_host,
client_properties={'connection_name':self.name},
credentials=self.credentials)
self.connection = pika.BlockingConnection(properties)
self.channel = self.connection.channel()
self.info = self.channel.exchange_declare(exchange=self.exchange,exchange_type='direct',durable=True)
if label is None:
self.qhandler = self.channel.queue_declare(queue=self.queue,durable=True)
else:
self.qhandler = self.channel.queue_declare(queue=label,durable=True)
self.channel.queue_bind(exchange=self.exchange,queue=self.qhandler.method.queue)
def isready(self):
#self.init()
resp = self.connection is not None and self.connection.is_open
# self.close()
return resp
def finalize(self):
pass
def close(self):
if self.connection.is_closed == False :
self.channel.close()
self.connection.close()
class Writer(MessageQueue):
"""
This class is designed to publish content to an AMQP (Rabbitmq)
The class will rely on pika to implement this functionality
We will publish information to a given queue for a given exchange
"""
def __init__(self,**params):
#self.host= params['host']
#self.exchange = params['uid']
#self.queue = params['queue']
MessageQueue.__init__(self,**params);
self.init()
def write(self,data,_type='text/plain'):
"""
This function writes a stream of data to the a given queue
@param object object to be written (will be converted to JSON)
@TODO: make this less chatty
"""
stream = json.dumps(data) if isinstance(data,dict) else data
self.channel.basic_publish(
exchange=self.exchange,
routing_key=self.queue,
body=stream,
properties=pika.BasicProperties(content_type=_type,delivery_mode=2)
);
# self.close()
def flush(self):
self.init()
_mode = 1 #-- Non persistent
self.channel.queue_delete( queue=self.queue);
self.close()
class Reader(MessageQueue):
"""
This class will read from a queue provided an exchange, queue and host
@TODO: Account for security and virtualhosts
"""
def __init__(self,**params):
"""
@param host host
@param uid exchange identifier
@param qid queue identifier
"""
#self.host= params['host']
#self.exchange = params['uid']
#self.queue = params['qid']
MessageQueue.__init__(self,**params);
# self.init()
self.durable = False if 'durable' not in params else params['durable']
# if 'durable' in params :
# self.durable = True
# else:
# self.durable = False
self.size = -1
self.data = {}
# def init(self,qid):
# properties = pika.ConnectionParameters(host=self.host)
# self.connection = pika.BlockingConnection(properties)
# self.channel = self.connection.channel()
# self.channel.exchange_declare(exchange=self.exchange,type='direct',durable=True)
# self.info = self.channel.queue_declare(queue=qid,durable=True)
def callback(self,channel,method,header,stream):
"""
This is the callback function designed to process the data stream from the queue
"""
r = []
# if re.match("^\{|\[",stream) is not None:
if stream.startswith(b'{') or stream.startswith(b'['):
r = json.loads(stream)
else:
r = stream
qid = self.qhandler.method.queue
if qid not in self.data :
self.data[qid] = []
self.data[qid].append(r)
#
# We stop reading when the all the messages of the queue are staked
#
if self.size == len(self.data[qid]) or len(self.data[qid]) == self.info.method.message_count:
self.close()
def read(self,**args):
"""
This function will read, the first message from a queue
@TODO:
Implement channel.basic_get in order to retrieve a single message at a time
Have the number of messages retrieved be specified by size (parameter)
"""
r = {}
self.size = -1 if 'size' in args else int(args['size'])
#
# We enabled the reader to be able to read from several queues (sequentially for now)
# The qid parameter will be an array of queues the reader will be reading from
#
if isinstance(self.queue,str) :
self.queue = [self.queue]
for qid in self.queue:
self.init(qid)
# r[qid] = []
if self.qhandler.method.message_count > 0:
self.channel.basic_consume(queue=qid,on_message_callback=self.callback,auto_ack=False);
self.channel.start_consuming()
else:
pass
#self.close()
# r[qid].append( self.data)
return self.data
class QueueListener(MessageQueue):
lock = RLock()
"""
This class is designed to have an active listener (worker) against a specified Exchange/Queue
It is initialized as would any other object and will require a callback function to address the objects returned.
"""
def __init__(self,**args):
MessageQueue.__init__(self,**args)
self.listen = self.read
self.apply = args['apply'] if 'apply' in args else print
self.lock = False if 'lock' not in args else args['lock']
def finalize(self,channel,ExceptionReason):
pass
def callback(self,channel,method,header,stream) :
_info= {}
# if re.match("^\{|\[",stream) is not None:
if stream.startswith(b"[") or stream.startswith(b"{"):
_info = json.loads(stream)
else:
_info = stream
#
# At this point we should invoke the apply function with a lock if need be
# @TODO: Establish a vocabulary
if stream == b'QUIT' :
# channel.exit()
self.close()
if self.lock == True :
QueueListener.lock.acquire()
try:
#
# In case the user has not specified a function to apply the data against, it will simply be printed
#
self.apply(_info)
except Exception as e:
pass
if self.lock == True :
QueueListener.lock.release()
def read(self):
self.init(self.queue)
self.channel.basic_consume(self.queue,self.callback,auto_ack=True);
self.channel.start_consuming()
class Factory :
@staticmethod
def instance(**_args):
"""
:param count number of workers
:param apply function workers
"""
_apply = _args['apply']
_count = _args['count']
for i in np.arange(_count) :
_name = _args['name'] if 'name' in _args else 'worker_'+str(i)
transport.factory.instance(provider="rabbit",context="listener",apply=_apply,auth_file=_args['auth_file'])

View File

@ -1,105 +0,0 @@
# from transport.common import Reader, Writer,Console #, factory
from transport import disk
import sqlite3
from transport import s3 as s3
from transport import rabbitmq as queue
from transport import couch as couch
from transport import mongo as mongo
from transport import sql as sql
from transport import etl as etl
from transport import qlistener
from transport import bricks
from transport import session
from transport import nextcloud
import psycopg2 as pg
import mysql.connector as my
from google.cloud import bigquery as bq
import nzpy as nz #--- netezza drivers
import os
from info import __version__
POSTGRESQL = 'postgresql'
MONGODB = 'mongodb'
HTTP='http'
BIGQUERY ='bigquery'
FILE = 'file'
ETL = 'etl'
SQLITE = 'sqlite'
SQLITE3= 'sqlite'
REDSHIFT = 'redshift'
NETEZZA = 'netezza'
MYSQL = 'mysql+mysqlconnector'
RABBITMQ = 'rabbitmq'
MARIADB = 'mariadb'
COUCHDB = 'couch'
CONSOLE = 'console'
ETL = 'etl'
TRANSPORT = ETL
NEXTCLOUD = 'nextcloud'
#
# synonyms of the above
BQ = BIGQUERY
MONGO = MONGODB
FERRETDB= MONGODB
PG = POSTGRESQL
PSQL = POSTGRESQL
PGSQL = POSTGRESQL
S3 = 's3'
AWS_S3 = 's3'
RABBIT = RABBITMQ
QLISTENER = 'qlistener'
QUEUE = QLISTENER
CALLBACK = QLISTENER
DATABRICKS= 'databricks+connector'
DRIVERS = {PG:pg,REDSHIFT:pg,MYSQL:my,MARIADB:my,NETEZZA:nz,SQLITE:sqlite3}
CATEGORIES ={'sql':[NETEZZA,PG,MYSQL,REDSHIFT,SQLITE,MARIADB],'nosql':[MONGODB,COUCHDB],'cloud':[NEXTCLOUD,S3,BIGQUERY,DATABRICKS],'file':[FILE],
'queue':[RABBIT,QLISTENER],'memory':[CONSOLE,QUEUE],'http':[HTTP]}
READ = {'sql':sql.SQLReader,'nosql':{MONGODB:mongo.MongoReader,COUCHDB:couch.CouchReader},
'cloud':{BIGQUERY:sql.BigQueryReader,DATABRICKS:bricks.BricksReader,NEXTCLOUD:nextcloud.NextcloudReader},
'file':disk.DiskReader,'queue':{RABBIT:queue.QueueReader,QLISTENER:qlistener.qListener},
# 'cli':{CONSOLE:Console},'memory':{CONSOLE:Console},'http':session.HttpReader
}
WRITE = {'sql':sql.SQLWriter,'nosql':{MONGODB:mongo.MongoWriter,COUCHDB:couch.CouchWriter},
'cloud':{BIGQUERY:sql.BigQueryWriter,DATABRICKS:bricks.BricksWriter,NEXTCLOUD:nextcloud.NextcloudWriter},
'file':disk.DiskWriter,'queue':{RABBIT:queue.QueueWriter,QLISTENER:qlistener.qListener},
# 'cli':{CONSOLE:Console},
# 'memory':{CONSOLE:Console}, 'http':session.HttpReader
}
# SQL_PROVIDERS = [POSTGRESQL,MYSQL,NETEZZA,MARIADB,SQLITE]
PROVIDERS = {
FILE:{'read':disk.DiskReader,'write':disk.DiskWriter},
SQLITE:{'read':disk.SQLiteReader,'write':disk.SQLiteWriter,'driver':sqlite3},
'sqlite3':{'read':disk.SQLiteReader,'write':disk.SQLiteWriter,'driver':sqlite3},
POSTGRESQL:{'read':sql.SQLReader,'write':sql.SQLWriter,'driver':pg,'default':{'host':'localhost','port':5432}},
NETEZZA:{'read':sql.SQLReader,'write':sql.SQLWriter,'driver':nz,'default':{'port':5480}},
REDSHIFT:{'read':sql.SQLReader,'write':sql.SQLWriter,'driver':pg,'default':{'host':'localhost','port':5432}},
RABBITMQ:{'read':queue.QueueReader,'writer':queue.QueueWriter,'context':queue.QueueListener,'default':{'host':'localhost','port':5432}},
MYSQL:{'read':sql.SQLReader,'write':sql.SQLWriter,'driver':my,'default':{'host':'localhost','port':3306}},
MARIADB:{'read':sql.SQLReader,'write':sql.SQLWriter,'driver':my,'default':{'host':'localhost','port':3306}},
S3:{'read':s3.s3Reader,'write':s3.s3Writer},
BIGQUERY:{'read':sql.BigQueryReader,'write':sql.BigQueryWriter},
DATABRICKS:{'read':bricks.BricksReader,'write':bricks.BricksWriter},
NEXTCLOUD:{'read':nextcloud.NextcloudReader,'write':nextcloud.NextcloudWriter},
QLISTENER:{'read':qlistener.qListener,'write':qlistener.qListener,'default':{'host':'localhost','port':5672}},
CONSOLE:{'read':qlistener.Console,"write":qlistener.Console},
HTTP:{'read':session.HttpReader,'write':session.HttpWriter},
MONGODB:{'read':mongo.MongoReader,'write':mongo.MongoWriter,'default':{'port':27017,'host':'localhost'}},
COUCHDB:{'read':couch.CouchReader,'writer':couch.CouchWriter,'default':{'host':'localhost','port':5984}},
# ETL :{'read':etl.Transporter,'write':etl.Transporter}
ETL :{'read':etl.instance,'write':etl.instance}
}
DEFAULT = {PG:{'host':'localhost','port':5432},MYSQL:{'host':'localhost','port':3306}}
DEFAULT[MONGODB] = {'port':27017,'host':'localhost'}
DEFAULT[REDSHIFT] = DEFAULT[PG]
DEFAULT[MARIADB] = DEFAULT[MYSQL]
DEFAULT[NETEZZA] = {'port':5480}

View File

@ -0,0 +1,44 @@
"""
This file is intended to aggregate all we can about the framework in terms of support
"""
BIGQUERY='bigquery'
POSTGRESQL = 'postgresql'
MONGODB = 'mongodb'
HTTP='http'
BIGQUERY ='bigquery'
FILE = 'file'
ETL = 'etl'
SQLITE = 'sqlite'
SQLITE3= 'sqlite3'
REDSHIFT = 'redshift'
NETEZZA = 'netezza'
MYSQL = 'mysql'
MARIADB= MYSQL
COUCHDB = 'couchdb'
CONSOLE = 'console'
ETL = 'etl'
TRANSPORT = ETL
NEXTCLOUD = 'nextcloud'
S3 = 's3'
CALLBACK = 'callback'
CONSOLE = 'console'
RABBITMQ = 'rabbitmq'
DATABRICKS= 'databricks'
#
# synonyms of the above
BQ = BIGQUERY
MONGO = MONGODB
FERRETDB= MONGODB
PG = POSTGRESQL
PSQL = POSTGRESQL
PGSQL = POSTGRESQL
AWS_S3 = 's3'
RABBIT = RABBITMQ
# QLISTENER = 'qlistener'

View File

@ -1,526 +0,0 @@
"""
This file is intended to perform read/writes against an SQL database such as PostgreSQL, Redshift, Mysql, MsSQL ...
LICENSE (MIT)
Copyright 2016-2020, The Phi Technology LLC
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
@TODO:
- Migrate SQLite to SQL hierarchy
- Include Write in Chunks from pandas
"""
import psycopg2 as pg
import mysql.connector as my
import sys
import sqlalchemy
if sys.version_info[0] > 2 :
from transport.common import Reader, Writer #, factory
else:
from common import Reader,Writer
import json
from google.oauth2 import service_account
from google.cloud import bigquery as bq
# import constants.bq_utils as bq_consts
from multiprocessing import Lock, RLock
import pandas as pd
import pandas_gbq as pd_gbq
import numpy as np
import nzpy as nz #--- netezza drivers
import sqlite3
import copy
import os
import time
class SQLRW :
lock = RLock()
MAX_CHUNK = 2000000
DRIVERS = {"postgresql":pg,"redshift":pg,"mysql":my,"mariadb":my,"netezza":nz}
REFERENCE = {
"netezza":{"port":5480,"handler":nz,"dtype":"VARCHAR(512)"},
"postgresql":{"port":5432,"handler":pg,"dtype":"VARCHAR"},
"redshift":{"port":5432,"handler":pg,"dtype":"VARCHAR"},
"mysql":{"port":3360,"handler":my,"dtype":"VARCHAR(256)"},
"mariadb":{"port":3360,"handler":my,"dtype":"VARCHAR(256)"},
}
def __init__(self,**_args):
_info = {}
_info['dbname'] = _args['db'] if 'db' in _args else _args['database']
self.table = _args['table'] if 'table' in _args else None
self.fields = _args['fields'] if 'fields' in _args else []
self.schema = _args['schema'] if 'schema' in _args else ''
self._chunks = 1 if 'chunks' not in _args else int(_args['chunks'])
self._provider = _args['provider'] if 'provider' in _args else None
# _info['host'] = 'localhost' if 'host' not in _args else _args['host']
# _info['port'] = SQLWriter.REFERENCE[_provider]['port'] if 'port' not in _args else _args['port']
_info['host'] = _args['host'] if 'host' in _args else ''
_info['port'] = _args['port'] if 'port' in _args else ''
# if 'host' in _args :
# _info['host'] = 'localhost' if 'host' not in _args else _args['host']
# # _info['port'] = SQLWriter.PROVIDERS[_args['provider']] if 'port' not in _args else _args['port']
# _info['port'] = SQLWriter.REFERENCE[_provider]['port'] if 'port' not in _args else _args['port']
self.lock = False if 'lock' not in _args else _args['lock']
if 'username' in _args or 'user' in _args:
key = 'username' if 'username' in _args else 'user'
_info['user'] = _args[key]
_info['password'] = _args['password'] if 'password' in _args else ''
if 'auth_file' in _args :
_auth = json.loads( open(_args['auth_file']).read() )
key = 'username' if 'username' in _auth else 'user'
_info['user'] = _auth[key]
_info['password'] = _auth['password'] if 'password' in _auth else ''
_info['host'] = _auth['host'] if 'host' in _auth else _info['host']
_info['port'] = _auth['port'] if 'port' in _auth else _info['port']
if 'database' in _auth:
_info['dbname'] = _auth['database']
self.table = _auth['table'] if 'table' in _auth else self.table
#
# We need to load the drivers here to see what we are dealing with ...
# _handler = SQLWriter.REFERENCE[_provider]['handler']
_handler = _args['driver'] #-- handler to the driver
self._dtype = _args['default']['type'] if 'default' in _args and 'type' in _args['default'] else 'VARCHAR(256)'
# self._provider = _args['provider']
# self._dtype = SQLWriter.REFERENCE[_provider]['dtype'] if 'dtype' not in _args else _args['dtype']
# self._provider = _provider
if _handler == nz :
_info['database'] = _info['dbname']
_info['securityLevel'] = 0
del _info['dbname']
if _handler == my :
_info['database'] = _info['dbname']
del _info['dbname']
if _handler == sqlite3 :
_info = {'path':_info['dbname'],'isolation_level':'IMMEDIATE'}
if _handler != sqlite3 :
self.conn = _handler.connect(**_info)
else:
self.conn = _handler.connect(_info['path'],isolation_level='IMMEDIATE')
self._engine = _args['sqlalchemy'] if 'sqlalchemy' in _args else None
def meta(self,**_args):
schema = []
try:
if self._engine :
table = _args['table'] if 'table' in _args else self.table
if sqlalchemy.__version__.startswith('1.') :
_m = sqlalchemy.MetaData(bind=self._engine)
_m.reflect()
else:
_m = sqlalchemy.MetaData()
_m.reflect(bind=self._engine)
schema = [{"name":_attr.name,"type":str(_attr.type)} for _attr in _m.tables[table].columns]
#
# Some house keeping work
_m = {'BIGINT':'INTEGER','TEXT':'STRING','DOUBLE_PRECISION':'FLOAT','NUMERIC':'FLOAT','DECIMAL':'FLOAT','REAL':'FLOAT'}
for _item in schema :
if _item['type'] in _m :
_item['type'] = _m[_item['type']]
except Exception as e:
print (e)
pass
return schema
def _tablename(self,name) :
return self.schema +'.'+name if self.schema not in [None, ''] and '.' not in name else name
def has(self,**_args):
return self.meta(**_args)
# found = False
# try:
# table = self._tablename(_args['table'])if 'table' in _args else self._tablename(self.table)
# sql = "SELECT * FROM :table LIMIT 1".replace(":table",table)
# if self._engine :
# _conn = self._engine.connect()
# else:
# _conn = self.conn
# found = pd.read_sql(sql,_conn).shape[0]
# found = True
# except Exception as e:
# print (e)
# pass
# finally:
# if not self._engine :
# _conn.close()
# return found
def isready(self):
_sql = "SELECT * FROM :table LIMIT 1".replace(":table",self.table)
try:
_conn = self.conn if not hasattr(self,'_engine') else self._engine
return pd.read_sql(_sql,_conn).columns.tolist()
except Exception as e:
pass
return False
def apply(self,_sql):
"""
This function applies a command and/or a query against the current relational data-store
:param _sql insert/select statement
@TODO: Store procedure calls
"""
#
_out = None
try:
if _sql.lower().startswith('select') :
_conn = self._engine if self._engine else self.conn
return pd.read_sql(_sql,_conn)
else:
# Executing a command i.e no expected return values ...
cursor = self.conn.cursor()
cursor.execute(_sql)
self.conn.commit()
except Exception as e :
print (e)
finally:
if not self._engine :
self.conn.commit()
# cursor.close()
def close(self):
try:
self.conn.close()
except Exception as error :
print (error)
pass
class SQLReader(SQLRW,Reader) :
def __init__(self,**_args):
super().__init__(**_args)
def read(self,**_args):
if 'sql' in _args :
_sql = (_args['sql'])
else:
if 'table' in _args :
table = _args['table']
else:
table = self.table
# table = self.table if self.table is not None else _args['table']
_sql = "SELECT :fields FROM "+self._tablename(table)
if 'filter' in _args :
_sql = _sql +" WHERE "+_args['filter']
if 'fields' in _args :
_fields = _args['fields']
else:
_fields = '*' if not self.fields else ",".join(self.fields)
_sql = _sql.replace(":fields",_fields)
#
# At this point we have a query we can execute gracefully
if 'limit' in _args :
_sql = _sql + " LIMIT "+str(_args['limit'])
#
# @TODO:
# It is here that we should inspect to see if there are any pre/post conditions
#
return self.apply(_sql)
def close(self) :
try:
self.conn.close()
except Exception as error :
print (error)
pass
class SQLWriter(SQLRW,Writer):
def __init__(self,**_args) :
super().__init__(**_args)
#
# In the advent that data typing is difficult to determine we can inspect and perform a default case
# This slows down the process but improves reliability of the data
# NOTE: Proper data type should be set on the target system if their source is unclear.
self._cast = False if 'cast' not in _args else _args['cast']
def init(self,fields=None):
# if not fields :
# try:
# table = self._tablename(self.table)
# self.fields = pd.read_sql_query("SELECT * FROM :table LIMIT 1".replace(":table",table),self.conn).columns.tolist()
# except Exception as e:
# pass
# finally:
# pass
# else:
self.fields = fields;
def make(self,**_args):
table = self._tablename(self.table) if 'table' not in _args else self._tablename(_args['table'])
if 'fields' in _args :
fields = _args['fields']
# table = self._tablename(self.table)
sql = " ".join(["CREATE TABLE",table," (", ",".join([ name +' '+ self._dtype for name in fields]),")"])
else:
schema = _args['schema'] if 'schema' in _args else []
_map = _args['map'] if 'map' in _args else {}
sql = [] # ["CREATE TABLE ",_args['table'],"("]
for _item in schema :
_type = _item['type']
if _type in _map :
_type = _map[_type]
sql = sql + [" " .join([_item['name'], ' ',_type])]
sql = ",".join(sql)
# table = self._tablename(_args['table'])
sql = ["CREATE TABLE ",table,"( ",sql," )"]
sql = " ".join(sql)
cursor = self.conn.cursor()
try:
cursor.execute(sql)
except Exception as e :
print (e)
# print (sql)
pass
finally:
# cursor.close()
self.conn.commit()
pass
def write(self,info,**_args):
"""
:param info writes a list of data to a given set of fields
"""
# inspect = False if 'inspect' not in _args else _args['inspect']
# cast = False if 'cast' not in _args else _args['cast']
# if not self.fields :
# if type(info) == list :
# _fields = info[0].keys()
# elif type(info) == dict :
# _fields = info.keys()
# elif type(info) == pd.DataFrame :
# _fields = info.columns.tolist()
# # _fields = info.keys() if type(info) == dict else info[0].keys()
# # _fields = list (_fields)
# self.init(_fields)
try:
table = _args['table'] if 'table' in _args else self.table
#
# In SQL, schema can stand for namespace or the structure of a table
# In case we have a list, we are likely dealing with table structure
#
if 'schema' in _args :
if type(_args['schema']) == str :
self.schema = _args['schema'] if 'schema' in _args else self.schema
elif type(_args['schema']) == list and len(_args['schema']) > 0 and not self.has(table=table):
#
# There is a messed up case when an empty array is passed (no table should be created)
#
self.make(table=table,schema=_args['schema'])
pass
# self.schema = _args['schema'] if 'schema' in _args else self.schema
table = self._tablename(table)
_sql = "INSERT INTO :table (:fields) VALUES (:values)".replace(":table",table) #.replace(":table",self.table).replace(":fields",_fields)
if type(info) == list :
_info = pd.DataFrame(info)
elif type(info) == dict :
_info = pd.DataFrame([info])
else:
_info = pd.DataFrame(info)
if _info.shape[0] == 0 :
return
if self.lock :
SQLRW.lock.acquire()
#
# we will adjust the chunks here in case we are not always sure of the
if self._chunks == 1 and _info.shape[0] > SQLRW.MAX_CHUNK :
self._chunks = 10
_indexes = np.array_split(np.arange(_info.shape[0]),self._chunks)
for i in _indexes :
#
# In case we have an invalid chunk ...
if _info.iloc[i].shape[0] == 0 :
continue
#
# We are enabling writing by chunks/batches because some persistent layers have quotas or limitations on volume of data
if self._engine is not None:
# pd.to_sql(_info,self._engine)
if self.schema in ['',None] :
rows = _info.iloc[i].to_sql(table,self._engine,if_exists='append',index=False)
else:
#
# Writing with schema information ...
rows = _info.iloc[i].to_sql(self.table,self._engine,schema=self.schema,if_exists='append',index=False)
time.sleep(1)
else:
_fields = ",".join(self.fields)
_sql = _sql.replace(":fields",_fields)
values = ", ".join("?"*len(self.fields)) if self._provider == 'netezza' else ",".join(["%s" for name in self.fields])
_sql = _sql.replace(":values",values)
cursor = self.conn.cursor()
cursor.executemany(_sql,_info.iloc[i].values.tolist())
cursor.close()
# cursor.commit()
# self.conn.commit()
except Exception as e:
print(e)
pass
finally:
if self._engine is None :
self.conn.commit()
if self.lock :
SQLRW.lock.release()
# cursor.close()
pass
def close(self):
try:
self.conn.close()
finally:
pass
class BigQuery:
def __init__(self,**_args):
path = _args['service_key'] if 'service_key' in _args else _args['private_key']
self.credentials = service_account.Credentials.from_service_account_file(path)
self.dataset = _args['dataset'] if 'dataset' in _args else None
self.path = path
self.dtypes = _args['dtypes'] if 'dtypes' in _args else None
self.table = _args['table'] if 'table' in _args else None
self.client = bq.Client.from_service_account_json(self.path)
def meta(self,**_args):
"""
This function returns meta data for a given table or query with dataset/table properly formatted
:param table name of the name WITHOUT including dataset
:param sql sql query to be pulled,
"""
table = _args['table'] if 'table' in _args else self.table
try:
if table :
_dataset = self.dataset if 'dataset' not in _args else _args['dataset']
sql = f"""SELECT column_name as name, data_type as type FROM {_dataset}.INFORMATION_SCHEMA.COLUMNS WHERE table_name = '{table}' """
_info = {'credentials':self.credentials,'dialect':'standard'}
return pd_gbq.read_gbq(sql,**_info).to_dict(orient='records')
# return self.read(sql=sql).to_dict(orient='records')
# ref = self.client.dataset(self.dataset).table(table)
# _schema = self.client.get_table(ref).schema
# return [{"name":_item.name,"type":_item.field_type,"description":( "" if not hasattr(_item,"description") else _item.description )} for _item in _schema]
else :
return []
except Exception as e:
return []
def has(self,**_args):
found = False
try:
_has = self.meta(**_args)
found = _has is not None and len(_has) > 0
except Exception as e:
pass
return found
class BQReader(BigQuery,Reader) :
def __init__(self,**_args):
super().__init__(**_args)
def apply(self,sql):
return self.read(sql=sql)
def read(self,**_args):
SQL = None
table = self.table if 'table' not in _args else _args['table']
if 'sql' in _args :
SQL = _args['sql']
elif table:
table = "".join(["`",table,"`"]) if '.' in table else "".join(["`:dataset.",table,"`"])
SQL = "SELECT * FROM :table ".replace(":table",table)
if not SQL :
return None
if SQL and 'limit' in _args:
SQL += " LIMIT "+str(_args['limit'])
if (':dataset' in SQL or ':DATASET' in SQL) and self.dataset:
SQL = SQL.replace(':dataset',self.dataset).replace(':DATASET',self.dataset)
_info = {'credentials':self.credentials,'dialect':'standard'}
return pd_gbq.read_gbq(SQL,**_info) if SQL else None
# return self.client.query(SQL).to_dataframe() if SQL else None
class BQWriter(BigQuery,Writer):
lock = Lock()
def __init__(self,**_args):
super().__init__(**_args)
self.parallel = False if 'lock' not in _args else _args['lock']
self.table = _args['table'] if 'table' in _args else None
self.mode = {'if_exists':'append','chunksize':900000,'destination_table':self.table,'credentials':self.credentials}
self._chunks = 1 if 'chunks' not in _args else int(_args['chunks'])
self._location = 'US' if 'location' not in _args else _args['location']
def write(self,_info,**_args) :
try:
if self.parallel or 'lock' in _args :
BQWriter.lock.acquire()
_args['table'] = self.table if 'table' not in _args else _args['table']
self._write(_info,**_args)
finally:
if self.parallel:
BQWriter.lock.release()
def submit(self,_sql):
"""
Write the output of a massive query to a given table, biquery will handle this as a job
This function will return the job identifier
"""
_config = bq.QueryJobConfig()
_config.destination = self.client.dataset(self.dataset).table(self.table)
_config.allow_large_results = True
# _config.write_disposition = bq.bq_consts.WRITE_APPEND
_config.dry_run = False
# _config.priority = 'BATCH'
_resp = self.client.query(_sql,location=self._location,job_config=_config)
return _resp.job_id
def status (self,_id):
return self.client.get_job(_id,location=self._location)
def _write(self,_info,**_args) :
_df = None
if type(_info) in [list,pd.DataFrame] :
if type(_info) == list :
_df = pd.DataFrame(_info)
elif type(_info) == pd.DataFrame :
_df = _info
if '.' not in _args['table'] :
self.mode['destination_table'] = '.'.join([self.dataset,_args['table']])
else:
self.mode['destination_table'] = _args['table'].strip()
if 'schema' in _args :
self.mode['table_schema'] = _args['schema']
#
# Let us insure that the types are somewhat compatible ...
# _map = {'INTEGER':np.int64,'DATETIME':'datetime64[ns]','TIMESTAMP':'datetime64[ns]','FLOAT':np.float64,'DOUBLE':np.float64,'STRING':str}
# _mode = copy.deepcopy(self.mode)
_mode = self.mode
# _df.to_gbq(**self.mode) #if_exists='append',destination_table=partial,credentials=credentials,chunksize=90000)
#
# Let us adjust the chunking here
self._chunks = 10 if _df.shape[0] > SQLRW.MAX_CHUNK and self._chunks == 1 else self._chunks
_indexes = np.array_split(np.arange(_df.shape[0]),self._chunks)
for i in _indexes :
_df.iloc[i].to_gbq(**self.mode)
time.sleep(1)
pass
#
# Aliasing the big query classes allowing it to be backward compatible
#
BigQueryReader = BQReader
BigQueryWriter = BQWriter

18
transport/sql/__init__.py Normal file
View File

@ -0,0 +1,18 @@
"""
This namespace/package wrap the sql functionalities for a certain data-stores
- netezza, postgresql, mysql and sqlite
- mariadb, redshift (also included)
"""
from . import postgresql, mysql, netezza, sqlite
#
# Creating aliases for support of additional data-store providerss
#
mariadb = mysql
redshift = postgresql
sqlite3 = sqlite
# from transport import sql

125
transport/sql/common.py Normal file
View File

@ -0,0 +1,125 @@
"""
This file encapsulates common operations associated with SQL databases via SQLAlchemy
"""
import sqlalchemy as sqa
import pandas as pd
class Base:
def __init__(self,**_args):
self._host = _args['host'] if 'host' in _args else 'localhost'
self._port = None
self._database = _args['database']
self._table = _args['table'] if 'table' in _args else None
self._engine= sqa.create_engine(self._get_uri(**_args),future=True)
def _set_uri(self,**_args) :
"""
:provider provider
:host host and port
:account account user/pwd
"""
_account = _args['account'] if 'account' in _args else None
_host = _args['host']
_provider = _args['provider'].replace(':','').replace('/','').strip()
def _get_uri(self,**_args):
"""
This function will return the formatted uri for the sqlAlchemy engine
"""
raise Exception ("Function Needs to be implemented ")
def meta (self,**_args):
"""
This function returns the schema (table definition) of a given table
:table optional name of the table (can be fully qualified)
"""
_table = self._table if 'table' not in _args else _args['table']
_schema = []
if _table :
if sqa.__version__.startswith('1.') :
_handler = sqa.MetaData(bind=self._engine)
_handler.reflect()
else:
#
# sqlalchemy's version 2.+
_handler = sqa.MetaData()
_handler.reflect(bind=self._engine)
#
# Let us extract the schema with the native types
_map = {'BIGINT':'INTEGER','TEXT':'STRING','DOUBLE_PRECISION':'FLOAT','NUMERIC':'FLOAT','DECIMAL':'FLOAT','REAL':'FLOAT'}
_schema = [{"name":_attr.name,"type":_map.get(str(_attr.type),str(_attr.type))} for _attr in _handler.tables[_table].columns]
return _schema
def has(self,**_args):
return self.meta(**_args)
def apply(self,sql):
"""
Executing sql statement that returns query results (hence the restriction on sql and/or with)
:sql SQL query to be exectued
@TODO: Execution of stored procedures
"""
return pd.read_sql(sql,self._engine) if sql.lower().startswith('select') or sql.lower().startswith('with') else None
class SQLBase(Base):
def __init__(self,**_args):
super().__init__(**_args)
def get_provider(self):
raise Exception ("Provider Needs to be set ...")
def get_default_port(self) :
raise Exception ("default port needs to be set")
def _get_uri(self,**_args):
_host = self._host
_account = ''
if self._port :
_port = self._port
else:
_port = self.get_default_port()
_host = f'{_host}:{_port}'
if 'username' in _args :
_account = ''.join([_args['username'],':',_args['password'],'@'])
_database = self._database
_provider = self.get_provider().replace(':','').replace('/','')
# _uri = [f'{_provider}:/',_account,_host,_database]
# _uri = [_item.strip() for _item in _uri if _item.strip()]
# return '/'.join(_uri)
return f'{_provider}://{_host}/{_database}' if _account == '' else f'{_provider}://{_account}{_host}/{_database}'
class BaseReader(SQLBase):
def __init__(self,**_args):
super().__init__(**_args)
def read(self,**_args):
"""
This function will read a query or table from the specific database
"""
if 'sql' in _args :
sql = _args['sql']
else:
_table = _args['table'] if 'table' in _args else self._table
sql = f'SELECT * FROM {_table}'
return self.apply(sql)
class BaseWriter (SQLBase):
"""
This class implements SQLAlchemy support for Writting to a data-store (RDBMS)
"""
def __init__(self,**_args):
super().__init__(**_args)
def write(self,_data,**_args):
if type(_data) == dict :
_df = pd.DataFrame(_data)
elif type(_data) == list :
_df = pd.DataFrame(_data)
else:
_df = _data.copy()
#
# We are assuming we have a data-frame at this point
#
_table = _args['table'] if 'table' in _args else self._table
_mode = {'chunksize':2000000,'if_exists':'append','index':False}
if 'schema' in _args :
_mode['schema'] = _args['schema']
if 'if_exists' in _args :
_mode['if_exists'] = _args['if_exists']
_df.to_sql(_table,self._engine,**_args,index=False)

18
transport/sql/mysql.py Normal file
View File

@ -0,0 +1,18 @@
"""
This file implements support for mysql and maria db (with drivers mysql+mysql)
"""
from transport.sql.common import BaseReader, BaseWriter
# import mysql.connector as my
class MYSQL:
def get_provider(self):
return "mysql+mysqlconnector"
def get_default_port(self):
return "3306"
class Reader(MYSQL,BaseReader) :
def __init__(self,**_args):
super().__init__(**_args)
class Writer(MYSQL,BaseWriter) :
def __init__(self,**_args):
super().__init__(**_args)

15
transport/sql/netezza.py Normal file
View File

@ -0,0 +1,15 @@
import nzpy as nz
from transport.sql.common import BaseReader, BaseWriter
class Netezza:
def get_provider(self):
return 'netezza+nzpy'
def get_default_port(self):
return '5480'
class Reader(Netezza,BaseReader) :
def __init__(self,**_args):
super().__init__(**_args)
class Writer(Netezza,BaseWriter):
def __init__(self,**_args):
super().__init__(**_args)

View File

@ -0,0 +1,22 @@
from transport.sql.common import BaseReader , BaseWriter
from psycopg2.extensions import register_adapter, AsIs
import numpy as np
register_adapter(np.int64, AsIs)
class PG:
def __init__(self,**_args):
super().__init__(**_args)
def get_provider(self):
return "postgresql"
def get_default_port(self):
return "5432"
class Reader(PG,BaseReader) :
def __init__(self,**_args):
super().__init__(**_args)
class Writer(PG,BaseWriter):
def __init__(self,**_args):
super().__init__(**_args)

25
transport/sql/sqlite.py Normal file
View File

@ -0,0 +1,25 @@
import sqlalchemy
import pandas as pd
from transport.sql.common import Base, BaseReader, BaseWriter
class SQLite (BaseReader):
def __init__(self,**_args):
super().__init__(**_args)
if 'path' in _args :
self._database = _args['path']
if 'database' in _args :
self._database = _args['database']
def _get_uri(self,**_args):
path = self._database
return f'sqlite:///{path}' # ensure this is the correct path for the sqlite file.
class Reader(SQLite,BaseReader):
def __init__(self,**_args):
super().__init__(**_args)
# def read(self,**_args):
# sql = _args['sql']
# return pd.read_sql(sql,self._engine)
class Writer (SQLite,BaseWriter):
def __init__(self,**_args):
super().__init__(**_args)