#!/usr/bin/env python # -*- encoding: utf-8 -*- """ --------------------------------------- # @Project : DAGASI # @File : kemono.py # @Author : GrayZhao # @Date : 2023/2/20 17:26 # @Version : # @Description : --------------------------------------- """ import gevent import requests from requests.exceptions import ConnectionError, ChunkedEncodingError import re import os from queue import Queue from bs4 import BeautifulSoup from tqdm import tqdm HEADER = { "accept-language": "zh-CN,zh;q=0.9,en;q=0.8", "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36" } URL_LOGIN = "https://kemono.party/account/login" HEADER_LOGIN = { "accept-language": "zh-CN,zh;q=0.9,en;q=0.8", "origin": "https://kemono.party", "referer": "https://kemono.party/account/login", "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36" } """ URL = "https://kemono.party/fanbox/user/1549213/post/5285772" response = requests.get(URL, headers=HEADER) soup = BeautifulSoup(response.text, features="lxml") timestamp = soup.find("div", class_="post__published").findChild("time")["datetime"].split(" ")[0] print(timestamp) downloads = soup.find_all("a", class_="post__attachment-link", text=re.compile(r"高画質")) for download in downloads: dw_url = download["href"] name = download.text.replace("\n", "").strip().replace("Download ", "") print(name, dw_url) """ class Kemono: __error_ids = list() __data_queue = Queue() __session = requests.session() @classmethod def login(cls, username: str, passwd: str): _data = {"username": username, "password": passwd} cls.__session.post(url=URL_LOGIN, headers=HEADER_LOGIN, data=_data) @classmethod def produce(cls, id_q: Queue): tasks = [gevent.spawn(cls.__create_data, id_q) for _ in range(4)] gevent.joinall(tasks) cls.__data_queue.join() @classmethod def __create_data(cls, id_q: Queue): while not id_q.empty(): post_id = id_q.get_nowait() url = f"https://kemono.party/fanbox/user/1549213/post/{post_id}" response = cls.__session.get(url, headers=HEADER) soup = BeautifulSoup(response.text, features="lxml") timestamp = soup.find("div", class_="post__published").findChild("time")["datetime"].split(" ")[0] downloads = soup.find_all("a", class_="post__attachment-link", text=re.compile(r"高画質")) for download in downloads: dw_url = download["href"] all_name = download.text.replace("\n", "").strip().replace("Download ", "") name = os.path.splitext(all_name)[0] save_name = f"[{timestamp}] {all_name}" q_data = { "postID": post_id, "name": name, "saveName": save_name, "url": dw_url, "error_count": 0 } cls.__data_queue.put_nowait(q_data) @classmethod def consume(cls): tasks = [gevent.spawn(cls.__download) for _ in range(4)] gevent.joinall(tasks) @classmethod def __download(cls): while True: data = cls.__data_queue.get() post_id = data["postID"] name = data["name"] save_name = "output/" + data["saveName"] url = data["url"] try: response = cls.__session.get(url=url, headers=HEADER, stream=True) except (ConnectionError, ChunkedEncodingError): if data["error_count"] >= 3: cls.__error_ids.append(f"{post_id} {name}") else: data["error_count"] += 1 cls.__data_queue.put(data) cls.__data_queue.task_done() continue total_size = int(response.headers.get("content-length", 0)) if os.path.exists(save_name): now_range = os.path.getsize(save_name) else: now_range = 0 if now_range >= total_size: print(f"[{post_id}] {name} 已下载完成...") cls.__data_queue.task_done() continue header = HEADER.copy() header.update(Range=f"bytes={now_range}-") block_size = 1024 pbar = tqdm(desc=f"{name}", total=total_size, initial=now_range, unit="KB", unit_scale=True, leave=False) try: response = cls.__session.get(url=url, headers=header, stream=True) with open(save_name, "ab") as file: for chuck in response.iter_content(block_size): file.write(chuck) pbar.update(len(chuck)) pbar.set_postfix(info=f"剩余个数:{cls.__data_queue.qsize()}") except (ConnectionError, ChunkedEncodingError) as e: if data["error_count"] >= 3: cls.__error_ids.append(f"{post_id} {name}") else: data["error_count"] += 1 cls.__data_queue.put(data) pbar.write(f"出现错误:\n{e}") finally: pbar.close() cls.__data_queue.task_done() continue @classmethod def is_error_ids(cls): if cls.__error_ids: return cls.__error_ids.sort() else: return False