|
|
- import os
- import time
- import requests
- import re
- import yaml
- from playwright.sync_api import sync_playwright
- from db_manager import DBManager
- from product_scraper import scrape_products, send_product_link_via_ui
- import erp_automation
-
-
- try:
- import config
- ALIYUN_API_KEY = config.ALIYUN_API_KEY
- BAILIAN_APP_ID = config.BAILIAN_APP_ID
- except ImportError:
- ALIYUN_API_KEY = ""
- BAILIAN_APP_ID = ""
-
- # ================= 全局状态 =================
- replied_history = {}
- bailian_sessions = {}
- GLOBAL_FXG_TOKEN = ""
-
- def intercept_request(request):
- """
- 网络探针:拦截页面发出的所有请求,静默提取并保存最新可用的 __token
- """
- global GLOBAL_FXG_TOKEN
- try:
- url = request.url
- if "__token=" in url:
- match = re.search(r'__token=([^&]+)', url)
- if match:
- GLOBAL_FXG_TOKEN = match.group(1)
- except Exception:
- pass
-
- def fetch_order_api(order_id: str, page) -> str:
- """
- 利用捕捉到的 token 和 cookies 绕过同源限制,直接请求抖店交易与物流 API
- """
- if not GLOBAL_FXG_TOKEN:
- return ""
- try:
- # 从当前所在的 playwright 页面无缝接管登录状态
- cookies = page.context.cookies()
- cookie_string = "; ".join([f"{c['name']}={c['value']}" for c in cookies])
- headers = {
- "Cookie": cookie_string,
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
- "Accept": "application/json, text/plain, */*"
- }
-
- d_url = f"https://fxg.jinritemai.com/api/order/orderDetail?order_id={order_id}&appid=1&__token={GLOBAL_FXG_TOKEN}"
- l_url = f"https://fxg.jinritemai.com/api/order/getOrderLogistics?order_id={order_id}&__token={GLOBAL_FXG_TOKEN}"
-
- d_res = requests.get(d_url, headers=headers, timeout=5)
- l_res = requests.get(l_url, headers=headers, timeout=5)
-
- info = ""
- if d_res.status_code == 200:
- info += f"[订单详情API原始返回]: {d_res.text[:800]}\n"
- if l_res.status_code == 200:
- info += f"[订单物流API原始返回]: {l_res.text[:800]}\n"
-
- return info
- except Exception:
- return ""
-
- def get_reply_from_api(customer_name: str, message_text: str, context_info: str = "") -> str:
- """
- 调用阿里云百炼 API,附带 RAG 数据
- """
- if not ALIYUN_API_KEY or "YOUR_" in ALIYUN_API_KEY:
- print(f"⚠️ 提醒:未在 config.py 中配置密钥,触发模拟回复。")
- return "【ai回复】好的"
-
- url = f"https://dashscope.aliyuncs.com/api/v1/apps/{BAILIAN_APP_ID}/completion"
-
- headers = {
- "Authorization": f"Bearer {ALIYUN_API_KEY}",
- "Content-Type": "application/json"
- }
-
- # 组装超强的带有检索数据的提示词,并加入严格的“长话短说”性格指令
- if context_info:
- final_prompt = f"【底层系统提供的真实事实数据】:\n{context_info}\n\n【系统强制指令】:你是一个顶级的高效客服。面对下方客户的问题,要求你的回答问题,请务必根据上面的事实数据精准作答,如果数据中显示库存为0,请委婉告知已售罄。\n\n【客户发来的消息】:\n{message_text}"
- else:
- final_prompt = f"【系统强制指令】:你是一个专业的高效客服。!\n\n【客户发来的消息】:\n{message_text}"
-
- # 新版应用 API 官方请求体结构
- payload = {
- "input": {
- "prompt": final_prompt
- },
- "parameters": {},
- "debug": {}
- }
-
- current_session_id = bailian_sessions.get(customer_name)
- if current_session_id:
- payload["input"]["session_id"] = current_session_id
-
- try:
- response = requests.post(url, headers=headers, json=payload, timeout=20)
- response.raise_for_status()
- data = response.json()
- output = data.get("output", {})
- reply_text = output.get("text", "").strip()
-
- if not reply_text:
- kb = load_kb()
- return kb.get("error_reply", "亲亲,抱歉哦,小助手网络稍微有点开小差,您可以重新发送一次刚才的问题,我会立刻为您处理哒!")
-
- # 过滤掉 Markdown 粗体符号
- reply_text = reply_text.replace("**", "")
-
- new_session_id = output.get("session_id")
- if new_session_id:
- bailian_sessions[customer_name] = new_session_id
-
- return reply_text
- except Exception as e:
- print(f"[阿里云百炼API] 请求或解析出错: {e}")
- # 从知识库尝试获取兜底回复
- kb = load_kb()
- return kb.get("error_reply", "亲亲,抱歉哦,小助手网络稍微有点开小差,您可以重新发送一次刚才的问题,我会立刻为您处理哒!")
-
- def load_kb():
- """
- 加载通用知识库配置
- """
- kb_path = "knowledge_base.yaml"
- if os.path.exists(kb_path):
- try:
- with open(kb_path, 'r', encoding='utf-8') as f:
- return yaml.safe_load(f)
- except Exception as e:
- print(f"[知识库] 加载失败: {e}")
- return {}
-
- def run_bot():
- print("=== 开始启动飞鸽 RAG 检索增强客服机器人 ===")
- with sync_playwright() as p:
- user_data_dir = os.path.expanduser("~/playwright_fxg_data")
- browser_context = p.chromium.launch_persistent_context(
- user_data_dir=user_data_dir,
- headless=False,
- channel="chrome"
- )
- page = browser_context.pages[0] if browser_context.pages else browser_context.new_page()
-
- # 挂载网络探针,窃取 __token
- page.on("request", intercept_request)
-
- try:
- page.goto("https://im.jinritemai.com/pc_seller_v2/main/workspace#", timeout=60000, wait_until="domcontentloaded")
- except Exception as e:
- print(">> 页面基础结构已加载,忽略残留的耗时网络请求继续执行...")
-
- print(">> 等待页面加载或扫码登录...")
- try:
- page.wait_for_selector(".auxo-tabs-tab, #im-input-box", timeout=60000)
- print(">> 页面加载完成,开始进入监听模式...")
- except Exception as e:
- print(">> 提示:未立刻检测到活跃聊天框,但我们将保持应用存活并进行持续监听。")
-
- time.sleep(2)
-
- # 初始化数据库管理与同步计时器
- db = DBManager()
- last_product_sync = 0
- last_erp_sync = 0 # ERP 自动合并计时器
-
-
- while True:
- try:
- current_time = time.time()
-
- # 动态加载同步频率设置
- kb = load_kb()
- sync_settings = kb.get("sync_settings", {})
- SYNC_INTERVAL = sync_settings.get("product_sync_interval", 1800)
- ERP_SYNC_INTERVAL = sync_settings.get("erp_sync_interval", 1800)
-
- # --- 1. 定时执行 ERP 订单自动合并周期任务 (优先检测) ---
- if current_time - last_erp_sync > ERP_SYNC_INTERVAL:
- try:
- erp_automation.run_sync(browser_context)
- except Exception as e:
- print(f">> ERP 自动合并动作执行异常: {e}")
- last_erp_sync = current_time # 无论成功失败都重置计时,避免死循环
-
- # --- 2. 定时商品同步逻辑 ---
- if current_time - last_product_sync > SYNC_INTERVAL:
- print(f"\n>> 正在执行商品数据同步 ({time.strftime('%H:%M:%S')})...")
- try:
- products = scrape_products(page)
- if products:
- for item in products:
- db.upsert_product(item)
- print(f">> 同步完成,共更新 {len(products)} 个商品。")
- last_product_sync = current_time
- page.locator("div.Qk7Fc20IPHKGdyq8SdNw:has-text('订单')").first.click(force=True)
- else:
- print(">> 本次同步未抓取到有效商品,5秒后重试。")
- last_product_sync = current_time - SYNC_INTERVAL + 5
- except Exception as e:
- print(f">> 商品同步发生错误: {e}")
-
- # --- 3. 扫描飞鸽客服会话 ---
- session_elements = page.locator("[data-qa-id='qa-conversation-chat-item']").all()
- if not session_elements:
- time.sleep(5)
- continue
-
- scan_limit = min(3, len(session_elements))
- for i in range(scan_limit):
- current_session = session_elements[i]
- try:
- box_text = current_session.inner_text().strip()
- except:
- continue
-
- lines = [line.strip() for line in box_text.split("\n") if line.strip()]
- if len(lines) < 2:
- continue
-
- customer_name = lines[0]
- last_text = lines[-1]
-
- needs_reply = any(("秒" in line or "分" in line) for line in lines[1:-1])
-
- if not needs_reply or replied_history.get(customer_name) == last_text:
- continue
-
- print(f"\n=================================")
- print(f"[{customer_name}] 有未读消息: {last_text}")
-
- try:
- current_session.click()
- time.sleep(1)
- except:
- continue
-
- context_info = ""
- rag_data_list = []
-
- # 🔍 非文字消息处理 (路线 A)
- reply_content = ""
- needs_product_check = False # 预初始化,防止变量未定义错误
- non_text_reply = ""
- if "[图片]" in last_text:
- non_text_reply = "亲亲,小助手目前还看不见图片内容哦,麻烦您用文字描述一下您的问题,我会立刻为您处理哒!"
- elif "[视频]" in last_text:
- non_text_reply = "亲亲,小助手暂时无法查看视频,请您文字描述或截图关键信息发给我哦。"
- elif "[语音]" in last_text:
- non_text_reply = "亲亲,由于系统限制,小助手暂时听不到语音消息,麻烦您转文字或直接打字发给我好吗?"
-
- if non_text_reply:
- print(f" -> 检测到非文字消息,执行固定回复。")
- reply_content = non_text_reply
- else:
- # 🔍 意图识别
- order_keywords = ["物流", "快递", "订单", "发货", "单号", "包裹", "签收", "进度", "运费", "什么时候到", "退款", "催发", "查一下", "到了吗"]
- product_keywords = ["库存", "有货", "多少钱", "价格", "规格", "材质", "尺码", "现货", "具体参数", "优惠", "便宜"]
-
- needs_order_check = any(k in last_text for k in order_keywords) or re.search(r'\d{19}', last_text)
- needs_product_check = any(k in last_text for k in product_keywords)
- # 1. 尝试订单 RAG
- if needs_order_check:
- try:
- # 注入魔法 JS:自动把右侧所有折叠的订单卡片统统【点开】
- page.evaluate('''() => {
- document.querySelectorAll('.ecom-collapse-header, .arco-collapse-item-header').forEach(el => {
- if(!el.className.includes('active') && !el.className.includes('expanded')) {
- el.click();
- }
- });
- }''')
- time.sleep(1)
-
- match_target = re.search(r'(\d{19})', last_text)
- target_order_id = match_target.group(1) if match_target else ""
-
- cards = page.locator("[class*='collapse-item']").all()
- if not cards:
- cards = page.locator(".ecom-collapse-content-box").all()
-
- if cards:
- print(f" -> [订单意图] 开始扫描右侧多订单面板...")
- order_texts = []
- has_active_order = False
- active_status_keywords = ["待发货", "备货中", "已支付", "已下单", "待收货"]
-
- for card in cards:
- text = card.inner_text()
- flat_text = text.replace('\n', ' ')
- lines = [log.strip() for log in text.split("\n") if log.strip()]
- parsed = {}
-
- current_order_id = ""
- status_text = ""
- for line in lines:
- match = re.search(r'(\d{19})', line)
- if match:
- current_order_id = match.group(1)
- parsed["订单编号"] = current_order_id
- break
- else:
- status_text += line + " "
-
- if status_text.strip() and len(status_text) < 40:
- parsed["订单状态"] = status_text.strip()
-
- if not current_order_id and target_order_id:
- parsed["订单编号"] = target_order_id
- current_order_id = target_order_id
-
- if target_order_id and current_order_id and target_order_id != current_order_id:
- continue
-
- for line in lines:
- if len(line) > 8 and "¥" not in line and "金额" not in line and not re.search(r'\d{19}', line):
- parsed["商品名称"] = line
- break
-
- anchor_keys = ["实付金额", "付款时间", "物流信息", "收货信息", "发货时间", "规格", "编码"]
- found_anchors = []
- for k in anchor_keys:
- idx_pos = flat_text.find(k)
- if idx_pos != -1:
- found_anchors.append((idx_pos, k))
-
- found_anchors.sort()
- for i in range(len(found_anchors)):
- idx_pos, key = found_anchors[i]
- start = idx_pos + len(key)
- end = found_anchors[i+1][0] if i + 1 < len(found_anchors) else idx_pos + 150
- val = flat_text[start:end].strip()
- for btn in ["发物流卡", "代客发起", "发售后卡", "打款", "自助开票", "邀评", "客服介入", "发送"]:
- val = val.split(btn)[0]
- val = val.strip(' ¥:\n')
- if val: parsed[key] = val
-
- if parsed:
- # 判断是否为活动订单
- status = parsed.get("订单状态", "")
- if any(k in status for k in active_status_keywords):
- has_active_order = True
-
- card_info = "\n".join([f" - {k}: {v}" for k, v in parsed.items()])
- order_texts.append(card_info)
- if not target_order_id: break
-
- if order_texts:
- rag_data_list.append("【客户历史订单详情】:\n" + "\n---\n".join(order_texts))
-
- # --- 核心改进:无活动订单时的知识库兜底 ---
- shipping_query = any(k in last_text for k in ["什么时候发货", "发货", "没发货", "多久发货", "快递"])
- if shipping_query and not has_active_order:
- kb = load_kb()
- policy = kb.get("shipping_policy", "")
- if policy:
- print(f" -> [知识库兜底] 未发现活动订单,补充通用发货政策。")
- rag_data_list.append("【店铺通用发货政策(知识库)】:\n" + policy)
- except Exception:
- pass
-
- if not rag_data_list:
- match = re.search(r'(\d{19})', last_text)
- if match:
- api_data = fetch_order_api(match.group(1), page)
- if api_data: rag_data_list.append(f"【系统接口拉取的订单数据】:\n{api_data}")
-
- # 2. 尝试商品 RAG
- if needs_product_check:
- search_key = last_text
- for word in ["有货吗", "库存", "价格", "多少钱", "这款", "那个", "吗", "呢", "?", "?", "咨询"]:
- search_key = search_key.replace(word, "")
- search_key = search_key.strip()
-
- if len(search_key) >= 1:
- products = db.search_products(search_key)
- if products:
- print(f" -> [商品意图] 命中关键词 '{search_key}',从数据库检索到相关数据。")
- prod_info_list = []
- for p in products:
- info = f" - 商品名: {p['name']}\n 库存: {p['stock']} | 价格: {p['price']}元 | 状态: {p['status']} | 发货时效: {p['delivery_time']}"
- prod_info_list.append(info)
- rag_data_list.append("【店铺实时商品库存/价格详情】:\n" + "\n".join(prod_info_list))
-
- if rag_data_list:
- context_info = "请根据以下实时检索到的背景事实精准回答顾客的问题:\n\n" + "\n\n".join(rag_data_list)
-
- reply_content = get_reply_from_api(customer_name, last_text, context_info)
-
- if reply_content:
- input_box = page.locator("#im-input-box textarea")
- if input_box.is_visible():
- input_box.fill("")
- input_box.type(reply_content, delay=50) # 模拟人手打字速度
- input_box.press("Enter")
-
- print(f" ==> 已发送回复: {reply_content}")
- replied_history[customer_name] = last_text
- time.sleep(1)
-
- # --- 核心新增:自动化 UI 发送商品卡片 ---
- if needs_product_check and 'search_key' in locals() and search_key:
- try:
- send_product_link_via_ui(page, search_key)
- except Exception as e:
- print(f" ⚠️ UI 发送商品动作执行异常: {e}")
-
- # 休眠一小段时间,避免过度占用 CPU
- time.sleep(5)
- except Exception:
- pass
-
-
- if __name__ == "__main__":
- run_bot()
|