HTML 结构解析是 Web 爬虫中的核心技能之一,它允许你从网页中提取所需的信息。Python 提供了几种流行的库来帮助进行 HTML 解析,其中最常用的是 BeautifulSoup
和 lxml
。
首先,你需要安装 requests
(用于发送 HTTP 请求)和 beautifulsoup4
(用于解析 HTML)。可以通过 pip 安装:
pip install requests beautifulsoup4
使用 requests
库可以轻松地从网站抓取 HTML 页面:
import requests url = "https://www.example.com" response = requests.get(url) # 检查请求是否成功 if response.status_code == 200: html_content = response.text else: print(f"Failed to retrieve page, status code: {response.status_code}")
接下来,使用 BeautifulSoup
解析 HTML 内容:
from bs4 import BeautifulSoup soup = BeautifulSoup(html_content, 'html.parser')
这里的 'html.parser'
是解析器的名字,BeautifulSoup
支持多种解析器,包括 Python 自带的标准库、lxml
和 html5lib
。
一旦你有了 BeautifulSoup
对象,你可以开始提取信息。以下是几种常见的选择器方法:
通过标签名:
titles = soup.find_all('h1')
通过类名:
articles = soup.find_all('div', class_='article')
通过 ID:
main_content = soup.find(id='main-content')
通过属性:
links = soup.find_all('a', href=True)
组合选择器:
article_titles = soup.select('div.article h2.title')
提取到数据后,你可以遍历并处理它们:
for title in soup.find_all('h2'): print(title.text.strip())
对于复杂的嵌套结构,你可以使用递归函数来解析:
def parse_section(section): title = section.find('h2') if title: print(title.text.strip()) sub_sections = section.find_all('section') for sub_section in sub_sections: parse_section(sub_section) sections = soup.find_all('section') for section in sections: parse_section(section)
让我们创建一个完整的示例,抓取并解析一个简单的网页:
import requests from bs4 import BeautifulSoup url = "https://www.example.com" # 发送请求并解析 HTML response = requests.get(url) soup = BeautifulSoup(response.text, 'html.parser') # 找到所有的文章标题 article_titles = soup.find_all('h2', class_='article-title') # 输出所有文章标题 for title in article_titles: print(title.text.strip())
这个示例展示了如何从网页中抓取所有具有 class="article-title"
的 h2
元素,并打印出它们的文本内容。
以上就是使用 Python 和 BeautifulSoup 进行 HTML 结构解析的基本流程。当然,实际应用中你可能需要处理更复杂的逻辑,比如处理 JavaScript 渲染的内容或者分页等。
在我们已经讨论的基础上,让我们进一步扩展代码,以便处理更复杂的场景,比如分页、错误处理、日志记录以及数据持久化。我们将继续使用 requests
和 BeautifulSoup
,并引入 logging
和 sqlite3
来记录日志和存储数据。
在爬取过程中,可能会遇到各种问题,如网络错误、服务器错误或解析错误。使用 try...except
块和 logging
模块可以帮助我们更好地处理这些问题:
import logging import requests from bs4 import BeautifulSoup logging.basicConfig(filename='crawler.log', level=logging.INFO, format='%(asctime)s:%(levelname)s:%(message)s') def fetch_data(url): try: response = requests.get(url) response.raise_for_status() # Raises an HTTPError for bad responses soup = BeautifulSoup(response.text, 'html.parser') return soup except requests.exceptions.RequestException as e: logging.error(f"Failed to fetch {url}: {e}") return None # Example usage url = 'https://www.example.com' soup = fetch_data(url) if soup: # Proceed with parsing... else: logging.info("No data fetched, skipping...")
许多网站使用分页显示大量数据。你可以通过检查页面源码找到分页链接的模式,并编写代码来遍历所有页面:
def fetch_pages(base_url, page_suffix='page/'): current_page = 1 while True: url = f"{base_url}{page_suffix}{current_page}" soup = fetch_data(url) if not soup: break # Process page data here... # Check for next page link next_page_link = soup.find('a', text='Next') if not next_page_link: break current_page += 1
使用数据库存储爬取的数据可以方便后续分析和检索。SQLite 是一个轻量级的数据库,非常适合小型项目:
import sqlite3 def init_db(): conn = sqlite3.connect('data.db') cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS articles ( id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT NOT NULL, author TEXT, published_date DATE ) ''') conn.commit() return conn def save_article(conn, title, author, published_date): cursor = conn.cursor() cursor.execute(''' INSERT INTO articles (title, author, published_date) VALUES (?, ?, ?) ''', (title, author, published_date)) conn.commit() # Initialize database conn = init_db() # Save data save_article(conn, "Example Title", "Author Name", "2024-07-24")
让我们将上述概念整合成一个完整的示例,抓取分页数据并将其保存到 SQLite 数据库:
import logging import requests from bs4 import BeautifulSoup import sqlite3 logging.basicConfig(filename='crawler.log', level=logging.INFO) def fetch_data(url): try: response = requests.get(url) response.raise_for_status() return BeautifulSoup(response.text, 'html.parser') except requests.exceptions.RequestException as e: logging.error(f"Failed to fetch {url}: {e}") return None def fetch_pages(base_url, page_suffix='page/'): conn = sqlite3.connect('data.db') cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS articles ( id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT NOT NULL, author TEXT, published_date DATE ) ''') conn.commit() current_page = 1 while True: url = f"{base_url}{page_suffix}{current_page}" soup = fetch_data(url) if not soup: break # Assume the structure of the site allows us to find titles easily titles = soup.find_all('h2', class_='article-title') for title in titles: save_article(conn, title.text.strip(), None, None) next_page_link = soup.find('a', text='Next') if not next_page_link: break current_page += 1 conn.close() def save_article(conn, title, author, published_date): cursor = conn.cursor() cursor.execute(''' INSERT INTO articles (title, author, published_date) VALUES (?, ?, ?) ''', (title, author, published_date)) conn.commit() # Example usage base_url = 'https://www.example.com/articles/' fetch_pages(base_url)
这个示例将抓取 https://www.example.com/articles/
上的分页数据,保存文章标题到 SQLite 数据库。注意,你需要根据实际网站的 HTML 结构调整 find_all
和 find
方法的参数。
既然我们已经有了一个基本的框架来抓取分页数据并存储到 SQLite 数据库中,现在让我们进一步完善这个代码,包括添加更详细的错误处理、日志记录、以及处理动态加载的网页内容(通常由 JavaScript 渲染)。
在 fetch_data
函数中,除了处理请求错误之外,我们还可以捕获和记录其他可能发生的错误,比如解析 HTML 的错误:
def fetch_data(url): try: response = requests.get(url) response.raise_for_status() soup = BeautifulSoup(response.text, 'html.parser') return soup except requests.exceptions.RequestException as e: logging.error(f"Request error fetching {url}: {e}") except Exception as e: logging.error(f"An unexpected error occurred: {e}") return None
在日志记录方面,我们可以增加更多的信息,比如请求的 HTTP 状态码、响应时间等:
import time def fetch_data(url): try: start_time = time.time() response = requests.get(url) elapsed_time = time.time() - start_time response.raise_for_status() soup = BeautifulSoup(response.text, 'html.parser') logging.info(f"Fetched {url} successfully in {elapsed_time:.2f} seconds, status code: {response.status_code}") return soup except requests.exceptions.RequestException as e: logging.error(f"Request error fetching {url}: {e}") except Exception as e: logging.error(f"An unexpected error occurred: {e}") return None
当网站使用 JavaScript 动态加载内容时,普通的 HTTP 请求无法获取完整的内容。这时可以使用 Selenium
或 Pyppeteer
等库来模拟浏览器行为。这里以 Selenium
为例:
from selenium import webdriver from selenium.webdriver.chrome.options import Options def fetch_data_with_js(url): options = Options() options.headless = True # Run Chrome in headless mode driver = webdriver.Chrome(options=options) driver.get(url) # Add wait time or wait for certain elements to load time.sleep(3) # Wait for dynamic content to load html = driver.page_source driver.quit() return BeautifulSoup(html, 'html.parser')
要使用这段代码,你需要先下载 ChromeDriver
并确保它在系统路径中可执行。此外,你还需要安装 selenium
库:
pip install selenium
现在,我们可以将上述所有改进点整合到我们的分页数据抓取脚本中:
import logging import time import requests from bs4 import BeautifulSoup import sqlite3 from selenium import webdriver from selenium.webdriver.chrome.options import Options logging.basicConfig(filename='crawler.log', level=logging.INFO) def fetch_data(url): try: start_time = time.time() response = requests.get(url) elapsed_time = time.time() - start_time response.raise_for_status() soup = BeautifulSoup(response.text, 'html.parser') logging.info(f"Fetched {url} successfully in {elapsed_time:.2f} seconds, status code: {response.status_code}") return soup except requests.exceptions.RequestException as e: logging.error(f"Request error fetching {url}: {e}") except Exception as e: logging.error(f"An unexpected error occurred: {e}") return None def fetch_data_with_js(url): options = Options() options.headless = True driver = webdriver.Chrome(options=options) driver.get(url) time.sleep(3) html = driver.page_source driver.quit() return BeautifulSoup(html, 'html.parser') def fetch_pages(base_url, page_suffix='page/', use_js=False): conn = sqlite3.connect('data.db') cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS articles ( id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT NOT NULL, author TEXT, published_date DATE ) ''') conn.commit() current_page = 1 fetch_function = fetch_data_with_js if use_js else fetch_data while True: url = f"{base_url}{page_suffix}{current_page}" soup = fetch_function(url) if not soup: break titles = soup.find_all('h2', class_='article-title') for title in titles: save_article(conn, title.text.strip(), None, None) next_page_link = soup.find('a', text='Next') if not next_page_link: break current_page += 1 conn.close() def save_article(conn, title, author, published_date): cursor = conn.cursor() cursor.execute(''' INSERT INTO articles (title, author, published_date) VALUES (?, ?, ?) ''', (title, author, published_date)) conn.commit() # Example usage base_url = 'https://www.example.com/articles/' use_js = True # Set to True if the site uses JS for loading content fetch_pages(base_url, use_js=use_js)
这个改进版的脚本包含了错误处理、详细的日志记录、以及处理动态加载内容的能力,使得它更加健壮和实用。
上一篇:前端 CSS 20道面试题详解
下一篇:【多线程】阻塞队列