123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197 |
- import io
- import os.path
- import time
- from datetime import datetime
- from logging import log
- from urllib.parse import urlparse
- import scrapy
- from PIL import Image
- from selenium import webdriver
- from selenium.common import TimeoutException
- from selenium.webdriver import ActionChains
- from selenium.webdriver.chrome.options import Options
- from selenium.webdriver.common.by import By
- from selenium.webdriver.support import expected_conditions as EC
- from selenium.webdriver.support.wait import WebDriverWait
- from nmc.compress_image import capture_element_screenshot
- def js_is_complete(driver):
- return driver.execute_script("return document.readyState") == "complete"
- def retry_get_url(driver, url, retries=3, delay=5):
- for attempt in range(retries):
- try:
- driver.get(url)
- # 可以在这里添加额外的检查,确认页面是否成功加载
- return True # 成功访问URL,退出循环
- except TimeoutException:
- if attempt < retries - 1: # 如果还没达到最大重试次数
- time.sleep(delay) # 等待一段时间后重试
- else:
- return False # 所有重试都失败了
- class NmcspiderSpider(scrapy.Spider):
- name = "nmcspider"
- allowed_domains = ["www.nmc.cn"]
- start_urls = ["http://www.nmc.cn/rest/province"]
- def start_requests(self):
- yield scrapy.Request(url=self.start_urls[0], callback=self.parse_provinces)
- def __init__(self):
- chrome_path = r'E:\chromedriver_win32\chromedriver.exe'
- chrome_options = Options()
- # 添加路径到chrome_optionsmmm
- chrome_options = webdriver.ChromeOptions()
- chrome_options.add_experimental_option("detach", True)
- chrome_options.add_argument('--webdriver-executable-path=' + chrome_path)
- # 如果需要添加其他选项,例如禁用浏览器窗口,可以这样:
- chrome_options.add_argument('--headless')
- chrome_options.add_argument('--disable-gpu')
- chrome_options.add_argument('--no-sandbox')
- self.driver = webdriver.Chrome(options=chrome_options)
- def close(self, reason):
- self.driver.quit()
- def parse_provinces(self, response):
- provinces_data = response.json()
- # 遍历省份数据,为每个省份生成新的请求
- for province in provinces_data:
- # 假设每个省份有url字段,基于此创建新的请求
- province_url = province.get('code') # 使用实际的键名,根据实际返回的JSON结构
- if province_url:
- yield scrapy.Request(url="http://www.nmc.cn/rest/province/" + province_url,
- callback=self.parse_province_details)
- def parse_province_details(self, response):
- provinces_data = response.json()
- for provinces in provinces_data:
- province_url = provinces.get('url')
- province = provinces.get('province')
- city = provinces.get('city')
- if province_url:
- yield scrapy.Request(url="http://www.nmc.cn" + province_url, callback=self.parse,
- meta={"province": province, "city": city})
- def parse(self, response):
- # 省份名称
- province = response.meta['province']
- # 城市名称
- city = response.meta['city']
- success = retry_get_url(self.driver, response.url)
- if success == False:
- self.logger.error(f"{response.url}超过最大重试次数")
- else:
- # self.driver.get(response.url)
- self.driver.set_window_size(1920, 1080)
- # 创建文件目录
- today_str = datetime.now().strftime("%Y-%m-%d")
- path_to_save = os.path.join("resource", province, city, today_str)
- full_path = os.path.join(".", path_to_save)
- if not os.path.exists(full_path):
- os.makedirs(full_path)
- # 获取第一张图片-天气预报
- element = WebDriverWait(self.driver, 20).until(
- EC.presence_of_element_located((By.XPATH, '/html/body/div[2]/div')))
- self.driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", element)
- location = element.location
- size = element.size
- # 获取天气预报图
- # 使用PIL和numpy处理图像
- screenshot = self.driver.get_screenshot_as_png()
- image = Image.open(io.BytesIO(screenshot))
- crop_area = (location['x'], location['y'], location['x'] + size['width'], location['y'] + size['height'])
- cropped_image = image.crop(crop_area)
- # 将图像转换为RGB模式,因为JPEG格式需要此模式
- cropped_image = cropped_image.convert("RGB")
- # 保存裁剪后的图片
- url = urlparse(response.url)
- parts = url.path.split('/')
- target_part = '_'.join([parts[3], parts[4].split('.')[0]])
- name = target_part + "_1.png"
- jpeg_name = name.rsplit('.', 1)[0] + ".jpg"
- jpeg_path = os.path.join(full_path, jpeg_name)
- quality = 50
- # image_path = os.path.join(full_path, name)
- cropped_image.save(jpeg_path, "JPEG", quality=quality)
- # 获取第二张图片
- highcharts = WebDriverWait(self.driver, 20).until(
- EC.presence_of_element_located((By.ID, 'realChart')))
- self.driver.execute_script("arguments[0].scrollIntoView({block: 'center', behavior: 'instant'});",
- highcharts)
- # time.sleep(2)
- # 获取屏幕截图
- screenshot = self.driver.get_screenshot_as_png()
- script = """
- var element = document.getElementById('realChart');
- var rect = element.getBoundingClientRect();
- return {left: rect.left, top: rect.top, width: rect.width, height: rect.height};
- """
- position_info = self.driver.execute_script(script)
- # 获取天气预报图
- # 使用PIL和numpy处理图像
- image = Image.open(io.BytesIO(screenshot))
- crop_area = (
- position_info['left'], position_info['top'],
- position_info['left'] + position_info['width'],
- position_info['top'] + position_info['height'])
- cropped_image = image.crop(crop_area)
- # 保存裁剪后的图片
- name = target_part + "_2.png"
- image_path = os.path.join(full_path, name)
- cropped_image.save(image_path)
- try:
- # 第三张图片
- # 等待js全部加载完成
- js_is_complete(self.driver)
- WebDriverWait(self.driver, 20).until(js_is_complete)
- highcharts = WebDriverWait(self.driver, 20).until(
- EC.presence_of_element_located((By.CSS_SELECTOR, '#container')))
- self.driver.execute_script("arguments[0].scrollIntoView({block: 'center', behavior: 'instant'});",
- highcharts)
- # 找到点击的按钮
- WebDriverWait(self.driver, 20).until(
- EC.presence_of_element_located((By.CSS_SELECTOR,
- '#container> .highcharts-container >svg > g > g > g >g.highcharts-legend-item.highcharts-column-series.highcharts-color-undefined.highcharts-series-0')))
- element = self.driver.find_element(By.CSS_SELECTOR,
- '#container> .highcharts-container >svg > g > g > g >g.highcharts-legend-item.highcharts-column-series.highcharts-color-undefined.highcharts-series-0')
- self.driver.execute_script("arguments[0].dispatchEvent(new Event('click'));",
- element)
- # 等待点击之后页面加载完成
- WebDriverWait(self.driver, 20).until(EC.visibility_of_element_located((By.ID, 'container')))
- # time.sleep(2)
- # 获取屏幕截图
- screenshot = self.driver.get_screenshot_as_png()
- scripts = """
- var element = document.getElementById('container');
- var rect = element.getBoundingClientRect();
- return {left: rect.left, top: rect.top, width: rect.width, height: rect.height};
- """
- position_info = self.driver.execute_script(scripts)
- # 获取天气预报图
- # 使用PIL和numpy处理图像
- image = Image.open(io.BytesIO(screenshot))
- crop_area = (
- position_info['left'], position_info['top'] + 50,
- position_info['left'] + position_info['width'],
- position_info['top'] + position_info['height'])
- cropped_image = image.crop(crop_area)
- # 保存裁剪后的图片
- name = target_part + "_3.png"
- image_path = os.path.join(full_path, name)
- cropped_image.save(image_path)
- except TimeoutException:
- default_image_path = r"E:\pySpider\nmc\default_image.png" # 这里填写默认图片的路径
- backup_image = Image.open(default_image_path)
- name = target_part + "_3.png"
- image_path = os.path.join(full_path, name)
- backup_image.save(image_path)
- current_url = self.driver.current_url # 获取当前页面的URL
- self.logger.error(
- f"在页面 {current_url} 上,在指定时间内未能找到指定元素使用默认图片")
|