data-transport/transport/__init__.py

230 lines
5.5 KiB
Python

"""
Data Transport - 1.0
Steve L. Nyemba, The Phi Technology LLC
This module is designed to serve as a wrapper to a set of supported data stores :
- couchdb
- mongodb
- Files (character delimited)
- Queues (RabbmitMq)
- Session (Flask)
- s3
The supported operations are read/write and providing meta data to the calling code
Requirements :
pymongo
boto
couldant
The configuration for the data-store is as follows :
couchdb:
{
args:{
url:<url>,
username:<username>,
password:<password>,
dbname:<database>,
doc:<document id>
}
}
RabbitMQ:
{
}
Mongodb:
{
args:{
host:<url>, #localhost:27017
username:<username>,
password:<password>,
dbname:<database>,
doc:<document id>s
}
}
"""
__author__ = 'The Phi Technology'
import pandas as pd
import numpy as np
import json
import importlib
import sys
if sys.version_info[0] > 2 :
from transport.common import Reader, Writer #, factory
from transport import disk
from transport import s3 as s3
from transport import rabbitmq as queue
from transport import couch as couch
from transport import mongo as mongo
from transport import sql as sql
else:
from common import Reader, Writer #, factory
import disk
import queue
import couch
import mongo
import s3
import sql
class factory :
@staticmethod
def instance(**args):
"""
This class will create an instance of a transport when providing
:type name of the type we are trying to create
:args The arguments needed to create the instance
"""
source = args['type']
params = args['args']
anObject = None
if source in ['HttpRequestReader','HttpSessionWriter']:
#
# @TODO: Make sure objects are serializable, be smart about them !!
#
aClassName = ''.join([source,'(**params)'])
else:
stream = json.dumps(params)
aClassName = ''.join([source,'(**',stream,')'])
try:
anObject = eval( aClassName)
#setattr(anObject,'name',source)
except Exception as e:
print(['Error ',e])
return anObject
import time
# class Reader:
# def __init__(self):
# self.nrows = 0
# self.xchar = None
# def row_count(self):
# content = self.read()
# return np.sum([1 for row in content])
# def delimiter(self,sample):
# """
# This function determines the most common delimiter from a subset of possible delimiters.
# It uses a statistical approach (distribution) to guage the distribution of columns for a given delimiter
# :sample sample string/content expecting matrix i.e list of rows
# """
# m = {',':[],'\t':[],'|':[],'\x3A':[]}
# delim = m.keys()
# for row in sample:
# for xchar in delim:
# if row.split(xchar) > 1:
# m[xchar].append(len(row.split(xchar)))
# else:
# m[xchar].append(0)
# #
# # The delimiter with the smallest variance, provided the mean is greater than 1
# # This would be troublesome if there many broken records sampled
# #
# m = {id: np.var(m[id]) for id in m.keys() if m[id] != [] and int(np.mean(m[id]))>1}
# index = m.values().index( min(m.values()))
# xchar = m.keys()[index]
# return xchar
# def col_count(self,sample):
# """
# This function retirms the number of columns of a given sample
# @pre self.xchar is not None
# """
# m = {}
# i = 0
# for row in sample:
# row = self.format(row)
# id = str(len(row))
# #id = str(len(row.split(self.xchar)))
# if id not in m:
# m[id] = 0
# m[id] = m[id] + 1
# index = m.values().index( max(m.values()) )
# ncols = int(m.keys()[index])
# return ncols;
# def format (self,row):
# """
# This function will clean records of a given row by removing non-ascii characters
# @pre self.xchar is not None
# """
# if isinstance(row,list) == False:
# #
# # We've observed sometimes fields contain delimiter as a legitimate character, we need to be able to account for this and not tamper with the field values (unless necessary)
# cols = self.split(row)
# #cols = row.split(self.xchar)
# else:
# cols = row ;
# return [ re.sub('[^\x00-\x7F,\n,\r,\v,\b,]',' ',col.strip()).strip().replace('"','') for col in cols]
# def split (self,row):
# """
# This function performs a split of a record and tries to attempt to preserve the integrity of the data within i.e accounting for the double quotes.
# @pre : self.xchar is not None
# """
# pattern = "".join(["(?:^|",self.xchar,")(\"(?:[^\"]+|\"\")*\"|[^",self.xchar,"]*)"])
# return re.findall(pattern,row.replace('\n',''))
# class Writer:
# def format(self,row,xchar):
# if xchar is not None and isinstance(row,list):
# return xchar.join(row)+'\n'
# elif xchar is None and isinstance(row,dict):
# row = json.dumps(row)
# return row
# """
# It is important to be able to archive data so as to insure that growth is controlled
# Nothing in nature grows indefinitely neither should data being handled.
# """
# def archive(self):
# pass
# def flush(self):
# pass
# class factory :
# @staticmethod
# def instance(**args):
# source = args['type']
# params = args['args']
# anObject = None
# if source in ['HttpRequestReader','HttpSessionWriter']:
# #
# # @TODO: Make sure objects are serializable, be smart about them !!
# #
# aClassName = ''.join([source,'(**params)'])
# else:
# stream = json.dumps(params)
# aClassName = ''.join([source,'(**',stream,')'])
# try:
# anObject = eval( aClassName)
# #setattr(anObject,'name',source)
# except Exception,e:
# print ['Error ',e]
# return anObject