ai客服
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

424 lines
22 KiB

1 month ago
  1. import os
  2. import time
  3. import requests
  4. import re
  5. import yaml
  6. from playwright.sync_api import sync_playwright
  7. from db_manager import DBManager
  8. from product_scraper import scrape_products, send_product_link_via_ui
  9. import erp_automation
  10. try:
  11. import config
  12. ALIYUN_API_KEY = config.ALIYUN_API_KEY
  13. BAILIAN_APP_ID = config.BAILIAN_APP_ID
  14. except ImportError:
  15. ALIYUN_API_KEY = ""
  16. BAILIAN_APP_ID = ""
  17. # ================= 全局状态 =================
  18. replied_history = {}
  19. bailian_sessions = {}
  20. GLOBAL_FXG_TOKEN = ""
  21. def intercept_request(request):
  22. """
  23. __token
  24. """
  25. global GLOBAL_FXG_TOKEN
  26. try:
  27. url = request.url
  28. if "__token=" in url:
  29. match = re.search(r'__token=([^&]+)', url)
  30. if match:
  31. GLOBAL_FXG_TOKEN = match.group(1)
  32. except Exception:
  33. pass
  34. def fetch_order_api(order_id: str, page) -> str:
  35. """
  36. token cookies API
  37. """
  38. if not GLOBAL_FXG_TOKEN:
  39. return ""
  40. try:
  41. # 从当前所在的 playwright 页面无缝接管登录状态
  42. cookies = page.context.cookies()
  43. cookie_string = "; ".join([f"{c['name']}={c['value']}" for c in cookies])
  44. headers = {
  45. "Cookie": cookie_string,
  46. "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
  47. "Accept": "application/json, text/plain, */*"
  48. }
  49. d_url = f"https://fxg.jinritemai.com/api/order/orderDetail?order_id={order_id}&appid=1&__token={GLOBAL_FXG_TOKEN}"
  50. l_url = f"https://fxg.jinritemai.com/api/order/getOrderLogistics?order_id={order_id}&__token={GLOBAL_FXG_TOKEN}"
  51. d_res = requests.get(d_url, headers=headers, timeout=5)
  52. l_res = requests.get(l_url, headers=headers, timeout=5)
  53. info = ""
  54. if d_res.status_code == 200:
  55. info += f"[订单详情API原始返回]: {d_res.text[:800]}\n"
  56. if l_res.status_code == 200:
  57. info += f"[订单物流API原始返回]: {l_res.text[:800]}\n"
  58. return info
  59. except Exception:
  60. return ""
  61. def get_reply_from_api(customer_name: str, message_text: str, context_info: str = "") -> str:
  62. """
  63. API RAG
  64. """
  65. if not ALIYUN_API_KEY or "YOUR_" in ALIYUN_API_KEY:
  66. print(f"⚠️ 提醒:未在 config.py 中配置密钥,触发模拟回复。")
  67. return "【ai回复】好的"
  68. url = f"https://dashscope.aliyuncs.com/api/v1/apps/{BAILIAN_APP_ID}/completion"
  69. headers = {
  70. "Authorization": f"Bearer {ALIYUN_API_KEY}",
  71. "Content-Type": "application/json"
  72. }
  73. # 组装超强的带有检索数据的提示词,并加入严格的“长话短说”性格指令
  74. if context_info:
  75. final_prompt = f"【底层系统提供的真实事实数据】:\n{context_info}\n\n【系统强制指令】:你是一个顶级的高效客服。面对下方客户的问题,要求你的回答问题,请务必根据上面的事实数据精准作答,如果数据中显示库存为0,请委婉告知已售罄。\n\n【客户发来的消息】:\n{message_text}"
  76. else:
  77. final_prompt = f"【系统强制指令】:你是一个专业的高效客服。!\n\n【客户发来的消息】:\n{message_text}"
  78. # 新版应用 API 官方请求体结构
  79. payload = {
  80. "input": {
  81. "prompt": final_prompt
  82. },
  83. "parameters": {},
  84. "debug": {}
  85. }
  86. current_session_id = bailian_sessions.get(customer_name)
  87. if current_session_id:
  88. payload["input"]["session_id"] = current_session_id
  89. try:
  90. response = requests.post(url, headers=headers, json=payload, timeout=20)
  91. response.raise_for_status()
  92. data = response.json()
  93. output = data.get("output", {})
  94. reply_text = output.get("text", "").strip()
  95. if not reply_text:
  96. kb = load_kb()
  97. return kb.get("error_reply", "亲亲,抱歉哦,小助手网络稍微有点开小差,您可以重新发送一次刚才的问题,我会立刻为您处理哒!")
  98. # 过滤掉 Markdown 粗体符号
  99. reply_text = reply_text.replace("**", "")
  100. new_session_id = output.get("session_id")
  101. if new_session_id:
  102. bailian_sessions[customer_name] = new_session_id
  103. return reply_text
  104. except Exception as e:
  105. print(f"[阿里云百炼API] 请求或解析出错: {e}")
  106. # 从知识库尝试获取兜底回复
  107. kb = load_kb()
  108. return kb.get("error_reply", "亲亲,抱歉哦,小助手网络稍微有点开小差,您可以重新发送一次刚才的问题,我会立刻为您处理哒!")
  109. def load_kb():
  110. """
  111. """
  112. kb_path = "knowledge_base.yaml"
  113. if os.path.exists(kb_path):
  114. try:
  115. with open(kb_path, 'r', encoding='utf-8') as f:
  116. return yaml.safe_load(f)
  117. except Exception as e:
  118. print(f"[知识库] 加载失败: {e}")
  119. return {}
  120. def run_bot():
  121. print("=== 开始启动飞鸽 RAG 检索增强客服机器人 ===")
  122. with sync_playwright() as p:
  123. user_data_dir = os.path.expanduser("~/playwright_fxg_data")
  124. browser_context = p.chromium.launch_persistent_context(
  125. user_data_dir=user_data_dir,
  126. headless=False,
  127. channel="chrome"
  128. )
  129. page = browser_context.pages[0] if browser_context.pages else browser_context.new_page()
  130. # 挂载网络探针,窃取 __token
  131. page.on("request", intercept_request)
  132. try:
  133. page.goto("https://im.jinritemai.com/pc_seller_v2/main/workspace#", timeout=60000, wait_until="domcontentloaded")
  134. except Exception as e:
  135. print(">> 页面基础结构已加载,忽略残留的耗时网络请求继续执行...")
  136. print(">> 等待页面加载或扫码登录...")
  137. try:
  138. page.wait_for_selector(".auxo-tabs-tab, #im-input-box", timeout=60000)
  139. print(">> 页面加载完成,开始进入监听模式...")
  140. except Exception as e:
  141. print(">> 提示:未立刻检测到活跃聊天框,但我们将保持应用存活并进行持续监听。")
  142. time.sleep(2)
  143. # 初始化数据库管理与同步计时器
  144. db = DBManager()
  145. last_product_sync = 0
  146. last_erp_sync = 0 # ERP 自动合并计时器
  147. while True:
  148. try:
  149. current_time = time.time()
  150. # 动态加载同步频率设置
  151. kb = load_kb()
  152. sync_settings = kb.get("sync_settings", {})
  153. SYNC_INTERVAL = sync_settings.get("product_sync_interval", 1800)
  154. ERP_SYNC_INTERVAL = sync_settings.get("erp_sync_interval", 1800)
  155. # --- 1. 定时执行 ERP 订单自动合并周期任务 (优先检测) ---
  156. if current_time - last_erp_sync > ERP_SYNC_INTERVAL:
  157. try:
  158. erp_automation.run_sync(browser_context)
  159. except Exception as e:
  160. print(f">> ERP 自动合并动作执行异常: {e}")
  161. last_erp_sync = current_time # 无论成功失败都重置计时,避免死循环
  162. # --- 2. 定时商品同步逻辑 ---
  163. if current_time - last_product_sync > SYNC_INTERVAL:
  164. print(f"\n>> 正在执行商品数据同步 ({time.strftime('%H:%M:%S')})...")
  165. try:
  166. products = scrape_products(page)
  167. if products:
  168. for item in products:
  169. db.upsert_product(item)
  170. print(f">> 同步完成,共更新 {len(products)} 个商品。")
  171. last_product_sync = current_time
  172. page.locator("div.Qk7Fc20IPHKGdyq8SdNw:has-text('订单')").first.click(force=True)
  173. else:
  174. print(">> 本次同步未抓取到有效商品,5秒后重试。")
  175. last_product_sync = current_time - SYNC_INTERVAL + 5
  176. except Exception as e:
  177. print(f">> 商品同步发生错误: {e}")
  178. # --- 3. 扫描飞鸽客服会话 ---
  179. session_elements = page.locator("[data-qa-id='qa-conversation-chat-item']").all()
  180. if not session_elements:
  181. time.sleep(5)
  182. continue
  183. scan_limit = min(3, len(session_elements))
  184. for i in range(scan_limit):
  185. current_session = session_elements[i]
  186. try:
  187. box_text = current_session.inner_text().strip()
  188. except:
  189. continue
  190. lines = [line.strip() for line in box_text.split("\n") if line.strip()]
  191. if len(lines) < 2:
  192. continue
  193. customer_name = lines[0]
  194. last_text = lines[-1]
  195. needs_reply = any(("" in line or "" in line) for line in lines[1:-1])
  196. if not needs_reply or replied_history.get(customer_name) == last_text:
  197. continue
  198. print(f"\n=================================")
  199. print(f"[{customer_name}] 有未读消息: {last_text}")
  200. try:
  201. current_session.click()
  202. time.sleep(1)
  203. except:
  204. continue
  205. context_info = ""
  206. rag_data_list = []
  207. # 🔍 非文字消息处理 (路线 A)
  208. reply_content = ""
  209. needs_product_check = False # 预初始化,防止变量未定义错误
  210. non_text_reply = ""
  211. if "[图片]" in last_text:
  212. non_text_reply = "亲亲,小助手目前还看不见图片内容哦,麻烦您用文字描述一下您的问题,我会立刻为您处理哒!"
  213. elif "[视频]" in last_text:
  214. non_text_reply = "亲亲,小助手暂时无法查看视频,请您文字描述或截图关键信息发给我哦。"
  215. elif "[语音]" in last_text:
  216. non_text_reply = "亲亲,由于系统限制,小助手暂时听不到语音消息,麻烦您转文字或直接打字发给我好吗?"
  217. if non_text_reply:
  218. print(f" -> 检测到非文字消息,执行固定回复。")
  219. reply_content = non_text_reply
  220. else:
  221. # 🔍 意图识别
  222. order_keywords = ["物流", "快递", "订单", "发货", "单号", "包裹", "签收", "进度", "运费", "什么时候到", "退款", "催发", "查一下", "到了吗"]
  223. product_keywords = ["库存", "有货", "多少钱", "价格", "规格", "材质", "尺码", "现货", "具体参数", "优惠", "便宜"]
  224. needs_order_check = any(k in last_text for k in order_keywords) or re.search(r'\d{19}', last_text)
  225. needs_product_check = any(k in last_text for k in product_keywords)
  226. # 1. 尝试订单 RAG
  227. if needs_order_check:
  228. try:
  229. # 注入魔法 JS:自动把右侧所有折叠的订单卡片统统【点开】
  230. page.evaluate('''() => {
  231. document.querySelectorAll('.ecom-collapse-header, .arco-collapse-item-header').forEach(el => {
  232. if(!el.className.includes('active') && !el.className.includes('expanded')) {
  233. el.click();
  234. }
  235. });
  236. }''')
  237. time.sleep(1)
  238. match_target = re.search(r'(\d{19})', last_text)
  239. target_order_id = match_target.group(1) if match_target else ""
  240. cards = page.locator("[class*='collapse-item']").all()
  241. if not cards:
  242. cards = page.locator(".ecom-collapse-content-box").all()
  243. if cards:
  244. print(f" -> [订单意图] 开始扫描右侧多订单面板...")
  245. order_texts = []
  246. has_active_order = False
  247. active_status_keywords = ["待发货", "备货中", "已支付", "已下单", "待收货"]
  248. for card in cards:
  249. text = card.inner_text()
  250. flat_text = text.replace('\n', ' ')
  251. lines = [log.strip() for log in text.split("\n") if log.strip()]
  252. parsed = {}
  253. current_order_id = ""
  254. status_text = ""
  255. for line in lines:
  256. match = re.search(r'(\d{19})', line)
  257. if match:
  258. current_order_id = match.group(1)
  259. parsed["订单编号"] = current_order_id
  260. break
  261. else:
  262. status_text += line + " "
  263. if status_text.strip() and len(status_text) < 40:
  264. parsed["订单状态"] = status_text.strip()
  265. if not current_order_id and target_order_id:
  266. parsed["订单编号"] = target_order_id
  267. current_order_id = target_order_id
  268. if target_order_id and current_order_id and target_order_id != current_order_id:
  269. continue
  270. for line in lines:
  271. if len(line) > 8 and "¥" not in line and "金额" not in line and not re.search(r'\d{19}', line):
  272. parsed["商品名称"] = line
  273. break
  274. anchor_keys = ["实付金额", "付款时间", "物流信息", "收货信息", "发货时间", "规格", "编码"]
  275. found_anchors = []
  276. for k in anchor_keys:
  277. idx_pos = flat_text.find(k)
  278. if idx_pos != -1:
  279. found_anchors.append((idx_pos, k))
  280. found_anchors.sort()
  281. for i in range(len(found_anchors)):
  282. idx_pos, key = found_anchors[i]
  283. start = idx_pos + len(key)
  284. end = found_anchors[i+1][0] if i + 1 < len(found_anchors) else idx_pos + 150
  285. val = flat_text[start:end].strip()
  286. for btn in ["发物流卡", "代客发起", "发售后卡", "打款", "自助开票", "邀评", "客服介入", "发送"]:
  287. val = val.split(btn)[0]
  288. val = val.strip(' ¥:\n')
  289. if val: parsed[key] = val
  290. if parsed:
  291. # 判断是否为活动订单
  292. status = parsed.get("订单状态", "")
  293. if any(k in status for k in active_status_keywords):
  294. has_active_order = True
  295. card_info = "\n".join([f" - {k}: {v}" for k, v in parsed.items()])
  296. order_texts.append(card_info)
  297. if not target_order_id: break
  298. if order_texts:
  299. rag_data_list.append("【客户历史订单详情】:\n" + "\n---\n".join(order_texts))
  300. # --- 核心改进:无活动订单时的知识库兜底 ---
  301. shipping_query = any(k in last_text for k in ["什么时候发货", "发货", "没发货", "多久发货", "快递"])
  302. if shipping_query and not has_active_order:
  303. kb = load_kb()
  304. policy = kb.get("shipping_policy", "")
  305. if policy:
  306. print(f" -> [知识库兜底] 未发现活动订单,补充通用发货政策。")
  307. rag_data_list.append("【店铺通用发货政策(知识库)】:\n" + policy)
  308. except Exception:
  309. pass
  310. if not rag_data_list:
  311. match = re.search(r'(\d{19})', last_text)
  312. if match:
  313. api_data = fetch_order_api(match.group(1), page)
  314. if api_data: rag_data_list.append(f"【系统接口拉取的订单数据】:\n{api_data}")
  315. # 2. 尝试商品 RAG
  316. if needs_product_check:
  317. search_key = last_text
  318. for word in ["有货吗", "库存", "价格", "多少钱", "这款", "那个", "", "", "", "?", "咨询"]:
  319. search_key = search_key.replace(word, "")
  320. search_key = search_key.strip()
  321. if len(search_key) >= 1:
  322. products = db.search_products(search_key)
  323. if products:
  324. print(f" -> [商品意图] 命中关键词 '{search_key}',从数据库检索到相关数据。")
  325. prod_info_list = []
  326. for p in products:
  327. info = f" - 商品名: {p['name']}\n 库存: {p['stock']} | 价格: {p['price']}元 | 状态: {p['status']} | 发货时效: {p['delivery_time']}"
  328. prod_info_list.append(info)
  329. rag_data_list.append("【店铺实时商品库存/价格详情】:\n" + "\n".join(prod_info_list))
  330. if rag_data_list:
  331. context_info = "请根据以下实时检索到的背景事实精准回答顾客的问题:\n\n" + "\n\n".join(rag_data_list)
  332. reply_content = get_reply_from_api(customer_name, last_text, context_info)
  333. if reply_content:
  334. input_box = page.locator("#im-input-box textarea")
  335. if input_box.is_visible():
  336. input_box.fill("")
  337. input_box.type(reply_content, delay=50) # 模拟人手打字速度
  338. input_box.press("Enter")
  339. print(f" ==> 已发送回复: {reply_content}")
  340. replied_history[customer_name] = last_text
  341. time.sleep(1)
  342. # --- 核心新增:自动化 UI 发送商品卡片 ---
  343. if needs_product_check and 'search_key' in locals() and search_key:
  344. try:
  345. send_product_link_via_ui(page, search_key)
  346. except Exception as e:
  347. print(f" ⚠️ UI 发送商品动作执行异常: {e}")
  348. # 休眠一小段时间,避免过度占用 CPU
  349. time.sleep(5)
  350. except Exception:
  351. pass
  352. if __name__ == "__main__":
  353. run_bot()