def parse_gophermap(response):
text_field.delete(1.0, tk.END) # Clear previous text
for line in response:
if line:
item_type = line[0]
if item_type in type_map:
try:
description, path, server, port = line.split('\t')
except ValueError as e:
print(f"Error:{e}\n{line}")
# comment lines are often malformed and contain only a descripton
# only abort processing the current line if it is not a comment
if item_type == 'i':
description = line
else:
continue
# clear textfield
text_field.insert(tk.END, f"{type_map[item_type]} ")
# item type i is a comment and is not clickable
if item_type in ['i', '3']:
text_field.insert(tk.END, description[1:])
# only pass the path which should be in this format: URL:http://...
elif item_type == 'h':
text_field.insert(tk.END, description[1:], ("link", path))
else:
link_data = f"{server}/{item_type}/{path}"
# add non standard port
if int(port) != 70: link_data += f":{port}"
text_field.insert(tk.END, description[1:], ("link", link_data))
text_field.insert(tk.END, "\n")
def bookmarks_list():
text_field.config(state="normal")
with open(os.path.join(working_directory, "bookmarks"), 'r') as file:
content = file.read()
parse_gophermap(content.split("\n"))
text_field.config(state="disabled")
def tcp_request(user_input, server, socket,
port, descriptor, path, query, update_history=True, quirk=False):
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(10)
s.connect((server, port))
if query:
if quirk: s.sendall(f"/{'/'.join(path)}\t{query}\n".encode())
else: s.sendall(f"{'/'.join(path)}\t{query}\n".encode())
else:
if quirk: s.sendall(f"/{'/'.join(path)}\n".encode())
else: s.sendall(f"{'/'.join(path)}\n".encode())
# Receive binary response
response = b""
while True:
# Receive data in chunks
chunk = s.recv(1024*4)
if not chunk: break
response += chunk
# Some goofball complains that the selector does not start
# with a "/" while some other nerd complained that sending a
# "/" first is not compliant client behavoiur. So as
# workaround I resend the request with a prepended slash ONCE
# if the reponse starts with an Error code (3).
if str(response)[2] == '3' and not quirk:
print("FIRE")
return tcp_request(user_input, server, socket,
port, descriptor, path, query, update_history, quirk=True)
else:
return response, user_input, descriptor, path, update_history
def handle_response(response, descriptor, path):
# text file
if descriptor == '0':
# strip line feeds
response = response.decode().split('\n')
response = [line.rstrip() for line in response]
response = '\n'.join(response)
text_field.delete(1.0, tk.END)
text_field.insert(tk.END, f"{response}\n")
# directory
elif descriptor in ['1', '7']: # I assume the query response is always a gophermap?
# strip line feeds
response = response.decode().split('\n')
response = [line.rstrip() for line in response]
parse_gophermap(response)
# binary file
elif descriptor in ['5','9']:
save_binary_file(response, path[-1])
# image file
elif descriptor in ['g','I']:
open_image_file(response, path[-1])
else:
print(f"unknown descriptor: {descriptor}")
# Function to start the TCP request in a new thread
def start_request(update_history=True):
user_input = entry.get()
# remove protocol string
if user_input.startswith("gopher://"):
user_input = user_input[9:]
if user_input.startswith("URL:"):
if messagebox.askyesno("Confirm", "Do you really want to leave gopherspace and open the URL?"):
# the open_link method removes double slashes. Ugly but it is what it is.
webbrowser.open(user_input.split(':', 1)[1].replace("/", "//", 1))
return
# extract port number if any
port = 70 # default
try:
port = int(user_input.split(':')[1])
user_input = user_input.split(':')[0]
except Exception as e:
# use default port
pass
# extract server, descriptor and path
parts = user_input.split('/')
# handle trailing slash on root domain
parts = [item for item in parts if item.strip()]
# extract request data
if len(parts) == 1:
server = parts[0]
else:
server, descriptor, *path = parts
if descriptor == '7':
query = simpledialog.askstring("Input", "Please input a query string.")
def thread_callback():
if entry.cget("bg") == 'white': entry.config(bg='lightgray')
else : entry.config(bg='white')
# rapidly change the color of the uri field to indicate thread acitvity
if thread and thread.is_alive():
root.after(200, lambda: thread_callback())
# handle the response from the tcp thread
else:
response, user_input, descriptor, path, update_history = thread.get_result()
# reset color of entry field after receiving response
entry.config(bg='white')
# change state of text field so it can be written to
text_field.config(state="normal")
handle_response(response, descriptor, path)
text_field.config(state="disabled")
# update history
global history_pointer
global history
# don't update history if the address does not change
if history and history[-1] == user_input: update_history = False
# don't update history for images/binary files
if descriptor in ['g','I','9']: update_history = False
if update_history:
# cut off history chain if pointer points not to the last element
if history_pointer != len(history) - 1:
history = history[:history_pointer+1]
history.append(entry.get())
history_pointer += 1
def save_binary_file(binary_data, filename):
# Open a file save dialog
file_path = filedialog.asksaveasfilename(
initialfile=filename,
filetypes=[("All files", "*.*")],
title="Save File")
if file_path: # If the user didn't cancel the dialog
try:
with open(file_path, 'wb') as file: # Open file in binary write mode
file.write(binary_data) # Write binary data to the file
messagebox.showinfo("Success", "File saved successfully!")
except Exception as e:
messagebox.showerror("Error", f"Failed to save file: {e}")
def open_image_file(binary_data, filename):
try:
with open(f"/tmp/{filename}", 'wb') as file: # Open file in binary write mode
file.write(binary_data) # Write binary data to the file
subprocess.run(["xdg-open", f"/tmp/{filename}"])
except Exception as e:
messagebox.showerror("Error", f"Failed to open file: {e}")
# ENTRY FIELD
entry = tk.Entry(root)
entry.grid(row=0, column=0, sticky="ew", ipadx=10)
entry.bind('<Return>', lambda event: start_request()) # Call tcp_request on Enter key
entry.focus_set() # Autofocus the entry field on startup
# SCROLLED TEXT FIELD
text_field = scrolledtext.ScrolledText(root, wrap=tk.WORD, width=35, height=10)
text_field.grid(row=1, column=0, columnspan=2, sticky="nsew")
# Configure the "link" tag to make it clickable
text_field.tag_config("link", foreground="blue", underline=1)
text_field.tag_bind("link", "<Button-1>", open_link)
text_field.tag_bind("link", "<Enter>", enter_link)
text_field.tag_bind("link", "<Leave>", leave_link)
# disable the text field
text_field.config(state="disabled")