From b160d0a295ed19deef01d885b1e93c8774b7897e Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Mon, 1 Apr 2024 13:27:14 -0500 Subject: [PATCH] housekeeping work --- transport/couch.py | 234 --------------------------------------- transport/mongo.py | 241 ----------------------------------------- transport/nextcloud.py | 80 -------------- transport/qlistener.py | 47 -------- transport/session.py | 88 --------------- 5 files changed, 690 deletions(-) delete mode 100644 transport/couch.py delete mode 100644 transport/mongo.py delete mode 100644 transport/nextcloud.py delete mode 100644 transport/qlistener.py delete mode 100644 transport/session.py diff --git a/transport/couch.py b/transport/couch.py deleted file mode 100644 index 8e02a4e..0000000 --- a/transport/couch.py +++ /dev/null @@ -1,234 +0,0 @@ -""" -Data-Transport -Steve L. Nyemba, The Phi Technology - -This file is a wrapper around couchdb using IBM Cloudant SDK that has an interface to couchdb - -""" -import cloudant -import json -import sys -if sys.version_info[0] > 2 : - from transport.common import Reader, Writer -else: - from common import Reader, Writer -class Couch: - """ - This class is a wrapper for read/write against couchdb. The class captures common operations for read/write. - @param url host & port reference default http://localhost:5984 - @param doc user id involved - @param dbname database name (target) - """ - def __init__(self,**args): - url = args['url'] if 'url' in args else 'http://localhost:5984' - self._id = args['doc'] - dbname = args['dbname'] - if 'username' not in args and 'password' not in args : - self.server = cloudant.CouchDB(None,None,url=url) - else: - self.server = cloudant.CouchDB(args['username'],args['password'],url=url) - self.server.connect() - - if dbname in self.server.all_dbs() : - self.dbase = self.server.get(dbname,dbname,True) - # - # @TODO Check if the database exists ... - # - doc = cloudant.document.Document(self.dbase,self._id) #self.dbase.get(self._id) - if not doc.exists(): - doc = self.dbase.create_document({"_id":self._id}) - doc.save() - else: - self.dbase = None - """ - Insuring the preconditions are met for processing - """ - def isready(self): - p = self.server.metadata() != {} - if p == False or not self.dbase: - return False - # - # At this point we are sure that the server is connected - # We are also sure that the database actually exists - # - doc = cloudant.document.Document(self.dbase,self._id) - # q = self.dbase.all_docs(key=self._id)['rows'] - # if not q : - if not doc.exists(): - return False - return True - - def view(self,**args): - """ - The function will execute a view (provivded a user is authenticated) - :id design document _design/xxxx (provide full name with _design prefix) - :view_name name of the view i.e - :key(s) key(s) to be used to filter the content - """ - document = cloudant.design_document.DesignDocument(self.dbase,args['id']) - document.fetch() - params = {'group_level':1,'group':True} - if 'key' in args : - params ['key'] = args['key'] - elif 'keys' in args : - params['keys'] = args['keys'] - return document.get_view(args['view_name'])(**params)['rows'] - - - - -class CouchReader(Couch,Reader): - """ - This function will read an attachment from couchdb and return it to calling code. The attachment must have been placed before hand (otherwise oops) - @T: Account for security & access control - """ - def __init__(self,**args): - """ - @param filename filename (attachment) - """ - # - # setting the basic parameters for - Couch.__init__(self,**args) - if 'filename' in args : - self.filename = args['filename'] - else: - self.filename = None - - # def isready(self): - # # - # # Is the basic information about the database valid - # # - # p = Couchdb.isready(self) - - # if p == False: - # return False - # # - # # The database name is set and correct at this point - # # We insure the document of the given user has the requested attachment. - # # - - # doc = self.dbase.get(self._id) - - # if '_attachments' in doc: - # r = self.filename in doc['_attachments'].keys() - - # else: - # r = False - - # return r - def stream(self): - # - # @TODO Need to get this working ... - # - document = cloudant.document.Document(self.dbase,self._id) - # content = self.dbase.fetch_attachment(self._id,self.filename).split('\n') ; - content = self.get_attachment(self.filename) - for row in content: - yield row - - def read(self,**args): - if self.filename is not None: - self.stream() - else: - return self.basic_read() - def basic_read(self): - document = cloudant.document.Document(self.dbase,self._id) - - # document = self.dbase.get(self._id) - if document.exists() : - document.fetch() - document = dict(document) - del document['_rev'] - else: - document = {} - return document - -class CouchWriter(Couch,Writer): - """ - This class will write on a couchdb document provided a scope - The scope is the attribute that will be on the couchdb document - """ - def __init__(self,**args): - """ - @param uri host & port reference - @param uid user id involved - @param filename filename (attachment) - @param dbname database name (target) - """ - - Couch.__init__(self,**args) - def set (self,info): - document = cloudand.document.Document(self.dbase,self._id) - if document.exists() : - keys = list(set(document.keys()) - set(['_id','_rev','_attachments'])) - for id in keys : - document.field_set(document,id,None) - for id in args : - value = args[id] - document.field_set(document,id,value) - - document.save() - pass - else: - _document = dict({"_id":self._id},**args) - document.create_document(_document) - def write(self,info): - """ - write a given attribute to a document database - @info object to be written to the to an attribute. this - """ - - # document = self.dbase.get(self._id) - document = cloudant.document.Document(self.dbase,self._id) #.get(self._id) - if document.exists() is False : - document = self.dbase.create_document({"_id":self._id}) - # label = params['label'] - # row = params['row'] - # if label not in document : - # document[label] = [] - # document[label].append(row) - for key in info : - if key in document and type(document[key]) == list : - document[key] += info[key] - else: - document[key] = info[key] - - document.save() - # self.dbase.bulk_docs([document]) - # self.dbase.save_doc(document) - - def upload(self,**args): - """ - :param name name of the file to be uploaded - :param data content of the file (binary or text) - :param content_type (default) - """ - mimetype = args['content_type'] if 'content_type' in args else 'text/plain' - document = cloudant.document.Document(self.dbase,self.uid) - document.put_attachment(self.dbase,args['filename'],mimetype,args['content']) - document.save() - - def archive(self,params=None): - """ - This function will archive the document onto itself. - """ - # document = self.dbase.all_docs(self._id,include_docs=True) - document = cloudant.document.Document(self.dbase,self.filename) - document.fetch() - content = {} - # _doc = {} - for id in document: - if id not in ['_id','_rev','_attachments'] : - content[id] = document[id] - del document[id] - - content = json.dumps(content) - # document= _doc - now = str(datetime.today()) - - name = '-'.join([document['_id'] , now,'.json']) - self.upload(filename=name,data=content,content_type='application/json') - # self.dbase.bulk_docs([document]) - # self.dbase.put_attachment(document,content,name,'application/json') - # document.put_attachment(self.dbase,name,'application/json',content) - # document.save() diff --git a/transport/mongo.py b/transport/mongo.py deleted file mode 100644 index c7b5ed8..0000000 --- a/transport/mongo.py +++ /dev/null @@ -1,241 +0,0 @@ -""" -Data Transport - 1.0 -Steve L. Nyemba, The Phi Technology LLC - -This file is a wrapper around mongodb for reading/writing content against a mongodb server and executing views (mapreduce) -""" -from pymongo import MongoClient -from bson.objectid import ObjectId -from bson.binary import Binary -# import nujson as json -from datetime import datetime -import pandas as pd -import numpy as np -import gridfs -# from transport import Reader,Writer -import sys -if sys.version_info[0] > 2 : - from transport.common import Reader, Writer, IEncoder -else: - from common import Reader, Writer -import json -import re -from multiprocessing import Lock, RLock -class Mongo : - lock = RLock() - """ - Basic mongodb functions are captured here - """ - def __init__(self,**args): - """ - :dbname database name/identifier - :host host and port of the database by default localhost:27017 - :username username for authentication - :password password for current user - """ - - self.mechanism= 'SCRAM-SHA-256' if 'mechanism' not in args else args['mechanism'] - # authSource=(args['authSource'] if 'authSource' in args else self.dbname) - self._lock = False if 'lock' not in args else args['lock'] - self.dbname = None - username = password = None - if 'auth_file' in args : - _info = json.loads((open(args['auth_file'])).read()) - - - else: - _info = {} - _args = dict(args,**_info) - _map = {'dbname':'db','database':'db','table':'uid','collection':'uid','col':'uid','doc':'uid'} - for key in _args : - if key in ['username','password'] : - username = _args['username'] if key=='username' else username - password = _args['password'] if key == 'password' else password - continue - value = _args[key] - if key in _map : - key = _map[key] - - self.setattr(key,value) - # - # Let us perform aliasing in order to remain backwards compatible - - self.dbname = self.db if hasattr(self,'db')else self.dbname - self.uid = _args['table'] if 'table' in _args else (_args['doc'] if 'doc' in _args else (_args['collection'] if 'collection' in _args else None)) - if username and password : - self.client = MongoClient(self.host, - username=username, - password=password , - authSource=self.authSource, - authMechanism=self.mechanism) - - else: - self.client = MongoClient(self.host,maxPoolSize=10000) - - self.db = self.client[self.dbname] - - def isready(self): - p = self.dbname in self.client.list_database_names() - q = self.uid in self.client[self.dbname].list_collection_names() - return p and q - def setattr(self,key,value): - _allowed = ['host','port','db','doc','collection','authSource','mechanism'] - if key in _allowed : - setattr(self,key,value) - pass - def close(self): - self.client.close() - def meta(self,**_args): - return [] -class MongoReader(Mongo,Reader): - """ - This class will read from a mongodb data store and return the content of a document (not a collection) - """ - def __init__(self,**args): - Mongo.__init__(self,**args) - def read(self,**args): - - if 'mongo' in args or 'cmd' in args or 'pipeline' in args: - # - # @TODO: - cmd = {} - if 'aggregate' not in cmd and 'aggregate' not in args: - cmd['aggregate'] = self.uid - elif 'aggregate' in args : - cmd['aggregate'] = args['aggregate'] - if 'pipeline' in args : - cmd['pipeline']= args['pipeline'] - - if 'pipeline' not in args or 'aggregate' not in cmd : - cmd = args['mongo'] if 'mongo' in args else args['cmd'] - if "aggregate" in cmd : - if "allowDiskUse" not in cmd : - cmd["allowDiskUse"] = True - if "cursor" not in cmd : - cmd["cursor"] = {} - r = [] - out = self.db.command(cmd) - #@TODO: consider using a yield (generator) works wonders - while True : - if 'values' in out : - r += out['values'] - if 'cursor' in out : - key = 'firstBatch' if 'firstBatch' in out['cursor'] else 'nextBatch' - else: - key = 'n' - if 'cursor' in out and out['cursor'][key] : - r += list(out['cursor'][key]) - elif key in out and out[key]: - r.append (out[key]) - # yield out['cursor'][key] - if key not in ['firstBatch','nextBatch'] or ('cursor' in out and out['cursor']['id'] == 0) : - break - else: - out = self.db.command({"getMore":out['cursor']['id'],"collection":out['cursor']['ns'].split(".")[-1]}) - - - return pd.DataFrame(r) - else: - - - if 'table' in args or 'collection' in args : - if 'table' in args: - _uid = args['table'] - elif 'collection' in args : - _uid = args['collection'] - else: - _uid = self.uid - else: - _uid = self.uid - collection = self.db[_uid] - _filter = args['filter'] if 'filter' in args else {} - _df = pd.DataFrame(collection.find(_filter)) - columns = _df.columns.tolist()[1:] - return _df[columns] - def view(self,**args): - """ - This function is designed to execute a view (map/reduce) operation - """ - pass -class MongoWriter(Mongo,Writer): - """ - This class is designed to write to a mongodb collection within a database - """ - def __init__(self,**args): - Mongo.__init__(self,**args) - def upload(self,**args) : - """ - This function will upload a file to the current database (using GridFS) - :param data binary stream/text to be stored - :param filename filename to be used - :param encoding content_encoding (default utf-8) - - """ - if 'encoding' not in args : - args['encoding'] = 'utf-8' - gfs = GridFS(self.db) - gfs.put(**args) - - def archive(self): - """ - This function will archive documents to the - """ - collection = self.db[self.uid] - rows = list(collection.find()) - for row in rows : - if type(row['_id']) == ObjectId : - row['_id'] = str(row['_id']) - stream = Binary(json.dumps(collection,cls=IEncoder).encode()) - collection.delete_many({}) - now = "-".join([str(datetime.now().year()),str(datetime.now().month), str(datetime.now().day)]) - name = ".".join([self.uid,'archive',now])+".json" - description = " ".join([self.uid,'archive',str(len(rows))]) - self.upload(filename=name,data=stream,description=description,content_type='application/json') - # gfs = GridFS(self.db) - # gfs.put(filename=name,description=description,data=stream,encoding='utf-8') - # self.write({{"filename":name,"file":stream,"description":descriptions}}) - - - pass - def write(self,info,**_args): - """ - This function will write to a given collection i.e add a record to a collection (no updates) - @param info new record in the collection to be added - """ - # document = self.db[self.uid].find() - #collection = self.db[self.uid] - # if type(info) == list : - # self.db[self.uid].insert_many(info) - # else: - try: - if 'table' in _args or 'collection' in _args : - _uid = _args['table'] if 'table' in _args else _args['collection'] - else: - _uid = self.uid if 'doc' not in _args else _args['doc'] - if self._lock : - Mongo.lock.acquire() - if type(info) == list or type(info) == pd.DataFrame : - self.db[_uid].insert_many(info if type(info) == list else info.to_dict(orient='records')) - else: - self.db[_uid].insert_one(info) - finally: - if self._lock : - Mongo.lock.release() - def set(self,document): - """ - if no identifier is provided the function will delete the entire collection and set the new document. - Please use this function with great care (archive the content first before using it... for safety) - """ - - collection = self.db[self.uid] - if collection.count_document() > 0 and '_id' in document: - id = document['_id'] - del document['_id'] - collection.find_one_and_replace({'_id':id},document) - else: - collection.delete_many({}) - self.write(info) - def close(self): - Mongo.close(self) - # collecton.update_one({"_id":self.uid},document,True) - diff --git a/transport/nextcloud.py b/transport/nextcloud.py deleted file mode 100644 index 2eefd51..0000000 --- a/transport/nextcloud.py +++ /dev/null @@ -1,80 +0,0 @@ -""" -We are implementing transport to and from nextcloud (just like s3) -""" -import os -import sys -from transport.common import Reader,Writer, IEncoder -import pandas as pd -from io import StringIO -import json -import nextcloud_client as nextcloud - -class Nextcloud : - def __init__(self,**_args): - pass - self._delimiter = None - self._handler = nextcloud.Client(_args['url']) - _uid = _args['uid'] - _token = _args['token'] - self._uri = _args['folder'] if 'folder' in _args else './' - if self._uri.endswith('/') : - self._uri = self._uri[:-1] - self._file = None if 'file' not in _args else _args['file'] - self._handler.login(_uid,_token) - def close(self): - try: - self._handler.logout() - except Exception as e: - pass - - -class NextcloudReader(Nextcloud,Reader): - def __init__(self,**_args): - # self._file = [] if 'file' not in _args else _args['file'] - super().__init__(**_args) - pass - def read(self,**_args): - _filename = self._file if 'file' not in _args else _args['file'] - # - # @TODO: if _filename is none, an exception should be raised - # - _uri = '/'.join([self._uri,_filename]) - if self._handler.get_file(_uri) : - # - # - _info = self._handler.file_info(_uri) - _content = self._handler.get_file_contents(_uri).decode('utf8') - if _info.get_content_type() == 'text/csv' : - # - # @TODO: enable handling of csv, xls, parquet, pickles - _file = StringIO(_content) - return pd.read_csv(_file) - else: - # - # if it is neither a structured document like csv, we will return the content as is - return _content - return None -class NextcloudWriter (Nextcloud,Writer): - """ - This class will write data to an instance of nextcloud - """ - def __init__(self,**_args) : - super().__init__(**_args) - self - def write(self,_data,**_args): - """ - This function will upload a file to a given destination - :file has the uri of the location of the file - """ - _filename = self._file if 'file' not in _args else _args['file'] - _uri = '/'.join([self._uri,_filename]) - if type(_data) == pd.DataFrame : - f = StringIO() - _data.to_csv(f,index=False) - _content = f.getvalue() - elif type(_data) == dict : - _content = json.dumps(_data,cls=IEncoder) - else: - _content = str(_data) - self._handler.put_file_contents(_uri,_content) - diff --git a/transport/qlistener.py b/transport/qlistener.py deleted file mode 100644 index 26f0ba8..0000000 --- a/transport/qlistener.py +++ /dev/null @@ -1,47 +0,0 @@ -import queue -from threading import Thread, Lock -from transport.common import Reader,Writer -import numpy as np -import pandas as pd - -class qListener : - lock = Lock() - _queue = {'default':queue.Queue()} - def __init__(self,**_args): - self._cache = {} - self._callback = _args['callback'] if 'callback' in _args else None - self._id = _args['id'] if 'id' in _args else 'default' - if self._id not in qListener._queue : - qListener._queue[self._id] = queue.Queue() - thread = Thread(target=self._forward) - thread.start() - def _forward(self): - _q = qListener._queue[self._id] - _data = _q.get() - _q.task_done() - self._callback(_data) - - def has(self,**_args) : - return self._callback is not None - - - def close(self): - """ - This will empty the queue and have it ready for another operation - """ - _q = qListener._queue[self._id] - with _q.mutex: - _q.queue.clear() - _q.all_tasks_done.notify_all() - - def write(self,_data,**_args): - _id = _args['id'] if 'id' in _args else self._id - - _q = qListener._queue[_id] - _q.put(_data) - _q.join() -class Console (qListener): - def __init__(self,**_args): - super().__init__(callback=print) - - # self.callback = print \ No newline at end of file diff --git a/transport/session.py b/transport/session.py deleted file mode 100644 index d74669a..0000000 --- a/transport/session.py +++ /dev/null @@ -1,88 +0,0 @@ -from flask import request, session -from datetime import datetime -import re -from transport.common import Reader, Writer -import json -import requests -from io import StringIO -import pandas as pd - - -class HttpReader(Reader): - """ - This class is designed to read data from an Http request file handler provided to us by flask - The file will be heald in memory and processed accordingly - NOTE: This is inefficient and can crash a micro-instance (becareful) - """ - - def __init__(self,**_args): - self._url = _args['url'] - self._headers = None if 'headers' not in _args else _args['headers'] - - # def isready(self): - # return self.file_length > 0 - def format(self,_response): - _mimetype= _response.headers['Content-Type'] - if _mimetype == 'text/csv' or 'text/csv': - _content = _response.text - return pd.read_csv(StringIO(_content)) - # - # @TODO: Add support for excel, JSON and other file formats that fit into a data-frame - # - - return _response.text - def read(self,**_args): - if self._headers : - r = requests.get(self._url,headers = self._headers) - else: - r = requests.get(self._url,headers = self._headers) - return self.format(r) - -class HttpWriter(Writer): - """ - This class is designed to submit data to an endpoint (url) - """ - def __init__(self,**_args): - """ - @param key required session key - """ - self._url = _args['url'] - self._name = _args['name'] - self._method = 'post' if 'method' not in _args else _args['method'] - - # self.session = params['queue'] - # self.session['sql'] = [] - # self.session['csv'] = [] - # self.tablename = re.sub('..+$','',params['filename']) - # self.session['uid'] = params['uid'] - #self.xchar = params['xchar'] - - - def format_sql(self,row): - values = "','".join([col.replace('"','').replace("'",'') for col in row]) - return "".join(["INSERT INTO :table VALUES('",values,"');\n"]).replace(':table',self.tablename) - def isready(self): - return True - def write(self,_data,**_args): - # - # - _method = self._method if 'method' not in _args else _args['method'] - _method = _method.lower() - _mimetype = 'text/csv' - if type(_data) == dict : - _mimetype = 'application/json' - _content = _data - else: - _content = _data.to_dict(orient='records') - _headers = {'Content-Type':_mimetype} - _pointer = getattr(requests,_method) - - _pointer ({self._name:_content},headers=_headers) - - - # label = params['label'] - # row = params ['row'] - - # if label == 'usable': - # self.session['csv'].append(self.format(row,',')) - # self.session['sql'].append(self.format_sql(row))