bug fix: simplyfying factory interface

This commit is contained in:
Steve Nyemba 2021-11-18 15:21:26 -06:00
parent 44ef71eb92
commit 2de6e51bdb
6 changed files with 106 additions and 155 deletions

View File

@ -5,9 +5,19 @@ steve@the-phi.com, The Phi Technology LLC
https://dev.the-phi.com/git/steve/data-transport.git
This program performs ETL between 9 supported data sources : Couchdb, Mongodb, Mysql, Mariadb, PostgreSQL, Netezza,Redshift, Sqlite, File
LICENSE (MIT)
Copyright 2016-2020, The Phi Technology LLC
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
Usage :
transport --config <path-to-file.json> --procs <number-procs>
@TODO: Create tables if they don't exist for relational databases
"""
import pandas as pd
import numpy as np

View File

@ -8,7 +8,7 @@ def read(fname):
return open(os.path.join(os.path.dirname(__file__), fname)).read()
args = {
"name":"data-transport",
"version":"1.3.9.6",
"version":"1.4.0",
"author":"The Phi Technology LLC","author_email":"info@the-phi.com",
"license":"MIT",
"packages":["transport"]}

View File

@ -65,9 +65,36 @@ else:
import mongo
import s3
import sql
import psycopg2 as pg
import mysql.connector as my
from google.cloud import bigquery as bq
import nzpy as nz #--- netezza drivers
import os
RDBMS = {
"postgresql":{"port":"5432","driver":pg},
"redshift":{"port":"5432","driver":pg},
"netezza":{"port":"5480","driver":nz},
"mysql":{"port":"3306","driver":my},
"mariadb":{"port":"3306","driver":my},
"mongodb":{"port":"27017","class":{"read"}},
"couchdb":{"port":"5984"}
}
class factory :
TYPE = {"sql":{"providers":["postgresql","mysql","neteeza","bigquery","mariadb","redshift"]}}
PROVIDERS = {
"file":{"class":{"read":disk.DiskReader,"write":disk.DiskWriter}},
"sqlite":{"class":{"read":disk.SQLiteReader,"write":disk.SQLiteWriter}},
"postgresql":{"port":5432,"host":"localhost","database":os.environ['USER'],"driver":pg,"default":{"type":"VARCHAR"}},
"redshift":{"port":5432,"host":"localhost","database":os.environ['USER'],"driver":pg,"default":{"type":"VARCHAR"}},
"bigquery":{"class":{"read":sql.BQReader,"write":sql.BQWriter}},
"mysql":{"port":3306,"host":"localhost","default":{"type":"VARCHAR(256)"}},
"mariadb":{"port":3306,"host":"localhost","default":{"type":"VARCHAR(256)"}},
"mongo":{"port":27017,"host":"localhost","class":{"read":mongo.MongoReader,"write":mongo.MongoWriter}},
"couch":{"port":5984,"host":"localhost","class":{"read":couch.CouchReader,"write":couch.CouchWriter}},
"netezza":{"port":5480,"driver":nz,"default":{"type":"VARCHAR(256)"}}}
@staticmethod
def instance(**args):
"""
@ -99,131 +126,30 @@ class factory :
return anObject
import time
def instance(provider,context,**_args):
"""
@param provider {file,sqlite,postgresql,redshift,bigquery,netezza,mongo,couch ...}
@param context read|write|rw
@param _args argument to got with the datastore (username,password,host,port ...)
"""
_id = context if context in ['read','write'] else None
if _id :
args = {'provider':_id}
for key in factory.PROVIDERS[provider] :
if key == 'class' :
continue
value = factory.PROVIDERS[provider][key]
args[key] = value
#
#
args = dict(args,**_args)
# class Reader:
# def __init__(self):
# self.nrows = 0
# self.xchar = None
# print (provider in factory.PROVIDERS)
if 'class' in factory.PROVIDERS[provider]:
pointer = factory.PROVIDERS[provider]['class'][_id]
else:
pointer = sql.SQLReader if _id == 'read' else sql.SQLWriter
return pointer(**args)
# def row_count(self):
# content = self.read()
# return np.sum([1 for row in content])
# def delimiter(self,sample):
# """
# This function determines the most common delimiter from a subset of possible delimiters.
# It uses a statistical approach (distribution) to guage the distribution of columns for a given delimiter
# :sample sample string/content expecting matrix i.e list of rows
# """
# m = {',':[],'\t':[],'|':[],'\x3A':[]}
# delim = m.keys()
# for row in sample:
# for xchar in delim:
# if row.split(xchar) > 1:
# m[xchar].append(len(row.split(xchar)))
# else:
# m[xchar].append(0)
# #
# # The delimiter with the smallest variance, provided the mean is greater than 1
# # This would be troublesome if there many broken records sampled
# #
# m = {id: np.var(m[id]) for id in m.keys() if m[id] != [] and int(np.mean(m[id]))>1}
# index = m.values().index( min(m.values()))
# xchar = m.keys()[index]
# return xchar
# def col_count(self,sample):
# """
# This function retirms the number of columns of a given sample
# @pre self.xchar is not None
# """
# m = {}
# i = 0
# for row in sample:
# row = self.format(row)
# id = str(len(row))
# #id = str(len(row.split(self.xchar)))
# if id not in m:
# m[id] = 0
# m[id] = m[id] + 1
# index = m.values().index( max(m.values()) )
# ncols = int(m.keys()[index])
# return ncols;
# def format (self,row):
# """
# This function will clean records of a given row by removing non-ascii characters
# @pre self.xchar is not None
# """
# if isinstance(row,list) == False:
# #
# # We've observed sometimes fields contain delimiter as a legitimate character, we need to be able to account for this and not tamper with the field values (unless necessary)
# cols = self.split(row)
# #cols = row.split(self.xchar)
# else:
# cols = row ;
# return [ re.sub('[^\x00-\x7F,\n,\r,\v,\b,]',' ',col.strip()).strip().replace('"','') for col in cols]
# def split (self,row):
# """
# This function performs a split of a record and tries to attempt to preserve the integrity of the data within i.e accounting for the double quotes.
# @pre : self.xchar is not None
# """
# pattern = "".join(["(?:^|",self.xchar,")(\"(?:[^\"]+|\"\")*\"|[^",self.xchar,"]*)"])
# return re.findall(pattern,row.replace('\n',''))
# class Writer:
# def format(self,row,xchar):
# if xchar is not None and isinstance(row,list):
# return xchar.join(row)+'\n'
# elif xchar is None and isinstance(row,dict):
# row = json.dumps(row)
# return row
# """
# It is important to be able to archive data so as to insure that growth is controlled
# Nothing in nature grows indefinitely neither should data being handled.
# """
# def archive(self):
# pass
# def flush(self):
# pass
# class factory :
# @staticmethod
# def instance(**args):
# source = args['type']
# params = args['args']
# anObject = None
# if source in ['HttpRequestReader','HttpSessionWriter']:
# #
# # @TODO: Make sure objects are serializable, be smart about them !!
# #
# aClassName = ''.join([source,'(**params)'])
# else:
# stream = json.dumps(params)
# aClassName = ''.join([source,'(**',stream,')'])
# try:
# anObject = eval( aClassName)
# #setattr(anObject,'name',source)
# except Exception,e:
# print ['Error ',e]
# return anObject
return None

View File

@ -22,10 +22,12 @@ class DiskReader(Reader) :
Reader.__init__(self)
self.path = params['path'] ;
self.delimiter = params['delimiter'] if 'delimiter' in params else None
self.delimiter = params['delimiter'] if 'delimiter' in params else ','
def isready(self):
return os.path.exists(self.path)
def read(self,**args):
_path = self.path if 'path' not in args else args['path']
_delimiter = self.delimiter if 'delimiter' not in args else args['delimiter']
return pd.read_csv(self.path,delimiter=self.delimiter)
def stream(self,**args):
"""
@ -121,6 +123,10 @@ class SQLiteReader (DiskReader):
elif 'filter' in args :
sql = "SELECT :fields FROM ",self.table, "WHERE (:filter)".replace(":filter",args['filter'])
sql = sql.replace(":fields",args['fields']) if 'fields' in args else sql.replace(":fields","*")
else:
sql = ' '.join(['SELECT * FROM ',self.table])
if 'limit' in args :
sql = sql + " LIMIT "+args['limit']
return pd.read_sql(sql,self.conn)
def close(self):
try:

View File

@ -84,7 +84,7 @@ class MongoReader(Mongo,Reader):
out = self.db.command({"getMore":out['cursor']['id'],"collection":out['cursor']['ns'].split(".")[-1]})
return r
return pd.DataFrame(r)
else:
collection = self.db[self.uid]
_filter = args['filter'] if 'filter' in args else {}

View File

@ -24,10 +24,11 @@ import pandas as pd
import numpy as np
import nzpy as nz #--- netezza drivers
import copy
import os
class SQLRW :
PROVIDERS = {"postgresql":"5432","redshift":"5432","mysql":"3306","mariadb":"3306","netezza":5480}
DRIVERS = {"postgresql":pg,"redshift":pg,"mysql":my,"mariadb":my,"netezza":nz}
REFERENCE = {
"netezza":{"port":5480,"handler":nz,"dtype":"VARCHAR(512)"},
@ -41,13 +42,19 @@ class SQLRW :
_info = {}
_info['dbname'] = _args['db'] if 'db' in _args else _args['database']
self.table = _args['table']
self.table = _args['table'] if 'table' in _args else None
self.fields = _args['fields'] if 'fields' in _args else []
_provider = _args['provider']
if 'host' in _args :
_info['host'] = 'localhost' if 'host' not in _args else _args['host']
# _info['port'] = SQLWriter.PROVIDERS[_args['provider']] if 'port' not in _args else _args['port']
_info['port'] = SQLWriter.REFERENCE[_provider]['port'] if 'port' not in _args else _args['port']
# _provider = _args['provider']
# _info['host'] = 'localhost' if 'host' not in _args else _args['host']
# _info['port'] = SQLWriter.REFERENCE[_provider]['port'] if 'port' not in _args else _args['port']
_info['host'] = _args['host']
_info['port'] = _args['port']
# if 'host' in _args :
# _info['host'] = 'localhost' if 'host' not in _args else _args['host']
# # _info['port'] = SQLWriter.PROVIDERS[_args['provider']] if 'port' not in _args else _args['port']
# _info['port'] = SQLWriter.REFERENCE[_provider]['port'] if 'port' not in _args else _args['port']
if 'username' in _args or 'user' in _args:
key = 'username' if 'username' in _args else 'user'
@ -55,10 +62,14 @@ class SQLRW :
_info['password'] = _args['password']
#
# We need to load the drivers here to see what we are dealing with ...
# _handler = SQLWriter.DRIVERS[_args['provider']]
_handler = SQLWriter.REFERENCE[_provider]['handler']
self._dtype = SQLWriter.REFERENCE[_provider]['dtype'] if 'dtype' not in _args else _args['dtype']
self._provider = _provider
# _handler = SQLWriter.REFERENCE[_provider]['handler']
_handler = _args['driver'] #-- handler to the driver
self._dtype = _args['default']['type'] if 'default' in _args and 'type' in _args['default'] else 'VARCHAR(256)'
self._provider = _args['provider']
# self._dtype = SQLWriter.REFERENCE[_provider]['dtype'] if 'dtype' not in _args else _args['dtype']
# self._provider = _provider
if _handler == nz :
_info['database'] = _info['dbname']
_info['securityLevel'] = 0
@ -228,24 +239,13 @@ class BigQuery:
self.dataset = _args['dataset'] if 'dataset' in _args else None
self.path = path
self.dtypes = _args['dtypes'] if 'dtypes' in _args else None
self.table = _args['table'] if 'table' in _args else None
def meta(self,**_args):
"""
This function returns meta data for a given table or query with dataset/table properly formatted
:param table name of the name WITHOUT including dataset
:param sql sql query to be pulled,
"""
#if 'table' in _args :
# sql = "SELECT * from :dataset."+ _args['table']" limit 1"
#else:
# sql = _args['sql']
# if 'limit' not in sql.lower() :
# sql = sql + ' limit 1'
#sql = sql.replace(':dataset',self.dataset) if ':dataset' in args else sql
#
# Let us return the schema information now for a given table
#
table = _args['table']
client = bq.Client.from_service_account_json(self.path)
ref = client.dataset(self.dataset).table(table)
@ -258,12 +258,15 @@ class BQReader(BigQuery,Reader) :
pass
def read(self,**_args):
SQL = None
table = self.table if 'table' not in _args else _args['table']
if 'sql' in _args :
SQL = _args['sql']
elif 'table' in _args:
elif table:
table = "".join(["`",_args['table'],"`"])
table = "".join(["`",table,"`"]) if '.' in table else "".join(["`:dataset.",table,"`"])
SQL = "SELECT * FROM :table ".replace(":table",table)
if not SQL :
return None
if SQL and 'limit' in _args:
SQL += " LIMIT "+str(_args['limit'])
if (':dataset' in SQL or ':DATASET' in SQL) and self.dataset:
@ -271,6 +274,7 @@ class BQReader(BigQuery,Reader) :
_info = {'credentials':self.credentials,'dialect':'standard'}
return pd.read_gbq(SQL,**_info) if SQL else None
# return pd.read_gbq(SQL,credentials=self.credentials,dialect='standard') if SQL else None
class BQWriter(BigQuery,Writer):
lock = Lock()
def __init__(self,**_args):
@ -308,3 +312,8 @@ class BQWriter(BigQuery,Writer):
_df.to_gbq(**self.mode) #if_exists='append',destination_table=partial,credentials=credentials,chunksize=90000)
pass
#
# Aliasing the big query classes allowing it to be backward compatible
#
BigQueryReader = BQReader
BigQueryWriter = BQWriter