diff --git a/info/__init__.py b/info/__init__.py index 6379b6c..04183a9 100644 --- a/info/__init__.py +++ b/info/__init__.py @@ -1,6 +1,6 @@ __app_name__ = 'data-transport' __author__ = 'The Phi Technology' -__version__= '2.2.0' +__version__= '2.2.1' __email__ = "info@the-phi.com" __license__=f""" Copyright 2010 - 2024, Steve L. Nyemba diff --git a/setup.py b/setup.py index 9b46d71..7bb44e8 100644 --- a/setup.py +++ b/setup.py @@ -19,7 +19,7 @@ args = { "packages": find_packages(include=['info','transport', 'transport.*'])} args["keywords"]=['mongodb','duckdb','couchdb','rabbitmq','file','read','write','s3','sqlite'] -args["install_requires"] = ['pyncclient','duckdb-engine','pymongo','sqlalchemy','pandas','typer','pandas-gbq','numpy','cloudant','pika','nzpy','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python','numpy','pymssql'] +args["install_requires"] = ['pyncclient','duckdb-engine','pymongo','sqlalchemy','pandas','typer','pandas-gbq','numpy','cloudant','pika','nzpy','termcolor','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python','numpy','pymssql'] args["url"] = "https://healthcareio.the-phi.com/git/code/transport.git" args['scripts'] = ['bin/transport'] # if sys.version_info[0] == 2 : diff --git a/transport/__init__.py b/transport/__init__.py index 16a2467..b934760 100644 --- a/transport/__init__.py +++ b/transport/__init__.py @@ -134,7 +134,7 @@ class get : """ @staticmethod def reader (**_args): - if not _args or 'provider' not in _args: + if not _args or ('provider' not in _args and 'label' not in _args): _args['label'] = 'default' _args['context'] = 'read' return instance(**_args) @@ -143,7 +143,7 @@ class get : """ This function is a wrapper that will return a writer to a database. It disambiguates the interface """ - if not _args : + if not _args or ('provider' not in _args and 'label' not in _args): _args['label'] = 'default' _args['context'] = 'write' return instance(**_args) diff --git a/transport/cloud/s3.py b/transport/cloud/s3.py index 4e230e8..095cfd3 100644 --- a/transport/cloud/s3.py +++ b/transport/cloud/s3.py @@ -5,8 +5,8 @@ Steve L. Nyemba, The Phi Technology LLC This file is a wrapper around s3 bucket provided by AWS for reading and writing content """ from datetime import datetime -import boto -from boto.s3.connection import S3Connection, OrdinaryCallingFormat +import boto3 +# from boto.s3.connection import S3Connection, OrdinaryCallingFormat import numpy as np import botocore from smart_open import smart_open @@ -14,6 +14,7 @@ import sys import json from io import StringIO +import pandas as pd import json class s3 : @@ -29,46 +30,37 @@ class s3 : @param filter filename or filtering elements """ try: - self.s3 = S3Connection(args['access_key'],args['secret_key'],calling_format=OrdinaryCallingFormat()) - self.bucket = self.s3.get_bucket(args['bucket'].strip(),validate=False) if 'bucket' in args else None - # self.path = args['path'] - self.filter = args['filter'] if 'filter' in args else None - self.filename = args['file'] if 'file' in args else None - self.bucket_name = args['bucket'] if 'bucket' in args else None - + self._client = boto3.client('s3',aws_access_key_id=args['access_key'],aws_secret_access_key=args['secret_key'],region_name=args['region']) + self._bucket_name = args['bucket'] + self._file_name = args['file'] + self._region = args['region'] except Exception as e : - self.s3 = None - self.bucket = None print (e) + pass + def has(self,**_args): + _found = None + try: + if 'file' in _args and 'bucket' in _args: + _found = self.meta(**_args) + elif 'bucket' in _args and not 'file' in _args: + _found = self._client.list_objects(Bucket=_args['bucket']) + elif 'file' in _args and not 'bucket' in _args : + _found = self.meta(bucket=self._bucket_name,file = _args['file']) + except Exception as e: + _found = None + pass + return type(_found) == dict def meta(self,**args): """ + This function will return information either about the file in a given bucket :name name of the bucket """ - info = self.list(**args) - [item.open() for item in info] - return [{"name":item.name,"size":item.size} for item in info] - def list(self,**args): - """ - This function will list the content of a bucket, the bucket must be provided by the name - :name name of the bucket - """ - return list(self.s3.get_bucket(args['name']).list()) - - - def buckets(self): - # - # This function will return all buckets, not sure why but it should be used cautiously - # based on why the s3 infrastructure is used - # - return [item.name for item in self.s3.get_all_buckets()] - - # def buckets(self): - pass - # """ - # This function is a wrapper around the bucket list of buckets for s3 - # """ - # return self.s3.get_all_buckets() - + _bucket = self._bucket_name if 'bucket' not in args else args['bucket'] + _file = self._file_name if 'file' not in args else args['file'] + _data = self._client.get_object(Bucket=_bucket,Key=_file) + return _data['ResponseMetadata'] + def close(self): + self._client.close() class Reader(s3) : """ @@ -77,51 +69,63 @@ class Reader(s3) : - stream content if file is Not None @TODO: support read from all buckets, think about it """ - def __init__(self,**args) : - s3.__init__(self,**args) - def files(self): - r = [] - try: - return [item.name for item in self.bucket if item.size > 0] - except Exception as e: - pass - return r - def stream(self,limit=-1): + def __init__(self,**_args) : + super().__init__(**_args) + + def _stream(self,**_args): """ At this point we should stream a file from a given bucket """ - key = self.bucket.get_key(self.filename.strip()) - if key is None : - yield None + _object = self._client.get_object(Bucket=_args['bucket'],Key=_args['file']) + _stream = None + try: + _stream = _object['Body'].read() + except Exception as e: + pass + if not _stream : + return None + if _object['ContentType'] in ['text/csv'] : + return pd.read_csv(StringIO(str(_stream).replace("\\n","\n").replace("\\r","").replace("\'",""))) else: - count = 0 - with smart_open(key) as remote_file: - for line in remote_file: - if count == limit and limit > 0 : - break - yield line - count += 1 + return _stream + def read(self,**args) : - if self.filename is None : - # - # returning the list of files because no one file was specified. - return self.files() - else: - limit = args['size'] if 'size' in args else -1 - return self.stream(limit) + + _name = self._file_name if 'file' not in args else args['file'] + _bucket = args['bucket'] if 'bucket' in args else self._bucket_name + return self._stream(bucket=_bucket,file=_name) + class Writer(s3) : - - def __init__(self,**args) : - s3.__init__(self,**args) - def mkdir(self,name): + """ + + """ + def __init__(self,**_args) : + super().__init__(**_args) + # + # + if not self.has(bucket=self._bucket_name) : + self.make_bucket(self._bucket_name) + def make_bucket(self,bucket_name): """ - This function will create a folder in a bucket + This function will create a folder in a bucket,It is best that the bucket is organized as a namespace :name name of the folder """ - self.s3.put_object(Bucket=self.bucket_name,key=(name+'/')) - def write(self,content): - file = StringIO(content.decode("utf8")) - self.s3.upload_fileobj(file,self.bucket_name,self.filename) + + self._client.create_bucket(Bucket=bucket_name,CreateBucketConfiguration={'LocationConstraint': self._region}) + def write(self,_data,**_args): + """ + This function will write the data to the s3 bucket, files can be either csv, or json formatted files + """ + if type(_data) == pd.DataFrame : + _stream = _data.to_csv(index=False) + elif type(_data) == dict : + _stream = json.dumps(_data) + else: + _stream = _data + file = StringIO(_stream) + bucket = self._bucket_name if 'bucket' not in _args else _args['bucket'] + file_name = self._file_name if 'file' not in _args else _args['file'] + self._client.put_object(Bucket=bucket, Key = file_name, Body=_stream) pass