bug fixes for version 1.0.8, streamlining interface

This commit is contained in:
Steve Nyemba 2020-02-01 09:42:45 -06:00
parent aaad4003a9
commit 081ed080d7
5 changed files with 214 additions and 158 deletions

View File

@ -6,7 +6,12 @@ import os
import sys import sys
def read(fname): def read(fname):
return open(os.path.join(os.path.dirname(__file__), fname)).read() return open(os.path.join(os.path.dirname(__file__), fname)).read()
args = {"name":"data-transport","version":"1.0.0","author":"The Phi Technology LLC","author_email":"info@the-phi.com","license":"MIT","packages":["transport"]} args = {
"name":"data-transport",
"version":"1.0.8",
"author":"The Phi Technology LLC","author_email":"info@the-phi.com",
"license":"MIT",
"packages":["transport"]}
args["keywords"]=['mongodb','couchdb','rabbitmq','file','read','write','s3'] args["keywords"]=['mongodb','couchdb','rabbitmq','file','read','write','s3']
args["install_requires"] = ['pymongo','numpy','cloudant','pika','boto','flask-session','smart_open'] args["install_requires"] = ['pymongo','numpy','cloudant','pika','boto','flask-session','smart_open']
args["url"] = "https://dev.the-phi.com/git/steve/data-transport.git" args["url"] = "https://dev.the-phi.com/git/steve/data-transport.git"

View File

@ -14,7 +14,8 @@ Requirements :
pymongo pymongo
boto boto
couldant couldant
@TODO:
Enable read/writing to multiple reads/writes
""" """
__author__ = 'The Phi Technology' __author__ = 'The Phi Technology'
import numpy as np import numpy as np
@ -22,107 +23,72 @@ import json
import importlib import importlib
# import couch # import couch
# import mongo # import mongo
class Reader: class IO:
def init(self,**args):
"""
This function enables attributes to be changed at runtime. Only the attributes defined in the class can be changed
Adding attributes will require sub-classing otherwise we may have an unpredictable class ...
"""
allowed = list(vars(self).keys())
for field in args :
if field not in allowed :
continue
value = args[field]
setattr(self,field,value)
class Reader (IO):
"""
This class is an abstraction of a read functionalities of a data store
"""
def __init__(self): def __init__(self):
self.nrows = 0 pass
self.xchar = None def meta(self):
def row_count(self):
content = self.read()
return np.sum([1 for row in content])
def delimiter(self,sample):
""" """
This function determines the most common delimiter from a subset of possible delimiters. This function is intended to return meta-data associated with what has just been read
It uses a statistical approach (distribution) to guage the distribution of columns for a given delimiter @return object of meta data information associated with the content of the store
:sample sample string/content expecting matrix i.e list of rows
""" """
raise Exception ("meta function needs to be implemented")
m = {',':[],'\t':[],'|':[],'\x3A':[]} def read(**args):
delim = list(m.keys())
for row in sample:
for xchar in delim:
if row.split(xchar) > 1:
m[xchar].append(len(row.split(xchar)))
else:
m[xchar].append(0)
#
# The delimiter with the smallest variance, provided the mean is greater than 1
# This would be troublesome if there many broken records sampled
#
m = {id: np.var(m[id]) for id in list(m.keys()) if m[id] != [] and int(np.mean(m[id]))>1}
index = list(m.values()).index( min(m.values()))
xchar = list(m.keys())[index]
return xchar
def col_count(self,sample):
""" """
This function retirms the number of columns of a given sample This function is intended to read the content of a store provided parameters to be used at the discretion of the subclass
@pre self.xchar is not None
""" """
raise Exception ("read function needs to be implemented")
m = {}
i = 0
for row in sample:
row = self.format(row)
id = str(len(row))
#id = str(len(row.split(self.xchar)))
if id not in m:
m[id] = 0
m[id] = m[id] + 1
index = list(m.values()).index( max(m.values()) )
ncols = int(list(m.keys())[index])
return ncols; class Writer(IO):
def format (self,row): def __init__(self):
""" self.cache = {"default":[]}
This function will clean records of a given row by removing non-ascii characters def log(self,**args):
@pre self.xchar is not None self.cache[id] = args
""" def meta (self,id="default",**args):
raise Exception ("meta function needs to be implemented")
if isinstance(row,list) == False:
#
# We've observed sometimes fields contain delimiter as a legitimate character, we need to be able to account for this and not tamper with the field values (unless necessary)
cols = self.split(row)
#cols = row.split(self.xchar)
else:
cols = row ;
return [ re.sub('[^\x00-\x7F,\n,\r,\v,\b,]',' ',col.strip()).strip().replace('"','') for col in cols]
def split (self,row):
"""
This function performs a split of a record and tries to attempt to preserve the integrity of the data within i.e accounting for the double quotes.
@pre : self.xchar is not None
"""
pattern = "".join(["(?:^|",self.xchar,")(\"(?:[^\"]+|\"\")*\"|[^",self.xchar,"]*)"])
return re.findall(pattern,row.replace('\n',''))
class Writer:
def format(self,row,xchar): def format(self,row,xchar):
if xchar is not None and isinstance(row,list): if xchar is not None and isinstance(row,list):
return xchar.join(row)+'\n' return xchar.join(row)+'\n'
elif xchar is None and isinstance(row,dict): elif xchar is None and isinstance(row,dict):
row = json.dumps(row) row = json.dumps(row)
return row return row
""" def write(self,**args):
"""
This function will write content to a store given parameters to be used at the discretion of the sub-class
"""
raise Exception ("write function needs to be implemented")
def archive(self):
"""
It is important to be able to archive data so as to insure that growth is controlled It is important to be able to archive data so as to insure that growth is controlled
Nothing in nature grows indefinitely neither should data being handled. Nothing in nature grows indefinitely neither should data being handled.
"""
raise Exception ("archive function needs to be implemented")
def close(self):
"""
This function will close the persistent storage connection/handler
"""
pass
class ReadWriter(Reader,Writer) :
""" """
def archive(self): This class implements the read/write functions aggregated
pass """
def flush(self): pass
pass
# class factory : # class factory :
# @staticmethod # @staticmethod
# def instance(**args): # def instance(**args):

View File

@ -15,13 +15,13 @@ else:
class Couch: class Couch:
""" """
This class is a wrapper for read/write against couchdb. The class captures common operations for read/write. This class is a wrapper for read/write against couchdb. The class captures common operations for read/write.
@param url host & port reference @param url host & port reference default http://localhost:5984
@param doc user id involved @param doc user id involved
@param dbname database name (target) @param dbname database name (target)
""" """
def __init__(self,**args): def __init__(self,**args):
url = args['url'] url = args['url'] if 'url' in args else 'http://localhost:5984'
self.uid = args['doc'] self._id = args['doc']
dbname = args['dbname'] dbname = args['dbname']
if 'username' not in args and 'password' not in args : if 'username' not in args and 'password' not in args :
self.server = cloudant.CouchDB(None,None,url=url) self.server = cloudant.CouchDB(None,None,url=url)
@ -34,9 +34,9 @@ class Couch:
# #
# @TODO Check if the database exists ... # @TODO Check if the database exists ...
# #
doc = cloudant.document.Document(self.dbase,self.uid) #self.dbase.get(self.uid) doc = cloudant.document.Document(self.dbase,self._id) #self.dbase.get(self._id)
if not doc.exists(): if not doc.exists():
doc = self.dbase.create_document({"_id":self.uid}) doc = self.dbase.create_document({"_id":self._id})
doc.save() doc.save()
else: else:
self.dbase = None self.dbase = None
@ -51,8 +51,8 @@ class Couch:
# At this point we are sure that the server is connected # At this point we are sure that the server is connected
# We are also sure that the database actually exists # We are also sure that the database actually exists
# #
doc = cloudant.document.Document(self.dbase,self.uid) doc = cloudant.document.Document(self.dbase,self._id)
# q = self.dbase.all_docs(key=self.uid)['rows'] # q = self.dbase.all_docs(key=self._id)['rows']
# if not q : # if not q :
if not doc.exists(): if not doc.exists():
return False return False
@ -107,7 +107,7 @@ class CouchReader(Couch,Reader):
# # We insure the document of the given user has the requested attachment. # # We insure the document of the given user has the requested attachment.
# # # #
# doc = self.dbase.get(self.uid) # doc = self.dbase.get(self._id)
# if '_attachments' in doc: # if '_attachments' in doc:
# r = self.filename in doc['_attachments'].keys() # r = self.filename in doc['_attachments'].keys()
@ -120,8 +120,8 @@ class CouchReader(Couch,Reader):
# #
# @TODO Need to get this working ... # @TODO Need to get this working ...
# #
document = cloudant.document.Document(self.dbase,self.uid) document = cloudant.document.Document(self.dbase,self._id)
# content = self.dbase.fetch_attachment(self.uid,self.filename).split('\n') ; # content = self.dbase.fetch_attachment(self._id,self.filename).split('\n') ;
content = self.get_attachment(self.filename) content = self.get_attachment(self.filename)
for row in content: for row in content:
yield row yield row
@ -132,9 +132,9 @@ class CouchReader(Couch,Reader):
else: else:
return self.basic_read() return self.basic_read()
def basic_read(self): def basic_read(self):
document = cloudant.document.Document(self.dbase,self.uid) document = cloudant.document.Document(self.dbase,self._id)
# document = self.dbase.get(self.uid) # document = self.dbase.get(self._id)
if document.exists() : if document.exists() :
document.fetch() document.fetch()
document = dict(document) document = dict(document)
@ -157,32 +157,62 @@ class CouchWriter(Couch,Writer):
""" """
Couch.__init__(self,**args) Couch.__init__(self,**args)
def set (self,info):
document = cloudand.document.Document(self.dbase,self._id)
if document.exists() :
keys = list(set(document.keys()) - set(['_id','_rev','_attachments']))
for id in keys :
document.field_set(document,id,None)
for id in args :
value = args[id]
document.field_set(document,id,value)
def write(self,**params): document.save()
pass
else:
_document = dict({"_id":self._id},**args)
document.create_document(_document)
def write(self,info):
""" """
write a given attribute to a document database write a given attribute to a document database
@param label scope of the row repair|broken|fixed|stats @info object to be written to the to an attribute. this
@param row row to be written
""" """
# document = self.dbase.get(self.uid) # document = self.dbase.get(self._id)
document = cloudant.document.Document(self.dbase,self.uid) #.get(self.uid) document = cloudant.document.Document(self.dbase,self._id) #.get(self._id)
if document.exists() is False : if document.exists() is False :
document = self.dbase.create_document({"_id":self.uid}) document = self.dbase.create_document({"_id":self._id})
label = params['label'] # label = params['label']
row = params['row'] # row = params['row']
if label not in document : # if label not in document :
document[label] = [] # document[label] = []
document[label].append(row) # document[label].append(row)
for key in info :
if key in document and type(document[key]) == list :
document[key] += info[key]
else:
document[key] = info[key]
document.save() document.save()
# self.dbase.bulk_docs([document]) # self.dbase.bulk_docs([document])
# self.dbase.save_doc(document) # self.dbase.save_doc(document)
def upload(self,**args):
"""
:param name name of the file to be uploaded
:param data content of the file (binary or text)
:param content_type (default)
"""
mimetype = args['content_type'] if 'content_type' in args else 'text/plain'
document = cloudant.document.Document(self.dbase,self.uid)
document.put_attachment(self.dbase,args['filename'],mimetype,args['content'])
document.save()
def archive(self,params=None): def archive(self,params=None):
""" """
This function will archive the document onto itself. This function will archive the document onto itself.
""" """
# document = self.dbase.all_docs(self.uid,include_docs=True) # document = self.dbase.all_docs(self._id,include_docs=True)
document = cloudant.document.Document(self.dbase,self.filename) document = cloudant.document.Document(self.dbase,self.filename)
document.fetch() document.fetch()
content = {} content = {}
@ -197,7 +227,8 @@ class CouchWriter(Couch,Writer):
now = str(datetime.today()) now = str(datetime.today())
name = '-'.join([document['_id'] , now,'.json']) name = '-'.join([document['_id'] , now,'.json'])
self.upload(filename=name,data=content,content_type='application/json')
# self.dbase.bulk_docs([document]) # self.dbase.bulk_docs([document])
# self.dbase.put_attachment(document,content,name,'application/json') # self.dbase.put_attachment(document,content,name,'application/json')
document.put_attachment(self.dbase,name,'application/json',content) # document.put_attachment(self.dbase,name,'application/json',content)
document.save() # document.save()

View File

@ -14,8 +14,8 @@ class DiskReader(Reader) :
""" """
Reader.__init__(self) Reader.__init__(self)
self.path = params['path'] ; self.path = params['path'] ;
self.delimiter = params['delimiter'] if 'delimiter' in params else None
def isready(self): def isready(self):
return os.path.exists(self.path) return os.path.exists(self.path)
def read(self,size=-1): def read(self,size=-1):
@ -31,55 +31,54 @@ class DiskReader(Reader) :
i += 1 i += 1
if size == i: if size == i:
break break
if self.delimiter :
yield row.split(self.char)
yield row yield row
f.close() f.close()
class DiskWriter(Writer): class DiskWriter(Writer):
""" """
This function writes output to disk in a designated location This function writes output to disk in a designated location. The function will write a text to a text file
- If a delimiter is provided it will use that to generate a xchar-delimited file
- If not then the object will be dumped as is
""" """
def __init__(self,**params): def __init__(self,**params):
Writer.__init__(self)
self.cache['meta'] = {'cols':0,'rows':0,'delimiter':None}
if 'path' in params: if 'path' in params:
self.path = params['path'] self.path = params['path']
else: else:
self.path = None self.path = 'data-transport.log'
if 'name' in params: self.delimiter = params['delimiter'] if 'delimiter' in params else None
self.name = params['name']; # if 'name' in params:
else: # self.name = params['name'];
self.name = 'out.log' # else:
# self.name = 'data-transport.log'
# if os.path.exists(self.path) == False: # if os.path.exists(self.path) == False:
# os.mkdir(self.path) # os.mkdir(self.path)
def meta(self):
return self.cache['meta']
def isready(self): def isready(self):
""" """
This function determines if the class is ready for execution or not This function determines if the class is ready for execution or not
i.e it determines if the preconditions of met prior execution i.e it determines if the preconditions of met prior execution
""" """
return True
p = self.path is not None and os.path.exists(self.path) # p = self.path is not None and os.path.exists(self.path)
q = self.name is not None # q = self.name is not None
return p and q # return p and q
def write(self,**params): def format (self,row):
self.cache['meta']['cols'] += len(row) if isinstance(row,list) else len(row.keys())
self.cache['meta']['rows'] += 1
return (self.delimiter.join(row) if self.delimiter else json.dumps(row))+"\n"
def write(self,info):
""" """
This function writes a record to a designated file This function writes a record to a designated file
@param label <passed|broken|fixed|stats> @param label <passed|broken|fixed|stats>
@param row row to be written @param row row to be written
""" """
# label = params['label']
row = params['row']
# xchar = None
# if 'xchar' is not None:
# xchar = params['xchar']
#path = ''.join([self.path,os.sep,label])
# path = ''.join([self.path,os.sep,self.name])
#if os.path.exists(path) == False:
# os.mkdir(path) ;
# path = ''.join([path,os.sep,self.name])
f = open(self.path,'a') f = open(self.path,'a')
if isinstance(row,object): f.write(self.format(info))
row = json.dumps(row)
#row = self.format(row,xchar);
f.write(row+"\n")
f.close() f.close()

View File

@ -4,7 +4,12 @@ Steve L. Nyemba, The Phi Technology LLC
This file is a wrapper around mongodb for reading/writing content against a mongodb server and executing views (mapreduce) This file is a wrapper around mongodb for reading/writing content against a mongodb server and executing views (mapreduce)
""" """
from pymongo import MongoClient from pymongo import MongoClient
from bson.objectid import ObjectId
from bson.binary import Binary
import json
from datetime import datetime
import gridfs
# from transport import Reader,Writer # from transport import Reader,Writer
import sys import sys
if sys.version_info[0] > 2 : if sys.version_info[0] > 2 :
@ -19,11 +24,11 @@ class Mongo :
def __init__(self,**args): def __init__(self,**args):
""" """
:dbname database name/identifier :dbname database name/identifier
:host host and port of the database :host host and port of the database by default localhost:27017
:username username for authentication :username username for authentication
:password password for current user :password password for current user
""" """
host = args['host'] host = args['host'] if 'host' in args else 'localhost:27017'
if 'user' in args and 'password' in args: if 'user' in args and 'password' in args:
self.client = MongoClient(host, self.client = MongoClient(host,
@ -31,7 +36,7 @@ class Mongo :
password=args['password'] , password=args['password'] ,
authMechanism='SCRAM-SHA-256') authMechanism='SCRAM-SHA-256')
else: else:
self.client = MongoClient() self.client = MongoClient(host)
self.uid = args['doc'] #-- document identifier self.uid = args['doc'] #-- document identifier
self.dbname = args['dbname'] self.dbname = args['dbname']
@ -62,17 +67,67 @@ class MongoWriter(Mongo,Writer):
""" """
def __init__(self,**args): def __init__(self,**args):
Mongo.__init__(self,**args) Mongo.__init__(self,**args)
def write(self,**args): def upload(self,**args) :
"""
This function will upload a file to the current database (using GridFS)
:param data binary stream/text to be stored
:param filename filename to be used
:param encoding content_encoding (default utf-8)
"""
if 'encoding' not in args :
args['encoding'] = 'utf-8'
gfs = GridFS(self.db)
gfs.put(**args)
def archive(self):
"""
This function will archive documents to the
"""
collection = self.db[self.uid]
rows = list(collection.find())
for row in rows :
if type(row['_id']) == ObjectId :
row['_id'] = str(row['_id'])
stream = Binary(json.dumps(collection).encode())
collection.delete_many({})
now = "-".join([str(datetime.now().year()),str(datetime.now().month), str(datetime.now().day)])
name = ".".join([self.uid,'archive',now])+".json"
description = " ".join([self.uid,'archive',str(len(rows))])
self.upload(filename=name,data=stream,description=description,content_type='application/json')
# gfs = GridFS(self.db)
# gfs.put(filename=name,description=description,data=stream,encoding='utf-8')
# self.write({{"filename":name,"file":stream,"description":descriptions}})
pass
def write(self,info):
"""
This function will write to a given collection i.e add a record to a collection (no updates)
@param info new record in the collection to be added
"""
# document = self.db[self.uid].find() # document = self.db[self.uid].find()
collection = self.db[self.uid] collection = self.db[self.uid]
if type(args['row']) == list : # if type(info) == list :
self.db[self.uid].insert_many(args['row']) # self.db[self.uid].insert_many(info)
# else:
if (type(info) == list) :
self.db[self.uid].insert_many(info)
else: else:
self.db[self.uid].insert_one(args['row']) self.db[self.uid].insert_one(info)
def set(self,document): def set(self,document):
"""
if no identifier is provided the function will delete the entire collection and set the new document.
Please use this function with great care (archive the content first before using it... for safety)
"""
collection = self.db[self.uid] collection = self.db[self.uid]
if collection.count_document() > 0 : if collection.count_document() > 0 and '_id' in document:
collection.delete({_id:self.uid}) id = document['_id']
del document['_id']
collecton.update_one({"_id":self.uid},document,True) collection.find_one_and_replace({'_id':id},document)
else:
collection.delete_many({})
self.write(info)
# collecton.update_one({"_id":self.uid},document,True)