bug fix: sqlite lock
This commit is contained in:
parent
6f08d64e49
commit
03e0203c28
|
@ -5,10 +5,10 @@ if sys.version_info[0] > 2 :
|
||||||
else:
|
else:
|
||||||
from common import Reader,Writer
|
from common import Reader,Writer
|
||||||
import json
|
import json
|
||||||
from threading import Lock
|
# from threading import Lock
|
||||||
import sqlite3
|
import sqlite3
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
|
from multiprocessing import Lock
|
||||||
class DiskReader(Reader) :
|
class DiskReader(Reader) :
|
||||||
"""
|
"""
|
||||||
This class is designed to read data from disk (location on hard drive)
|
This class is designed to read data from disk (location on hard drive)
|
||||||
|
@ -126,6 +126,8 @@ class SQLiteReader (DiskReader):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
class SQLiteWriter(DiskWriter) :
|
class SQLiteWriter(DiskWriter) :
|
||||||
|
connection = None
|
||||||
|
LOCK = Lock()
|
||||||
def __init__(self,**args):
|
def __init__(self,**args):
|
||||||
"""
|
"""
|
||||||
:path
|
:path
|
||||||
|
@ -140,7 +142,7 @@ class SQLiteWriter(DiskWriter) :
|
||||||
|
|
||||||
if self.fields and not self.isready():
|
if self.fields and not self.isready():
|
||||||
self.init(self.fields)
|
self.init(self.fields)
|
||||||
|
SQLiteWriter.connection = self.conn
|
||||||
def init(self,fields):
|
def init(self,fields):
|
||||||
self.fields = fields;
|
self.fields = fields;
|
||||||
sql = " ".join(["CREATE TABLE IF NOT EXISTS ",self.table," (", ",".join(self.fields),")"])
|
sql = " ".join(["CREATE TABLE IF NOT EXISTS ",self.table," (", ",".join(self.fields),")"])
|
||||||
|
@ -175,14 +177,18 @@ class SQLiteWriter(DiskWriter) :
|
||||||
|
|
||||||
if type(info) != list :
|
if type(info) != list :
|
||||||
info = [info]
|
info = [info]
|
||||||
cursor = self.conn.cursor()
|
|
||||||
|
|
||||||
|
SQLiteWriter.LOCK.acquire()
|
||||||
sql = " " .join(["INSERT INTO ",self.table,"(", ",".join(self.fields) ,")", "values(':values')"])
|
try:
|
||||||
for row in info :
|
cursor = self.conn.cursor()
|
||||||
stream = json.dumps(row)
|
sql = " " .join(["INSERT INTO ",self.table,"(", ",".join(self.fields) ,")", "values(':values')"])
|
||||||
stream = stream.replace("'","''")
|
for row in info :
|
||||||
cursor.execute(sql.replace(":values",stream) )
|
stream = json.dumps(row)
|
||||||
|
stream = stream.replace("'","''")
|
||||||
|
cursor.execute(sql.replace(":values",stream) )
|
||||||
|
|
||||||
# self.conn.commit()
|
# self.conn.commit()
|
||||||
# print (sql)
|
# print (sql)
|
||||||
|
except Exception as e :
|
||||||
|
pass
|
||||||
|
SQLiteWriter.LOCK.release()
|
Loading…
Reference in New Issue