bug fix: added chunks and enhacement with global variables (constants)

This commit is contained in:
Steve Nyemba 2022-08-10 09:17:30 -05:00
parent 245b319e7b
commit e5f3b19336
6 changed files with 50 additions and 10 deletions

View File

@ -8,12 +8,12 @@ def read(fname):
return open(os.path.join(os.path.dirname(__file__), fname)).read() return open(os.path.join(os.path.dirname(__file__), fname)).read()
args = { args = {
"name":"data-transport", "name":"data-transport",
"version":"1.5.8", "version":"1.6.0",
"author":"The Phi Technology LLC","author_email":"info@the-phi.com", "author":"The Phi Technology LLC","author_email":"info@the-phi.com",
"license":"MIT", "license":"MIT",
"packages":["transport"]} "packages":["transport"]}
args["keywords"]=['mongodb','couchdb','rabbitmq','file','read','write','s3','sqlite'] args["keywords"]=['mongodb','couchdb','rabbitmq','file','read','write','s3','sqlite']
args["install_requires"] = ['pymongo','sqlalchemy','pandas','pandas-gbq','numpy','cloudant','pika','nzpy','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python'] args["install_requires"] = ['nujson','pymongo','sqlalchemy','pandas','pandas-gbq','numpy','cloudant','pika','nzpy','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python']
args["url"] = "https://healthcareio.the-phi.com/git/code/transport.git" args["url"] = "https://healthcareio.the-phi.com/git/code/transport.git"
args['scripts'] = ['bin/transport'] args['scripts'] = ['bin/transport']
if sys.version_info[0] == 2 : if sys.version_info[0] == 2 :

View File

@ -52,8 +52,38 @@ from google.cloud import bigquery as bq
import nzpy as nz #--- netezza drivers import nzpy as nz #--- netezza drivers
import os import os
class providers :
POSTGRESQL = 'postgresql'
MONGODB = 'mongodb'
BIGQUERY ='bigquery'
FILE = 'file'
ETL = 'etl'
SQLITE = 'sqlite'
REDSHIFT = 'redshift'
NETEZZA = 'netezza'
MYSQL = 'mysql'
RABBITMQ = 'rabbitmq'
MARIADB = 'mariadb'
COUCHDB = 'couch'
CONSOLE = 'console'
ETL = 'etl'
#
# synonyms of the above
BQ = BIGQUERY
MONGO = MONGODB
PG = POSTGRESQL
PSQL = POSTGRESQL
class IEncoder (json.JSONEncoder):
def default (self,object):
if type(object) == np.integer :
return int(object)
elif type(object) == np.floating:
return float(object)
elif type(object) == np.ndarray :
return object.tolist()
else:
return super(IEncoder,self).default(object)
class factory : class factory :
TYPE = {"sql":{"providers":["postgresql","mysql","neteeza","bigquery","mariadb","redshift"]}} TYPE = {"sql":{"providers":["postgresql","mysql","neteeza","bigquery","mariadb","redshift"]}}
PROVIDERS = { PROVIDERS = {
@ -149,9 +179,10 @@ def instance(**_args):
# #
# Let us try to establish an sqlalchemy wrapper # Let us try to establish an sqlalchemy wrapper
try: try:
account = ''
host = '' host = ''
if provider not in ['bigquery','mongodb','mongo','couchdb','sqlite','console','etl','file','rabbitmq'] : if provider not in [providers.BIGQUERY,providers.MONGODB, providers.COUCHDB, providers.SQLITE, providers.CONSOLE,providers.ETL, providers.FILE, providers.RABBITMQ] :
# if provider not in ['bigquery','mongodb','mongo','couchdb','sqlite','console','etl','file','rabbitmq'] :
# #
# In these cases we are assuming RDBMS and thus would exclude NoSQL and BigQuery # In these cases we are assuming RDBMS and thus would exclude NoSQL and BigQuery
username = args['username'] if 'username' in args else '' username = args['username'] if 'username' in args else ''
@ -165,11 +196,13 @@ def instance(**_args):
host = host+":"+str(args['port']) host = host+":"+str(args['port'])
database = args['database'] database = args['database']
elif provider == 'sqlite': elif provider in [providers.SQLITE,providers.FILE]:
account = '' account = ''
host = '' host = ''
database = args['path'] if 'path' in args else args['database'] database = args['path'] if 'path' in args else args['database']
if provider not in ['mongodb','mongo','couchdb','bigquery','console','etl','file','rabbitmq'] :
if provider not in [providers.MONGODB, providers.COUCHDB, providers.BIGQUERY, providers.CONSOLE, providers.ETL,providers.FILE,providers.RABBITMQ] :
# if provider not in ['mongodb','mongo','couchdb','bigquery','console','etl','file','rabbitmq'] :
uri = ''.join([provider,"://",account,host,'/',database]) uri = ''.join([provider,"://",account,host,'/',database])
e = sqlalchemy.create_engine (uri,future=True) e = sqlalchemy.create_engine (uri,future=True)
@ -178,6 +211,7 @@ def instance(**_args):
# #
# @TODO: Include handling of bigquery with SQLAlchemy # @TODO: Include handling of bigquery with SQLAlchemy
except Exception as e: except Exception as e:
print (_args)
print (e) print (e)
return pointer(**args) return pointer(**args)

View File

@ -24,6 +24,8 @@ import importlib
from multiprocessing import RLock from multiprocessing import RLock
# import couch # import couch
# import mongo # import mongo
class IO: class IO:
def init(self,**args): def init(self,**args):
""" """

View File

@ -1,10 +1,12 @@
import os import os
import sys import sys
if sys.version_info[0] > 2 : if sys.version_info[0] > 2 :
from transport.common import Reader, Writer #, factory from transport.common import Reader, Writer #, factory
else: else:
from common import Reader,Writer from common import Reader,Writer
import json import nujson as json
# from threading import Lock # from threading import Lock
import sqlite3 import sqlite3
import pandas as pd import pandas as pd

View File

@ -7,7 +7,7 @@ This file is a wrapper around mongodb for reading/writing content against a mong
from pymongo import MongoClient from pymongo import MongoClient
from bson.objectid import ObjectId from bson.objectid import ObjectId
from bson.binary import Binary from bson.binary import Binary
import json import nujson as json
from datetime import datetime from datetime import datetime
import pandas as pd import pandas as pd

View File

@ -357,7 +357,9 @@ class BigQuery:
table = _args['table'] table = _args['table']
try: try:
ref = self.client.dataset(self.dataset).table(table) ref = self.client.dataset(self.dataset).table(table)
return self.client.get_table(ref).schema _schema = self.client.get_table(ref).schema
return [{"name":_item.name,"type":_item.field_type,"description":( "" if not hasattr(_item,"description") else _item.description )} for _item in _schema]
except Exception as e: except Exception as e:
return [] return []
def has(self,**_args): def has(self,**_args):