AI Generated Usenet Client in Python If you have any improvements to this or helpful ideas I would like to see them.
https://github.com/alt-magick/Newsgroup-Client-/ #!/usr/bin/env python3 import nntplib import sys import termios import tty import re import quopri import base64 from email.message import EmailMessage import os import tempfile import subprocess # ================= USER CONFIG ================= NNTP_SERVER = "usnews.blocknews.net" NNTP_PORT = 563 USERNAME = "" PASSWORD = "" PAGE_LINES = 12 MAX_ARTICLES_LIST = 200 MAX_REPLY_SCAN = 300 START_GROUP = "alt.test" SHOW_REPLY_COUNT = False SHOW_REPLY_COUNT_MAIN = True # ============================================== RE_REPLY = re.compile(r"^(re|fwd):", re.IGNORECASE) CLEAN_RE = re.compile(r"[À---]") # ---------- STATUS LINE ---------- STATUS_LINE = "" def set_status(msg): global STATUS_LINE STATUS_LINE = msg def show_status(): global STATUS_LINE if STATUS_LINE: print(f" [{STATUS_LINE}]") STATUS_LINE = "" # ---------- RAW KEY INPUT ---------- def get_key(): fd = sys.stdin.fileno() old = termios.tcgetattr(fd) try: tty.setraw(fd) return sys.stdin.read(1) finally: termios.tcsetattr(fd, termios.TCSADRAIN, old) def prompt(text): sys.stdout.write(text) sys.stdout.flush() return sys.stdin.readline().strip() # ---------- HARD-CODED PAGER ---------- def paged_print(lines): i = 0 total = len(lines) while i < total: end = min(i + PAGE_LINES, total) for line in lines[i:end]: print(line) i = end if i >= total: break print(" --- ENTER = next page | SPACE = skip ---") if get_key() == " ": break # ---------- BODY DECODER ---------- def decode_body_line(line_bytes): s = line_bytes.decode("utf-8", errors="replace") s = CLEAN_RE.sub("", s) if "=" in s: try: s = quopri.decodestring(s).decode("utf-8", errors="replace") except Exception: pass if re.fullmatch(r"[A-Za-z0-9+/=\s]+", s) and len(s.strip()) > 20: try: s = base64.b64decode(s, validate=True).decode("utf-8", errors="replace") except Exception: pass return s # ---------- POST BODY EDITOR ---------- def edit_body(initial=""): editor = os.environ.get("EDITOR", "nano") fd, path = tempfile.mkstemp(suffix=".txt") try: with os.fdopen(fd, "w") as f: f.write(initial) subprocess.call([editor, path]) with open(path, "r") as f: return f.read() finally: os.unlink(path) # ---------- POST BODY SOURCE ---------- def get_post_body(): print(" Post body source:") print(" E = Edit in editor") print(" F = Load from external text file") print(" T = Type directly in terminal") choice = get_key().lower() if choice == "f": path = prompt(" Enter path to text file: ") try: with open(path, "r", encoding="utf-8", errors="replace") as f: body = f.read() if not body.strip(): set_status("Post aborted (file is empty)") return None return body except Exception as e: set_status(f"Failed to read file: {e}") return None elif choice == "t": print(" Type your post below. End with a period on a line by itself.") lines = [] while True: line = input() if line.strip() == ".": break # stop input when user enters a single period on a line lines.append(line) body = " ".join(lines) if not body.strip(): set_status("Post aborted (empty input)") return None return body # default: editor body = edit_body() if not body.strip(): set_status("Post aborted (empty body)") return None return body # ---------- POSTING ---------- def post_article(nntp, group, subject=None, references=None): name = prompt("Enter your display name: ") email = prompt("Enter your email: ") if not subject: subject = prompt("Enter subject: ") body = get_post_body() if not body: return False msg = EmailMessage() msg["From"] = f"{name} <{email}>" msg["Newsgroups"] = group msg["Subject"] = subject if references: msg["References"] = references msg.set_content(body) try: nntp.post(msg.as_bytes()) set_status("Article posted successfully") return True except Exception as e: set_status(f"Post failed: {e}") return False # ---------- REPLY POSTING ---------- def post_reply(nntp, group, article_num): try: _, hinfo = nntp.head(str(article_num)) headers = {} for raw in hinfo.lines: line = decode_body_line(raw) if ":" in line: k, v = line.split(":", 1) headers[k.lower()] = v.strip() subject = headers.get("subject", "(no subject)") if not RE_REPLY.match(subject): subject = "Re: " + subject refs = [] if "references" in headers: refs.append(headers["references"]) if "message-id" in headers: refs.append(headers["message-id"]) return post_article(nntp, group, subject, " ".join(refs)) except Exception as e: set_status(f"Reply failed: {e}") return False # ---------- ARTICLE DISPLAY ---------- def show_article(nntp, num, group=None, allow_reply=False): try: _, hinfo = nntp.head(str(num)) headers = {} for raw in hinfo.lines: line = decode_body_line(raw) if ":" in line: k, v = line.split(":", 1) headers[k.lower()] = v.strip() _, body = nntp.body(str(num)) lines = [decode_body_line(l) for l in body.lines] paged_print([ f"From: {headers.get('from','?')}", f"Date: {headers.get('date','?')}", f"Subject: {headers.get('subject','(no subject)')}", "" ] + lines) if allow_reply and group: print(" P=reply (any other key to continue)") if get_key().lower() == "p": post_reply(nntp, group, num) except Exception as e: set_status(f"Fetch failed: {e}") # ---------- REPLY SCANNING ---------- def scan_replies_xover(nntp, msgid, first, last): replies = [] start = max(first, last - MAX_REPLY_SCAN) try: _, overviews = nntp.over((start, last)) except: return replies for num, hdr in overviews: if msgid in hdr.get("references", ""): replies.append(int(num)) return replies # ---------- GROUP RELOAD ---------- def reload_group(nntp, group): try: _, _, first, last, _ = nntp.group(group) first = int(first) last = int(last) _, overviews = nntp.over((max(first, last - MAX_ARTICLES_LIST), last)) posts = [] for num, hdr in reversed(overviews): subject = hdr.get("subject", "") if RE_REPLY.match(subject): continue msgid = hdr.get("message-id", "") replies = sum( 1 for _, h in overviews if msgid in h.get("references", "") ) if SHOW_REPLY_COUNT_MAIN else 0 posts.append({ "num": int(num), "subject": CLEAN_RE.sub("", subject), "from": CLEAN_RE.sub("", hdr.get("from", "?")), "date": hdr.get("date", "?"), "msgid": msgid, "replies": replies }) return posts, first, last except Exception as e: set_status(f"Reload failed: {e}") return None, None, None # ---------- GROUP BROWSER ---------- def browse_group(nntp, group): posts, first, last = reload_group(nntp, group) if not posts: return index = 0 while index < len(posts): p = posts[index] print(f" [{index+1}] #{p['num']}") print(f"From: {p['from']}") print(f"Date: {p['date']}") print(f"Replies: {p['replies'] if SHOW_REPLY_COUNT_MAIN else '?'}") print(f"Subject: {p['subject']}") show_status() print(" ENTER=read SPACE=next R=replies N=new post L=reload J=jump G=group Q=quit") key = get_key().lower() if key == "q": sys.exit(0) elif key == " ": index += 1 elif key in (" ", " "): show_article(nntp, p["num"], group, True) elif key == "n": if post_article(nntp, group): posts, first, last = reload_group(nntp, group) index = 0 elif key == "l": posts, first, last = reload_group(nntp, group) index = 0 set_status("Group reloaded") elif key == "j": val = prompt("Jump to post number: ") if val.isdigit(): idx = int(val) - 1 if 0 <= idx < len(posts): index = idx elif key == "g": browse_group(nntp, prompt("New group: ")) return elif key == "r": replies = scan_replies_xover(nntp, p["msgid"], first, last) if not replies: set_status("No replies found") continue for i, rnum in enumerate(replies): if i < len(replies) - 1: print(" ENTER=next reply | SPACE=skip remaining | P=reply") else: print(" End of replies | P=reply") k = get_key().lower() if k == " ": set_status("Skipped remaining replies") break elif k == "p": post_reply(nntp, group, rnum) elif k in (" ", " "): show_article(nntp, rnum, group, True) # ---------- MAIN ---------- def main(): print(f"Connecting to {NNTP_SERVER}:{NNTP_PORT}...") nntp = nntplib.NNTP_SSL(NNTP_SERVER, NNTP_PORT, USERNAME, PASSWORD) set_status("Connected") browse_group(nntp, START_GROUP) nntp.quit() if __name__ == "__main__": main() -- https://mail.python.org/mailman3//lists/python-list.python.org
