You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

187 lines
6.1 KiB

#!/usr/bin/env python3
import sqlite3
import common
import json
import time
import threading
from datetime import datetime
from kafka import KafkaConsumer
def committer(connection):
while True:
connection.commit()
time.sleep(10)
def insert_update_store(connection, stores, value_json):
cursor = connection.cursor()
columns = '('
values_sql = f'('
values = []
update_sql = ''
update_values = []
for dict_key in value_json['store']:
columns = f'{columns}{dict_key}, '
values.append(value_json['store'][dict_key])
values_sql = f'{values_sql}?, '
if dict_key != 'store':
update_sql = f'{update_sql}{dict_key}=?, '
update_values.append(value_json['store'][dict_key])
columns = columns[:-2] + ')'
values_sql = values_sql[:-2] + ')'
update_sql = update_sql[:-2]
update_values.append(value_json["store"]["store"])
try:
sql_statement = f'INSERT INTO store {columns} VALUES {values_sql}'
cursor.execute(sql_statement, values)
stores[value_json["store"]["store"]] = cursor.lastrowid
except sqlite3.IntegrityError as err:
try:
sql_statement = f'UPDATE store SET {update_sql} WHERE store=?'
cursor.execute(sql_statement, tuple(update_values))
except Exception as err:
print(err)
except Exception as err:
print(err)
def insert_update_product(connection, stores, value_json):
cursor = connection.cursor()
columns = '(store, '
values_sql = f'(?, '
values = [stores[value_json["store"]]]
update_sql = ''
update_values = []
for dict_key in value_json['product']:
columns = f'{columns}{dict_key}, '
values.append(value_json['product'][dict_key])
values_sql = f'{values_sql}?, '
if dict_key != 'sku_code':
update_sql = f'{update_sql}{dict_key}=?, '
update_values.append(value_json['product'][dict_key])
columns = columns[:-2] + ')'
values_sql = values_sql[:-2] + ')'
update_sql = update_sql[:-2]
update_values.append(stores[value_json["store"]])
update_values.append(value_json["product"]["sku_code"])
product_id = None
try:
sql_statement = f'INSERT INTO products {columns} VALUES {values_sql}'
cursor.execute(sql_statement, values)
product_id = cursor.lastrowid
#print(f'inserted {product_id}')
except sqlite3.IntegrityError as err:
try:
sql_statement = f'UPDATE products SET {update_sql} WHERE store=? and sku_code=?'
cursor.execute(sql_statement, tuple(update_values))
sql_statement = f'SELECT id FROM products WHERE store=? and sku_code=?'
cursor.execute(sql_statement, (stores[value_json["store"]], value_json["product"]["sku_code"]))
product_id = cursor.fetchone()[0]
#print(f'updated {product_id}')
except Exception as err:
print(err)
except Exception as err:
print(err)
insert_update_price(connection, stores, value_json, product_id)
def insert_update_price(connection, stores, value_json, product_id):
cursor = connection.cursor()
columns = '(product_id, last_update, '
values_sql = f'(?, ?, '
values = [product_id, datetime.now().strftime('%d/%m/%Y')]
for dict_key in value_json['price']:
columns = f'{columns}{dict_key}, '
values.append(value_json['price'][dict_key])
values_sql = f'{values_sql}?, '
columns = columns[:-2] + ')'
values_sql = values_sql[:-2] + ')'
price_exists = False
try:
sql_statement = f'SELECT id FROM price WHERE product_id=? and price=? and active=1'
cursor.execute(sql_statement, (product_id, value_json["price"]["price"]))
if cursor.fetchone():
price_exists = True
except Exception as err:
print(err)
try:
if not price_exists:
sql_statement = f'INSERT INTO price {columns} VALUES {values_sql}'
cursor.execute(sql_statement, values)
else:
sql_statement = f'UPDATE price SET last_update=? WHERE product_id=? and price=? and active=1'
cursor.execute(sql_statement, (datetime.now().strftime("%d/%m/%Y"), product_id, value_json["price"]["price"]))
except sqlite3.IntegrityError as err:
print(err)
except Exception as err:
print(err)
def deactivate_old_price(connection, stores, value_json):
cursor = connection.cursor()
try:
sql_statement=f'UPDATE price SET active=0 WHERE product_id IN (SELECT id FROM products WHERE store=?) AND last_update<?'
cursor.execute(sql_statement, (stores[value_json["store"]], datetime.now().strftime("%d/%m/%Y")))
#print(sql_statement)
except Exception as err:
print(err)
def insert(connection, stores, value_json):
if value_json['type'] == 'store':
print('insert_update_store')
insert_update_store(connection, stores, value_json)
if value_json['type'] == 'store_update':
print('deactivate_old_price')
deactivate_old_price(connection, stores, value_json)
if value_json['type'] == 'product':
#print('insert_update_product')
insert_update_product(connection, stores, value_json)
def get_stores(connection):
stores = {}
cursor = con.cursor()
sql_statement = 'SELECT store, id FROM store'
cursor.execute(sql_statement)
result = cursor.fetchall()
for res in result:
stores[res[0]] = res[1]
return stores
if __name__ == '__main__':
config = common.get_config()
con = sqlite3.connect(config['config']['sqlitedb'], check_same_thread=False)
if config['config']['log_sql']:
con.set_trace_callback(print)
stores = get_stores(con)
commit_thread = threading.Thread(target=committer, args=(con,), daemon=True)
commit_thread.start()
consumer = KafkaConsumer('shopper_db',bootstrap_servers=[config['config']['kafka_boostrap_servers']])
for msg in consumer:
value_json = json.loads(msg.value.decode("utf-8"))
insert(con, stores, value_json)