support for filters in read

This commit is contained in:
Steve Nyemba 2020-05-17 21:57:18 -05:00
parent 834dc80e79
commit d5ba648abf
6 changed files with 68 additions and 20 deletions

View File

@ -50,10 +50,11 @@ import sys
if sys.version_info[0] > 2 :
from transport.common import Reader, Writer #, factory
from transport import disk
from transport import queue as queue
from transport import s3 as s3
from transport import rabbitmq as queue
from transport import couch as couch
from transport import mongo as mongo
from transport import s3 as s3
else:
from common import Reader, Writer #, factory
import disk

View File

@ -47,7 +47,7 @@ class Reader (IO):
@return object of meta data information associated with the content of the store
"""
raise Exception ("meta function needs to be implemented")
def read(**args):
def read(self,**args):
"""
This function is intended to read the content of a store provided parameters to be used at the discretion of the subclass
"""

View File

@ -1,5 +1,9 @@
import os
from .__init__ import Reader,Writer
import sys
if sys.version_info[0] > 2 :
from transport.common import Reader, Writer #, factory
else:
from common import Reader,Writer
import json
class DiskReader(Reader) :
@ -18,12 +22,12 @@ class DiskReader(Reader) :
self.delimiter = params['delimiter'] if 'delimiter' in params else None
def isready(self):
return os.path.exists(self.path)
def read(self,size=-1):
def read(self,**args):
"""
This function reads the rows from a designated location on disk
@param size number of rows to be read, -1 suggests all rows
"""
size = -1 if 'size' not in args else int(args['size'])
f = open(self.path,'rU')
i = 1
for row in f:

View File

@ -39,7 +39,7 @@ class Mongo :
self.client = MongoClient(host)
self.uid = args['doc'] #-- document identifier
self.dbname = args['dbname']
self.dbname = args['dbname'] if 'db' in args else args['db']
self.db = self.client[self.dbname]
def isready(self):
@ -53,9 +53,10 @@ class MongoReader(Mongo,Reader):
"""
def __init__(self,**args):
Mongo.__init__(self,**args)
def read(self,size=-1):
def read(self,**args):
collection = self.db[self.uid]
return collection.find({})
_filter = args['filter'] if 'filter' in args else {}
return collection.find(_filter)
def view(self,**args):
"""
This function is designed to execute a view (map/reduce) operation

View File

@ -183,7 +183,7 @@ class QueueReader(MessageQueue,Reader):
if self.size == len(self.data[qid]) or len(self.data[qid]) == self.info.method.message_count:
self.close()
def read(self,size=-1):
def read(self,**args):
"""
This function will read, the first message from a queue
@TODO:
@ -191,7 +191,7 @@ class QueueReader(MessageQueue,Reader):
Have the number of messages retrieved be specified by size (parameter)
"""
r = {}
self.size = size
self.size = -1 if 'size' in args else int(args['size'])
#
# We enabled the reader to be able to read from several queues (sequentially for now)
# The qid parameter will be an array of queues the reader will be reading from

View File

@ -6,6 +6,8 @@ This file is a wrapper around s3 bucket provided by AWS for reading and writing
"""
from datetime import datetime
import boto
from boto.s3.connection import S3Connection, OrdinaryCallingFormat
import numpy as np
import botocore
from smart_open import smart_open
import sys
@ -14,13 +16,14 @@ if sys.version_info[0] > 2 :
else:
from common import Reader, Writer
import json
from io import StringIO
import json
class s3 :
"""
@TODO: Implement a search function for a file given a bucket??
"""
def __init__(self,args) :
def __init__(self,**args) :
"""
This function will extract a file or set of files from s3 bucket provided
@param access_key
@ -29,18 +32,39 @@ class s3 :
@param filter filename or filtering elements
"""
try:
self.s3 = boto.connect_s3(args['access_key'],args['secret_key'])
self.s3 = S3Connection(args['access_key'],args['secret_key'],calling_format=OrdinaryCallingFormat())
self.bucket = self.s3.get_bucket(args['bucket'].strip(),validate=False) if 'bucket' in args else None
# self.path = args['path']
self.filter = args['filter'] if 'filter' in args else None
self.filename = args['file'] if 'file' in args else None
self.bucket_name = args['bucket'] if 'bucket' in args else None
except Exception as e :
self.s3 = None
self.bucket = None
print (e)
def meta(self,**args):
"""
:name name of the bucket
"""
info = self.list(**args)
[item.open() for item in info]
return [{"name":item.name,"size":item.size} for item in info]
def list(self,**args):
"""
This function will list the content of a bucket, the bucket must be provided by the name
:name name of the bucket
"""
return list(self.s3.get_bucket(args['name']).list())
def buckets(self):
#
# This function will return all buckets, not sure why but it should be used cautiously
# based on why the s3 infrastructure is used
#
return [item.name for item in self.s3.get_all_buckets()]
# def buckets(self):
pass
# """
@ -56,8 +80,8 @@ class s3Reader(s3,Reader) :
- stream content if file is Not None
@TODO: support read from all buckets, think about it
"""
def __init__(self,args) :
s3.__init__(self,args)
def __init__(self,**args) :
s3.__init__(self,**args)
def files(self):
r = []
try:
@ -80,14 +104,32 @@ class s3Reader(s3,Reader) :
break
yield line
count += 1
def read(self,limit=-1) :
def read(self,**args) :
if self.filename is None :
#
# returning the list of files because no one file was specified.
return self.files()
else:
return self.stream(10)
limit = args['size'] if 'size' in args else -1
return self.stream(limit)
class s3Writer(s3,Writer) :
def __init__(self,args) :
s3.__init__(self,args)
def __init__(self,args) :
s3.__init__(self,args)
def mkdir(self,name):
"""
This function will create a folder in a bucket
:name name of the folder
"""
self.s3.put_object(Bucket=self.bucket_name,key=(name+'/'))
def write(self,content):
file = StringIO(content.decode("utf8"))
self.s3.upload_fileobj(file,self.bucket_name,self.filename)
pass
if __name__ == '__main__' :
p = {'access_key':'AKIAJO7KII27XH3TCPJQ','secret_key':'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'}
reader = s3Reader(**p)
buckets = reader.buckets()
print(reader.list(name = buckets[0]))