From 03e0203c288ad54121420b7f4768ed9ea08232d6 Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Fri, 11 Dec 2020 03:22:49 -0600 Subject: [PATCH] bug fix: sqlite lock --- transport/disk.py | 28 +++++++++++++++++----------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/transport/disk.py b/transport/disk.py index 1b9c648..64cabd4 100644 --- a/transport/disk.py +++ b/transport/disk.py @@ -5,10 +5,10 @@ if sys.version_info[0] > 2 : else: from common import Reader,Writer import json -from threading import Lock +# from threading import Lock import sqlite3 import pandas as pd - +from multiprocessing import Lock class DiskReader(Reader) : """ This class is designed to read data from disk (location on hard drive) @@ -126,6 +126,8 @@ class SQLiteReader (DiskReader): pass class SQLiteWriter(DiskWriter) : + connection = None + LOCK = Lock() def __init__(self,**args): """ :path @@ -140,7 +142,7 @@ class SQLiteWriter(DiskWriter) : if self.fields and not self.isready(): self.init(self.fields) - + SQLiteWriter.connection = self.conn def init(self,fields): self.fields = fields; sql = " ".join(["CREATE TABLE IF NOT EXISTS ",self.table," (", ",".join(self.fields),")"]) @@ -175,14 +177,18 @@ class SQLiteWriter(DiskWriter) : if type(info) != list : info = [info] - cursor = self.conn.cursor() - - sql = " " .join(["INSERT INTO ",self.table,"(", ",".join(self.fields) ,")", "values(':values')"]) - for row in info : - stream = json.dumps(row) - stream = stream.replace("'","''") - cursor.execute(sql.replace(":values",stream) ) + SQLiteWriter.LOCK.acquire() + try: + cursor = self.conn.cursor() + sql = " " .join(["INSERT INTO ",self.table,"(", ",".join(self.fields) ,")", "values(':values')"]) + for row in info : + stream = json.dumps(row) + stream = stream.replace("'","''") + cursor.execute(sql.replace(":values",stream) ) + # self.conn.commit() # print (sql) - + except Exception as e : + pass + SQLiteWriter.LOCK.release() \ No newline at end of file