from __future__ import annotations
import asyncio
import logging
import dataclasses
from typing import Any, Awaitable, Callable, Optional
import random
import time
import json
from playwright.async_api import (
Browser,
BrowserContext,
Page,
Playwright,
ProxySettings,
async_playwright,
TimeoutError,
Error as PlaywrightError,
)
from urllib.parse import urlencode, quote, urlparse
from proxyproviders import ProxyProvider
from proxyproviders.algorithms import Algorithm
from proxyproviders.models.proxy import ProxyFormat
from .stealth import stealth_async
from .helpers import random_choice
from .api.user import User
from .api.video import Video
from .api.sound import Sound
from .api.hashtag import Hashtag
from .api.comment import Comment
from .api.trending import Trending
from .api.search import Search
from .api.playlist import Playlist
from .exceptions import (
InvalidJSONException,
EmptyResponseException,
)
[docs]
@dataclasses.dataclass
class TikTokPlaywrightSession:
"""A TikTok session using Playwright"""
context: Any
page: Any
proxy: str = None
params: dict = None
headers: dict = None
ms_token: str = None
base_url: str = "https://www.tiktok.com"
is_valid: bool = True
[docs]
class TikTokApi:
"""The main TikTokApi class that contains all the endpoints.
Import With:
.. code-block:: python
from TikTokApi import TikTokApi
api = TikTokApi()
"""
user = User
video = Video
sound = Sound
hashtag = Hashtag
comment = Comment
trending = Trending
search = Search
playlist = Playlist
def __init__(self, logging_level: int = logging.WARN, logger_name: str = None):
"""
Create a TikTokApi object.
Args:
logging_level (int): The logging level you want to use.
logger_name (str): The name of the logger you want to use.
"""
self.sessions = []
self._session_recovery_enabled = True
self._session_creation_lock = asyncio.Lock()
self._cleanup_called = False
self._auto_cleanup_dead_sessions = True
self._proxy_provider: Optional[ProxyProvider] = None
self._proxy_algorithm: Optional[Algorithm] = None
if logger_name is None:
logger_name = __name__
self.__create_logger(logger_name, logging_level)
User.parent = self
Video.parent = self
Sound.parent = self
Hashtag.parent = self
Comment.parent = self
Trending.parent = self
Search.parent = self
Playlist.parent = self
self.browser: Browser = None
self.playwright: Playwright = None
def __create_logger(self, name: str, level: int = logging.DEBUG):
"""Create a logger for the class."""
self.logger: logging.Logger = logging.getLogger(name)
self.logger.setLevel(level)
handler = logging.StreamHandler()
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)
def __del__(self):
"""
Destructor to ensure cleanup happens even if user forgets.
Warning: This is a safety net. Users should still call close_sessions() explicitly
in async contexts. This will log a warning if cleanup wasn't called properly.
"""
if not self._cleanup_called:
if self.sessions or self.browser or self.playwright:
self.logger.warning(
"TikTokApi object is being destroyed but cleanup was not called. "
"Please use 'async with TikTokApi()' or call 'await api.close_sessions()' and "
"'await api.stop_playwright()' explicitly to avoid resource leaks. "
f"Leaked resources: {len(self.sessions)} sessions, "
f"browser={'exists' if self.browser else 'none'}, "
f"playwright={'exists' if self.playwright else 'none'}"
)
async def __set_session_params(self, session: TikTokPlaywrightSession):
"""Set the session params for a TikTokPlaywrightSession"""
user_agent = await session.page.evaluate("() => navigator.userAgent")
language = await session.page.evaluate(
"() => navigator.language || navigator.userLanguage"
)
platform = await session.page.evaluate("() => navigator.platform")
device_id = str(random.randint(10**18, 10**19 - 1)) # Random device id
history_len = str(random.randint(1, 10)) # Random history length
screen_height = str(random.randint(600, 1080)) # Random screen height
screen_width = str(random.randint(800, 1920)) # Random screen width
timezone = await session.page.evaluate(
"() => Intl.DateTimeFormat().resolvedOptions().timeZone"
)
session_params = {
"aid": "1988",
"app_language": language,
"app_name": "tiktok_web",
"browser_language": language,
"browser_name": "Mozilla",
"browser_online": "true",
"browser_platform": platform,
"browser_version": user_agent,
"channel": "tiktok_web",
"cookie_enabled": "true",
"device_id": device_id,
"device_platform": "web_pc",
"focus_state": "true",
"from_page": "user",
"history_len": history_len,
"is_fullscreen": "false",
"is_page_visible": "true",
"language": language,
"os": platform,
"priority_region": "",
"referer": "",
"region": "US", # TODO: TikTokAPI option
"screen_height": screen_height,
"screen_width": screen_width,
"tz_name": timezone,
"webcast_language": language,
}
session.params = session_params
async def _is_session_valid(self, session: TikTokPlaywrightSession) -> bool:
"""
Check if a session is still valid/alive.
Args:
session: The session to check
Returns:
bool: True if session is valid, False otherwise
"""
if not session.is_valid:
return False
try:
# Quick validation - try to get page URL
# This will fail immediately if the page/context/browser is closed
_ = session.page.url
return True
except (PlaywrightError, AttributeError) as e:
self.logger.warning(f"Session validation failed: {e}")
session.is_valid = False
return False
async def _mark_session_invalid(self, session: TikTokPlaywrightSession):
"""
Mark a session as invalid and attempt cleanup.
Args:
session: The session to mark as invalid
"""
session.is_valid = False
# Attempt graceful cleanup
try:
if session.page:
await session.page.close()
except Exception as e:
self.logger.debug(f"Error closing page during invalidation: {e}")
try:
if session.context:
await session.context.close()
except Exception as e:
self.logger.debug(f"Error closing context during invalidation: {e}")
# Immediately remove from sessions list if auto-cleanup is enabled
# This prevents memory leaks from accumulating dead sessions
if self._auto_cleanup_dead_sessions and session in self.sessions:
try:
self.sessions.remove(session)
self.logger.debug(
f"Automatically removed dead session from pool. Remaining: {len(self.sessions)}"
)
except ValueError:
pass # Session already removed
async def _get_valid_session_index(
self, **kwargs
) -> tuple[int, TikTokPlaywrightSession]:
"""
Get a valid session, with automatic recovery if needed.
Args:
session_index (int, optional): Specific session index to use
Returns:
tuple: (index, session)
Raises:
Exception: If no valid sessions available and recovery fails
"""
max_attempts = 3
for attempt in range(max_attempts):
# First, try to get a valid session
if kwargs.get("session_index") is not None:
i = kwargs["session_index"]
if i < len(self.sessions):
session = self.sessions[i]
if await self._is_session_valid(session):
return i, session
else:
self.logger.warning(f"Requested session {i} is invalid")
else:
# Try to find any valid session
valid_sessions = []
for idx, session in enumerate(self.sessions):
if await self._is_session_valid(session):
valid_sessions.append((idx, session))
if valid_sessions:
return random.choice(valid_sessions)
# No valid sessions found - attempt recovery if enabled
if self._session_recovery_enabled and attempt < max_attempts - 1:
self.logger.warning(
f"No valid sessions found, attempting recovery (attempt {attempt + 1}/{max_attempts})"
)
await self._recover_sessions()
else:
break
raise Exception(
"No valid sessions available. All sessions appear to be dead. "
"Please call create_sessions() again or restart the API."
)
async def _recover_sessions(self):
"""
Attempt to recover from session failures by cleaning up dead sessions
and potentially creating new ones if we have the necessary configuration.
"""
async with self._session_creation_lock:
self.logger.info("Starting session recovery...")
# Remove invalid sessions
initial_count = len(self.sessions)
self.sessions = [
s for s in self.sessions if await self._is_session_valid(s)
]
removed_count = initial_count - len(self.sessions)
if removed_count > 0:
self.logger.info(f"Removed {removed_count} dead session(s)")
# Note: We don't automatically create new sessions here because we'd need
# all the original parameters (proxies, ms_tokens, etc.)
# Users should call create_sessions() again if they need more sessions
async def __create_session(
self,
url: str = "https://www.tiktok.com",
ms_token: str | None = None,
proxy: dict[str, Any] | ProxySettings | None = None,
context_options: dict[str, Any] = {},
sleep_after: int = 1,
cookies: dict[str, Any] | None = None,
suppress_resource_load_types: list[str] = None,
timeout: int = 30000,
page_factory: Callable[[BrowserContext], Awaitable[Page]] | None = None,
browser_context_factory: (
Callable[[Playwright], Awaitable[BrowserContext]] | None
) = None,
):
try:
"""Create a TikTokPlaywrightSession"""
if ms_token is not None:
if cookies is None:
cookies = {}
cookies["msToken"] = ms_token
if self._proxy_provider is not None:
proxy_obj = self._proxy_provider.get_proxy(self._proxy_algorithm)
proxy = proxy_obj.format(ProxyFormat.PLAYWRIGHT)
if browser_context_factory is not None:
context = self.browser
else:
context = await self.browser.new_context(proxy=proxy, **context_options)
if cookies is not None:
formatted_cookies = [
{"name": k, "value": v, "domain": urlparse(url).netloc, "path": "/"}
for k, v in cookies.items()
if v is not None
]
await context.add_cookies(formatted_cookies)
if page_factory:
page = await page_factory(context)
else:
page = await context.new_page()
await stealth_async(page)
_ = await page.goto(url)
if "tiktok" not in page.url:
_ = await page.goto("https://www.tiktok.com")
# Get the request headers to the url
request_headers = None
def handle_request(request):
nonlocal request_headers
request_headers = request.headers
page.once("request", handle_request)
if suppress_resource_load_types is not None:
await page.route(
"**/*",
lambda route, request: (
route.abort()
if request.resource_type in suppress_resource_load_types
else route.continue_()
),
)
# Set the navigation timeout
page.set_default_navigation_timeout(timeout)
# by doing this, we are simulate scroll event using mouse to `avoid` bot detection
x, y = random.randint(0, 50), random.randint(0, 50)
a, b = random.randint(1, 50), random.randint(100, 200)
await page.mouse.move(x, y)
await page.wait_for_load_state("networkidle")
await page.mouse.move(a, b)
session = TikTokPlaywrightSession(
context,
page,
ms_token=ms_token,
proxy=proxy,
headers=request_headers,
base_url=url,
is_valid=True,
)
if ms_token is None:
await asyncio.sleep(
sleep_after
) # TODO: Find a better way to wait for msToken
cookies = await self.get_session_cookies(session)
ms_token = cookies.get("msToken")
session.ms_token = ms_token
if ms_token is None:
self.logger.info(
f"Failed to get msToken on session index {len(self.sessions)}, you should consider specifying ms_tokens"
)
self.sessions.append(session)
await self.__set_session_params(session)
except Exception as e:
# clean up
self.logger.error(f"Failed to create session: {e}")
# Cleanup resources if they were partially created
if "page" in locals():
try:
await page.close()
except Exception:
pass
if "context" in locals():
try:
await context.close()
except Exception:
pass
raise # Re-raise the exception after cleanup
[docs]
async def create_sessions(
self,
num_sessions: int = 5,
headless: bool = True,
ms_tokens: list[str] | None = None,
proxies: list[dict[str, Any] | ProxySettings] | None = None,
proxy_provider: Optional[ProxyProvider] = None,
proxy_algorithm: Optional[Algorithm] = None,
sleep_after: int = 1,
starting_url: str = "https://www.tiktok.com",
context_options: dict[str, Any] = {},
override_browser_args: list[str] | None = None,
cookies: list[dict[str, Any]] | None = None,
suppress_resource_load_types: list[str] | None = None,
browser: str = "chromium",
executable_path: str | None = None,
page_factory: Callable[[BrowserContext], Awaitable[Page]] | None = None,
browser_context_factory: (
Callable[[Playwright], Awaitable[BrowserContext]] | None
) = None,
timeout: int = 30000,
enable_session_recovery: bool = True,
allow_partial_sessions: bool = False,
min_sessions: int | None = None,
):
"""
Create sessions for use within the TikTokApi class.
These sessions are what will carry out requesting your data from TikTok.
Args:
num_sessions (int): The amount of sessions you want to create.
headless (bool): Whether or not you want the browser to be headless.
ms_tokens (list[str]): A list of msTokens to use for the sessions, you can get these from your cookies after visiting TikTok.
If you don't provide any, the sessions will try to get them themselves, but this is not guaranteed to work.
proxies (list): **DEPRECATED - Use proxy_provider instead.** A list of proxies to use for the sessions.
This parameter is maintained for backwards compatibility but will be removed in a future version.
proxy_provider (ProxyProvider | None): A ProxyProvider instance for smart proxy rotation.
See examples/proxy_provider_example.py for usage examples. Full documentation: https://davidteather.github.io/proxyproviders/
proxy_algorithm (Algorithm | None): Algorithm for proxy selection (RoundRobin, Random, First, or custom) per session.
Only used with proxy_provider. Defaults to RoundRobin if not specified.
sleep_after (int): The amount of time to sleep after creating a session, this is to allow the msToken to be generated.
starting_url (str): The url to start the sessions on, this is usually https://www.tiktok.com.
context_options (dict): Options to pass to the playwright context.
override_browser_args (list[dict]): A list of dictionaries containing arguments to pass to the browser.
cookies (list[dict]): A list of cookies to use for the sessions, you can get these from your cookies after visiting TikTok.
suppress_resource_load_types (list[str]): Types of resources to suppress playwright from loading, excluding more types will make playwright faster.. Types: document, stylesheet, image, media, font, script, textrack, xhr, fetch, eventsource, websocket, manifest, other.
browser (str): firefox, chromium, or webkit; default is chromium
executable_path (str): Path to the browser executable
page_factory (Callable[[BrowserContext], Awaitable[Page]]) | None: Optional async function for instantiating pages.
browser_context_factory (Callable[[Playwright], Awaitable[BrowserContext]]) | None: Optional async function for creating browser contexts. When provided, you can choose any browser (chromium/firefox/webkit) inside the factory, and the 'browser' parameter is ignored.
timeout (int): The timeout in milliseconds for page navigation
enable_session_recovery (bool): Enable automatic session recovery on failures (default: True)
allow_partial_sessions (bool): If True, succeed even if some sessions fail to create. If False (default), fail if any session fails
min_sessions (int | None): Minimum number of sessions required. Only used if allow_partial_sessions=True. If None, defaults to 1.
Example Usage:
.. code-block:: python
from TikTokApi import TikTokApi
async with TikTokApi() as api:
await api.create_sessions(num_sessions=5, ms_tokens=['msToken1', 'msToken2'])
Proxy Provider Usage:
For proxy provider examples with different algorithms and configurations,
see examples/proxy_provider_example.py
Custom Launchers:
To implement custom functionality, such as login or captcha solving, when the session is being created,
you may use the keyword arguments `browser_context_factory` and `page_factory`.
These arguments are callable functions that TikTok-Api will use to launch your browser and pages,
and allow you to perform custom actions on the page before the session is created.
You can find examples in the test file: tests/test_custom_launchers.py
"""
self._session_recovery_enabled = enable_session_recovery
self._proxy_provider = proxy_provider
self._proxy_algorithm = proxy_algorithm
if proxies is not None and proxy_provider is not None:
raise ValueError(
"Cannot use both 'proxies' and 'proxy_provider' parameters. "
"Please use 'proxy_provider' (recommended) or 'proxies' (deprecated)."
)
self.playwright = await async_playwright().start()
if browser_context_factory is not None:
self.browser = await browser_context_factory(self.playwright)
elif browser == "chromium":
if headless and override_browser_args is None:
override_browser_args = ["--headless=new"]
headless = False # managed by the arg
self.browser = await self.playwright.chromium.launch(
headless=headless,
args=override_browser_args,
proxy=random_choice(proxies),
executable_path=executable_path,
)
elif browser == "firefox":
self.browser = await self.playwright.firefox.launch(
headless=headless,
args=override_browser_args,
proxy=random_choice(proxies),
executable_path=executable_path,
)
elif browser == "webkit":
self.browser = await self.playwright.webkit.launch(
headless=headless,
args=override_browser_args,
proxy=random_choice(proxies),
executable_path=executable_path,
)
else:
raise ValueError("Invalid browser argument passed")
# Create sessions concurrently
# Use return_exceptions only if partial sessions are allowed
if allow_partial_sessions:
results = await asyncio.gather(
*(
self.__create_session(
proxy=(
random_choice(proxies) if proxy_provider is None else None
),
ms_token=random_choice(ms_tokens),
url=starting_url,
context_options=context_options,
sleep_after=sleep_after,
cookies=random_choice(cookies),
suppress_resource_load_types=suppress_resource_load_types,
timeout=timeout,
page_factory=page_factory,
browser_context_factory=browser_context_factory,
)
for _ in range(num_sessions)
),
return_exceptions=True,
)
# Count failures and provide feedback
failed_count = sum(1 for r in results if isinstance(r, Exception))
success_count = len(self.sessions)
minimum_required = min_sessions if min_sessions is not None else 1
if success_count < minimum_required:
# Didn't meet minimum requirements
error_messages = [str(r) for r in results if isinstance(r, Exception)]
raise Exception(
f"Failed to create minimum required sessions. "
f"Created {success_count}/{num_sessions}, needed at least {minimum_required}.\n"
f"Errors: {error_messages[:3]}" # Show first 3 errors
)
elif failed_count > 0:
# Some sessions failed but we have enough - log warning and continue
self.logger.warning(
f"Created {success_count}/{num_sessions} sessions successfully. "
f"{failed_count} session(s) failed to create."
)
# Log individual errors at debug level
for i, result in enumerate(results):
if isinstance(result, Exception):
self.logger.debug(f"Session {i} creation failed: {result}")
else:
await asyncio.gather(
*(
self.__create_session(
proxy=(
random_choice(proxies) if proxy_provider is None else None
),
ms_token=random_choice(ms_tokens),
url=starting_url,
context_options=context_options,
sleep_after=sleep_after,
cookies=random_choice(cookies),
suppress_resource_load_types=suppress_resource_load_types,
timeout=timeout,
page_factory=page_factory,
browser_context_factory=browser_context_factory,
)
for _ in range(num_sessions)
)
)
[docs]
async def close_sessions(self):
"""
Close all the sessions. Should be called when you're done with the TikTokApi object
This is called automatically when using the TikTokApi with "with"
"""
self.logger.debug(f"Closing {len(self.sessions)} sessions...")
for session in self.sessions:
try:
if session.page:
await session.page.close()
except Exception as e:
self.logger.debug(f"Error closing page: {e}")
try:
if session.context:
await session.context.close()
except Exception as e:
self.logger.debug(f"Error closing context: {e}")
self.sessions.clear()
try:
if self.browser:
await self.browser.close()
self.browser = None
except Exception as e:
self.logger.debug(f"Error closing browser: {e}")
try:
if self.playwright:
await self.playwright.stop()
self.playwright = None
except Exception as e:
self.logger.debug(f"Error stopping playwright: {e}")
self._cleanup_called = True
self.logger.debug("All sessions and browser resources closed successfully")
[docs]
def generate_js_fetch(self, method: str, url: str, headers: dict) -> str:
"""Generate a javascript fetch function for use in playwright"""
headers_js = json.dumps(headers)
return f"""
() => {{
return new Promise((resolve, reject) => {{
fetch('{url}', {{ method: '{method}', headers: {headers_js} }})
.then(response => response.text())
.then(data => resolve(data))
.catch(error => reject(error.message));
}});
}}
"""
def _get_session(self, **kwargs):
"""Get a random session
DEPRECATED: Use _get_valid_session_index() for better error handling
Args:
session_index (int): The index of the session you want to use, if not provided a random session will be used.
Returns:
int: The index of the session.
TikTokPlaywrightSession: The session.
"""
if len(self.sessions) == 0:
raise Exception("No sessions created, please create sessions first")
if kwargs.get("session_index") is not None:
i = kwargs["session_index"]
else:
i = random.randint(0, len(self.sessions) - 1)
return i, self.sessions[i]
[docs]
async def set_session_cookies(self, session, cookies):
"""
Set the cookies for a session
Args:
session (TikTokPlaywrightSession): The session to set the cookies for.
cookies (dict): The cookies to set for the session.
"""
await session.context.add_cookies(cookies)
[docs]
async def get_session_cookies(self, session):
"""
Get the cookies for a session
Args:
session (TikTokPlaywrightSession): The session to get the cookies for.
Returns:
dict: The cookies for the session.
"""
cookies = await session.context.cookies()
return {cookie["name"]: cookie["value"] for cookie in cookies}
[docs]
async def run_fetch_script(self, url: str, headers: dict, **kwargs):
"""
Execute a javascript fetch function in a session
Args:
url (str): The url to fetch.
headers (dict): The headers to use for the fetch.
Returns:
any: The result of the fetch. Seems to be a string or dict
"""
js_script = self.generate_js_fetch("GET", url, headers)
try:
_, session = await self._get_valid_session_index(**kwargs)
except Exception:
# Fallback to old method for backwards compatibility
_, session = self._get_session(**kwargs)
try:
result = await session.page.evaluate(js_script)
return result
except PlaywrightError as e:
# Session died during operation
self.logger.error(f"Session failed during fetch: {e}")
await self._mark_session_invalid(session)
raise
[docs]
async def generate_x_bogus(self, url: str, **kwargs):
"""Generate the X-Bogus header for a url"""
try:
_, session = await self._get_valid_session_index(**kwargs)
except Exception:
# Fallback to old method for backwards compatibility
_, session = self._get_session(**kwargs)
max_attempts = 5
attempts = 0
while attempts < max_attempts:
attempts += 1
try:
timeout_time = random.randint(5000, 20000)
await session.page.wait_for_function(
"window.byted_acrawler !== undefined", timeout=timeout_time
)
break
except TimeoutError as e:
if attempts == max_attempts:
raise TimeoutError(
f"Failed to load tiktok after {max_attempts} attempts, consider using a proxy"
)
try_urls = [
"https://www.tiktok.com/foryou",
"https://www.tiktok.com",
"https://www.tiktok.com/@tiktok",
"https://www.tiktok.com/foryou",
]
await session.page.goto(random.choice(try_urls))
except PlaywrightError as e:
# Session died
self.logger.error(f"Session died during x-bogus generation: {e}")
await self._mark_session_invalid(session)
raise
try:
result = await session.page.evaluate(
f'() => {{ return window.byted_acrawler.frontierSign("{url}") }}'
)
return result
except PlaywrightError as e:
# Session died during operation
self.logger.error(f"Session died during x-bogus evaluation: {e}")
await self._mark_session_invalid(session)
raise
[docs]
async def sign_url(self, url: str, **kwargs):
"""Sign a url"""
try:
i, session = await self._get_valid_session_index(**kwargs)
except Exception:
# Fallback to old method for backwards compatibility
i, session = self._get_session(**kwargs)
# TODO: Would be nice to generate msToken here
# Add X-Bogus to url
x_bogus = (await self.generate_x_bogus(url, session_index=i)).get("X-Bogus")
if x_bogus is None:
raise Exception("Failed to generate X-Bogus")
if "?" in url:
url += "&"
else:
url += "?"
url += f"X-Bogus={x_bogus}"
return url
[docs]
async def make_request(
self,
url: str,
headers: dict = None,
params: dict = None,
retries: int = 3,
exponential_backoff: bool = True,
**kwargs,
):
"""
Makes a request to TikTok through a session.
Args:
url (str): The url to make the request to.
headers (dict): The headers to use for the request.
params (dict): The params to use for the request.
retries (int): The amount of times to retry the request if it fails.
exponential_backoff (bool): Whether or not to use exponential backoff when retrying the request.
session_index (int): The index of the session you want to use, if not provided a random session will be used.
Returns:
dict: The json response from TikTok.
Raises:
Exception: If the request fails.
"""
try:
i, session = await self._get_valid_session_index(**kwargs)
except Exception:
# Fallback to old method for backwards compatibility
i, session = self._get_session(**kwargs)
if session.params is not None:
params = {**session.params, **params}
if headers is not None:
headers = {**session.headers, **headers}
else:
headers = session.headers
# get msToken
if params.get("msToken") is None:
# try to get msToken from session
if session.ms_token is not None:
params["msToken"] = session.ms_token
else:
# we'll try to read it from cookies
cookies = await self.get_session_cookies(session)
ms_token = cookies.get("msToken")
if ms_token is None:
self.logger.warn(
"Failed to get msToken from cookies, trying to make the request anyway (probably will fail)"
)
params["msToken"] = ms_token
encoded_params = f"{url}?{urlencode(params, safe='=', quote_via=quote)}"
signed_url = await self.sign_url(encoded_params, session_index=i)
retry_count = 0
while retry_count < retries:
retry_count += 1
try:
result = await self.run_fetch_script(
signed_url, headers=headers, session_index=i
)
if result is None:
raise Exception("TikTokApi.run_fetch_script returned None")
if result == "":
raise EmptyResponseException(
result,
"TikTok returned an empty response. They are detecting you're a bot, try some of these: headless=False, browser='webkit', consider using a proxy",
)
try:
data = json.loads(result)
if data.get("status_code") != 0:
self.logger.error(f"Got an unexpected status code: {data}")
return data
except json.decoder.JSONDecodeError:
if retry_count == retries:
self.logger.error(f"Failed to decode json response: {result}")
raise InvalidJSONException()
self.logger.info(
f"Failed a request, retrying ({retry_count}/{retries})"
)
if exponential_backoff:
await asyncio.sleep(2**retry_count)
else:
await asyncio.sleep(1)
except PlaywrightError as e:
# Session died during request
self.logger.error(f"Playwright error during request: {e}")
await self._mark_session_invalid(session)
if retry_count < retries:
self.logger.info(
f"Retrying with a new session ({retry_count}/{retries})"
)
# Get a new valid session for the retry
try:
i, session = await self._get_valid_session_index(**kwargs)
except Exception as session_error:
self.logger.error(
f"Failed to get valid session: {session_error}"
)
raise
else:
raise
[docs]
async def stop_playwright(self):
"""
Stop the playwright browser.
Note: It's better to use close_sessions() which calls this automatically.
"""
try:
if self.browser:
await self.browser.close()
self.browser = None
except Exception as e:
self.logger.debug(f"Error closing browser: {e}")
try:
if self.playwright:
await self.playwright.stop()
self.playwright = None
except Exception as e:
self.logger.debug(f"Error stopping playwright: {e}")
[docs]
async def get_session_content(self, url: str, **kwargs):
"""Get the content of a url"""
try:
_, session = await self._get_valid_session_index(**kwargs)
except Exception:
# Fallback to old method
_, session = self._get_session(**kwargs)
try:
return await session.page.content()
except PlaywrightError as e:
self.logger.error(f"Session died during get_session_content: {e}")
await self._mark_session_invalid(session)
raise
[docs]
def get_resource_stats(self) -> dict:
"""
Get statistics about current resource usage.
Useful for monitoring and detecting potential memory leaks.
Returns:
dict: Statistics including session count, browser status, etc.
"""
valid_sessions = sum(1 for s in self.sessions if s.is_valid)
invalid_sessions = len(self.sessions) - valid_sessions
return {
"total_sessions": len(self.sessions),
"valid_sessions": valid_sessions,
"invalid_sessions": invalid_sessions,
"has_browser": self.browser is not None,
"has_playwright": self.playwright is not None,
"cleanup_called": self._cleanup_called,
"auto_cleanup_enabled": self._auto_cleanup_dead_sessions,
"recovery_enabled": self._session_recovery_enabled,
}
[docs]
async def health_check(self) -> dict:
"""
Perform a health check on all resources.
This actively validates all sessions and returns detailed health info.
Useful for monitoring and debugging.
Returns:
dict: Health check results
"""
health = self.get_resource_stats()
# Actively validate all sessions
session_health = []
for i, session in enumerate(self.sessions):
is_valid = await self._is_session_valid(session)
session_health.append(
{
"index": i,
"valid": is_valid,
"marked_valid": session.is_valid,
}
)
health["session_details"] = session_health
health["healthy_sessions"] = sum(1 for s in session_health if s["valid"])
# Check for potential leaks
if health["invalid_sessions"] > 0 and not self._auto_cleanup_dead_sessions:
health["warning"] = (
f"{health['invalid_sessions']} invalid sessions accumulating (auto-cleanup disabled)"
)
return health
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb):
"""Ensure cleanup happens when exiting context manager"""
await self.close_sessions()
# stop_playwright is already called by close_sessions, but call it again for safety
if not self._cleanup_called:
await self.stop_playwright()