|
|
- import time
- import re
- from playwright.sync_api import Page
-
- def scrape_products(page: Page):
- """
- 深度抓取:支持无限滚动、自动加载与智能去重
- """
- try:
- print(">> 正在尝试自动切换到“商品”标签页...")
- try:
- # 锁定精准类名 Qk7Fc20IPHKGdyq8SdNw
- target_tab = page.locator("div.Qk7Fc20IPHKGdyq8SdNw:has-text('商品')").first
-
- if target_tab.count() > 0:
- target_tab.hover()
- target_tab.click(force=True)
- print(" -> 已点击“商品”标签。")
- else:
- # 兜底:直接按文本点击
- page.get_by_text("商品", exact=True).first.click(force=True)
- print(" -> 已执行兜底点击。")
-
- # --- 核心优化 1:等待商品列表出现 ---
- print(" -> 正在等待商品列表渲染 (最长10秒)...")
- start_wait = time.time()
- found_list = False
- while time.time() - start_wait < 10:
- # 寻找包含“库存”字样的 div,这是商品列表加载出来的标志
- check_cards = page.locator("div").filter(has_text=re.compile(r"库存\s*\d+")).count()
- if check_cards > 0:
- found_list = True
- break
- time.sleep(1)
-
- if not found_list:
- print(" ⚠️ 商品列表在 10 秒内未渲染,可能页面加载较慢或标签切换未成功。")
- else:
- print(f" -> 商品列表已就绪,当前可见商品块: {check_cards}")
- except Exception as e:
- print(f" ⚠️ 切换标签页失败: {e}")
-
- # 2. 深度滚动抓取逻辑
- print(" -> 开始执行深度滚动抓取...")
- results = []
- seen_names = set()
-
- max_scrolls = 40 # 增加最大滚动次数,支持更多商品
- scroll_count = 0
- consecutive_no_new = 0
- has_found_any = False # 标记是否已经抓到过任何东西
-
- while scroll_count < max_scrolls:
- # --- 核心优化 1:使用特征属性定位商品卡片 ---
- # 飞鸽页面中,商品卡片通常带有 data-btm-id,且包含 .card_ 字样
- # 如果没有,则使用包含“库存”字样的 div 作为基准
- card_locators = [
- page.locator("div[data-btm-id*='card_']"),
- page.locator("div._2a3389bc1da255c500cb6d07f312c984-less"), # 用户提供的特定类名
- page.locator("div").filter(has_text=re.compile(r"库存\s*\d+"))
- ]
-
- current_cards = []
- for loc in card_locators:
- all_found = loc.all()
- if len(all_found) > 0: # 只要抓到,不限数量
- current_cards = all_found
- break
-
- new_found_this_round = 0
-
- for card in current_cards:
- try:
- # 获取卡片的 inner_text,如果太短则尝试溯源到父级
- inner_text = card.inner_text()
-
- # 鲁棒性改进:如果卡片本身没抓到“发送”字样,可能是因为定位到了子 div
- # 尝试向上寻找直到找到包含“发送”的父级
- if "发送" not in inner_text and "库存" in inner_text:
- parent = card
- for _ in range(3): # 最多向上找 3 层
- parent = parent.locator("xpath=..")
- p_text = parent.inner_text()
- if "发送" in p_text:
- inner_text = p_text
- break
-
- # 最终检查:必须包含关键特征
- if "库存" not in inner_text:
- continue
-
- # 对 2,533 这种带逗号的数字进行清理
- clean_text = inner_text.replace(",", "")
-
- # 提取名称:取第一行或非空长行
- lines = [l.strip() for l in inner_text.split("\n") if len(l.strip()) > 3]
- junk_words = ["全部商品", "浏览足迹", "爆品推荐", "订单", "会话搜索", "快捷短语", "保障", "物流", "复制", "发送"]
- name = ""
- for l in lines:
- if not any(j in l for j in junk_words) and "库存" not in l and "¥" not in l and "已售" not in l:
- name = l
- break
-
- if not name or name in seen_names:
- continue
-
- seen_names.add(name)
- new_found_this_round += 1
- has_found_any = True
-
- # 提取图片 (支持 background-image 和 img 标签)
- img_url = ""
- img_div = card.locator("div[style*='background-image']").first
- if img_div.count() > 0:
- style = img_div.get_attribute("style")
- match = re.search(r'url\("?([^"\)]+)"?\)', style)
- if match: img_url = match.group(1)
- if not img_url:
- img_el = card.locator("img").first
- if img_el.count() > 0: img_url = img_el.get_attribute("src")
-
- # 解析价格与库存
- price = 0
- price_match = re.search(r"¥\s*([\d\.]+)", inner_text)
- if price_match: price = float(price_match.group(1))
-
- stock_match = re.search(r"库存\s*([\d,]+)", inner_text)
- if stock_match:
- stock_str = stock_match.group(1).replace(",", "")
- stock = int(stock_str)
-
- status = "现货"
- if "预售" in inner_text or "预热" in inner_text: status = "预售"
-
- delivery_time = ""
- # 增强发货时间正则:支持“最晚 48小时内发货”、“15天内发货”等
- delivery_match = re.search(r"((?:最晚|现在付款,)?(?:明天|后天|[\d]+小时内|[\d]+天内)发货)", inner_text)
- if delivery_match: delivery_time = delivery_match.group(1)
-
- item_data = {
- "name": name, "img_url": img_url, "stock": stock,
- "status": status, "delivery_time": delivery_time, "price": price
- }
- results.append(item_data)
- print(f" [抓取成功] {name[:10]}... | 库存:{stock} | 时间:{delivery_time}")
- except:
- continue
-
- # --- 核心优化 2:更稳健的退出判定 ---
- if new_found_this_round == 0:
- # 只有在已经抓到过东西的前提下,才开始累计“到底”计数
- if has_found_any:
- consecutive_no_new += 1
- else:
- consecutive_no_new = 0
-
- if consecutive_no_new >= 2: # 连续两次滚动没有新商品,说明终于到底了
- print(" -> 连续两轮未发现新商品,判定已到达列表底部。")
- break
-
- # 执行滚动:优先尝试滚动到当前视图的最后一个元素,再执行容器滚动
- try:
- if current_cards:
- last_card = current_cards[-1]
- last_card.scroll_into_view_if_needed()
- time.sleep(0.5)
-
- page.evaluate('''() => {
- const selectors = [
- '.auxo-tabs-content-active',
- '.arco-tabs-content',
- '.scrollbar-container',
- '.auxo-list-items',
- '.auxo-spin-container',
- '[class*="tabs-content"]'
- ];
- let scrolled = false;
- for (let sel of selectors) {
- const els = document.querySelectorAll(sel);
- for (let el of els) {
- if (el && el.scrollHeight > el.clientHeight) {
- // 每次向下载入 1200 像素,确保触发更多数据
- el.scrollTop += 1200;
- scrolled = true;
- }
- }
- }
- if (!scrolled) window.scrollBy(0, 1000);
- }''')
- # 等待加载:给予更多缓冲时间让新数据渲染
- time.sleep(2.5)
- except Exception as e:
- print(f" ⚠️ 执行滚动动作时出错: {e}")
-
- scroll_count += 1
- print(f" -> 滚动中 ({scroll_count}/{max_scrolls})... 当前新发现 {new_found_this_round} 个商品")
-
- print(f">> 深度抓取同步完成,共抓取到 {len(results)} 个唯一商品。")
- return results
-
- except Exception as e:
- print(f"[Scraper] 抓取发生异常: {e}")
- return []
-
- def send_product_link_via_ui(page: Page, keyword: str):
- """
- 通过 UI 界面搜索商品并点击“发送商品”
- """
- try:
- if not keyword:
- return False
-
- print(f">> [UI动作] 尝试为客户发送商品卡片: {keyword}")
-
- # 1. 切换到“商品”标签页
- target_tab = page.locator("div.Qk7Fc20IPHKGdyq8SdNw:has-text('商品')").first
- if target_tab.count() > 0:
- target_tab.click(force=True)
- time.sleep(1)
-
- # 2. 在搜索框中输入关键词
- search_input = page.locator("input.auxo-input[placeholder*='输入商品名称']").first
- if search_input.count() > 0:
- search_input.fill("")
- search_input.type(keyword, delay=50)
- search_input.press("Enter")
- print(f" -> 已输入关键词并回车,等待结果加载...")
- time.sleep(3) # 给更多时间加载搜索结果
-
- # 3. 寻找并点击第一个“发送商品”按钮
- # 核心优化:使用 JS 注入在当前视图中寻找类名为 auxo-btn-dashed 且包含“发送商品”文本的按钮
- # 这种方式比常规 Selector 更能抵抗 DOM 结构波动
- click_success = page.evaluate('''(keyword) => {
- const buttons = Array.from(document.querySelectorAll('button.auxo-btn-dashed'));
- const sendBtn = buttons.find(btn => btn.innerText.includes('发送商品'));
- if (sendBtn) {
- sendBtn.scrollIntoView({ behavior: 'smooth', block: 'center' });
- // 给一步滚动后的缓冲时间
- return new Promise(resolve => {
- setTimeout(() => {
- sendBtn.click();
- resolve(true);
- }, 500);
- });
- }
- return false;
- }''', keyword)
-
- if click_success:
- print(f" -> [成功] 已通过 JS 触发“发送商品”点击 (关键词: {keyword})。")
- time.sleep(2) # 点击后留足发送时间
- else:
- print(f" -> [失败] 页面上未发现可点击的“发送商品”按钮。")
- else:
- print(" ⚠️ 未能在页面上找到商品搜索框。")
-
- # 4. 复位:返回订单标签页
- time.sleep(1) # 复位前稍等
- page.locator("div.Qk7Fc20IPHKGdyq8SdNw:has-text('订单')").first.click(force=True)
- return True
- except Exception as e:
- print(f">> [UI动作] 执行发送商品动作失败: {e}")
- return False
|