From e5f3b1933657d3616cd256a95f6e774d1e005f05 Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Wed, 10 Aug 2022 09:17:30 -0500 Subject: [PATCH] bug fix: added chunks and enhacement with global variables (constants) --- setup.py | 4 ++-- transport/__init__.py | 44 ++++++++++++++++++++++++++++++++++++++----- transport/common.py | 2 ++ transport/disk.py | 4 +++- transport/mongo.py | 2 +- transport/sql.py | 4 +++- 6 files changed, 50 insertions(+), 10 deletions(-) diff --git a/setup.py b/setup.py index aecb441..2e6991e 100644 --- a/setup.py +++ b/setup.py @@ -8,12 +8,12 @@ def read(fname): return open(os.path.join(os.path.dirname(__file__), fname)).read() args = { "name":"data-transport", - "version":"1.5.8", + "version":"1.6.0", "author":"The Phi Technology LLC","author_email":"info@the-phi.com", "license":"MIT", "packages":["transport"]} args["keywords"]=['mongodb','couchdb','rabbitmq','file','read','write','s3','sqlite'] -args["install_requires"] = ['pymongo','sqlalchemy','pandas','pandas-gbq','numpy','cloudant','pika','nzpy','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python'] +args["install_requires"] = ['nujson','pymongo','sqlalchemy','pandas','pandas-gbq','numpy','cloudant','pika','nzpy','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python'] args["url"] = "https://healthcareio.the-phi.com/git/code/transport.git" args['scripts'] = ['bin/transport'] if sys.version_info[0] == 2 : diff --git a/transport/__init__.py b/transport/__init__.py index 5915b21..b4e80fb 100644 --- a/transport/__init__.py +++ b/transport/__init__.py @@ -52,8 +52,38 @@ from google.cloud import bigquery as bq import nzpy as nz #--- netezza drivers import os +class providers : + POSTGRESQL = 'postgresql' + MONGODB = 'mongodb' + BIGQUERY ='bigquery' + FILE = 'file' + ETL = 'etl' + SQLITE = 'sqlite' + REDSHIFT = 'redshift' + NETEZZA = 'netezza' + MYSQL = 'mysql' + RABBITMQ = 'rabbitmq' + MARIADB = 'mariadb' + COUCHDB = 'couch' + CONSOLE = 'console' + ETL = 'etl' + # + # synonyms of the above + BQ = BIGQUERY + MONGO = MONGODB + PG = POSTGRESQL + PSQL = POSTGRESQL - +class IEncoder (json.JSONEncoder): + def default (self,object): + if type(object) == np.integer : + return int(object) + elif type(object) == np.floating: + return float(object) + elif type(object) == np.ndarray : + return object.tolist() + else: + return super(IEncoder,self).default(object) class factory : TYPE = {"sql":{"providers":["postgresql","mysql","neteeza","bigquery","mariadb","redshift"]}} PROVIDERS = { @@ -149,9 +179,10 @@ def instance(**_args): # # Let us try to establish an sqlalchemy wrapper try: - + account = '' host = '' - if provider not in ['bigquery','mongodb','mongo','couchdb','sqlite','console','etl','file','rabbitmq'] : + if provider not in [providers.BIGQUERY,providers.MONGODB, providers.COUCHDB, providers.SQLITE, providers.CONSOLE,providers.ETL, providers.FILE, providers.RABBITMQ] : + # if provider not in ['bigquery','mongodb','mongo','couchdb','sqlite','console','etl','file','rabbitmq'] : # # In these cases we are assuming RDBMS and thus would exclude NoSQL and BigQuery username = args['username'] if 'username' in args else '' @@ -165,11 +196,13 @@ def instance(**_args): host = host+":"+str(args['port']) database = args['database'] - elif provider == 'sqlite': + elif provider in [providers.SQLITE,providers.FILE]: account = '' host = '' database = args['path'] if 'path' in args else args['database'] - if provider not in ['mongodb','mongo','couchdb','bigquery','console','etl','file','rabbitmq'] : + + if provider not in [providers.MONGODB, providers.COUCHDB, providers.BIGQUERY, providers.CONSOLE, providers.ETL,providers.FILE,providers.RABBITMQ] : + # if provider not in ['mongodb','mongo','couchdb','bigquery','console','etl','file','rabbitmq'] : uri = ''.join([provider,"://",account,host,'/',database]) e = sqlalchemy.create_engine (uri,future=True) @@ -178,6 +211,7 @@ def instance(**_args): # # @TODO: Include handling of bigquery with SQLAlchemy except Exception as e: + print (_args) print (e) return pointer(**args) diff --git a/transport/common.py b/transport/common.py index d9a3fab..2ed7cd2 100644 --- a/transport/common.py +++ b/transport/common.py @@ -24,6 +24,8 @@ import importlib from multiprocessing import RLock # import couch # import mongo + + class IO: def init(self,**args): """ diff --git a/transport/disk.py b/transport/disk.py index dad2f33..0eb42fe 100644 --- a/transport/disk.py +++ b/transport/disk.py @@ -1,10 +1,12 @@ import os import sys + + if sys.version_info[0] > 2 : from transport.common import Reader, Writer #, factory else: from common import Reader,Writer -import json +import nujson as json # from threading import Lock import sqlite3 import pandas as pd diff --git a/transport/mongo.py b/transport/mongo.py index 50f463f..e212047 100644 --- a/transport/mongo.py +++ b/transport/mongo.py @@ -7,7 +7,7 @@ This file is a wrapper around mongodb for reading/writing content against a mong from pymongo import MongoClient from bson.objectid import ObjectId from bson.binary import Binary -import json +import nujson as json from datetime import datetime import pandas as pd diff --git a/transport/sql.py b/transport/sql.py index 08f3648..2f4d96f 100644 --- a/transport/sql.py +++ b/transport/sql.py @@ -357,7 +357,9 @@ class BigQuery: table = _args['table'] try: ref = self.client.dataset(self.dataset).table(table) - return self.client.get_table(ref).schema + _schema = self.client.get_table(ref).schema + return [{"name":_item.name,"type":_item.field_type,"description":( "" if not hasattr(_item,"description") else _item.description )} for _item in _schema] + except Exception as e: return [] def has(self,**_args):