# -*- coding: utf-8 -*- """ Lead extractor for Google (HTML export) business results — UK tuned. Outputs: Business Name | Phone | Website | Address - Per-listing block slicing via the "PiKi2c" marker (2k–8k chars by default) - Address: decodes Google Maps '/maps/dir//' links found in href= or data-url= - Phone (UK): supports '0ddd ddd dddd', '0dddd dddddd', '0ddd dddd dddd', contiguous 11 digits, and common 07 mobile format; robust to non-breaking spaces - Website: first external (non-Google/non-ads) link in block - Light ad-skip heuristic (configurable) Use the helper "run_on_all_html()" at bottom to auto-scan uploaded files and export CSV. """ import re import html from pathlib import Path from urllib.parse import urlparse, parse_qs, unquote import pandas as pd # ------------------------------- # Core helpers # ------------------------------- def read_text(path: str) -> str: return Path(path).read_text(encoding="utf-8", errors="ignore") def find_markers(text: str, marker: str = "PiKi2c") -> list[int]: out, i, L = [], 0, len(marker) while True: j = text.find(marker, i) if j == -1: break out.append(j + 1) i = j + L return out def slice_blocks(text: str, marker_positions_1based: list[int], max_block: int = 8000) -> list[str]: blocks = [] N = len(marker_positions_1based) for i, p1 in enumerate(marker_positions_1based): start0 = p1 - 1 end0 = marker_positions_1based[i+1] - 1 if i < N-1 else start0 + max_block end0 = min(end0, start0 + max_block) blocks.append(text[start0:end0]) return blocks # ------------------------------- # Extractors # ------------------------------- # Business name RE_NAME = re.compile(r']*class="OSrXXb"[^>]*>(.*?)', re.DOTALL) def extract_business_name(block_html: str) -> str | None: m = RE_NAME.search(block_html) if not m: return None name = html.unescape(m.group(1)).strip() return name or None # Phones (UK formats) def normalise_spaces(s: str) -> str: return s.replace(" ", " ").replace("\xa0", " ") PHONE_PATTERNS = [ r"\b0\d{3} \d{3} \d{4}\b", # 0ddd ddd dddd r"\b0\d{4} \d{6}\b", # 0dddd dddddd r"\b0\d{3} \d{4} \d{4}\b", # 0ddd dddd dddd r"\b0\d{10}\b", # 0dddddddddd (11 contiguous) r"\b07\d{3} \d{6}\b", # mobiles like 07xxx xxxxxx ] def extract_phone(block_html: str) -> str | None: s = normalise_spaces(block_html) for window in (2500, None): search_space = s if window is None else s[:window] for pat in PHONE_PATTERNS: m = re.search(pat, search_space) if m: return m.group(0) return None # Website (skip Google/ads) RE_HREF = re.compile(r'href=["\'](https?://[^"\']+)["\']', re.IGNORECASE) def is_skip_link(url: str) -> bool: u = url.lower() return ("google.com" in u) or ("/maps/dir" in u) or ("googleadservices.com" in u) def resolve_ad_redirect(url: str) -> str: if "googleadservices.com" in url: try: q = parse_qs(urlparse(url).query) for k in ("url", "adurl"): if k in q and q[k]: return unquote(q[k][0]) except: pass return url def extract_website(block_html: str) -> str | None: for m in RE_HREF.finditer(block_html): url = resolve_ad_redirect(m.group(1)) if not is_skip_link(url): return url return None # Address via /maps/dir// in href or data-url RE_DIR_LINK = re.compile(r'(?:href|data-url)=["\'](/maps/dir//[^"\']+)["\']', re.IGNORECASE) RE_UK_PC = re.compile(r"\b[A-Z]{1,2}\d{1,2}[A-Z]?\s*\d[A-Z]{2}\b", re.IGNORECASE) def decode_maps_dir_address(raw: str) -> str | None: if not raw: return None s = html.unescape(raw) for cut in ("/data=", "?", "&"): idx = s.find(cut) if idx != -1: s = s[:idx] if s.startswith("/maps/dir//"): s = s[len("/maps/dir//"):] s = s.replace("+", " ") try: s = unquote(s) except: pass return s.strip(" /") or None def extract_address(block_html: str) -> str | None: for m in RE_DIR_LINK.finditer(block_html): addr = decode_maps_dir_address(m.group(1)) if addr: return addr # prefer first; UK postcode check is allowed but not required return None # (Optional) very light ad detection AD_HINTS = ("data-text-ad", "Sponsored", "Ad ·", "Ads ·") def is_probable_ad(block_html: str) -> bool: s = block_html.lower() return any(h.lower() in s for h in AD_HINTS) # Row + file processing def extract_row_from_block(block_html: str, skip_ads: bool = True) -> dict: if skip_ads and is_probable_ad(block_html): return {"Business Name": None, "Phone": None, "Website": None, "Address": None} return { "Business Name": extract_business_name(block_html), "Phone": extract_phone(block_html), "Website": extract_website(block_html), "Address": extract_address(block_html), } def process_google_html(file_path: str, marker: str = "PiKi2c", max_block_chars: int = 8000, skip_ads: bool = True) -> pd.DataFrame: text = read_text(file_path) positions = find_markers(text, marker=marker) blocks = slice_blocks(text, positions, max_block=max_block_chars) rows = [extract_row_from_block(b, skip_ads=skip_ads) for b in blocks] return pd.DataFrame(rows) def process_many(files: list[str], **kwargs) -> pd.DataFrame: dfs = [] for fp in files: df = process_google_html(fp, **kwargs) df.insert(0, "Source File", Path(fp).name) dfs.append(df) return pd.concat(dfs, ignore_index=True) if dfs else pd.DataFrame( columns=["Source File", "Business Name", "Phone", "Website", "Address"] ) # Convenience runner: scan likely upload dirs for *.htm/*.html and export CSV def run_on_all_html(output_csv: str = "leads.csv", max_block_chars: int = 8000, skip_ads: bool = True) -> str: search_dirs = [Path.cwd()] try: p = Path("/mnt/data") if p.exists(): search_dirs.append(p) except Exception: pass html_files = [] for d in search_dirs: html_files.extend(sorted(map(str, d.glob("*.htm")))) html_files.extend(sorted(map(str, d.glob("*.html")))) df = process_many(html_files, max_block_chars=max_block_chars, skip_ads=skip_ads) Path(output_csv).parent.mkdir(parents=True, exist_ok=True) df.to_csv(output_csv, index=False) return str(Path(output_csv).resolve())