initial commit
This commit is contained in:
parent
29f2a9dcec
commit
404370359b
3
.gitignore
vendored
Normal file
3
.gitignore
vendored
Normal file
@ -0,0 +1,3 @@
|
||||
.idea/
|
||||
.venv/
|
||||
**/__pycache__/
|
0
cfg/__init__.py
Normal file
0
cfg/__init__.py
Normal file
78
cfg/toml_config.py
Normal file
78
cfg/toml_config.py
Normal file
@ -0,0 +1,78 @@
|
||||
import os
|
||||
from pathlib import Path
|
||||
import tomllib
|
||||
|
||||
|
||||
class TomlConfig:
|
||||
def __init__(self, config_dir: str, config_file: str):
|
||||
self.config_path = Path(config_dir).expanduser() / config_file
|
||||
if not self.config_path.exists():
|
||||
raise FileNotFoundError(f"Config file not found: {self.config_path}")
|
||||
self.config = self._load_config()
|
||||
|
||||
def _load_config(self):
|
||||
try:
|
||||
with open(self.config_path, "rb") as f:
|
||||
return tomllib.load(f)
|
||||
except tomllib.TOMLDecodeError as e:
|
||||
raise ValueError(
|
||||
f"Failed to decode TOML file at {self.config_path}: {e}"
|
||||
)
|
||||
|
||||
def get_value(self, key, section=None):
|
||||
if section is None:
|
||||
return self.config[key]
|
||||
return self.config[section][key]
|
||||
|
||||
def set_value(self, key, value, section=None):
|
||||
if section is None:
|
||||
self.config[key] = value
|
||||
else:
|
||||
self.config[section][key] = value
|
||||
self._save_config()
|
||||
|
||||
def get_section(self, section):
|
||||
return self.config[section]
|
||||
|
||||
def get_sections(self):
|
||||
return list(self.config.keys())
|
||||
|
||||
def get_keys_in_section(self, section):
|
||||
if section not in self.config:
|
||||
return []
|
||||
else:
|
||||
keys = list(self.config[section].keys())
|
||||
return keys
|
||||
|
||||
def get_section_for_key(self, key):
|
||||
for section, keys in self.config.items():
|
||||
if key in keys:
|
||||
return section
|
||||
return None
|
||||
|
||||
def global_search(self, key_to_find):
|
||||
"""
|
||||
Recursively searches for a specific key in the TOML configuration,
|
||||
regardless of its section.
|
||||
:param key_to_find: The key to search for.
|
||||
:return: the value of the key if found, or None if the key does not exist.
|
||||
"""
|
||||
|
||||
def recursive_search(d):
|
||||
if isinstance(d, dict):
|
||||
for key, value in d.items():
|
||||
if key == key_to_find:
|
||||
return value
|
||||
if isinstance(value, dict):
|
||||
result = recursive_search(value)
|
||||
if result is not None:
|
||||
return result
|
||||
return None
|
||||
|
||||
return recursive_search(self.config)
|
||||
|
||||
def get_all(self):
|
||||
return self.config
|
||||
|
||||
def dump(self):
|
||||
print(self.config)
|
0
content/__init__.py
Normal file
0
content/__init__.py
Normal file
52
content/parser.py
Normal file
52
content/parser.py
Normal file
@ -0,0 +1,52 @@
|
||||
import tomllib # For parsing the TOML-like section (Python 3.11+)
|
||||
import re
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
class PostParser:
|
||||
def __init__(self, file_path):
|
||||
"""
|
||||
Initialize the parser with the path to the input file.
|
||||
|
||||
:param file_path: Path to the file that contains the input data.
|
||||
"""
|
||||
self.file_path = file_path
|
||||
self.fixed_path = Path(self.file_path).expanduser()
|
||||
|
||||
def parse(self):
|
||||
"""
|
||||
Parses the file into two parts: a dictionary of TOML values and Markdown content.
|
||||
|
||||
:return: A tuple containing (TOML dictionary, Markdown string).
|
||||
"""
|
||||
try:
|
||||
with open(self.fixed_path, "r") as f:
|
||||
content = f.read()
|
||||
|
||||
# Extract the triple-quoted TOML section and Markdown using regex
|
||||
match = re.match(r'"""(.*?)"""\n(.*)', content, re.DOTALL)
|
||||
if match is None:
|
||||
raise ValueError("Input file does not follow the expected format.")
|
||||
|
||||
toml_content, markdown_content = match.groups()
|
||||
|
||||
# Validate and parse the TOML section
|
||||
self._validate_toml(toml_content.strip())
|
||||
toml_dict = tomllib.loads(toml_content.strip())
|
||||
|
||||
return toml_dict, markdown_content.strip()
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Failed to parse the file '{self.file_path}'. Error: {e}")
|
||||
|
||||
def _validate_toml(self, toml_content):
|
||||
"""
|
||||
Validates the TOML section for known issues and provides descriptive error messages.
|
||||
|
||||
:param toml_content: The TOML content as a string.
|
||||
:raises ValueError: If the TOML content contains known errors.
|
||||
"""
|
||||
# Check for uppercase booleans
|
||||
if re.search(r"=\s*(True|False)", toml_content):
|
||||
raise ValueError("TOML booleans must be lowercase (true/false).")
|
||||
|
||||
# Add additional TOML validation as needed
|
6
example.toml
Normal file
6
example.toml
Normal file
@ -0,0 +1,6 @@
|
||||
["credentials"]
|
||||
userid = "persnickety"
|
||||
password = "$up3rD00p3rS3crit"
|
||||
|
||||
["content"]
|
||||
content_dir = "~/Documents/lunduke"
|
43
main.py
Normal file
43
main.py
Normal file
@ -0,0 +1,43 @@
|
||||
import asyncio
|
||||
|
||||
from cfg.toml_config import TomlConfig
|
||||
from content.parser import PostParser
|
||||
from po.lunduke_forum import LundukeForum
|
||||
from pw.utils import prepare_browser, cleanup_browser
|
||||
|
||||
|
||||
async def main():
|
||||
site_cfg = TomlConfig("~/.discourse", "lunduke.toml")
|
||||
|
||||
apw, browser, page = await prepare_browser("chromium", headless=True)
|
||||
forum = LundukeForum(page)
|
||||
await forum.login(
|
||||
site_cfg.get_value("userid", "credentials"),
|
||||
site_cfg.get_value("password","credentials")
|
||||
)
|
||||
|
||||
content_dir = site_cfg.get_value("content_dir", "content")
|
||||
parser = PostParser(content_dir+"/"+"post_three.md")
|
||||
toml_data = {}
|
||||
markdown = ""
|
||||
try:
|
||||
toml_data, markdown = parser.parse()
|
||||
except RuntimeError as e:
|
||||
print(e)
|
||||
|
||||
await forum.create_new_topic(
|
||||
toml_data["Category"],
|
||||
toml_data["Title"],
|
||||
markdown,
|
||||
draft=bool(toml_data["Draft"])
|
||||
)
|
||||
|
||||
# await forum.publish_draft_topic("Create a draft and publish it")
|
||||
|
||||
# Close the pw
|
||||
await cleanup_browser(apw, browser)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
|
0
po/__init__.py
Normal file
0
po/__init__.py
Normal file
119
po/lunduke_forum.py
Normal file
119
po/lunduke_forum.py
Normal file
@ -0,0 +1,119 @@
|
||||
from playwright.async_api import Page, Locator
|
||||
|
||||
|
||||
class LundukeForum:
|
||||
def __init__(self, page: Page, root_url: str = "https://forum.lunduke.com"):
|
||||
# URL Configuration
|
||||
self.page = page
|
||||
self.root_url = root_url
|
||||
self.latest_url = f"{self.root_url}/latest"
|
||||
self.hot_url = f"{self.root_url}/hot"
|
||||
self.categories_url = f"{self.root_url}/categories"
|
||||
|
||||
# Locators (encapsulated here for maintainability)
|
||||
self.login_modal = self.page.locator('.login-left-side')
|
||||
self.username_field = self.page.locator('#login-account-name')
|
||||
self.password_field = self.page.locator('#login-account-password')
|
||||
self.login_button = self.page.locator('#login-button')
|
||||
self.create_topic_button = self.page.locator('#create-topic')
|
||||
self.topic_drafts_button = self.page.locator('[data-identifier="topic-drafts-menu"]')
|
||||
self.reply_control = self.page.locator('#reply-control')
|
||||
self.category_dropdown = self.page.locator('.category-input')
|
||||
self.submit_button = self.page.locator('.save-or-cancel')
|
||||
|
||||
# Asynchronous Navigation Methods
|
||||
async def goto(self, endpoint: str):
|
||||
"""Navigate to a specific endpoint on the site."""
|
||||
target_url = f"{self.root_url}/{endpoint}"
|
||||
await self.page.goto(target_url)
|
||||
|
||||
async def goto_latest(self):
|
||||
"""Navigate to the 'Latest' discussions page."""
|
||||
await self.goto("/latest")
|
||||
|
||||
async def goto_categories(self):
|
||||
"""Navigate to the 'Categories' page."""
|
||||
await self.goto("/categories")
|
||||
|
||||
# Login Action
|
||||
async def login(self, username: str, password: str):
|
||||
"""Perform a login on the forum."""
|
||||
# Go to the login page
|
||||
await self.page.goto(f"{self.root_url}/login")
|
||||
|
||||
# Wait for login modal to appear
|
||||
await self.login_modal.wait_for(state="visible", timeout=30000)
|
||||
|
||||
# Type username and password
|
||||
await self.username_field.type(username)
|
||||
await self.password_field.type(password)
|
||||
|
||||
# Click the login button
|
||||
await self.login_button.click()
|
||||
await element_not_visible(self.login_modal)
|
||||
|
||||
# Confirm login
|
||||
await self.wait_for_dom_load()
|
||||
|
||||
# Create a New Topic
|
||||
async def create_new_topic(self, category: str, title: str, body: str, draft: bool = True):
|
||||
"""Create a new topic in the forum.
|
||||
:param category: the category to place the post under
|
||||
:param title: the title of the topic.
|
||||
:param body: the body of the topic.
|
||||
:param draft: whether to store the post as a draft. (Default: True)
|
||||
:return: None
|
||||
"""
|
||||
# Navigate to Categories
|
||||
# await self.goto_categories()
|
||||
|
||||
# Click 'Create Topic' button and await visibility of reply control
|
||||
await self.create_topic_button.click()
|
||||
await self.reply_control.wait_for(state="visible", timeout=30000)
|
||||
|
||||
# Fill out the topic fields
|
||||
title_field = self.page.locator('[placeholder="Type title, or paste a link here"]')
|
||||
body_field = self.page.locator(
|
||||
'[placeholder="Type here. Use Markdown, BBCode, or HTML to format. Drag or paste images."]')
|
||||
|
||||
await title_field.type(title)
|
||||
await body_field.type(body)
|
||||
|
||||
# Select the desired category
|
||||
await self.category_dropdown.click()
|
||||
category_locator = self.page.locator(f'[data-name="{category}"]')
|
||||
await category_locator.click()
|
||||
|
||||
if draft:
|
||||
close_button = self.page.locator('.d-button-label', has_text='Close')
|
||||
await close_button.click()
|
||||
save_draft = self.page.locator('.d-button-label', has_text='Save draft for later')
|
||||
await save_draft.click()
|
||||
await self.wait_for_dom_load()
|
||||
else:
|
||||
await self.submit_button.click()
|
||||
await self.wait_for_dom_load()
|
||||
|
||||
async def publish_draft_topic(self, title: str):
|
||||
# Click 'Submit' to create the topic
|
||||
await self.topic_drafts_button.click()
|
||||
await self._select_draft(title)
|
||||
await self.submit_button.click()
|
||||
await self.wait_for_dom_load()
|
||||
|
||||
async def _select_draft(self, title: str):
|
||||
draft_entry = self.page.locator('.d-button-label', has_text=title)
|
||||
await draft_entry.wait_for(state="visible", timeout=5000)
|
||||
await draft_entry.click()
|
||||
|
||||
async def wait_for_dom_load(self):
|
||||
await self.page.wait_for_load_state("domcontentloaded", timeout=30000)
|
||||
|
||||
# Additional Utility Methods
|
||||
async def element_visible(locator: Locator, timeout: int = 10000):
|
||||
"""Wait for a specific element to confirm the page has loaded."""
|
||||
await locator.wait_for(state="visible", timeout=timeout)
|
||||
|
||||
async def element_not_visible(locator: Locator, timeout: int = 10000):
|
||||
await locator.wait_for(state="hidden", timeout=timeout)
|
||||
|
0
pw/__init__.py
Normal file
0
pw/__init__.py
Normal file
42
pw/utils.py
Normal file
42
pw/utils.py
Normal file
@ -0,0 +1,42 @@
|
||||
from playwright.async_api import async_playwright
|
||||
|
||||
async def get_async_context(browser):
|
||||
return await browser.new_context()
|
||||
|
||||
async def get_async_page(context):
|
||||
return await context.new_page()
|
||||
|
||||
async def prepare_browser(browser_type="chromium", headless=True):
|
||||
apw = await async_playwright().start() # Start Playwright manually
|
||||
try:
|
||||
# Initialize the correct browser
|
||||
if browser_type == "firefox":
|
||||
browser = await apw.firefox.launch(headless=headless)
|
||||
elif browser_type == "chromium" or browser_type == "edge": # Edge uses Chromium
|
||||
browser = await apw.chromium.launch(headless=headless)
|
||||
elif browser_type == "webkit":
|
||||
browser = await apw.webkit.launch(headless=headless)
|
||||
else:
|
||||
raise ValueError(f"Unsupported platform: {browser_type}")
|
||||
|
||||
# Create context and page
|
||||
context = await browser.new_context()
|
||||
page = await context.new_page()
|
||||
|
||||
# Return everything (browser, context, page)
|
||||
# return browser, context, page
|
||||
return apw, browser, page
|
||||
# return browser, page
|
||||
except Exception as e:
|
||||
# Cleanup Playwright instance upon failure
|
||||
await apw.stop()
|
||||
raise e
|
||||
|
||||
|
||||
async def cleanup_browser(apw, browser):
|
||||
# A utility function to close and clean up resources
|
||||
if browser:
|
||||
await browser.close()
|
||||
if apw:
|
||||
await apw.stop()
|
||||
|
2
requirements.txt
Normal file
2
requirements.txt
Normal file
@ -0,0 +1,2 @@
|
||||
playwright~=1.50.0
|
||||
tomli~=2.2.1
|
Loading…
Reference in New Issue
Block a user