From 4320159f3d930f74ff84c537d7c62992f6d2ebb1 Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Sat, 30 Sep 2023 01:17:35 -0500 Subject: [PATCH] bug fixes --- transport/disk.py | 29 +++++++---------------------- transport/etl.py | 2 +- 2 files changed, 8 insertions(+), 23 deletions(-) diff --git a/transport/disk.py b/transport/disk.py index a3880ec..1d966c7 100644 --- a/transport/disk.py +++ b/transport/disk.py @@ -65,7 +65,7 @@ class DiskWriter(Writer): super().__init__() self._path = params['path'] self._delimiter = params['delimiter'] - + self._mode = 'w' if 'mode' not in params else params['mode'] # def meta(self): # return self.cache['meta'] # def isready(self): @@ -89,28 +89,13 @@ class DiskWriter(Writer): """ try: - _mode = 'a' if 'overwrite' not in _args else 'w' - DiskWriter.THREAD_LOCK.acquire() - # # _path = _args['path'] if 'path' in _args else self.path - # # _delim= _args['delimiter'] if 'delimiter' in _args else self._delimiter - # # info.to_csv(_path,sep=_delim) - # info.to_csv(self.path) - # f = open(self.path,_mode) - # if self.delimiter : - # if type(info) == list : - # for row in info : - # f.write(self.format(row)) - # else: - # f.write(self.format(info)) - # else: - # if not type(info) == str : - # f.write(json.dumps(info)+"\n") - # else: - # f.write(info) - # f.close() + + DiskWriter.THREAD_LOCK.acquire() + _delim = self._delimiter if 'delimiter' not in _args else _args['delimiter'] - _path = self.path if 'path' not in _args else _args['path'] - info.to_csv(_path,index=False,sep=_delim) + _path = self._path if 'path' not in _args else _args['path'] + _mode = self._mode if 'mode' not in _args else _args['mode'] + info.to_csv(_path,index=False,sep=_delim, mode=_mode) pass except Exception as e: # diff --git a/transport/etl.py b/transport/etl.py index aa4a73e..b2e0e6a 100644 --- a/transport/etl.py +++ b/transport/etl.py @@ -116,7 +116,7 @@ class Transporter(Process): for _indexes in _segments : _fwd_args = {} if not _args else _args - self._delegate_write(_df.iloc[_indexes]) + self._delegate_write(_df.iloc[_indexes],**_fwd_args) # # @TODO: Perhaps consider writing up each segment in a thread/process (speeds things up?) pass