data-transport/transport/session.py

89 lines
2.5 KiB
Python
Raw Permalink Normal View History

from flask import request, session
from datetime import datetime
import re
2023-09-30 01:27:53 +00:00
from transport.common import Reader, Writer
import json
2023-09-30 01:27:53 +00:00
import requests
from io import StringIO
import pandas as pd
2023-09-30 01:27:53 +00:00
class HttpReader(Reader):
"""
This class is designed to read data from an Http request file handler provided to us by flask
The file will be heald in memory and processed accordingly
NOTE: This is inefficient and can crash a micro-instance (becareful)
"""
2023-09-30 01:27:53 +00:00
def __init__(self,**_args):
self._url = _args['url']
self._headers = None if 'headers' not in _args else _args['headers']
# def isready(self):
# return self.file_length > 0
def format(self,_response):
_mimetype= _response.headers['Content-Type']
if _mimetype == 'text/csv' or 'text/csv':
_content = _response.text
return pd.read_csv(StringIO(_content))
#
# @TODO: Add support for excel, JSON and other file formats that fit into a data-frame
#
2023-09-30 01:27:53 +00:00
return _response.text
def read(self,**_args):
if self._headers :
r = requests.get(self._url,headers = self._headers)
else:
r = requests.get(self._url,headers = self._headers)
return self.format(r)
2023-09-30 01:27:53 +00:00
class HttpWriter(Writer):
"""
2023-09-30 01:27:53 +00:00
This class is designed to submit data to an endpoint (url)
"""
2023-09-30 01:27:53 +00:00
def __init__(self,**_args):
"""
@param key required session key
"""
2023-09-30 01:27:53 +00:00
self._url = _args['url']
self._name = _args['name']
self._method = 'post' if 'method' not in _args else _args['method']
# self.session = params['queue']
# self.session['sql'] = []
# self.session['csv'] = []
# self.tablename = re.sub('..+$','',params['filename'])
# self.session['uid'] = params['uid']
#self.xchar = params['xchar']
def format_sql(self,row):
values = "','".join([col.replace('"','').replace("'",'') for col in row])
return "".join(["INSERT INTO :table VALUES('",values,"');\n"]).replace(':table',self.tablename)
def isready(self):
return True
2023-09-30 01:27:53 +00:00
def write(self,_data,**_args):
#
#
_method = self._method if 'method' not in _args else _args['method']
_method = _method.lower()
_mimetype = 'text/csv'
if type(_data) == dict :
_mimetype = 'application/json'
_content = _data
else:
_content = _data.to_dict(orient='records')
_headers = {'Content-Type':_mimetype}
_pointer = getattr(requests,_method)
_pointer ({self._name:_content},headers=_headers)
# label = params['label']
# row = params ['row']
2023-09-30 01:27:53 +00:00
# if label == 'usable':
# self.session['csv'].append(self.format(row,','))
# self.session['sql'].append(self.format_sql(row))