commit
ac5d64242d
184
bin/transport
184
bin/transport
|
@ -14,19 +14,27 @@ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLI
|
||||||
|
|
||||||
|
|
||||||
Usage :
|
Usage :
|
||||||
transport --config <path-to-file.json> --procs <number-procs>
|
transport help -- will print this page
|
||||||
@TODO: Create tables if they don't exist for relational databases
|
|
||||||
example of configuration :
|
|
||||||
|
|
||||||
1. Move data from a folder to a data-store
|
transport move <path> [index]
|
||||||
transport [--folder <path> ] --config <config.json> #-- assuming the configuration doesn't have folder
|
<path> path to the configuration file
|
||||||
transport --folder <path> --provider <postgresql|mongo|sqlite> --<database|db> <name> --table|doc <document_name>
|
<index> optional index within the configuration file
|
||||||
In this case the configuration should look like :
|
|
||||||
{folder:..., target:{}}
|
e.g: configuration file (JSON formatted)
|
||||||
2. Move data from one source to another
|
- single source to a single target
|
||||||
transport --config <file.json>
|
|
||||||
{source:{..},target:{..}} or [{source:{..},target:{..}},{source:{..},target:{..}}]
|
{"source":{"provider":"http","url":"https://cdn.wsform.com/wp-content/uploads/2020/06/agreement.csv"}
|
||||||
|
"target":{"provider":"sqlite3","path":"transport-demo.sqlite","table":"agreement"}
|
||||||
|
}
|
||||||
|
|
||||||
|
- single source to multiple targets
|
||||||
|
{
|
||||||
|
"source":{"provider":"http","url":"https://cdn.wsform.com/wp-content/uploads/2020/06/agreement.csv"},
|
||||||
|
"target":[
|
||||||
|
{"provider":"sqlite3","path":"transport-demo.sqlite","table":"agreement},
|
||||||
|
{"provider":"mongodb","db":"transport-demo","collection":"agreement"}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
"""
|
"""
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
|
@ -36,51 +44,123 @@ import sys
|
||||||
import transport
|
import transport
|
||||||
import time
|
import time
|
||||||
from multiprocessing import Process
|
from multiprocessing import Process
|
||||||
SYS_ARGS = {}
|
import typer
|
||||||
if len(sys.argv) > 1:
|
import os
|
||||||
|
import transport
|
||||||
|
from transport import etl
|
||||||
|
from transport import providers
|
||||||
|
|
||||||
|
# SYS_ARGS = {}
|
||||||
|
# if len(sys.argv) > 1:
|
||||||
|
|
||||||
N = len(sys.argv)
|
# N = len(sys.argv)
|
||||||
for i in range(1,N):
|
# for i in range(1,N):
|
||||||
value = None
|
# value = None
|
||||||
if sys.argv[i].startswith('--'):
|
# if sys.argv[i].startswith('--'):
|
||||||
key = sys.argv[i][2:] #.replace('-','')
|
# key = sys.argv[i][2:] #.replace('-','')
|
||||||
SYS_ARGS[key] = 1
|
# SYS_ARGS[key] = 1
|
||||||
if i + 1 < N:
|
# if i + 1 < N:
|
||||||
value = sys.argv[i + 1] = sys.argv[i+1].strip()
|
# value = sys.argv[i + 1] = sys.argv[i+1].strip()
|
||||||
if key and value and not value.startswith('--'):
|
# if key and value and not value.startswith('--'):
|
||||||
SYS_ARGS[key] = value
|
# SYS_ARGS[key] = value
|
||||||
|
|
||||||
|
|
||||||
i += 2
|
# i += 2
|
||||||
|
|
||||||
|
app = typer.Typer()
|
||||||
|
|
||||||
|
# @app.command()
|
||||||
|
def help() :
|
||||||
|
print (__doc__)
|
||||||
|
def wait(jobs):
|
||||||
|
while jobs :
|
||||||
|
jobs = [thread for thread in jobs if thread.is_alive()]
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
@app.command()
|
||||||
|
def move (path,index=None):
|
||||||
|
|
||||||
|
_proxy = lambda _object: _object.write(_object.read())
|
||||||
|
if os.path.exists(path):
|
||||||
|
file = open(path)
|
||||||
|
_config = json.loads (file.read() )
|
||||||
|
file.close()
|
||||||
|
if index :
|
||||||
|
_config = _config[ int(index)]
|
||||||
|
etl.instance(**_config)
|
||||||
|
else:
|
||||||
|
etl.instance(config=_config)
|
||||||
|
|
||||||
|
#
|
||||||
|
# if type(_config) == dict :
|
||||||
|
# _object = transport.etl.instance(**_config)
|
||||||
|
# _proxy(_object)
|
||||||
|
# else:
|
||||||
|
# #
|
||||||
|
# # here we are dealing with a list of objects (long ass etl job)
|
||||||
|
# jobs = []
|
||||||
|
# failed = []
|
||||||
|
# for _args in _config :
|
||||||
|
# if index and _config.index(_args) != index :
|
||||||
|
# continue
|
||||||
|
|
||||||
|
# _object=transport.etl.instance(**_args)
|
||||||
|
# thread = Process(target=_proxy,args=(_object,))
|
||||||
|
# thread.start()
|
||||||
|
# jobs.append(thread())
|
||||||
|
# if _config.index(_args) == 0 :
|
||||||
|
# thread.join()
|
||||||
|
# wait(jobs)
|
||||||
|
@app.command()
|
||||||
|
def version():
|
||||||
|
print (transport.version.__version__)
|
||||||
|
@app.command()
|
||||||
|
def generate (path:str):
|
||||||
|
"""
|
||||||
|
This function will generate a configuration template to give a sense of how to create one
|
||||||
|
"""
|
||||||
|
_config = [
|
||||||
|
{
|
||||||
|
"source":{"provider":"http","url":"https://raw.githubusercontent.com/codeforamerica/ohana-api/master/data/sample-csv/addresses.csv"},
|
||||||
|
"target":
|
||||||
|
[{"provider":"file","path":"addresses.csv","delimiter":"csv"},{"provider":"sqlite","database":"sample.db3","table":"addresses"}]
|
||||||
|
}
|
||||||
|
]
|
||||||
|
file = open(path,'w')
|
||||||
|
file.write(json.dumps(_config))
|
||||||
|
file.close()
|
||||||
|
@app.command()
|
||||||
|
def usage():
|
||||||
|
print (__doc__)
|
||||||
if __name__ == '__main__' :
|
if __name__ == '__main__' :
|
||||||
#
|
app()
|
||||||
# Load information from the file ...
|
# #
|
||||||
if 'help' in SYS_ARGS :
|
# # Load information from the file ...
|
||||||
print (__doc__)
|
# if 'help' in SYS_ARGS :
|
||||||
else:
|
# print (__doc__)
|
||||||
try:
|
# else:
|
||||||
_info = json.loads(open(SYS_ARGS['config']).read())
|
# try:
|
||||||
if 'index' in SYS_ARGS :
|
# _info = json.loads(open(SYS_ARGS['config']).read())
|
||||||
_index = int(SYS_ARGS['index'])
|
# if 'index' in SYS_ARGS :
|
||||||
_info = [_item for _item in _info if _info.index(_item) == _index]
|
# _index = int(SYS_ARGS['index'])
|
||||||
pass
|
# _info = [_item for _item in _info if _info.index(_item) == _index]
|
||||||
elif 'id' in SYS_ARGS :
|
# pass
|
||||||
_info = [_item for _item in _info if 'id' in _item and _item['id'] == SYS_ARGS['id']]
|
# elif 'id' in SYS_ARGS :
|
||||||
|
# _info = [_item for _item in _info if 'id' in _item and _item['id'] == SYS_ARGS['id']]
|
||||||
|
|
||||||
procs = 1 if 'procs' not in SYS_ARGS else int(SYS_ARGS['procs'])
|
# procs = 1 if 'procs' not in SYS_ARGS else int(SYS_ARGS['procs'])
|
||||||
jobs = transport.factory.instance(provider='etl',info=_info,procs=procs)
|
# jobs = transport.factory.instance(provider='etl',info=_info,procs=procs)
|
||||||
print ([len(jobs),' Jobs are running'])
|
# print ([len(jobs),' Jobs are running'])
|
||||||
N = len(jobs)
|
# N = len(jobs)
|
||||||
while jobs :
|
# while jobs :
|
||||||
x = len(jobs)
|
# x = len(jobs)
|
||||||
jobs = [_job for _job in jobs if _job.is_alive()]
|
# jobs = [_job for _job in jobs if _job.is_alive()]
|
||||||
if x != len(jobs) :
|
# if x != len(jobs) :
|
||||||
print ([len(jobs),'... jobs still running'])
|
# print ([len(jobs),'... jobs still running'])
|
||||||
time.sleep(1)
|
# time.sleep(1)
|
||||||
print ([N,' Finished running'])
|
# print ([N,' Finished running'])
|
||||||
except Exception as e:
|
# except Exception as e:
|
||||||
|
|
||||||
print (e)
|
# print (e)
|
||||||
|
|
||||||
|
|
||||||
|
|
2
setup.py
2
setup.py
|
@ -17,7 +17,7 @@ args = {
|
||||||
"license":"MIT",
|
"license":"MIT",
|
||||||
"packages":["transport"]}
|
"packages":["transport"]}
|
||||||
args["keywords"]=['mongodb','couchdb','rabbitmq','file','read','write','s3','sqlite']
|
args["keywords"]=['mongodb','couchdb','rabbitmq','file','read','write','s3','sqlite']
|
||||||
args["install_requires"] = ['pymongo','sqlalchemy<2.0.0','pandas','pandas-gbq','numpy','cloudant','pika','nzpy','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python']
|
args["install_requires"] = ['pyncclient','pymongo','sqlalchemy<2.0.0','pandas','typer','pandas-gbq','numpy','cloudant','pika','nzpy','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python']
|
||||||
args["url"] = "https://healthcareio.the-phi.com/git/code/transport.git"
|
args["url"] = "https://healthcareio.the-phi.com/git/code/transport.git"
|
||||||
args['scripts'] = ['bin/transport']
|
args['scripts'] = ['bin/transport']
|
||||||
if sys.version_info[0] == 2 :
|
if sys.version_info[0] == 2 :
|
||||||
|
|
|
@ -27,8 +27,9 @@ import json
|
||||||
import importlib
|
import importlib
|
||||||
import sys
|
import sys
|
||||||
import sqlalchemy
|
import sqlalchemy
|
||||||
|
from datetime import datetime
|
||||||
if sys.version_info[0] > 2 :
|
if sys.version_info[0] > 2 :
|
||||||
from transport.common import Reader, Writer,Console #, factory
|
# from transport.common import Reader, Writer,Console #, factory
|
||||||
from transport import disk
|
from transport import disk
|
||||||
|
|
||||||
from transport import s3 as s3
|
from transport import s3 as s3
|
||||||
|
@ -83,21 +84,24 @@ import os
|
||||||
# PGSQL = POSTGRESQL
|
# PGSQL = POSTGRESQL
|
||||||
# import providers
|
# import providers
|
||||||
|
|
||||||
class IEncoder (json.JSONEncoder):
|
# class IEncoder (json.JSONEncoder):
|
||||||
def default (self,object):
|
def IEncoder (self,object):
|
||||||
if type(object) == np.integer :
|
if type(object) == np.integer :
|
||||||
return int(object)
|
return int(object)
|
||||||
elif type(object) == np.floating:
|
elif type(object) == np.floating:
|
||||||
return float(object)
|
return float(object)
|
||||||
elif type(object) == np.ndarray :
|
elif type(object) == np.ndarray :
|
||||||
return object.tolist()
|
return object.tolist()
|
||||||
else:
|
elif type(object) == datetime :
|
||||||
return super(IEncoder,self).default(object)
|
return o.isoformat()
|
||||||
|
else:
|
||||||
|
return super(IEncoder,self).default(object)
|
||||||
|
|
||||||
class factory :
|
class factory :
|
||||||
TYPE = {"sql":{"providers":["postgresql","mysql","neteeza","bigquery","mariadb","redshift"]}}
|
TYPE = {"sql":{"providers":["postgresql","mysql","neteeza","bigquery","mariadb","redshift"]}}
|
||||||
PROVIDERS = {
|
PROVIDERS = {
|
||||||
"etl":{"class":{"read":etl.instance,"write":etl.instance}},
|
"etl":{"class":{"read":etl.instance,"write":etl.instance}},
|
||||||
"console":{"class":{"write":Console,"read":Console}},
|
# "console":{"class":{"write":Console,"read":Console}},
|
||||||
"file":{"class":{"read":disk.DiskReader,"write":disk.DiskWriter}},
|
"file":{"class":{"read":disk.DiskReader,"write":disk.DiskWriter}},
|
||||||
"sqlite":{"class":{"read":disk.SQLiteReader,"write":disk.SQLiteWriter}},
|
"sqlite":{"class":{"read":disk.SQLiteReader,"write":disk.SQLiteWriter}},
|
||||||
"postgresql":{"port":5432,"host":"localhost","database":None,"driver":pg,"default":{"type":"VARCHAR"},"class":{"read":sql.SQLReader,"write":sql.SQLWriter}},
|
"postgresql":{"port":5432,"host":"localhost","database":None,"driver":pg,"default":{"type":"VARCHAR"},"class":{"read":sql.SQLReader,"write":sql.SQLWriter}},
|
||||||
|
@ -124,6 +128,9 @@ class factory :
|
||||||
#
|
#
|
||||||
# Legacy code being returned
|
# Legacy code being returned
|
||||||
return factory._instance(**_args);
|
return factory._instance(**_args);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
else:
|
else:
|
||||||
return instance(**_args)
|
return instance(**_args)
|
||||||
@staticmethod
|
@staticmethod
|
||||||
|
@ -157,27 +164,49 @@ class factory :
|
||||||
return anObject
|
return anObject
|
||||||
|
|
||||||
import time
|
import time
|
||||||
def instance(**_args):
|
def instance(**_pargs):
|
||||||
"""
|
"""
|
||||||
creating an instance given the provider, we should have an idea of :class, :driver
|
creating an instance given the provider, we should have an idea of :class, :driver
|
||||||
:provider
|
:provider
|
||||||
:read|write = {connection to the database}
|
:read|write = {connection to the database}
|
||||||
"""
|
"""
|
||||||
|
#
|
||||||
|
# @TODO: provide authentication file that will hold all the parameters, that will later on be used
|
||||||
|
#
|
||||||
|
_args = dict(_pargs,**{})
|
||||||
|
if 'auth_file' in _args :
|
||||||
|
path = _args['auth_file']
|
||||||
|
file = open(path)
|
||||||
|
_config = json.loads( file.read())
|
||||||
|
_args = dict(_args,**_config)
|
||||||
|
file.close()
|
||||||
|
|
||||||
_provider = _args['provider']
|
_provider = _args['provider']
|
||||||
_group = None
|
_context = list( set(['read','write','listen']) & set(_args.keys()) )
|
||||||
|
if _context :
|
||||||
|
_context = _context[0]
|
||||||
|
else:
|
||||||
|
_context = _args['context'] if 'context' in _args else 'read'
|
||||||
|
# _group = None
|
||||||
|
|
||||||
for _id in providers.CATEGORIES :
|
|
||||||
if _provider in providers.CATEGORIES[_id] :
|
# for _id in providers.CATEGORIES :
|
||||||
_group = _id
|
# if _provider in providers.CATEGORIES[_id] :
|
||||||
break
|
# _group = _id
|
||||||
if _group :
|
# break
|
||||||
_classPointer = _getClassInstance(_group,**_args)
|
# if _group :
|
||||||
|
|
||||||
|
if _provider in providers.PROVIDERS and _context in providers.PROVIDERS[_provider]:
|
||||||
|
|
||||||
|
# _classPointer = _getClassInstance(_group,**_args)
|
||||||
|
_classPointer = providers.PROVIDERS[_provider][_context]
|
||||||
#
|
#
|
||||||
# Let us reformat the arguments
|
# Let us reformat the arguments
|
||||||
if 'read' in _args or 'write' in _args :
|
# if 'read' in _args or 'write' in _args :
|
||||||
_args = _args['read'] if 'read' in _args else _args['write']
|
# _args = _args['read'] if 'read' in _args else _args['write']
|
||||||
_args['provider'] = _provider
|
# _args['provider'] = _provider
|
||||||
if _group == 'sql' :
|
# if _group == 'sql' :
|
||||||
|
if _provider in providers.CATEGORIES['sql'] :
|
||||||
_info = _get_alchemyEngine(**_args)
|
_info = _get_alchemyEngine(**_args)
|
||||||
|
|
||||||
_args = dict(_args,**_info)
|
_args = dict(_args,**_info)
|
||||||
|
@ -202,58 +231,68 @@ def _get_alchemyEngine(**_args):
|
||||||
This function returns the SQLAlchemy engine associated with parameters, This is only applicable for SQL _items
|
This function returns the SQLAlchemy engine associated with parameters, This is only applicable for SQL _items
|
||||||
:_args arguments passed to the factory {provider and other}
|
:_args arguments passed to the factory {provider and other}
|
||||||
"""
|
"""
|
||||||
#@TODO: Enable authentication files (private_key)
|
|
||||||
_username = _args['username'] if 'username' in _args else ''
|
|
||||||
_password = _args['password'] if 'password' in _args else ''
|
|
||||||
_account = _args['account'] if 'account' in _args else ''
|
|
||||||
_database = _args['database']
|
|
||||||
_provider = _args['provider']
|
_provider = _args['provider']
|
||||||
if _username != '':
|
_pargs = {}
|
||||||
_account = _username + ':'+_password+'@'
|
if _provider == providers.SQLITE3 :
|
||||||
_host = _args['host'] if 'host' in _args else ''
|
_path = _args['database'] if 'database' in _args else _args['path']
|
||||||
_port = _args['port'] if 'port' in _args else ''
|
uri = ''.join([_provider,':///',_path])
|
||||||
if _provider in providers.DEFAULT :
|
|
||||||
_default = providers.DEFAULT[_provider]
|
|
||||||
_host = _host if _host != '' else (_default['host'] if 'host' in _default else '')
|
|
||||||
_port = _port if _port != '' else (_default['port'] if 'port' in _default else '')
|
|
||||||
if _port == '':
|
|
||||||
_port = providers.DEFAULT['port'] if 'port' in providers.DEFAULT else ''
|
|
||||||
#
|
|
||||||
|
|
||||||
if _host != '' and _port != '' :
|
|
||||||
_fhost = _host+":"+str(_port) #--formatted hostname
|
|
||||||
else:
|
else:
|
||||||
_fhost = _host
|
|
||||||
# Let us update the parameters we have thus far
|
#@TODO: Enable authentication files (private_key)
|
||||||
|
_username = _args['username'] if 'username' in _args else ''
|
||||||
|
_password = _args['password'] if 'password' in _args else ''
|
||||||
|
_account = _args['account'] if 'account' in _args else ''
|
||||||
|
_database = _args['database'] if 'database' in _args else _args['path']
|
||||||
|
|
||||||
|
if _username != '':
|
||||||
|
_account = _username + ':'+_password+'@'
|
||||||
|
_host = _args['host'] if 'host' in _args else ''
|
||||||
|
_port = _args['port'] if 'port' in _args else ''
|
||||||
|
if _provider in providers.DEFAULT :
|
||||||
|
_default = providers.DEFAULT[_provider]
|
||||||
|
_host = _host if _host != '' else (_default['host'] if 'host' in _default else '')
|
||||||
|
_port = _port if _port != '' else (_default['port'] if 'port' in _default else '')
|
||||||
|
if _port == '':
|
||||||
|
_port = providers.DEFAULT['port'] if 'port' in providers.DEFAULT else ''
|
||||||
|
#
|
||||||
|
|
||||||
|
if _host != '' and _port != '' :
|
||||||
|
_fhost = _host+":"+str(_port) #--formatted hostname
|
||||||
|
else:
|
||||||
|
_fhost = _host
|
||||||
|
# Let us update the parameters we have thus far
|
||||||
#
|
#
|
||||||
|
|
||||||
|
|
||||||
uri = ''.join([_provider,"://",_account,_fhost,'/',_database])
|
uri = ''.join([_provider,"://",_account,_fhost,'/',_database])
|
||||||
|
_pargs = {'host':_host,'port':_port,'username':_username,'password':_password}
|
||||||
_engine = sqlalchemy.create_engine (uri,future=True)
|
_engine = sqlalchemy.create_engine (uri,future=True)
|
||||||
_out = {'sqlalchemy':_engine}
|
_out = {'sqlalchemy':_engine}
|
||||||
_pargs = {'host':_host,'port':_port,'username':_username,'password':_password}
|
|
||||||
for key in _pargs :
|
for key in _pargs :
|
||||||
if _pargs[key] != '' :
|
if _pargs[key] != '' :
|
||||||
_out[key] = _pargs[key]
|
_out[key] = _pargs[key]
|
||||||
return _out
|
return _out
|
||||||
|
@DeprecationWarning
|
||||||
def _getClassInstance(_group,**_args):
|
def _getClassInstance(_group,**_args):
|
||||||
"""
|
"""
|
||||||
This function returns the class instance we are attempting to instanciate
|
This function returns the class instance we are attempting to instanciate
|
||||||
:_group items in providers.CATEGORIES.keys()
|
:_group items in providers.CATEGORIES.keys()
|
||||||
:_args arguments passed to the factory class
|
:_args arguments passed to the factory class
|
||||||
"""
|
"""
|
||||||
if 'read' in _args or 'write' in _args :
|
# if 'read' in _args or 'write' in _args :
|
||||||
_context = 'read' if 'read' in _args else _args['write']
|
# _context = 'read' if 'read' in _args else _args['write']
|
||||||
_info = _args[_context]
|
# _info = _args[_context]
|
||||||
else:
|
# else:
|
||||||
_context = _args['context'] if 'context' in _args else 'read'
|
# _context = _args['context'] if 'context' in _args else 'read'
|
||||||
_class = providers.READ[_group] if _context == 'read' else providers.WRITE[_group]
|
# _class = providers.READ[_group] if _context == 'read' else providers.WRITE[_group]
|
||||||
if type(_class) == dict and _args['provider'] in _class:
|
# if type(_class) == dict and _args['provider'] in _class:
|
||||||
_class = _class[_args['provider']]
|
# _class = _class[_args['provider']]
|
||||||
|
|
||||||
return _class
|
# return _class
|
||||||
|
|
||||||
|
@DeprecationWarning
|
||||||
def __instance(**_args):
|
def __instance(**_args):
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,111 @@
|
||||||
|
"""
|
||||||
|
This file implements databricks handling, This functionality will rely on databricks-sql-connector
|
||||||
|
LICENSE (MIT)
|
||||||
|
Copyright 2016-2020, The Phi Technology LLC
|
||||||
|
|
||||||
|
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
|
||||||
|
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
|
||||||
|
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||||
|
|
||||||
|
|
||||||
|
@TODO:
|
||||||
|
- Migrate SQLite to SQL hierarchy
|
||||||
|
- Include Write in Chunks from pandas
|
||||||
|
"""
|
||||||
|
import os
|
||||||
|
import sqlalchemy
|
||||||
|
from transport.common import Reader,Writer
|
||||||
|
import pandas as pd
|
||||||
|
|
||||||
|
|
||||||
|
class Bricks:
|
||||||
|
"""
|
||||||
|
:host
|
||||||
|
:token
|
||||||
|
:database
|
||||||
|
:cluster_path
|
||||||
|
:table
|
||||||
|
"""
|
||||||
|
def __init__(self,**_args):
|
||||||
|
_host = _args['host']
|
||||||
|
_token= _args['token']
|
||||||
|
_cluster_path = _args['cluster_path']
|
||||||
|
self._schema = _args['schema'] if 'schema' in _args else _args['database']
|
||||||
|
_catalog = _args['catalog']
|
||||||
|
self._table = _args['table'] if 'table' in _args else None
|
||||||
|
|
||||||
|
#
|
||||||
|
# @TODO:
|
||||||
|
# Sometimes when the cluster isn't up and running it takes a while, the user should be alerted of this
|
||||||
|
#
|
||||||
|
|
||||||
|
_uri = f'''databricks://token:{_token}@{_host}?http_path={_cluster_path}&catalog={_catalog}&schema={self._schema}'''
|
||||||
|
self._engine = sqlalchemy.create_engine (_uri)
|
||||||
|
pass
|
||||||
|
def meta(self,**_args):
|
||||||
|
table = _args['table'] if 'table' in _args else self._table
|
||||||
|
if not table :
|
||||||
|
return []
|
||||||
|
else:
|
||||||
|
if sqlalchemy.__version__.startswith('1.') :
|
||||||
|
_m = sqlalchemy.MetaData(bind=self._engine)
|
||||||
|
_m.reflect(only=[table])
|
||||||
|
else:
|
||||||
|
_m = sqlalchemy.MetaData()
|
||||||
|
_m.reflect(bind=self._engine)
|
||||||
|
#
|
||||||
|
# Let's retrieve te information associated with a table
|
||||||
|
#
|
||||||
|
return [{'name':_attr.name,'type':_attr.type} for _attr in _m.tables[table].columns]
|
||||||
|
|
||||||
|
def has(self,**_args):
|
||||||
|
return self.meta(**_args)
|
||||||
|
def apply(self,_sql):
|
||||||
|
try:
|
||||||
|
if _sql.lower().startswith('select') :
|
||||||
|
return pd.read_sql(_sql,self._engine)
|
||||||
|
except Exception as e:
|
||||||
|
pass
|
||||||
|
|
||||||
|
class BricksReader(Bricks,Reader):
|
||||||
|
"""
|
||||||
|
This class is designed for reads and will execute reads against a table name or a select SQL statement
|
||||||
|
"""
|
||||||
|
def __init__(self,**_args):
|
||||||
|
super().__init__(**_args)
|
||||||
|
def read(self,**_args):
|
||||||
|
limit = None if 'limit' not in _args else str(_args['limit'])
|
||||||
|
|
||||||
|
if 'sql' in _args :
|
||||||
|
sql = _args['sql']
|
||||||
|
elif 'table' in _args :
|
||||||
|
table = _args['table']
|
||||||
|
sql = f'SELECT * FROM {table}'
|
||||||
|
if limit :
|
||||||
|
sql = sql + f' LIMIT {limit}'
|
||||||
|
|
||||||
|
if 'sql' in _args or 'table' in _args :
|
||||||
|
return self.apply(sql)
|
||||||
|
else:
|
||||||
|
return pd.DataFrame()
|
||||||
|
pass
|
||||||
|
class BricksWriter(Bricks,Writer):
|
||||||
|
def __init__(self,**_args):
|
||||||
|
super().__init__(**_args)
|
||||||
|
def write(self,_data,**_args):
|
||||||
|
"""
|
||||||
|
This data will write data to data-bricks against a given table. If the table is not specified upon initiazation, it can be specified here
|
||||||
|
_data: data frame to push to databricks
|
||||||
|
_args: chunks, table, schema
|
||||||
|
"""
|
||||||
|
_schema = self._schema if 'schema' not in _args else _args['schema']
|
||||||
|
_table = self._table if 'table' not in _args else _args['table']
|
||||||
|
_df = _data if type(_data) == pd.DataFrame else _data
|
||||||
|
if type(_df) == dict :
|
||||||
|
_df = [_df]
|
||||||
|
if type(_df) == list :
|
||||||
|
_df = pd.DataFrame(_df)
|
||||||
|
_df.to_sql(
|
||||||
|
name=_table,schema=_schema,
|
||||||
|
con=self._engine,if_exists='append',index=False);
|
||||||
|
pass
|
|
@ -25,7 +25,7 @@ from multiprocessing import RLock
|
||||||
import queue
|
import queue
|
||||||
# import couch
|
# import couch
|
||||||
# import mongo
|
# import mongo
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
class IO:
|
class IO:
|
||||||
def init(self,**args):
|
def init(self,**args):
|
||||||
|
@ -39,6 +39,19 @@ class IO:
|
||||||
continue
|
continue
|
||||||
value = args[field]
|
value = args[field]
|
||||||
setattr(self,field,value)
|
setattr(self,field,value)
|
||||||
|
class IEncoder (json.JSONEncoder):
|
||||||
|
def default (self,object):
|
||||||
|
if type(object) == np.integer :
|
||||||
|
return int(object)
|
||||||
|
elif type(object) == np.floating:
|
||||||
|
return float(object)
|
||||||
|
elif type(object) == np.ndarray :
|
||||||
|
return object.tolist()
|
||||||
|
elif type(object) == datetime :
|
||||||
|
return object.isoformat()
|
||||||
|
else:
|
||||||
|
return super(IEncoder,self).default(object)
|
||||||
|
|
||||||
class Reader (IO):
|
class Reader (IO):
|
||||||
"""
|
"""
|
||||||
This class is an abstraction of a read functionalities of a data store
|
This class is an abstraction of a read functionalities of a data store
|
||||||
|
@ -93,29 +106,29 @@ class ReadWriter(Reader,Writer) :
|
||||||
This class implements the read/write functions aggregated
|
This class implements the read/write functions aggregated
|
||||||
"""
|
"""
|
||||||
pass
|
pass
|
||||||
class Console(Writer):
|
# class Console(Writer):
|
||||||
lock = RLock()
|
# lock = RLock()
|
||||||
def __init__(self,**_args):
|
# def __init__(self,**_args):
|
||||||
self.lock = _args['lock'] if 'lock' in _args else False
|
# self.lock = _args['lock'] if 'lock' in _args else False
|
||||||
self.info = self.write
|
# self.info = self.write
|
||||||
self.debug = self.write
|
# self.debug = self.write
|
||||||
self.log = self.write
|
# self.log = self.write
|
||||||
pass
|
# pass
|
||||||
def write (self,logs=None,**_args):
|
# def write (self,logs=None,**_args):
|
||||||
if self.lock :
|
# if self.lock :
|
||||||
Console.lock.acquire()
|
# Console.lock.acquire()
|
||||||
try:
|
# try:
|
||||||
_params = _args if logs is None and _args else logs
|
# _params = _args if logs is None and _args else logs
|
||||||
if type(_params) == list:
|
# if type(_params) == list:
|
||||||
for row in _params :
|
# for row in _params :
|
||||||
print (row)
|
# print (row)
|
||||||
else:
|
# else:
|
||||||
print (_params)
|
# print (_params)
|
||||||
except Exception as e :
|
# except Exception as e :
|
||||||
print (e)
|
# print (e)
|
||||||
finally:
|
# finally:
|
||||||
if self.lock :
|
# if self.lock :
|
||||||
Console.lock.release()
|
# Console.lock.release()
|
||||||
|
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -12,6 +12,8 @@ import json
|
||||||
import sqlite3
|
import sqlite3
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
from multiprocessing import Lock
|
from multiprocessing import Lock
|
||||||
|
from transport.common import Reader, Writer, IEncoder
|
||||||
|
|
||||||
class DiskReader(Reader) :
|
class DiskReader(Reader) :
|
||||||
"""
|
"""
|
||||||
This class is designed to read data from disk (location on hard drive)
|
This class is designed to read data from disk (location on hard drive)
|
||||||
|
@ -62,34 +64,25 @@ class DiskWriter(Writer):
|
||||||
"""
|
"""
|
||||||
THREAD_LOCK = Lock()
|
THREAD_LOCK = Lock()
|
||||||
def __init__(self,**params):
|
def __init__(self,**params):
|
||||||
Writer.__init__(self)
|
super().__init__()
|
||||||
self.cache['meta'] = {'cols':0,'rows':0,'delimiter':None}
|
self._path = params['path']
|
||||||
if 'path' in params:
|
self._delimiter = params['delimiter'] if 'delimiter' in params else None
|
||||||
self.path = params['path']
|
self._mode = 'w' if 'mode' not in params else params['mode']
|
||||||
else:
|
# def meta(self):
|
||||||
self.path = 'data-transport.log'
|
# return self.cache['meta']
|
||||||
self.delimiter = params['delimiter'] if 'delimiter' in params else None
|
# def isready(self):
|
||||||
# if 'name' in params:
|
# """
|
||||||
# self.name = params['name'];
|
# This function determines if the class is ready for execution or not
|
||||||
# else:
|
# i.e it determines if the preconditions of met prior execution
|
||||||
# self.name = 'data-transport.log'
|
# """
|
||||||
# if os.path.exists(self.path) == False:
|
# return True
|
||||||
# os.mkdir(self.path)
|
# # p = self.path is not None and os.path.exists(self.path)
|
||||||
def meta(self):
|
# # q = self.name is not None
|
||||||
return self.cache['meta']
|
# # return p and q
|
||||||
def isready(self):
|
# def format (self,row):
|
||||||
"""
|
# self.cache['meta']['cols'] += len(row) if isinstance(row,list) else len(row.keys())
|
||||||
This function determines if the class is ready for execution or not
|
# self.cache['meta']['rows'] += 1
|
||||||
i.e it determines if the preconditions of met prior execution
|
# return (self.delimiter.join(row) if self.delimiter else json.dumps(row))+"\n"
|
||||||
"""
|
|
||||||
return True
|
|
||||||
# p = self.path is not None and os.path.exists(self.path)
|
|
||||||
# q = self.name is not None
|
|
||||||
# return p and q
|
|
||||||
def format (self,row):
|
|
||||||
self.cache['meta']['cols'] += len(row) if isinstance(row,list) else len(row.keys())
|
|
||||||
self.cache['meta']['rows'] += 1
|
|
||||||
return (self.delimiter.join(row) if self.delimiter else json.dumps(row))+"\n"
|
|
||||||
def write(self,info,**_args):
|
def write(self,info,**_args):
|
||||||
"""
|
"""
|
||||||
This function writes a record to a designated file
|
This function writes a record to a designated file
|
||||||
|
@ -97,21 +90,15 @@ class DiskWriter(Writer):
|
||||||
@param row row to be written
|
@param row row to be written
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
_mode = 'a' if 'overwrite' not in _args else 'w'
|
|
||||||
DiskWriter.THREAD_LOCK.acquire()
|
|
||||||
f = open(self.path,_mode)
|
DiskWriter.THREAD_LOCK.acquire()
|
||||||
if self.delimiter :
|
|
||||||
if type(info) == list :
|
_delim = self._delimiter if 'delimiter' not in _args else _args['delimiter']
|
||||||
for row in info :
|
_path = self._path if 'path' not in _args else _args['path']
|
||||||
f.write(self.format(row))
|
_mode = self._mode if 'mode' not in _args else _args['mode']
|
||||||
else:
|
info.to_csv(_path,index=False,sep=_delim)
|
||||||
f.write(self.format(info))
|
pass
|
||||||
else:
|
|
||||||
if not type(info) == str :
|
|
||||||
f.write(json.dumps(info)+"\n")
|
|
||||||
else:
|
|
||||||
f.write(info)
|
|
||||||
f.close()
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
#
|
#
|
||||||
# Not sure what should be done here ...
|
# Not sure what should be done here ...
|
||||||
|
@ -220,18 +207,25 @@ class SQLiteWriter(SQLite,DiskWriter) :
|
||||||
#
|
#
|
||||||
# If the table doesn't exist we should create it
|
# If the table doesn't exist we should create it
|
||||||
#
|
#
|
||||||
def write(self,info):
|
def write(self,info,**_args):
|
||||||
"""
|
"""
|
||||||
"""
|
"""
|
||||||
|
|
||||||
if not self.fields :
|
#if not self.fields :
|
||||||
self.init(list(info.keys()))
|
# #if type(info) == pd.DataFrame :
|
||||||
|
# # _columns = list(info.columns)
|
||||||
|
# #self.init(list(info.keys()))
|
||||||
|
|
||||||
if type(info) == dict :
|
if type(info) == dict :
|
||||||
info = [info]
|
info = [info]
|
||||||
elif type(info) == pd.DataFrame :
|
elif type(info) == pd.DataFrame :
|
||||||
|
info = info.fillna('')
|
||||||
info = info.to_dict(orient='records')
|
info = info.to_dict(orient='records')
|
||||||
|
|
||||||
|
if not self.fields :
|
||||||
|
_rec = info[0]
|
||||||
|
self.init(list(_rec.keys()))
|
||||||
|
|
||||||
SQLiteWriter.LOCK.acquire()
|
SQLiteWriter.LOCK.acquire()
|
||||||
try:
|
try:
|
||||||
|
|
||||||
|
@ -239,7 +233,8 @@ class SQLiteWriter(SQLite,DiskWriter) :
|
||||||
sql = " " .join(["INSERT INTO ",self.table,"(", ",".join(self.fields) ,")", "values(:values)"])
|
sql = " " .join(["INSERT INTO ",self.table,"(", ",".join(self.fields) ,")", "values(:values)"])
|
||||||
for row in info :
|
for row in info :
|
||||||
stream =["".join(["",value,""]) if type(value) == str else value for value in row.values()]
|
stream =["".join(["",value,""]) if type(value) == str else value for value in row.values()]
|
||||||
stream = json.dumps(stream).replace("[","").replace("]","")
|
stream = json.dumps(stream,cls=IEncoder)
|
||||||
|
stream = stream.replace("[","").replace("]","")
|
||||||
|
|
||||||
|
|
||||||
self.conn.execute(sql.replace(":values",stream) )
|
self.conn.execute(sql.replace(":values",stream) )
|
||||||
|
@ -250,4 +245,4 @@ class SQLiteWriter(SQLite,DiskWriter) :
|
||||||
except Exception as e :
|
except Exception as e :
|
||||||
print (e)
|
print (e)
|
||||||
pass
|
pass
|
||||||
SQLiteWriter.LOCK.release()
|
SQLiteWriter.LOCK.release()
|
||||||
|
|
467
transport/etl.py
467
transport/etl.py
|
@ -35,6 +35,9 @@ import json
|
||||||
import sys
|
import sys
|
||||||
import transport
|
import transport
|
||||||
import time
|
import time
|
||||||
|
import os
|
||||||
|
|
||||||
|
|
||||||
from multiprocessing import Process
|
from multiprocessing import Process
|
||||||
SYS_ARGS = {}
|
SYS_ARGS = {}
|
||||||
if len(sys.argv) > 1:
|
if len(sys.argv) > 1:
|
||||||
|
@ -52,199 +55,303 @@ if len(sys.argv) > 1:
|
||||||
|
|
||||||
|
|
||||||
i += 2
|
i += 2
|
||||||
|
class Transporter(Process):
|
||||||
class Post(Process):
|
"""
|
||||||
def __init__(self,**args):
|
The transporter (Jason Stathem) moves data from one persistant store to another
|
||||||
super().__init__()
|
- callback functions
|
||||||
self.store = args['target']
|
:onFinish callback function when finished
|
||||||
if 'provider' not in args['target'] :
|
:onError callback function when an error occurs
|
||||||
pass
|
:source source data specification
|
||||||
self.PROVIDER = args['target']['type']
|
:target destination(s) to move the data to
|
||||||
# self.writer = transport.factory.instance(**args['target'])
|
"""
|
||||||
else:
|
|
||||||
self.PROVIDER = args['target']['provider']
|
|
||||||
self.store['context'] = 'write'
|
|
||||||
# self.store = args['target']
|
|
||||||
self.store['lock'] = True
|
|
||||||
# self.writer = transport.instance(**args['target'])
|
|
||||||
#
|
|
||||||
# If the table doesn't exists maybe create it ?
|
|
||||||
#
|
|
||||||
self.rows = args['rows']
|
|
||||||
# self.rows = args['rows'].fillna('')
|
|
||||||
|
|
||||||
def log(self,**_args) :
|
|
||||||
if ETL.logger :
|
|
||||||
ETL.logger.info(**_args)
|
|
||||||
|
|
||||||
def run(self):
|
|
||||||
_info = {"values":self.rows} if 'couch' in self.PROVIDER else self.rows
|
|
||||||
|
|
||||||
writer = transport.factory.instance(**self.store)
|
|
||||||
writer.write(_info)
|
|
||||||
writer.close()
|
|
||||||
|
|
||||||
|
|
||||||
class ETL (Process):
|
|
||||||
logger = None
|
|
||||||
def __init__(self,**_args):
|
def __init__(self,**_args):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
|
# self.onfinish = _args['onFinish']
|
||||||
self.name = _args['id'] if 'id' in _args else 'UNREGISTERED'
|
# self._onerror = _args['onError']
|
||||||
# if 'provider' not in _args['source'] :
|
self._source = _args['source']
|
||||||
# #@deprecate
|
|
||||||
# self.reader = transport.factory.instance(**_args['source'])
|
|
||||||
# else:
|
|
||||||
# #
|
|
||||||
# # This is the new interface
|
|
||||||
# _args['source']['context'] = 'read'
|
|
||||||
|
|
||||||
# self.reader = transport.instance(**_args['source'])
|
|
||||||
|
|
||||||
#
|
|
||||||
# do we have an sql query provided or not ....
|
|
||||||
# self.sql = _args['source']['sql'] if 'sql' in _args['source'] else None
|
|
||||||
# self.cmd = _args['source']['cmd'] if 'cmd' in _args['source'] else None
|
|
||||||
# self._oargs = _args['target'] #transport.factory.instance(**_args['target'])
|
|
||||||
self._source = _args ['source']
|
|
||||||
self._target = _args['target']
|
self._target = _args['target']
|
||||||
self._source['context'] = 'read'
|
|
||||||
self._target['context'] = 'write'
|
|
||||||
|
|
||||||
self.JOB_COUNT = _args['jobs']
|
|
||||||
self.jobs = []
|
|
||||||
# self.logger = transport.factory.instance(**_args['logger'])
|
|
||||||
def log(self,**_args) :
|
|
||||||
if ETL.logger :
|
|
||||||
ETL.logger.info(**_args)
|
|
||||||
|
|
||||||
def run(self):
|
|
||||||
# if self.cmd :
|
|
||||||
# idf = self.reader.read(**self.cmd)
|
|
||||||
# else:
|
|
||||||
# idf = self.reader.read()
|
|
||||||
# idf = pd.DataFrame(idf)
|
|
||||||
# # idf = idf.replace({np.nan: None}, inplace = True)
|
|
||||||
|
|
||||||
# idf.columns = [str(name).replace("b'",'').replace("'","").strip() for name in idf.columns.tolist()]
|
|
||||||
# self.log(rows=idf.shape[0],cols=idf.shape[1],jobs=self.JOB_COUNT)
|
|
||||||
|
|
||||||
#
|
#
|
||||||
# writing the data to a designated data source
|
# Let's insure we can support multiple targets
|
||||||
|
self._target = [self._target] if type(self._target) != list else self._target
|
||||||
|
|
||||||
|
pass
|
||||||
|
def read(self,**_args):
|
||||||
|
"""
|
||||||
|
This function
|
||||||
|
"""
|
||||||
|
_reader = transport.factory.instance(**self._source)
|
||||||
#
|
#
|
||||||
try:
|
# If arguments are provided then a query is to be executed (not just a table dump)
|
||||||
|
return _reader.read() if 'args' not in self._source else _reader.read(**self._source['args'])
|
||||||
|
|
||||||
|
def _delegate_write(self,_data,**_args):
|
||||||
|
"""
|
||||||
|
This function will write a data-frame to a designated data-store, The function is built around a delegation design pattern
|
||||||
|
:data data-frame or object to be written
|
||||||
|
"""
|
||||||
|
if _data.shape[0] > 0 :
|
||||||
|
for _target in self._target :
|
||||||
|
if 'write' not in _target :
|
||||||
|
_target['context'] = 'write'
|
||||||
|
# _target['lock'] = True
|
||||||
|
else:
|
||||||
|
# _target['write']['lock'] = True
|
||||||
|
pass
|
||||||
|
_writer = transport.factory.instance(**_target)
|
||||||
|
_writer.write(_data,**_args)
|
||||||
|
if hasattr(_writer,'close') :
|
||||||
|
_writer.close()
|
||||||
|
|
||||||
|
def write(self,_df,**_args):
|
||||||
|
"""
|
||||||
|
"""
|
||||||
|
SEGMENT_COUNT = 6
|
||||||
|
MAX_ROWS = 1000000
|
||||||
|
# _df = self.read()
|
||||||
|
_segments = np.array_split(np.range(_df.shape[0]),SEGMENT_COUNT) if _df.shape[0] > MAX_ROWS else np.array( [np.arange(_df.shape[0])])
|
||||||
|
# _index = 0
|
||||||
|
|
||||||
|
|
||||||
|
for _indexes in _segments :
|
||||||
|
_fwd_args = {} if not _args else _args
|
||||||
|
|
||||||
|
self._delegate_write(_df.iloc[_indexes],**_fwd_args)
|
||||||
|
#
|
||||||
|
# @TODO: Perhaps consider writing up each segment in a thread/process (speeds things up?)
|
||||||
|
pass
|
||||||
|
|
||||||
|
def instance(**_args):
|
||||||
|
_proxy = lambda _agent: _agent.write(_agent.read())
|
||||||
|
if 'source' in _args and 'target' in _args :
|
||||||
|
|
||||||
|
_agent = Transporter(**_args)
|
||||||
|
_proxy(_agent)
|
||||||
|
|
||||||
|
else:
|
||||||
|
_config = _args['config']
|
||||||
|
_items = [Transporter(**_item) for _item in _config ]
|
||||||
|
_MAX_JOBS = 5
|
||||||
|
_items = np.array_split(_items,_MAX_JOBS)
|
||||||
|
for _batch in _items :
|
||||||
|
jobs = []
|
||||||
|
for _item in _batch :
|
||||||
|
thread = Process(target=_proxy,args = (_item,))
|
||||||
|
thread.start()
|
||||||
|
jobs.append(thread)
|
||||||
|
while jobs :
|
||||||
|
jobs = [thread for thread in jobs if thread.is_alive()]
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
pass
|
||||||
|
# class Post(Process):
|
||||||
|
# def __init__(self,**args):
|
||||||
|
# super().__init__()
|
||||||
|
# self.store = args['target']
|
||||||
|
# if 'provider' not in args['target'] :
|
||||||
|
# pass
|
||||||
|
# self.PROVIDER = args['target']['type']
|
||||||
|
# # self.writer = transport.factory.instance(**args['target'])
|
||||||
|
# else:
|
||||||
|
# self.PROVIDER = args['target']['provider']
|
||||||
|
# self.store['context'] = 'write'
|
||||||
|
# # self.store = args['target']
|
||||||
|
# self.store['lock'] = True
|
||||||
|
# # self.writer = transport.instance(**args['target'])
|
||||||
|
# #
|
||||||
|
# # If the table doesn't exists maybe create it ?
|
||||||
|
# #
|
||||||
|
# self.rows = args['rows']
|
||||||
|
# # self.rows = args['rows'].fillna('')
|
||||||
|
|
||||||
|
# def log(self,**_args) :
|
||||||
|
# if ETL.logger :
|
||||||
|
# ETL.logger.info(**_args)
|
||||||
|
|
||||||
|
# def run(self):
|
||||||
|
# _info = {"values":self.rows} if 'couch' in self.PROVIDER else self.rows
|
||||||
|
|
||||||
|
# writer = transport.factory.instance(**self.store)
|
||||||
|
# writer.write(_info)
|
||||||
|
# writer.close()
|
||||||
|
|
||||||
|
|
||||||
|
# class ETL (Process):
|
||||||
|
# logger = None
|
||||||
|
# def __init__(self,**_args):
|
||||||
|
# super().__init__()
|
||||||
|
|
||||||
|
# self.name = _args['id'] if 'id' in _args else 'UNREGISTERED'
|
||||||
|
# # if 'provider' not in _args['source'] :
|
||||||
|
# # #@deprecate
|
||||||
|
# # self.reader = transport.factory.instance(**_args['source'])
|
||||||
|
# # else:
|
||||||
|
# # #
|
||||||
|
# # # This is the new interface
|
||||||
|
# # _args['source']['context'] = 'read'
|
||||||
|
|
||||||
|
# # self.reader = transport.instance(**_args['source'])
|
||||||
|
|
||||||
|
# #
|
||||||
|
# # do we have an sql query provided or not ....
|
||||||
|
# # self.sql = _args['source']['sql'] if 'sql' in _args['source'] else None
|
||||||
|
# # self.cmd = _args['source']['cmd'] if 'cmd' in _args['source'] else None
|
||||||
|
# # self._oargs = _args['target'] #transport.factory.instance(**_args['target'])
|
||||||
|
# self._source = _args ['source']
|
||||||
|
# self._target = _args['target']
|
||||||
|
# self._source['context'] = 'read'
|
||||||
|
# self._target['context'] = 'write'
|
||||||
|
|
||||||
|
# self.JOB_COUNT = _args['jobs']
|
||||||
|
# self.jobs = []
|
||||||
|
# # self.logger = transport.factory.instance(**_args['logger'])
|
||||||
|
# def log(self,**_args) :
|
||||||
|
# if ETL.logger :
|
||||||
|
# ETL.logger.info(**_args)
|
||||||
|
|
||||||
|
# def run(self):
|
||||||
|
# # if self.cmd :
|
||||||
|
# # idf = self.reader.read(**self.cmd)
|
||||||
|
# # else:
|
||||||
|
# # idf = self.reader.read()
|
||||||
|
# # idf = pd.DataFrame(idf)
|
||||||
|
# # # idf = idf.replace({np.nan: None}, inplace = True)
|
||||||
|
|
||||||
|
# # idf.columns = [str(name).replace("b'",'').replace("'","").strip() for name in idf.columns.tolist()]
|
||||||
|
# # self.log(rows=idf.shape[0],cols=idf.shape[1],jobs=self.JOB_COUNT)
|
||||||
|
|
||||||
|
# #
|
||||||
|
# # writing the data to a designated data source
|
||||||
|
# #
|
||||||
|
# try:
|
||||||
|
|
||||||
|
|
||||||
_log = {"name":self.name,"rows":{"input":0,"output":0}}
|
# _log = {"name":self.name,"rows":{"input":0,"output":0}}
|
||||||
_reader = transport.factory.instance(**self._source)
|
# _reader = transport.factory.instance(**self._source)
|
||||||
if 'table' in self._source :
|
# if 'table' in self._source :
|
||||||
_df = _reader.read()
|
# _df = _reader.read()
|
||||||
else:
|
# else:
|
||||||
_df = _reader.read(**self._source['cmd'])
|
# _df = _reader.read(**self._source['cmd'])
|
||||||
_log['rows']['input'] = _df.shape[0]
|
# _log['rows']['input'] = _df.shape[0]
|
||||||
#
|
# #
|
||||||
# Let's write the input data-frame to the target ...
|
# # Let's write the input data-frame to the target ...
|
||||||
_writer = transport.factory.instance(**self._target)
|
# _writer = transport.factory.instance(**self._target)
|
||||||
_writer.write(_df)
|
# _writer.write(_df)
|
||||||
_log['rows']['output'] = _df.shape[0]
|
# _log['rows']['output'] = _df.shape[0]
|
||||||
|
|
||||||
# self.log(module='write',action='partitioning',jobs=self.JOB_COUNT)
|
# # self.log(module='write',action='partitioning',jobs=self.JOB_COUNT)
|
||||||
# rows = np.array_split(np.arange(0,idf.shape[0]),self.JOB_COUNT)
|
# # rows = np.array_split(np.arange(0,idf.shape[0]),self.JOB_COUNT)
|
||||||
|
|
||||||
# #
|
# # #
|
||||||
# # @TODO: locks
|
# # # @TODO: locks
|
||||||
# for i in np.arange(self.JOB_COUNT) :
|
# # for i in np.arange(self.JOB_COUNT) :
|
||||||
# # _id = ' '.join([str(i),' table ',self.name])
|
# # # _id = ' '.join([str(i),' table ',self.name])
|
||||||
# indexes = rows[i]
|
# # indexes = rows[i]
|
||||||
# segment = idf.loc[indexes,:].copy() #.to_dict(orient='records')
|
# # segment = idf.loc[indexes,:].copy() #.to_dict(orient='records')
|
||||||
# _name = "partition-"+str(i)
|
# # _name = "partition-"+str(i)
|
||||||
# if segment.shape[0] == 0 :
|
# # if segment.shape[0] == 0 :
|
||||||
# continue
|
# # continue
|
||||||
|
|
||||||
# proc = Post(target = self._oargs,rows = segment,name=_name)
|
# # proc = Post(target = self._oargs,rows = segment,name=_name)
|
||||||
# self.jobs.append(proc)
|
# # self.jobs.append(proc)
|
||||||
# proc.start()
|
# # proc.start()
|
||||||
|
|
||||||
# self.log(module='write',action='working',segment=str(self.name),table=self.name,rows=segment.shape[0])
|
# # self.log(module='write',action='working',segment=str(self.name),table=self.name,rows=segment.shape[0])
|
||||||
# while self.jobs :
|
# # while self.jobs :
|
||||||
# jobs = [job for job in proc if job.is_alive()]
|
# # jobs = [job for job in proc if job.is_alive()]
|
||||||
# time.sleep(1)
|
# # time.sleep(1)
|
||||||
except Exception as e:
|
# except Exception as e:
|
||||||
print (e)
|
# print (e)
|
||||||
self.log(**_log)
|
# self.log(**_log)
|
||||||
def is_done(self):
|
# def is_done(self):
|
||||||
self.jobs = [proc for proc in self.jobs if proc.is_alive()]
|
# self.jobs = [proc for proc in self.jobs if proc.is_alive()]
|
||||||
return len(self.jobs) == 0
|
# return len(self.jobs) == 0
|
||||||
def instance(**_args):
|
|
||||||
"""
|
|
||||||
:path ,index, id
|
# def instance (**_args):
|
||||||
:param _info list of objects with {source,target}`
|
# """
|
||||||
:param logger
|
# path to configuration file
|
||||||
"""
|
# """
|
||||||
logger = _args['logger'] if 'logger' in _args else None
|
# _path = _args['path']
|
||||||
if 'path' in _args :
|
# _config = {}
|
||||||
_info = json.loads((open(_args['path'])).read())
|
# jobs = []
|
||||||
|
# if os.path.exists(_path) :
|
||||||
|
# file = open(_path)
|
||||||
if 'index' in _args :
|
# _config = json.loads(file.read())
|
||||||
_index = int(_args['index'])
|
# file.close()
|
||||||
_info = _info[_index]
|
# if _config and type
|
||||||
|
|
||||||
elif 'id' in _args :
|
|
||||||
_info = [_item for _item in _info if '_id' in _item and _item['id'] == _args['id']]
|
|
||||||
_info = _info[0] if _info else _info
|
|
||||||
else:
|
|
||||||
_info = _args['info']
|
|
||||||
|
|
||||||
if logger and type(logger) != str:
|
|
||||||
ETL.logger = logger
|
# def _instance(**_args):
|
||||||
elif logger == 'console':
|
# """
|
||||||
ETL.logger = transport.factory.instance(provider='console',context='write',lock=True)
|
# :path ,index, id
|
||||||
if type(_info) in [list,dict] :
|
# :param _info list of objects with {source,target}`
|
||||||
_info = _info if type(_info) != dict else [_info]
|
# :param logger
|
||||||
#
|
# """
|
||||||
# The assumption here is that the objects within the list are {source,target}
|
# logger = _args['logger'] if 'logger' in _args else None
|
||||||
jobs = []
|
# if 'path' in _args :
|
||||||
for _item in _info :
|
# _info = json.loads((open(_args['path'])).read())
|
||||||
|
|
||||||
_item['jobs'] = 5 if 'procs' not in _args else int(_args['procs'])
|
|
||||||
_job = ETL(**_item)
|
|
||||||
|
|
||||||
_job.start()
|
|
||||||
jobs.append(_job)
|
|
||||||
return jobs
|
|
||||||
|
|
||||||
else:
|
|
||||||
return None
|
# if 'index' in _args :
|
||||||
|
# _index = int(_args['index'])
|
||||||
if __name__ == '__main__' :
|
# _info = _info[_index]
|
||||||
_info = json.loads(open (SYS_ARGS['config']).read())
|
|
||||||
index = int(SYS_ARGS['index']) if 'index' in SYS_ARGS else None
|
|
||||||
procs = []
|
|
||||||
for _config in _info :
|
|
||||||
if 'source' in SYS_ARGS :
|
|
||||||
_config['source'] = {"type":"disk.DiskReader","args":{"path":SYS_ARGS['source'],"delimiter":","}}
|
|
||||||
|
|
||||||
_config['jobs'] = 3 if 'jobs' not in SYS_ARGS else int(SYS_ARGS['jobs'])
|
|
||||||
etl = ETL (**_config)
|
|
||||||
if index is None:
|
|
||||||
|
|
||||||
etl.start()
|
# elif 'id' in _args :
|
||||||
procs.append(etl)
|
# _info = [_item for _item in _info if '_id' in _item and _item['id'] == _args['id']]
|
||||||
|
# _info = _info[0] if _info else _info
|
||||||
elif _info.index(_config) == index :
|
# else:
|
||||||
|
# _info = _args['info']
|
||||||
|
|
||||||
|
# if logger and type(logger) != str:
|
||||||
|
# ETL.logger = logger
|
||||||
|
# elif logger == 'console':
|
||||||
|
# ETL.logger = transport.factory.instance(provider='console',context='write',lock=True)
|
||||||
|
# if type(_info) in [list,dict] :
|
||||||
|
# _info = _info if type(_info) != dict else [_info]
|
||||||
|
# #
|
||||||
|
# # The assumption here is that the objects within the list are {source,target}
|
||||||
|
# jobs = []
|
||||||
|
# for _item in _info :
|
||||||
|
|
||||||
# print (_config)
|
# _item['jobs'] = 5 if 'procs' not in _args else int(_args['procs'])
|
||||||
procs = [etl]
|
# _job = ETL(**_item)
|
||||||
etl.start()
|
|
||||||
break
|
# _job.start()
|
||||||
#
|
# jobs.append(_job)
|
||||||
#
|
# return jobs
|
||||||
N = len(procs)
|
|
||||||
while procs :
|
# else:
|
||||||
procs = [thread for thread in procs if not thread.is_done()]
|
# return None
|
||||||
if len(procs) < N :
|
|
||||||
print (["Finished ",(N-len(procs)), " remaining ", len(procs)])
|
# if __name__ == '__main__' :
|
||||||
N = len(procs)
|
# _info = json.loads(open (SYS_ARGS['config']).read())
|
||||||
time.sleep(1)
|
# index = int(SYS_ARGS['index']) if 'index' in SYS_ARGS else None
|
||||||
# print ("We're done !!")
|
# procs = []
|
||||||
|
# for _config in _info :
|
||||||
|
# if 'source' in SYS_ARGS :
|
||||||
|
# _config['source'] = {"type":"disk.DiskReader","args":{"path":SYS_ARGS['source'],"delimiter":","}}
|
||||||
|
|
||||||
|
# _config['jobs'] = 3 if 'jobs' not in SYS_ARGS else int(SYS_ARGS['jobs'])
|
||||||
|
# etl = ETL (**_config)
|
||||||
|
# if index is None:
|
||||||
|
|
||||||
|
# etl.start()
|
||||||
|
# procs.append(etl)
|
||||||
|
|
||||||
|
# elif _info.index(_config) == index :
|
||||||
|
|
||||||
|
# # print (_config)
|
||||||
|
# procs = [etl]
|
||||||
|
# etl.start()
|
||||||
|
# break
|
||||||
|
# #
|
||||||
|
# #
|
||||||
|
# N = len(procs)
|
||||||
|
# while procs :
|
||||||
|
# procs = [thread for thread in procs if not thread.is_done()]
|
||||||
|
# if len(procs) < N :
|
||||||
|
# print (["Finished ",(N-len(procs)), " remaining ", len(procs)])
|
||||||
|
# N = len(procs)
|
||||||
|
# time.sleep(1)
|
||||||
|
# # print ("We're done !!")
|
|
@ -15,7 +15,7 @@ import gridfs
|
||||||
# from transport import Reader,Writer
|
# from transport import Reader,Writer
|
||||||
import sys
|
import sys
|
||||||
if sys.version_info[0] > 2 :
|
if sys.version_info[0] > 2 :
|
||||||
from transport.common import Reader, Writer
|
from transport.common import Reader, Writer, IEncoder
|
||||||
else:
|
else:
|
||||||
from common import Reader, Writer
|
from common import Reader, Writer
|
||||||
import json
|
import json
|
||||||
|
@ -95,10 +95,16 @@ class MongoReader(Mongo,Reader):
|
||||||
Mongo.__init__(self,**args)
|
Mongo.__init__(self,**args)
|
||||||
def read(self,**args):
|
def read(self,**args):
|
||||||
|
|
||||||
if 'mongo' in args or 'cmd' in args:
|
if 'mongo' in args or 'cmd' in args or 'pipeline' in args:
|
||||||
#
|
#
|
||||||
# @TODO:
|
# @TODO:
|
||||||
cmd = args['mongo'] if 'mongo' in args else args['cmd']
|
cmd = {}
|
||||||
|
if 'pipeline' in args :
|
||||||
|
cmd['pipeline']= args['pipeline']
|
||||||
|
if 'aggregate' not in cmd :
|
||||||
|
cmd['aggregate'] = self.uid
|
||||||
|
if 'pipeline' not in args or 'aggregate' not in cmd :
|
||||||
|
cmd = args['mongo'] if 'mongo' in args else args['cmd']
|
||||||
if "aggregate" in cmd :
|
if "aggregate" in cmd :
|
||||||
if "allowDiskUse" not in cmd :
|
if "allowDiskUse" not in cmd :
|
||||||
cmd["allowDiskUse"] = True
|
cmd["allowDiskUse"] = True
|
||||||
|
@ -176,7 +182,7 @@ class MongoWriter(Mongo,Writer):
|
||||||
for row in rows :
|
for row in rows :
|
||||||
if type(row['_id']) == ObjectId :
|
if type(row['_id']) == ObjectId :
|
||||||
row['_id'] = str(row['_id'])
|
row['_id'] = str(row['_id'])
|
||||||
stream = Binary(json.dumps(collection).encode())
|
stream = Binary(json.dumps(collection,cls=IEncoder).encode())
|
||||||
collection.delete_many({})
|
collection.delete_many({})
|
||||||
now = "-".join([str(datetime.now().year()),str(datetime.now().month), str(datetime.now().day)])
|
now = "-".join([str(datetime.now().year()),str(datetime.now().month), str(datetime.now().day)])
|
||||||
name = ".".join([self.uid,'archive',now])+".json"
|
name = ".".join([self.uid,'archive',now])+".json"
|
||||||
|
|
|
@ -0,0 +1,80 @@
|
||||||
|
"""
|
||||||
|
We are implementing transport to and from nextcloud (just like s3)
|
||||||
|
"""
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
from transport.common import Reader,Writer, IEncoder
|
||||||
|
import pandas as pd
|
||||||
|
from io import StringIO
|
||||||
|
import json
|
||||||
|
import nextcloud_client as nextcloud
|
||||||
|
|
||||||
|
class Nextcloud :
|
||||||
|
def __init__(self,**_args):
|
||||||
|
pass
|
||||||
|
self._delimiter = None
|
||||||
|
self._handler = nextcloud.Client(_args['url'])
|
||||||
|
_uid = _args['uid']
|
||||||
|
_token = _args['token']
|
||||||
|
self._uri = _args['folder'] if 'folder' in _args else './'
|
||||||
|
if self._uri.endswith('/') :
|
||||||
|
self._uri = self._uri[:-1]
|
||||||
|
self._file = None if 'file' not in _args else _args['file']
|
||||||
|
self._handler.login(_uid,_token)
|
||||||
|
def close(self):
|
||||||
|
try:
|
||||||
|
self._handler.logout()
|
||||||
|
except Exception as e:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class NextcloudReader(Nextcloud,Reader):
|
||||||
|
def __init__(self,**_args):
|
||||||
|
# self._file = [] if 'file' not in _args else _args['file']
|
||||||
|
super().__init__(**_args)
|
||||||
|
pass
|
||||||
|
def read(self,**_args):
|
||||||
|
_filename = self._file if 'file' not in _args else _args['file']
|
||||||
|
#
|
||||||
|
# @TODO: if _filename is none, an exception should be raised
|
||||||
|
#
|
||||||
|
_uri = '/'.join([self._uri,_filename])
|
||||||
|
if self._handler.get_file(_uri) :
|
||||||
|
#
|
||||||
|
#
|
||||||
|
_info = self._handler.file_info(_uri)
|
||||||
|
_content = self._handler.get_file_contents(_uri).decode('utf8')
|
||||||
|
if _info.get_content_type() == 'text/csv' :
|
||||||
|
#
|
||||||
|
# @TODO: enable handling of csv, xls, parquet, pickles
|
||||||
|
_file = StringIO(_content)
|
||||||
|
return pd.read_csv(_file)
|
||||||
|
else:
|
||||||
|
#
|
||||||
|
# if it is neither a structured document like csv, we will return the content as is
|
||||||
|
return _content
|
||||||
|
return None
|
||||||
|
class NextcloudWriter (Nextcloud,Writer):
|
||||||
|
"""
|
||||||
|
This class will write data to an instance of nextcloud
|
||||||
|
"""
|
||||||
|
def __init__(self,**_args) :
|
||||||
|
super().__init__(**_args)
|
||||||
|
self
|
||||||
|
def write(self,_data,**_args):
|
||||||
|
"""
|
||||||
|
This function will upload a file to a given destination
|
||||||
|
:file has the uri of the location of the file
|
||||||
|
"""
|
||||||
|
_filename = self._file if 'file' not in _args else _args['file']
|
||||||
|
_uri = '/'.join([self._uri,_filename])
|
||||||
|
if type(_data) == pd.DataFrame :
|
||||||
|
f = StringIO()
|
||||||
|
_data.to_csv(f,index=False)
|
||||||
|
_content = f.getvalue()
|
||||||
|
elif type(_data) == dict :
|
||||||
|
_content = json.dumps(_data,cls=IEncoder)
|
||||||
|
else:
|
||||||
|
_content = str(_data)
|
||||||
|
self._handler.put_file_contents(_uri,_content)
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
from transport.common import Reader, Writer,Console #, factory
|
# from transport.common import Reader, Writer,Console #, factory
|
||||||
from transport import disk
|
from transport import disk
|
||||||
import sqlite3
|
import sqlite3
|
||||||
from transport import s3 as s3
|
from transport import s3 as s3
|
||||||
|
@ -8,6 +8,9 @@ from transport import mongo as mongo
|
||||||
from transport import sql as sql
|
from transport import sql as sql
|
||||||
from transport import etl as etl
|
from transport import etl as etl
|
||||||
from transport import qlistener
|
from transport import qlistener
|
||||||
|
from transport import bricks
|
||||||
|
from transport import session
|
||||||
|
from transport import nextcloud
|
||||||
import psycopg2 as pg
|
import psycopg2 as pg
|
||||||
import mysql.connector as my
|
import mysql.connector as my
|
||||||
from google.cloud import bigquery as bq
|
from google.cloud import bigquery as bq
|
||||||
|
@ -26,12 +29,14 @@ SQLITE = 'sqlite'
|
||||||
SQLITE3= 'sqlite'
|
SQLITE3= 'sqlite'
|
||||||
REDSHIFT = 'redshift'
|
REDSHIFT = 'redshift'
|
||||||
NETEZZA = 'netezza'
|
NETEZZA = 'netezza'
|
||||||
MYSQL = 'mysql'
|
MYSQL = 'mysql+mysqlconnector'
|
||||||
RABBITMQ = 'rabbitmq'
|
RABBITMQ = 'rabbitmq'
|
||||||
MARIADB = 'mariadb'
|
MARIADB = 'mariadb'
|
||||||
COUCHDB = 'couch'
|
COUCHDB = 'couch'
|
||||||
CONSOLE = 'console'
|
CONSOLE = 'console'
|
||||||
ETL = 'etl'
|
ETL = 'etl'
|
||||||
|
NEXTCLOUD = 'nextcloud'
|
||||||
|
|
||||||
#
|
#
|
||||||
# synonyms of the above
|
# synonyms of the above
|
||||||
BQ = BIGQUERY
|
BQ = BIGQUERY
|
||||||
|
@ -45,21 +50,54 @@ AWS_S3 = 's3'
|
||||||
RABBIT = RABBITMQ
|
RABBIT = RABBITMQ
|
||||||
|
|
||||||
QLISTENER = 'qlistener'
|
QLISTENER = 'qlistener'
|
||||||
|
QUEUE = QLISTENER
|
||||||
|
CALLBACK = QLISTENER
|
||||||
|
DATABRICKS= 'databricks+connector'
|
||||||
DRIVERS = {PG:pg,REDSHIFT:pg,MYSQL:my,MARIADB:my,NETEZZA:nz,SQLITE:sqlite3}
|
DRIVERS = {PG:pg,REDSHIFT:pg,MYSQL:my,MARIADB:my,NETEZZA:nz,SQLITE:sqlite3}
|
||||||
CATEGORIES ={'sql':[NETEZZA,PG,MYSQL,REDSHIFT,SQLITE,MARIADB],'nosql':[MONGODB,COUCHDB],'cloud':[BIGQUERY],'file':[FILE],
|
CATEGORIES ={'sql':[NETEZZA,PG,MYSQL,REDSHIFT,SQLITE,MARIADB],'nosql':[MONGODB,COUCHDB],'cloud':[NEXTCLOUD,S3,BIGQUERY,DATABRICKS],'file':[FILE],
|
||||||
'queue':[RABBIT,QLISTENER],'memory':[CONSOLE,QLISTENER],'http':[HTTP]}
|
'queue':[RABBIT,QLISTENER],'memory':[CONSOLE,QUEUE],'http':[HTTP]}
|
||||||
|
|
||||||
READ = {'sql':sql.SQLReader,'nosql':{MONGODB:mongo.MongoReader,COUCHDB:couch.CouchReader},'cloud':sql.BigQueryReader,
|
READ = {'sql':sql.SQLReader,'nosql':{MONGODB:mongo.MongoReader,COUCHDB:couch.CouchReader},
|
||||||
|
'cloud':{BIGQUERY:sql.BigQueryReader,DATABRICKS:bricks.BricksReader,NEXTCLOUD:nextcloud.NextcloudReader},
|
||||||
'file':disk.DiskReader,'queue':{RABBIT:queue.QueueReader,QLISTENER:qlistener.qListener},
|
'file':disk.DiskReader,'queue':{RABBIT:queue.QueueReader,QLISTENER:qlistener.qListener},
|
||||||
'cli':{CONSOLE:Console},'memory':{CONSOLE:Console}
|
# 'cli':{CONSOLE:Console},'memory':{CONSOLE:Console},'http':session.HttpReader
|
||||||
}
|
}
|
||||||
WRITE = {'sql':sql.SQLWriter,'nosql':{MONGODB:mongo.MongoWriter,COUCHDB:couch.CouchWriter},'cloud':sql.BigQueryWriter,
|
WRITE = {'sql':sql.SQLWriter,'nosql':{MONGODB:mongo.MongoWriter,COUCHDB:couch.CouchWriter},
|
||||||
'file':disk.DiskWriter,'queue':{RABBIT:queue.QueueWriter,QLISTENER:qlistener.qListener},'cli':{CONSOLE:Console},'memory':{CONSOLE:Console}
|
'cloud':{BIGQUERY:sql.BigQueryWriter,DATABRICKS:bricks.BricksWriter,NEXTCLOUD:nextcloud.NextcloudWriter},
|
||||||
|
'file':disk.DiskWriter,'queue':{RABBIT:queue.QueueWriter,QLISTENER:qlistener.qListener},
|
||||||
|
# 'cli':{CONSOLE:Console},
|
||||||
|
# 'memory':{CONSOLE:Console}, 'http':session.HttpReader
|
||||||
|
|
||||||
}
|
}
|
||||||
|
# SQL_PROVIDERS = [POSTGRESQL,MYSQL,NETEZZA,MARIADB,SQLITE]
|
||||||
|
PROVIDERS = {
|
||||||
|
FILE:{'read':disk.DiskReader,'write':disk.DiskWriter},
|
||||||
|
SQLITE:{'read':disk.SQLiteReader,'write':disk.SQLiteWriter,'driver':sqlite3},
|
||||||
|
'sqlite3':{'read':disk.SQLiteReader,'write':disk.SQLiteWriter,'driver':sqlite3},
|
||||||
|
|
||||||
|
POSTGRESQL:{'read':sql.SQLReader,'write':sql.SQLWriter,'driver':pg,'default':{'host':'localhost','port':5432}},
|
||||||
|
NETEZZA:{'read':sql.SQLReader,'write':sql.SQLWriter,'driver':nz,'default':{'port':5480}},
|
||||||
|
REDSHIFT:{'read':sql.SQLReader,'write':sql.SQLWriter,'driver':pg,'default':{'host':'localhost','port':5432}},
|
||||||
|
RABBITMQ:{'read':queue.QueueReader,'writer':queue.QueueWriter,'context':queue.QueueListener,'default':{'host':'localhost','port':5432}},
|
||||||
|
|
||||||
|
MYSQL:{'read':sql.SQLReader,'write':sql.SQLWriter,'driver':my,'default':{'host':'localhost','port':3306}},
|
||||||
|
MARIADB:{'read':sql.SQLReader,'write':sql.SQLWriter,'driver':my,'default':{'host':'localhost','port':3306}},
|
||||||
|
|
||||||
|
S3:{'read':s3.s3Reader,'write':s3.s3Writer},
|
||||||
|
BIGQUERY:{'read':sql.BigQueryReader,'write':sql.BigQueryWriter},
|
||||||
|
DATABRICKS:{'read':bricks.BricksReader,'write':bricks.BricksWriter},
|
||||||
|
NEXTCLOUD:{'read':nextcloud.NextcloudReader,'write':nextcloud.NextcloudWriter},
|
||||||
|
|
||||||
|
QLISTENER:{'read':qlistener.qListener,'write':qlistener.qListener,'default':{'host':'localhost','port':5672}},
|
||||||
|
CONSOLE:{'read':qlistener.Console,"write":qlistener.Console},
|
||||||
|
HTTP:{'read':session.HttpReader,'write':session.HttpWriter},
|
||||||
|
|
||||||
|
MONGODB:{'read':mongo.MongoReader,'write':mongo.MongoWriter,'default':{'port':27017,'host':'localhost'}},
|
||||||
|
COUCHDB:{'read':couch.CouchReader,'writer':couch.CouchWriter,'default':{'host':'localhost','port':5984}},
|
||||||
|
ETL :{'read':etl.Transporter,'write':etl.Transporter}
|
||||||
|
}
|
||||||
DEFAULT = {PG:{'host':'localhost','port':5432},MYSQL:{'host':'localhost','port':3306}}
|
DEFAULT = {PG:{'host':'localhost','port':5432},MYSQL:{'host':'localhost','port':3306}}
|
||||||
DEFAULT[MONGODB] = {'port':27017,'host':'localhost'}
|
DEFAULT[MONGODB] = {'port':27017,'host':'localhost'}
|
||||||
DEFAULT[REDSHIFT] = DEFAULT[PG]
|
DEFAULT[REDSHIFT] = DEFAULT[PG]
|
||||||
DEFAULT[MARIADB] = DEFAULT[MYSQL]
|
DEFAULT[MARIADB] = DEFAULT[MYSQL]
|
||||||
DEFAULT[NETEZZA] = {'port':5480}
|
DEFAULT[NETEZZA] = {'port':5480}
|
||||||
|
|
|
@ -40,3 +40,8 @@ class qListener :
|
||||||
_q = qListener._queue[_id]
|
_q = qListener._queue[_id]
|
||||||
_q.put(_data)
|
_q.put(_data)
|
||||||
_q.join()
|
_q.join()
|
||||||
|
class Console (qListener):
|
||||||
|
def __init__(self,**_args):
|
||||||
|
super().__init__(callback=print)
|
||||||
|
|
||||||
|
# self.callback = print
|
|
@ -1,54 +1,60 @@
|
||||||
from flask import request, session
|
from flask import request, session
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
import re
|
import re
|
||||||
from common import Reader, Writer
|
from transport.common import Reader, Writer
|
||||||
import json
|
import json
|
||||||
|
import requests
|
||||||
|
from io import StringIO
|
||||||
|
import pandas as pd
|
||||||
|
|
||||||
class HttpRequestReader(Reader):
|
|
||||||
|
class HttpReader(Reader):
|
||||||
"""
|
"""
|
||||||
This class is designed to read data from an Http request file handler provided to us by flask
|
This class is designed to read data from an Http request file handler provided to us by flask
|
||||||
The file will be heald in memory and processed accordingly
|
The file will be heald in memory and processed accordingly
|
||||||
NOTE: This is inefficient and can crash a micro-instance (becareful)
|
NOTE: This is inefficient and can crash a micro-instance (becareful)
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self,**params):
|
def __init__(self,**_args):
|
||||||
self.file_length = 0
|
self._url = _args['url']
|
||||||
try:
|
self._headers = None if 'headers' not in _args else _args['headers']
|
||||||
|
|
||||||
#self.file = params['file']
|
# def isready(self):
|
||||||
#self.file.seek(0, os.SEEK_END)
|
# return self.file_length > 0
|
||||||
#self.file_length = self.file.tell()
|
def format(self,_response):
|
||||||
|
_mimetype= _response.headers['Content-Type']
|
||||||
#print 'size of file ',self.file_length
|
if _mimetype == 'text/csv' or 'text/csv':
|
||||||
self.content = params['file'].readlines()
|
_content = _response.text
|
||||||
self.file_length = len(self.content)
|
return pd.read_csv(StringIO(_content))
|
||||||
except Exception as e:
|
#
|
||||||
print ("Error ... ",e)
|
# @TODO: Add support for excel, JSON and other file formats that fit into a data-frame
|
||||||
pass
|
#
|
||||||
|
|
||||||
def isready(self):
|
return _response.text
|
||||||
return self.file_length > 0
|
def read(self,**_args):
|
||||||
def read(self,size =-1):
|
if self._headers :
|
||||||
i = 1
|
r = requests.get(self._url,headers = self._headers)
|
||||||
for row in self.content:
|
else:
|
||||||
i += 1
|
r = requests.get(self._url,headers = self._headers)
|
||||||
if size == i:
|
return self.format(r)
|
||||||
break
|
|
||||||
yield row
|
|
||||||
|
|
||||||
class HttpSessionWriter(Writer):
|
class HttpWriter(Writer):
|
||||||
"""
|
"""
|
||||||
This class is designed to write data to a session/cookie
|
This class is designed to submit data to an endpoint (url)
|
||||||
"""
|
"""
|
||||||
def __init__(self,**params):
|
def __init__(self,**_args):
|
||||||
"""
|
"""
|
||||||
@param key required session key
|
@param key required session key
|
||||||
"""
|
"""
|
||||||
self.session = params['queue']
|
self._url = _args['url']
|
||||||
self.session['sql'] = []
|
self._name = _args['name']
|
||||||
self.session['csv'] = []
|
self._method = 'post' if 'method' not in _args else _args['method']
|
||||||
self.tablename = re.sub('..+$','',params['filename'])
|
|
||||||
self.session['uid'] = params['uid']
|
# self.session = params['queue']
|
||||||
|
# self.session['sql'] = []
|
||||||
|
# self.session['csv'] = []
|
||||||
|
# self.tablename = re.sub('..+$','',params['filename'])
|
||||||
|
# self.session['uid'] = params['uid']
|
||||||
#self.xchar = params['xchar']
|
#self.xchar = params['xchar']
|
||||||
|
|
||||||
|
|
||||||
|
@ -57,10 +63,26 @@ class HttpSessionWriter(Writer):
|
||||||
return "".join(["INSERT INTO :table VALUES('",values,"');\n"]).replace(':table',self.tablename)
|
return "".join(["INSERT INTO :table VALUES('",values,"');\n"]).replace(':table',self.tablename)
|
||||||
def isready(self):
|
def isready(self):
|
||||||
return True
|
return True
|
||||||
def write(self,**params):
|
def write(self,_data,**_args):
|
||||||
label = params['label']
|
#
|
||||||
row = params ['row']
|
#
|
||||||
|
_method = self._method if 'method' not in _args else _args['method']
|
||||||
|
_method = _method.lower()
|
||||||
|
_mimetype = 'text/csv'
|
||||||
|
if type(_data) == dict :
|
||||||
|
_mimetype = 'application/json'
|
||||||
|
_content = _data
|
||||||
|
else:
|
||||||
|
_content = _data.to_dict(orient='records')
|
||||||
|
_headers = {'Content-Type':_mimetype}
|
||||||
|
_pointer = getattr(requests,_method)
|
||||||
|
|
||||||
if label == 'usable':
|
_pointer ({self._name:_content},headers=_headers)
|
||||||
self.session['csv'].append(self.format(row,','))
|
|
||||||
self.session['sql'].append(self.format_sql(row))
|
|
||||||
|
# label = params['label']
|
||||||
|
# row = params ['row']
|
||||||
|
|
||||||
|
# if label == 'usable':
|
||||||
|
# self.session['csv'].append(self.format(row,','))
|
||||||
|
# self.session['sql'].append(self.format_sql(row))
|
||||||
|
|
|
@ -291,17 +291,17 @@ class SQLWriter(SQLRW,Writer):
|
||||||
"""
|
"""
|
||||||
# inspect = False if 'inspect' not in _args else _args['inspect']
|
# inspect = False if 'inspect' not in _args else _args['inspect']
|
||||||
# cast = False if 'cast' not in _args else _args['cast']
|
# cast = False if 'cast' not in _args else _args['cast']
|
||||||
if not self.fields :
|
# if not self.fields :
|
||||||
if type(info) == list :
|
# if type(info) == list :
|
||||||
_fields = info[0].keys()
|
# _fields = info[0].keys()
|
||||||
elif type(info) == dict :
|
# elif type(info) == dict :
|
||||||
_fields = info.keys()
|
# _fields = info.keys()
|
||||||
elif type(info) == pd.DataFrame :
|
# elif type(info) == pd.DataFrame :
|
||||||
_fields = info.columns.tolist()
|
# _fields = info.columns.tolist()
|
||||||
|
|
||||||
# _fields = info.keys() if type(info) == dict else info[0].keys()
|
# # _fields = info.keys() if type(info) == dict else info[0].keys()
|
||||||
_fields = list (_fields)
|
# # _fields = list (_fields)
|
||||||
self.init(_fields)
|
# self.init(_fields)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
table = _args['table'] if 'table' in _args else self.table
|
table = _args['table'] if 'table' in _args else self.table
|
||||||
|
@ -431,8 +431,8 @@ class BQReader(BigQuery,Reader) :
|
||||||
|
|
||||||
super().__init__(**_args)
|
super().__init__(**_args)
|
||||||
def apply(self,sql):
|
def apply(self,sql):
|
||||||
self.read(sql=sql)
|
return self.read(sql=sql)
|
||||||
pass
|
|
||||||
def read(self,**_args):
|
def read(self,**_args):
|
||||||
SQL = None
|
SQL = None
|
||||||
table = self.table if 'table' not in _args else _args['table']
|
table = self.table if 'table' not in _args else _args['table']
|
||||||
|
|
|
@ -1,2 +1,2 @@
|
||||||
__author__ = 'The Phi Technology'
|
__author__ = 'The Phi Technology'
|
||||||
__version__= '1.8.2'
|
__version__= '1.9.2'
|
||||||
|
|
Loading…
Reference in New Issue