import io import os import sys import re import json import time import string import random import logging import hashlib import zipfile import subprocess from datetime import datetime, timezone import requests import uuid logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) formatter = logging.Formatter('[%(levelname)s] %(message)s') console_handler.setFormatter(formatter) logger.addHandler(console_handler) aweme_host = 'api16-normal-useast2a.tiktokv.com' #modify this based on your tt-target-idc # Some more hosts to try if "login expired" # Examples: # 'api16-normal-c-useast1a.tiktokv.com' #useast1a # 'api16-normal-useast2a.tiktokv.com' #useast2a # 'api16-normal-useast5.us.tiktokv.com' #useast5 # 'api22-normal-v4.tiktokv.com' # 'api22-normal-c-alisg.tiktokv.com' # 'api16-normal-useast1a.tiktokv.com' # 'api16-normal-no1a.tiktokv.eu' APP_INFO = { 'app_name': 'musical_ly', 'app_version': '40.0.4', 'manifest_app_version': '2024000040', 'aid': '1233' } web_headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', 'Referer': 'https://www.tiktok.com/' } aweme_headers = { "User-Agent": f"com.zhiliaoapp.musically/{APP_INFO['manifest_app_version']} (Linux; U; Android 13; en_US; Pixel 7; Build/TD1A.220804.031; Cronet/58.0.2991.0)", "X-Argus": "", "X-Ladon": "", "Content-Type": "application/json" } def check_integrity(video, filename, keep_corrupt): try: result = subprocess.run( f'ffprobe -v error -select_streams v:0 -show_entries packet=pts_time -of csv=p=0 "{video}"', capture_output=True, check=True, text=True, shell=True ) integrity_passed = 'invalid data' not in (result.stdout + result.stderr).lower() except Exception: logger.exception('Error checking integrity. Deleting video: %s', filename) os.remove(video) return 1 if integrity_passed: logger.info('Video passed FFprobe integrity check.') elif keep_corrupt: logger.warning('Video failed FFprobe integrity check. Keeping the corrupted file.') else: logger.error('Video failed FFprobe integrity check. Deleting: %s', filename) os.remove(video) return 1 os.replace(video, filename) logger.info("DOWNLOADED: %s", filename) return 0 def check_source_encode(video, sd_video): try: cmd = f'ffprobe -v error -select_streams v:0 -show_entries stream=width,height:format_tags -of json "{video}"' output = subprocess.run(cmd, shell=True, capture_output=True, text=True, check=True) data = json.loads(output.stdout.strip()) width = data["streams"][0]["width"] height = data["streams"][0]["height"] metadata_tags = data["format"].get("tags", {}).keys() has_sd_tags = "vid_md5" in metadata_tags or ("comment" in metadata_tags and data["format"]["tags"]["comment"].startswith("vid:v")) is_sd = width <= 576 or height <= 576 if is_sd and has_sd_tags: if os.path.exists(sd_video): os.remove(video) logger.info(f"Downloaded video is SD and '{sd_video}' already exists.Deleting...") else: os.rename(video, sd_video) logger.info(f"Downloaded video is SD. Renaming '{video}' to '{sd_video}'") return False except Exception as e: logger.exception(f"Unexpected error while checking resolution for '{video}': {e}") return True def add_metadata(video, metadata): if not os.path.isfile(video): return 1 try: temp_video = f"{video}.temp" os.replace(video, temp_video) metadata_fields = { "Artist": f"@{metadata.get('username')}" if metadata.get('username') else None, "user_id": metadata.get("user_id"), "sec_uid": metadata.get("sec_uid"), "Nickname": metadata.get("nickname"), "Date": get_video_date(metadata.get("timestamp"), full_format=True), "Title": metadata.get("title"), "Region": metadata.get("region"), "video_id": metadata.get("video_id"), "music_id": metadata.get("music_id"), "music_title": metadata.get("music_title"), "music_author": metadata.get("music_author"), "matched_song_id": metadata.get("matched_song_id"), "matched_song_title": metadata.get("matched_song_title"), "matched_song_author": metadata.get("matched_song_author"), "mentioned_users": metadata.get("mentioned_users"), "mentioned_users_sticker": metadata.get("mentioned_users_sticker"), "original_size": os.path.getsize(temp_video), "original_MD5": get_video_hash(temp_video), "author_stats": metadata.get("author_stats"), "video_stats": metadata.get("video_stats"), "Timestamp": metadata.get("timestamp") } metadata_file = f"{video.rsplit('.', 1)[0]}.txt" with open(metadata_file, 'w', encoding='utf-8') as meta_file: meta_file.write(";FFMETADATA1\n") meta_file.writelines(f"{field}={value}\n" for field, value in metadata_fields.items() if value) subprocess.run( f'ffmpeg -y -i "{temp_video}" -i "{metadata_file}" -movflags use_metadata_tags -map_metadata 0 -map_metadata 1 -c copy "{video}"', capture_output=True, check=True, text=True, shell=True, errors='replace' ) os.remove(temp_video) os.remove(metadata_file) logger.info('EMBEDDED METADATA: %s', video) return 0 except Exception: logger.exception("Unexpected error occurred while embedding metadata: %s", video) return 1 def convert_to_h264(video, delete_original=False): if not os.path.isfile(video): return 1 try: converted_video = f"{video.rsplit('.', 1)[0]}_h264.mp4" probe_cmd = ['ffprobe', '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=codec_name', '-of', 'default=noprint_wrappers=1:nokey=1', video] result = subprocess.run(probe_cmd, capture_output=True, text=True, check=True) codec_name = result.stdout.strip() if codec_name.lower() == 'hevc': logger.info('Converting: %s to h264...', video) convert_cmd = [ 'ffmpeg', '-y', '-i', video, '-map_metadata', '0', '-movflags', 'use_metadata_tags', '-c:v', 'libx264', '-crf', '18', '-profile:v', 'high', '-pix_fmt', 'yuv420p', '-level', '4.0', '-c:a', 'copy', converted_video ] subprocess.run(convert_cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) logger.info('Converted HEVC video: %s to h264: %s', video, converted_video) if delete_original: os.remove(video) logger.info('Deleted original HEVC video: %s', video) return 0 except Exception: logger.exception("Unexpected error occurred while converting video: %s", video) return 1 def dump_metadata(filename, post): try: filename = f'{filename}.json' if not os.path.exists(filename): with open(filename, 'w', encoding='utf-8') as json_file: json.dump(post, json_file, indent=4) logger.info('SAVED METADATA: %s', filename) return 0 except Exception: logger.exception('Error saving metadata: %s', filename) return 1 def get_hq_images(video_id, is_story, share_url, cookie=None): if is_story and share_url and cookie: url = share_url headers = aweme_headers video_detail = "webapp.reflow.video.detail" else: url = f'https://www.tiktok.com/@/video/{video_id}' headers = web_headers video_detail = "webapp.video-detail" if cookie: headers['Cookie'] = cookie max_retries = 3 retry_delay = 2 for attempt in range(max_retries): try: response = requests.get(url, headers=headers, timeout=10) response.raise_for_status() json_content = get_json(response.text, "__UNIVERSAL_DATA_FOR_REHYDRATION__") data = json.loads(json_content) image_urls = [image["imageURL"]["urlList"][0] for image in data["__DEFAULT_SCOPE__"][video_detail]["itemInfo"]["itemStruct"]["imagePost"]["images"]] return image_urls except Exception as e: logger.warning('Attempt (%d/%d) to get higher quality images failed: %s', attempt + 1, max_retries, e) if attempt < max_retries - 1: time.sleep(retry_delay) logger.error('Failed to get higher quality images after %d attempts for video ID: %s', max_retries, video_id) return False def get_json(page_text, script_id): marker = f'', start) if start < len(marker) or end == -1: raise ValueError("No valid JSON content in the response data.") return page_text[start:end] def download_photos(filename, video_id, image_urls, music_title, music_url, is_aweme, is_story, is_story_note, share_url, post, cookie=None, save_metadata=False, zip_photos=False): try: music_title = sanitize_filename(music_title) if music_title else 'untitled' if is_aweme: hq_image_urls = get_hq_images(video_id, is_story, share_url, cookie) if not is_story_note else None if hq_image_urls: post['images'] = image_urls = hq_image_urls if zip_photos: json_data = json.dumps(post, indent=4) zip_buffer = io.BytesIO() with zipfile.ZipFile(zip_buffer, 'w') as zip_file: for i, url in enumerate(image_urls, 1): img_data = requests.get(url, timeout=10).content zip_file.writestr(f'{filename}_{i}.jpg', img_data) if music_url: music_data = requests.get(music_url, timeout=10).content audio_codec = get_audio_codec(music_data) extension = 'm4a' if audio_codec and 'aac' in audio_codec else 'mp3' zip_file.writestr(f'{music_title}.{extension}', music_data) if save_metadata: zip_file.writestr(f'{filename}.json', json_data.encode('utf-8')) with open(f'{filename}.zip', 'wb') as f: f.write(zip_buffer.getvalue()) logger.info('DOWNLOADED: %s.zip', filename) else: os.mkdir(filename) json_data = json.dumps(post, indent=4) for i, url in enumerate(image_urls, 1): with open(f'{filename}/{filename}_{i}.jpg', 'wb') as image: image.write(requests.get(url, timeout=10).content) if music_url: music_data = requests.get(music_url, timeout=10).content audio_codec = get_audio_codec(music_data) extension = 'm4a' if audio_codec and 'aac' in audio_codec else 'mp3' with open(f'{filename}/{music_title}.{extension}', 'wb') as audio: audio.write(music_data) if save_metadata: with open(f'{filename}/{filename}.json', 'w', encoding='utf-8') as json_file: json_file.write(json_data) logger.info('DOWNLOADED: %s', filename) return True except Exception as e: logger.exception('ERROR DOWNLOADING PHOTO POST: %s.', video_id) return False def get_source_encode(video_id): submit_url = "https://tikwm.com/api/video/task/submit" result_url = f"https://tikwm.com/api/video/task/result?task_id=" for attempt in range(5): try: response = requests.post(submit_url, data={"web": 1, "url": video_id}, headers=web_headers) if response.status_code == 200 and response.text: json_data = response.json() if json_data.get("code") == 0: task_id = json_data["data"]["task_id"] break except Exception as e: logger.warning(f"[Attempt {attempt + 1}/3] Failed to request source from TikWM. Error: {e}") time.sleep(2) else: logger.warning("Failed to get source quality video: %s", video_id) return None for attempt in range(60): try: response = requests.get(result_url + task_id, headers=web_headers) if response.status_code == 200 and response.text: json_data = response.json() if json_data.get("code") == 0: status = json_data["data"]["status"] if status == 2: return { "play_url": json_data["data"]["detail"]["play_url"], "size": json_data["data"]["detail"]["size"] } elif status == 3: return None time.sleep(1) except Exception as e: logger.warning(f"Error checking task result TikWM: {e}") time.sleep(1) logger.warning("Failed to get source quality video: %s", video_id) return None def download_video(filename, video_id, video_urls, file_hash, cookie, set_cookie=None, keep_corrupt=False): max_retries = 3 retry_delay = 2 temp_file = f"{filename}.temp" headers = web_headers if set_cookie else aweme_headers if cookie or set_cookie: headers["Cookie"] = "; ".join(filter(bool, [cookie, set_cookie])) for attempt in range(max_retries): for url in video_urls: try: response = requests.get(url, headers=headers, allow_redirects=True, timeout=10, stream=True) response.raise_for_status() content_type = response.headers.get('Content-Type', '').lower() if 'audio' in content_type: logger.info(f"Skipping audio file: {video_id}") return 1 if 'text' in content_type or b'ftyp' not in response.content[:100]: logger.warning(f"[Attempt {attempt + 1}/{max_retries}] Invalid file type detected: {url}. Trying alternative URL...") continue if not download_file(url, temp_file, headers): logger.warning(f"[Attempt {attempt + 1}/{max_retries}] Error downloading: {url}. Trying alternative URL...") continue if not file_hash: # logger.info(f"No MD5 checksum found in the post info. Verifying file integrity with FFprobe instead.") return check_integrity(temp_file, filename, keep_corrupt) if get_video_hash(temp_file) == file_hash: os.replace(temp_file, filename) logger.info("DOWNLOADED: %s", filename) return 0 if attempt < max_retries - 1: logger.warning(f"[Attempt {attempt + 1}/{max_retries}] Downloaded file does not match the MD5 checksum. Retrying...") os.remove(temp_file) else: logger.warning(f"[Attempt {attempt + 1}/{max_retries}] Failed to match MD5 checksum after {max_retries} attempts. Checking file integrity with FFprobe instead.") return check_integrity(temp_file, filename, keep_corrupt) break except Exception as e: logger.warning(f"[Attempt {attempt + 1}/{max_retries}] Error occurred during download: {e}. Retrying...") time.sleep(retry_delay) logger.error(f"Failed to download video after {max_retries} attempts: {video_id}") return 1 def download_file(url, filename, headers=None): try: response = requests.get(url, headers=headers, allow_redirects=True, stream=True, timeout=10) response.raise_for_status() with open(filename, 'wb') as f: for chunk in response.iter_content(chunk_size=2048*2048): if chunk: f.write(chunk) return True except Exception as e: logger.error('Error downloading file: %s. Error: %s', filename, e) if os.path.isfile(filename): os.remove(filename) return False def get_video_hash(video): chunk_size=10 * 1024 * 1024 file_hash = hashlib.md5() with open(video, 'rb') as file: for chunk in iter(lambda: file.read(chunk_size), b''): file_hash.update(chunk) return file_hash.hexdigest() def get_audio_codec(audio_data): try: audio_stream = io.BytesIO(audio_data) result = subprocess.run( ['ffprobe', '-v', 'error', '-select_streams', 'a:0', '-show_entries', 'stream=codec_name', '-of', 'default=noprint_wrappers=1:nokey=1', '-'], input=audio_stream.read(), stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True ) return result.stdout.decode().strip().lower() except Exception: logger.exception("Unexpected error occurred while detecting audio codec.") return None def get_user_info(user_id, cookie=None): headers = web_headers if cookie: headers['Cookie'] = cookie max_retries = 3 retry_delay = 2 for attempt in range(max_retries): try: response = requests.get(f'https://www.tiktok.com/@{user_id}', allow_redirects=False, headers=headers, timeout=10) response.raise_for_status() json_content = get_json(response.text, "__UNIVERSAL_DATA_FOR_REHYDRATION__") data = json.loads(json_content) user_info = data["__DEFAULT_SCOPE__"]["webapp.user-detail"]["userInfo"]["user"] return user_info except Exception as e: logger.warning('Attempt (%d/%d) to fetch user info failed: %s', attempt + 1, max_retries, e) if attempt < max_retries - 1: time.sleep(retry_delay) retry_delay += 2 logger.error('Failed to fetch user info after %d attempts for user: %s . User may not exist.', max_retries, user_id) return False def get_timestamp(video_id): return int(f'{int(video_id.split(" ")[0]):08b}'[:31], 2) def get_video_date(timestamp, full_format=False): if not timestamp: return None format_str = '%Y-%m-%dT%H:%M:%SZ' if full_format else '%Y-%m-%d' return datetime.fromtimestamp(timestamp, tz=timezone.utc).strftime(format_str) def save_user_info(json_data): try: with open('user_info.json', 'w', encoding='utf-8') as file: json.dump(json_data, file, indent=4) logger.info('SAVED USER INFO') except Exception: logger.exception('ERROR SAVING USER INFO.') def get_avatar(avatar_url): try: match = re.search(r'/([^/?]+?)(?:\.[^/?]+)?\?', avatar_url) if match: os.makedirs('Avatar', exist_ok=True) avatar_filename = os.path.join('Avatar', sanitize_filename(match.group(1)) + '.jpg') if not os.path.exists(avatar_filename): response = requests.get(avatar_url, timeout=10) response.raise_for_status() with open(avatar_filename, 'wb') as file: file.write(response.content) logger.info('DOWNLOADED AVATAR: %s', avatar_filename) except Exception: logger.exception('ERROR DOWNLOADING AVATAR.') def get_video_id(video_url): def extract_video_id(url): pattern = r'(6\d{18}|7\d{18})|(?<=/video/)([^?]+)' match = re.search(pattern, url) return match.group(0) if match else None video_id = extract_video_id(video_url) if not video_id: try: response = requests.head(video_url, allow_redirects=True) video_id = extract_video_id(response.url) except Exception: pass return video_id def get_post_info(video_url, request_type, cookie=None): video_id = video_url if video_url.isdigit() else get_video_id(video_url) if not video_id: logger.error('Invalid URL: Could not extract video ID') return None max_retries = 3 retry_delay = 2 for attempt in range(max_retries): try: if request_type == "aweme": url = f'https://{aweme_host}/aweme/v1/feed/?aweme_id={video_id}' else: url = f'https://www.tiktok.com/@/video/{video_id}' headers = aweme_headers if request_type == "aweme" else web_headers if cookie: headers['Cookie'] = cookie query = build_aweme_query() if request_type == "aweme" else None response = requests.get(url, params=query if query else {}, headers=headers, timeout=10, allow_redirects=False) response.raise_for_status() post_info = None set_cookie = "; ".join(f"{match.group(1)}={match.group(2)}" for match in re.finditer(r"(ttwid|tt_csrf_token|tt_chain_token)=([^;]+)", response.headers.get('Set-Cookie', ''))) if request_type == "web" else None if request_type == "aweme": response_data = response.json() if response_data['status_code'] == 0 and response_data['aweme_list'][0]['aweme_id'] == video_id: post_info = response_data['aweme_list'][0] else: json_content = get_json(response.text, "__UNIVERSAL_DATA_FOR_REHYDRATION__") data = json.loads(json_content) post_info = data["__DEFAULT_SCOPE__"]["webapp.video-detail"]["itemInfo"]["itemStruct"] if not post_info.get('video', {}).get('bitrateInfo'): request_type = "aweme" raise ValueError("Missing video data in post info. Trying with the mobile api...") if post_info: return {"post_info": post_info, "set_cookie": set_cookie} raise ValueError("Failed to fetch post info.") except Exception as e: logger.warning('Attempt (%d/%d) to get post info failed: %s', attempt + 1, max_retries, e) if attempt < max_retries - 1: time.sleep(retry_delay) retry_delay += 2 logger.error('Failed to fetch post info after %d attempts for video ID: %s.', max_retries, video_id) return None def parse_json(post): try: is_aweme = bool(post.get("aweme_id")) video_id = post.get("id") or post.get("aweme_id") if not video_id: raise ValueError("The response is invalid or does not conform to the expected format.") image_post_key = "image_post_info" if is_aweme else "imagePost" is_image_post = bool(post.get(image_post_key)) image_url_key = "display_image" if is_aweme else "imageURL" url_list_key = "url_list" if is_aweme else "urlList" if is_image_post else "UrlList" image_urls = [ image.get(image_url_key, {}).get(url_list_key, [])[-1] for image in post.get(image_post_key, {}).get("images", []) if image.get(image_url_key, {}).get(url_list_key, []) ] bitrate_key = "bit_rate" if is_aweme else "Bitrate" play_addr_key = "play_addr" if is_aweme else "PlayAddr" data_size_key = "data_size" if is_aweme else "DataSize" file_hash_key = "file_hash" if is_aweme else "FileHash" height_key = "height" if is_aweme else "Height" width_key = "width" if is_aweme else "Width" width = height = data_size = bitrate = 0 file_hash = None video_urls = [] video_data = post.get("video", {}) sd_size = video_data.get(play_addr_key, {}).get(data_size_key) or video_data.get("size", 0) bitrate_info = video_data.get("bitrateInfo") or video_data.get("bit_rate") if bitrate_info: highest_res_video = max( bitrate_info, key=lambda x: (x.get(play_addr_key, {}).get(width_key, 0), x.get(bitrate_key, 0)) ) height = highest_res_video.get(play_addr_key, {}).get(height_key, 0) width = highest_res_video.get(play_addr_key, {}).get(width_key, 0) data_size = highest_res_video.get(play_addr_key, {}).get(data_size_key, 0) file_hash = highest_res_video.get(play_addr_key, {}).get(file_hash_key) bitrate = highest_res_video.get(bitrate_key, 0) video_urls = highest_res_video.get(play_addr_key, {}).get(url_list_key, []) if (width <= 576 or height <= 576) and is_aweme: video_urls = video_data.get(play_addr_key, {}).get(url_list_key, []) file_hash = video_data.get(play_addr_key, {}).get(file_hash_key) height = video_data.get(play_addr_key, {}).get(height_key, 0) width = video_data.get(play_addr_key, {}).get(width_key, 0) data_size = sd_size text_extra = post.get("original_client_text", {}).get("text_extra") or post.get("textExtra", []) mentioned_users = [ user for user in text_extra if user.get("type") == 0 ] seen_uids = set() mentioned_users_sticker = [ { key: sticker["mention_info"].get(key, "") for key in ["user_id", "user_name", "nickname", "sec_uid"] } for sticker in post.get("interaction_stickers") or [] if sticker.get("mention_info") and sticker["mention_info"].get("user_id") not in seen_uids and not seen_uids.add(sticker["mention_info"].get("user_id")) ] return { "author_stats": post.get("authorStats") or post.get("authorStatsV2"), "data_size": data_size, "file_hash": file_hash, "height": height, "image_urls": image_urls, "is_aweme": is_aweme, "is_image_post": is_image_post, "is_sd": True, "is_story": bool(post.get("story_metadata")), "is_story_note": bool(post.get("story_note_info")), "matched_song_author": post.get("music", {}).get("matched_song", {}).get("author"), "matched_song_id": post.get("music", {}).get("matched_song", {}).get("id") if is_aweme else post.get("music", {}).get("tt2dsp", {}).get('tt_to_dsp_song_infos'), "matched_song_title": post.get("music", {}).get("matched_song", {}).get("title"), "mentioned_users": mentioned_users, "mentioned_users_sticker": mentioned_users_sticker, "music_author": post.get("music", {}).get("authorName") or post.get("music", {}).get("author"), "music_id": post.get("music", {}).get("mid") if is_aweme else post.get("music", {}).get("id"), "music_title": post.get("music", {}).get("title"), "music_url": post.get("music", {}).get("playUrl") or post.get("music", {}).get("play_url", {}).get("uri"), "nickname": post.get("author", {}).get("nickname"), "region": post.get("region") or post.get("locationCreated"), "sd_size": sd_size, "sec_uid": post.get("author", {}).get("secUid") or post.get("author", {}).get("sec_uid"), "share_url": post.get("share_info", {}).get("share_url"), "timestamp": int(post.get("createTime") or post.get("create_time", 0)), "title": post.get("desc") or post.get("title"), "user_id": post.get("author", {}).get("id") or post.get("author", {}).get("uid"), "username": post.get("author", {}).get("uniqueId") or post.get("author", {}).get("unique_id"), "video_id": video_id, "video_stats": post.get("statistics") or post.get("stats") or post.get("statsV2"), "video_urls": video_urls[::-1] if video_urls and is_aweme else video_urls, "width": width } except Exception as e: logger.exception(f"Error parsing JSON: {e}") return None def build_web_query(sec_uid, cursor): return { 'aid': '1988', 'app_language': 'en', 'app_name': 'tiktok_web', 'browser_language': 'en-US', 'browser_name': 'Mozilla', 'browser_online': 'true', 'browser_platform': 'Win32', 'browser_version': '5.0 (Windows)', 'channel': 'tiktok_web', 'cookie_enabled': 'true', 'count': '15', 'cursor': cursor, 'device_id': str(random.randint(7250000000000000000, 7351147085025500000)), 'device_platform': 'web_pc', 'focus_state': 'true', 'from_page': 'user', 'history_len': '2', 'is_fullscreen': 'false', 'is_page_visible': 'true', 'language': 'en', 'os': 'windows', 'priority_region': '', 'referer': '', 'region': 'US', 'screen_height': '1080', 'screen_width': '1920', 'secUid': sec_uid, 'type': '1', # pagination type: 0 == oldest-to-newest, 1 == newest-to-oldest 'tz_name': 'UTC', 'verifyFp': f'verify_{"".join(random.choices(string.hexdigits, k=7))}', 'webcast_language': 'en', } def build_aweme_query(cursor=None, request_type=None): query = { 'device_platform': 'android', 'os': 'android', 'ssmix': 'a', '_rticket': int(time.time() * 1000), 'cdid': str(uuid.uuid4()), 'channel': 'googleplay', 'aid': APP_INFO['aid'], 'app_name': APP_INFO['app_name'], 'version_code': ''.join(f'{int(v):02d}' for v in APP_INFO['app_version'].split('.')), 'version_name': APP_INFO['app_version'], 'manifest_version_code': APP_INFO['manifest_app_version'], 'update_version_code': APP_INFO['manifest_app_version'], 'ab_version': APP_INFO['app_version'], 'resolution': '1080*2400', 'dpi': 420, 'device_type': 'Pixel 7', 'device_brand': 'Google', 'language': 'en', 'os_api': '29', 'os_version': '13', 'ac': 'wifi', 'is_pad': '0', 'current_region': 'US', 'app_type': 'normal', 'sys_region': 'US', 'last_install_time': int(time.time()) - random.randint(86400, 1123200), 'timezone_name': 'America/New_York', 'residence': 'US', 'app_language': 'en', 'timezone_offset': '-14400', 'host_abi': 'armeabi-v7a', 'locale': 'en', 'ac2': 'wifi5g', 'uoo': '1', 'carrier_region': 'US', 'op_region': 'US', 'build_number': APP_INFO['app_version'], 'region': 'US', 'retry_type': 'no_retry', 'ts': int(time.time()), 'iid': str(random.randint(7250000000000000000, 7351147085025500000)), 'device_id': str(random.randint(7250000000000000000, 7351147085025500000)), 'openudid': ''.join(random.choices('0123456789abcdef', k=16)) } if request_type == 'aweme_stories': query['cursor'] = cursor elif request_type: query['max_cursor'] = cursor return query def get_video_ids(sec_uid, user_id, amount_time, amount_videos, request_type, cookie=None): api_base_urls = { 'web_posts': 'https://www.tiktok.com/api/creator/item_list/?count=15', 'web_likes': 'https://www.tiktok.com/api/favorite/item_list/?count=30', 'web_reposts': 'https://www.tiktok.com/api/repost/item_list/?count=16', 'aweme_posts': f"https://{aweme_host}/aweme/v1/aweme/post/?user_id={user_id}&count=35&source=0", 'aweme_likes': f"https://{aweme_host}/aweme/v1/aweme/favorite/?user_id={user_id}&count=50&invalid_item_count=0&is_hiding_invalid_item=0", 'aweme_stories': f"https://{aweme_host}/tiktok/v1/story/get_user_story?author_id={user_id}&count=5&load_before=1" } headers = aweme_headers if request_type.startswith('aweme') else web_headers if cookie: headers['Cookie'] = cookie if amount_time: amount_timestamp = int(time.mktime(datetime.strptime(amount_time, "%Y-%m-%d").timetuple())) * 1E3 else: amount_timestamp = None posts = [] cursor = int(time.time() * 1E3) if request_type == 'aweme_stories' else 0 seen_ids = set() max_retries = 5 retry_delay = 3 while True: query = build_aweme_query(cursor, request_type) if request_type.startswith('aweme') else build_web_query(sec_uid, cursor) for attempt in range(max_retries): response = requests.get(api_base_urls[request_type], params=query, headers=headers, timeout=10) if response.status_code == 200 and response.text: response_data = response.json() if ( (request_type.startswith('aweme') and response_data.get('status_code') == 0) or (request_type.startswith('web') and response_data.get('status_code') == 0 and response_data.get('statusCode') == 0) ): break else: logger.warning('Invalid response. status_msg: %s. Attempt (%d/%d)', response_data.get('status_msg', ''), attempt + 1, max_retries) else: logger.warning('Request failed: HTTP/%d or response is empty. Attempt (%d/%d)', response.status_code, attempt + 1, max_retries) if attempt < max_retries - 1: time.sleep(retry_delay) retry_delay += 2 else: logger.error('Max retries exceeded for requesting video ids.') break old_cursor = cursor if request_type == 'web_posts': videos = response_data.get('itemList') cursor = int(videos[-1]['createTime'] * 1E3) if videos else None has_more = response_data.get('hasMorePrevious') elif request_type == 'web_likes': videos = response_data.get('itemList') cursor = response_data.get('cursor') has_more = response_data.get('hasMore') elif request_type == 'web_reposts': videos = response_data.get('itemList') cursor += 16 has_more = response_data.get('hasMore') elif request_type == 'aweme_stories': videos = response_data.get("user_story", {}).get("stories") cursor = response_data.get("user_story", {}).get("min_cursor") has_more = response_data.get("user_story", {}).get("has_more_before") elif request_type.startswith('aweme'): videos = response_data.get("aweme_list") cursor = response_data.get("max_cursor") has_more = response_data.get("has_more") else: videos = response_data.get("data", {}).get("videos") cursor = response_data.get("data", {}).get("cursor") has_more = response_data.get("data", {}).get("hasMore") videos = videos or [] for video in videos: video_id = video.get("video_id") or video.get("aweme_id") or video.get("id") if video_id in seen_ids: continue seen_ids.add(video_id) posts.append(video) sys.stdout.write(f"\rFetching video IDs: {len(posts)}/???") sys.stdout.flush() if amount_videos is not None and len(posts) >= amount_videos: posts = posts[:amount_videos] break if not request_type == 'web_reposts': if cursor: cursor = int(str(cursor)[:13]) if not cursor or old_cursor == cursor: cursor = old_cursor - 7 * 86_400_000 if cursor < 1472706000000 or (amount_timestamp is not None and int(cursor) < amount_timestamp): break if not has_more: break if amount_timestamp: timestamp = 'createTime' if request_type.startswith('web') else 'create_time' posts = [i for i in posts if amount_timestamp <= int(i[timestamp] * 1E3)] sys.stdout.write("\r" + " " * 40 + "\r") sys.stdout.flush() return posts def generate_filename(video_id, username, timestamp, title, description, add_date, add_username): filename = f"{video_id}" if description and title: filename = f"{video_id} {title}" if add_date: filename = f"{get_video_date(timestamp)} {filename}" if add_username: filename = f"{username} {filename}" return sanitize_filename(filename) def sanitize_filename(filename): filename = re.sub(r'[\\/*:?"<>|]', '_', filename).strip() filename = filename.encode('utf-8')[:195].decode('utf-8', 'ignore').strip() return filename