|
|
- import yaml
- import pymysql
- import os
- from datetime import datetime
-
- class DBManager:
- def __init__(self, config_path="config.yaml"):
- self.config_path = config_path
- self.conn = None
- self._load_config()
- self._ensure_table_exists()
-
- def _load_config(self):
- try:
- with open(self.config_path, 'r', encoding='utf-8') as f:
- config = yaml.safe_load(f)
- mysql_cfg = config.get('mysql', {})
-
- # 处理 host:port 字符串
- host_port = mysql_cfg.get('host', '127.0.0.1:3306')
- if ':' in host_port:
- self.host, self.port = host_port.split(':')
- self.port = int(self.port)
- else:
- self.host = host_port
- self.port = 3306
-
- self.user = mysql_cfg.get('username')
- self.password = mysql_cfg.get('password')
- self.database = mysql_cfg.get('dbname')
- except Exception as e:
- print(f"[DBManager] 加载配置失败: {e}")
- raise
-
- def get_connection(self):
- try:
- if self.conn is None or not self.conn.open:
- self.conn = pymysql.connect(
- host=self.host,
- port=self.port,
- user=self.user,
- password=self.password,
- database=self.database,
- charset='utf8mb4',
- cursorclass=pymysql.cursors.DictCursor,
- autocommit=True
- )
- return self.conn
- except Exception as e:
- print(f"[DBManager] 数据库连接失败: {e}")
- return None
-
- def _ensure_table_exists(self):
- conn = self.get_connection()
- if not conn:
- return
-
- try:
- with conn.cursor() as cursor:
- create_table_sql = """
- CREATE TABLE IF NOT EXISTS douyin_products (
- id INT AUTO_INCREMENT PRIMARY KEY,
- name VARCHAR(512) NOT NULL,
- img_url TEXT,
- stock INT DEFAULT 0,
- status VARCHAR(64),
- delivery_time VARCHAR(256),
- price DECIMAL(10, 2),
- updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
- UNIQUE KEY idx_name (name)
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
- """
- cursor.execute(create_table_sql)
- print("[DBManager] 数据库表 douyin_products 已就绪")
- except Exception as e:
- print(f"[DBManager] 创建表失败: {e}")
-
- def search_products(self, keyword):
- """
- 根据关键词模糊搜索商品信息
- """
- conn = self.get_connection()
- if not conn:
- return []
-
- try:
- with conn.cursor() as cursor:
- # 使用关键词进行前中后模糊匹配
- sql = """
- SELECT name, stock, status, delivery_time, price
- FROM douyin_products
- WHERE name LIKE %s
- LIMIT 5
- """
- cursor.execute(sql, (f"%{keyword}%",))
- results = cursor.fetchall()
- return results
- except Exception as e:
- print(f"[DBManager] 搜索商品失败: {e}")
- return []
-
- def upsert_product(self, product_data):
- """
- 插入或更新商品信息
- product_data: dict {name, img_url, stock, status, delivery_time, price}
- """
- conn = self.get_connection()
- if not conn:
- return False # 连接失败,直接返回,不抛异常导致主程序崩溃
-
- try:
- with conn.cursor() as cursor:
- sql = """
- INSERT INTO douyin_products (name, img_url, stock, status, delivery_time, price, updated_at)
- VALUES (%s, %s, %s, %s, %s, %s, %s)
- ON DUPLICATE KEY UPDATE
- img_url = VALUES(img_url),
- stock = VALUES(stock),
- status = VALUES(status),
- delivery_time = VALUES(delivery_time),
- price = VALUES(price),
- updated_at = VALUES(updated_at)
- """
- cursor.execute(sql, (
- product_data['name'][:250], # 强制截断,防止数据库 Data too long 报错
- product_data['img_url'],
- product_data['stock'],
- product_data['status'],
- product_data['delivery_time'],
- product_data.get('price', 0),
- datetime.now()
- ))
- return True
- except Exception as e:
- print(f"[DBManager] 写入数据失败: {e}")
- return False
-
- if __name__ == "__main__":
- # 测试连接
- db = DBManager()
- test_data = {
- 'name': '测试商品',
- 'img_url': 'http://test.com/img.jpg',
- 'stock': 100,
- 'status': '现货',
- 'delivery_time': '48小时发货',
- 'price': 9.9
- }
- if db.upsert_product(test_data):
- print("测试写入成功")
|