import json import time import random import re import logging import requests API_KEY = 'AIzaSyA-dlBUjVQeuc4a6ZN4RkNUYDFddrVLxrA' BASE_URL = 'https://www.googleapis.com/youtube/v3/' ## Creates error logger with handler def make_error_logger(filename): handler = logging.FileHandler(filename) handler.setLevel(logging.ERROR) formatter = logging.Formatter("%(asctime)s|%(levelname)s|%(message)s", "%Y-%m-%d %H:%M:%S") handler.setFormatter(formatter) logger = logging.getLogger(__name__) logger.addHandler(handler) return logger ## Input: list ## Output: unique list (ordered) def unique_list(_list): (unique_list,seen) = ([],{}) for item in _list: if item in seen: continue else: seen[item] = 1 unique_list.append(item) return unique_list def __sleep(x,y=None): if y is None: y = x sleep_time = round(random.uniform(x,y),3) print(f'Sleeping for {sleep_time} seconds...') time.sleep(sleep_time) return def __get_response(url,session=None,exit_codes=[200],sleep=[1,3]): if session is None: session = requests.Session() if not isinstance(sleep,list): sleep = [sleep] while True: if session.params: full_url = url + '?' + '&'.join( '{}={}'.format(k,v) for k,v in session.params.items()) print(f'Retrieving {full_url}') try: response = session.get(url,timeout=10) except Exception as e: print(f'Exception: {e}') __sleep(*sleep) continue ## Check status code status_code = response.status_code print(f'Status: {status_code}') if status_code in exit_codes: break else: print('Retrying...') api_params = session.params session = requests.Session() session.params = api_params __sleep(*sleep) return {'response':response,'session':session} ## Pages through API request and returns list of all items ## Params must be in the Session object def get_api_items(endpoint,session): ## Get response js_api = [] session.params['key'] = API_KEY err_msg = None while True: exit_codes = [200,400,401,403,404,409] ret = __get_response( BASE_URL+endpoint,session=session,exit_codes=exit_codes, sleep=[1,3]) (response,session) = (ret['response'],ret['session']) js = json.loads(response.text) ## Skip if 4** status code if response.status_code // 100 == 4: errs = ','.join([x['reason'] for x in js['error']['errors']]) err_msg = '{} {}'.format(js['error']['code'],errs) print(f'Error: {err_msg}') print('Skipping...') break ## Concatenate API js_api.extend(js['items']) ## Continue if nextPageToken next_page_token = js.get('nextPageToken') if next_page_token is None: break else: session.params['pageToken'] = next_page_token return {'json': js_api, 'session': session, 'error': err_msg} ## Returns Ids of _type (videoId,playlistId,channelId) def get_ids(f_input,_type): ids = [] match_dict = { 'videoId': '^([a-zA-Z0-9_-]{11})$', 'playlistId': '^([a-zA-Z0-9_-]{34}|[a-zA-Z0-9_-]{18})$', 'channelId': '^(UC[a-zA-Z0-9_-]{22})$'} with open(f_input, 'r', encoding='utf-8', errors='ignore') as f: matches = [re.match(match_dict[_type],x.rstrip()) for x in f] ids = unique_list([m.group(1) for m in matches if m]) return ids ## Write Ids of _type to file def write_ids(ids, f_out, _type): urls = { 'videoId': 'https://www.youtube.com/watch?v={}', 'playlistId': 'https://www.youtube.com/playlist?list={}'} url = urls[_type] with open(f_out, 'w', encoding='utf-8') as f: [f.write(url.format(x)+'\n') for x in ids] return def write_api(f_api, js): with open(f_api, 'w', encoding='utf-8') as f: json.dump(js, f, indent=4) return