| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151 |
- #!/usr/bin/env python
- # -*- encoding: utf-8 -*-
- """
- ---------------------------------------
- # @Project : DAGASI
- # @File : kemono.py
- # @Author : GrayZhao
- # @Date : 2023/2/20 17:26
- # @Version :
- # @Description :
- ---------------------------------------
- """
- import gevent
- import requests
- from requests.exceptions import ConnectionError, ChunkedEncodingError
- import re
- import os
- from queue import Queue
- from bs4 import BeautifulSoup
- from tqdm import tqdm
- HEADER = {
- "accept-language": "zh-CN,zh;q=0.9,en;q=0.8",
- "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36"
- }
- URL_LOGIN = "https://kemono.party/account/login"
- HEADER_LOGIN = {
- "accept-language": "zh-CN,zh;q=0.9,en;q=0.8",
- "origin": "https://kemono.party",
- "referer": "https://kemono.party/account/login",
- "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36"
- }
- """
- URL = "https://kemono.party/fanbox/user/1549213/post/5285772"
- response = requests.get(URL, headers=HEADER)
- soup = BeautifulSoup(response.text, features="lxml")
- timestamp = soup.find("div", class_="post__published").findChild("time")["datetime"].split(" ")[0]
- print(timestamp)
- downloads = soup.find_all("a", class_="post__attachment-link", text=re.compile(r"高画質"))
- for download in downloads:
- dw_url = download["href"]
- name = download.text.replace("\n", "").strip().replace("Download ", "")
- print(name, dw_url)
- """
- class Kemono:
- __error_ids = list()
- __data_queue = Queue()
- __session = requests.session()
- @classmethod
- def login(cls, username: str, passwd: str):
- _data = {"username": username, "password": passwd}
- cls.__session.post(url=URL_LOGIN, headers=HEADER_LOGIN, data=_data)
- @classmethod
- def produce(cls, id_q: Queue):
- tasks = [gevent.spawn(cls.__create_data, id_q) for _ in range(4)]
- gevent.joinall(tasks)
- cls.__data_queue.join()
- @classmethod
- def __create_data(cls, id_q: Queue):
- while not id_q.empty():
- post_id = id_q.get_nowait()
- url = f"https://kemono.party/fanbox/user/1549213/post/{post_id}"
- response = cls.__session.get(url, headers=HEADER)
- soup = BeautifulSoup(response.text, features="lxml")
- timestamp = soup.find("div", class_="post__published").findChild("time")["datetime"].split(" ")[0]
- downloads = soup.find_all("a", class_="post__attachment-link", text=re.compile(r"高画質"))
- for download in downloads:
- dw_url = download["href"]
- all_name = download.text.replace("\n", "").strip().replace("Download ", "")
- name = os.path.splitext(all_name)[0]
- save_name = f"[{timestamp}] {all_name}"
- q_data = {
- "postID": post_id,
- "name": name,
- "saveName": save_name,
- "url": dw_url,
- "error_count": 0
- }
- cls.__data_queue.put_nowait(q_data)
- @classmethod
- def consume(cls):
- tasks = [gevent.spawn(cls.__download) for _ in range(4)]
- gevent.joinall(tasks)
- @classmethod
- def __download(cls):
- while True:
- data = cls.__data_queue.get()
- post_id = data["postID"]
- name = data["name"]
- save_name = "output/" + data["saveName"]
- url = data["url"]
- try:
- response = cls.__session.get(url=url, headers=HEADER, stream=True)
- except (ConnectionError, ChunkedEncodingError):
- if data["error_count"] >= 3:
- cls.__error_ids.append(f"{post_id} {name}")
- else:
- data["error_count"] += 1
- cls.__data_queue.put(data)
- cls.__data_queue.task_done()
- continue
- total_size = int(response.headers.get("content-length", 0))
- if os.path.exists(save_name):
- now_range = os.path.getsize(save_name)
- else:
- now_range = 0
- if now_range >= total_size:
- print(f"[{post_id}] {name} 已下载完成...")
- cls.__data_queue.task_done()
- continue
- header = HEADER.copy()
- header.update(Range=f"bytes={now_range}-{total_size}")
- block_size = 1024
- pbar = tqdm(desc=f"{name}", total=total_size, initial=now_range, unit="KB", unit_scale=True, leave=False)
- try:
- response = cls.__session.get(url=url, headers=header, stream=True)
- with open(save_name, "ab") as file:
- for chuck in response.iter_content(block_size):
- file.write(chuck)
- pbar.update(len(chuck))
- pbar.set_postfix(info=f"剩余个数:{cls.__data_queue.qsize()}")
- except (ConnectionError, ChunkedEncodingError) as e:
- if data["error_count"] >= 3:
- cls.__error_ids.append(f"{post_id} {name}")
- else:
- data["error_count"] += 1
- cls.__data_queue.put(data)
- pbar.write(f"出现错误:\n{e}")
- finally:
- pbar.close()
- cls.__data_queue.task_done()
- continue
- @classmethod
- def is_error_ids(cls):
- if cls.__error_ids:
- return cls.__error_ids.sort()
- else:
- return False
|