Rework how commands are defined - toot - Unnamed repository; edit this file 'de… | |
Log | |
Files | |
Refs | |
LICENSE | |
--- | |
commit 373f26424d002b22c8fa7fe7eabc38e13275d1fc | |
parent 2a3d66bae50a7fff4d50319c192dacd5db12365e | |
Author: Ivan Habunek <[email protected]> | |
Date: Wed, 19 Apr 2017 14:47:30 +0200 | |
Rework how commands are defined | |
Diffstat: | |
README.rst | 38 +++++++++++++++++++------------ | |
tests/test_console.py | 34 ++++++++++++++++---------------- | |
toot/__init__.py | 8 +++++++- | |
toot/api.py | 2 +- | |
toot/commands.py | 307 +++++++++++++++++++++++++++++++ | |
toot/console.py | 593 ++++++++----------------------- | |
toot/output.py | 37 +++++++++++++++++++++++++++++++ | |
7 files changed, 543 insertions(+), 476 deletions(-) | |
--- | |
diff --git a/README.rst b/README.rst | |
@@ -34,20 +34,30 @@ Running ``toot`` displays a list of available commands. | |
Running ``toot <command> -h`` shows the documentation for the given command. | |
-=================== =========================================================… | |
- Command Description | |
-=================== =========================================================… | |
- ``toot login`` Log into a Mastodon instance. | |
- ``toot 2fa`` Log into a Mastodon instance using two factor authentica… | |
- ``toot logout`` Log out, deletes stored access keys. | |
- ``toot auth`` Display stored authenitication tokens. | |
- ``toot whoami`` Display logged in user details. | |
- ``toot post`` Post a status to your timeline. | |
- ``toot search`` Search for accounts or hashtags. | |
- ``toot timeline`` Display recent items in your public timeline. | |
- ``toot follow`` Follow an account. | |
- ``toot unfollow`` Unfollow an account. | |
-=================== =========================================================… | |
+.. code-block:: | |
+ | |
+ $ toot | |
+ | |
+ toot - a Mastodon CLI client | |
+ | |
+ Usage: | |
+ toot login Log into a Mastodon instance | |
+ toot login_2fa Log in using two factor authentication (experimental) | |
+ toot logout Log out, delete stored access keys | |
+ toot auth Show stored credentials | |
+ toot whoami Display logged in user details | |
+ toot post Post a status text to your timeline | |
+ toot upload Upload an image or video file | |
+ toot search Search for users or hashtags | |
+ toot follow Follow an account | |
+ toot unfollow Unfollow an account | |
+ toot timeline Show recent items in your public timeline | |
+ | |
+ To get help for each command run: | |
+ toot <command> --help | |
+ | |
+ https://github.com/ihabunek/toot | |
+ | |
Authentication | |
-------------- | |
diff --git a/tests/test_console.py b/tests/test_console.py | |
@@ -19,10 +19,10 @@ def uncolorize(text): | |
def test_print_usage(capsys): | |
console.print_usage() | |
out, err = capsys.readouterr() | |
- assert "toot - interact with Mastodon from the command line" in out | |
+ assert "toot - a Mastodon CLI client" in out | |
-def test_post_status_defaults(monkeypatch, capsys): | |
+def test_post_defaults(monkeypatch, capsys): | |
def mock_prepare(request): | |
assert request.method == 'POST' | |
assert request.url == 'https://habunek.com/api/v1/statuses' | |
@@ -41,13 +41,13 @@ def test_post_status_defaults(monkeypatch, capsys): | |
monkeypatch.setattr(requests.Request, 'prepare', mock_prepare) | |
monkeypatch.setattr(requests.Session, 'send', mock_send) | |
- console.cmd_post_status(app, user, ['Hello world']) | |
+ console.run_command(app, user, 'post', ['Hello world']) | |
out, err = capsys.readouterr() | |
assert "Toot posted" in out | |
-def test_post_status_with_options(monkeypatch, capsys): | |
+def test_post_with_options(monkeypatch, capsys): | |
def mock_prepare(request): | |
assert request.method == 'POST' | |
assert request.url == 'https://habunek.com/api/v1/statuses' | |
@@ -68,27 +68,27 @@ def test_post_status_with_options(monkeypatch, capsys): | |
args = ['"Hello world"', '--visibility', 'unlisted'] | |
- console.cmd_post_status(app, user, args) | |
+ console.run_command(app, user, 'post', args) | |
out, err = capsys.readouterr() | |
assert "Toot posted" in out | |
-def test_post_status_invalid_visibility(monkeypatch, capsys): | |
+def test_post_invalid_visibility(monkeypatch, capsys): | |
args = ['Hello world', '--visibility', 'foo'] | |
with pytest.raises(SystemExit): | |
- console.cmd_post_status(app, user, args) | |
+ console.run_command(app, user, 'post', args) | |
out, err = capsys.readouterr() | |
assert "invalid visibility value: 'foo'" in err | |
-def test_post_status_invalid_media(monkeypatch, capsys): | |
+def test_post_invalid_media(monkeypatch, capsys): | |
args = ['Hello world', '--media', 'does_not_exist.jpg'] | |
with pytest.raises(SystemExit): | |
- console.cmd_post_status(app, user, args) | |
+ console.run_command(app, user, 'post', args) | |
out, err = capsys.readouterr() | |
assert "can't open 'does_not_exist.jpg'" in err | |
@@ -112,7 +112,7 @@ def test_timeline(monkeypatch, capsys): | |
monkeypatch.setattr(requests, 'get', mock_get) | |
- console.cmd_timeline(app, user, []) | |
+ console.run_command(app, user, 'timeline', []) | |
out, err = capsys.readouterr() | |
assert "The computer can't tell you the emotional story." in out | |
@@ -138,7 +138,7 @@ def test_upload(monkeypatch, capsys): | |
monkeypatch.setattr(requests.Request, 'prepare', mock_prepare) | |
monkeypatch.setattr(requests.Session, 'send', mock_send) | |
- console.cmd_upload(app, user, [__file__]) | |
+ console.run_command(app, user, 'upload', [__file__]) | |
out, err = capsys.readouterr() | |
assert "Uploading media" in out | |
@@ -168,7 +168,7 @@ def test_search(monkeypatch, capsys): | |
monkeypatch.setattr(requests, 'get', mock_get) | |
- console.cmd_search(app, user, ['freddy']) | |
+ console.run_command(app, user, 'search', ['freddy']) | |
out, err = capsys.readouterr() | |
assert "Hashtags:\n\033[32m#foo\033[0m, \033[32m#bar\033[0m, \033[32m#baz\… | |
@@ -200,7 +200,7 @@ def test_follow(monkeypatch, capsys): | |
monkeypatch.setattr(requests.Session, 'send', mock_send) | |
monkeypatch.setattr(requests, 'get', mock_get) | |
- console.cmd_follow(app, user, ['blixa']) | |
+ console.run_command(app, user, 'follow', ['blixa']) | |
out, err = capsys.readouterr() | |
assert "You are now following blixa" in out | |
@@ -218,7 +218,7 @@ def test_follow_not_found(monkeypatch, capsys): | |
monkeypatch.setattr(requests, 'get', mock_get) | |
- console.cmd_follow(app, user, ['blixa']) | |
+ console.run_command(app, user, 'follow', ['blixa']) | |
out, err = capsys.readouterr() | |
assert "Account not found" in err | |
@@ -247,7 +247,7 @@ def test_unfollow(monkeypatch, capsys): | |
monkeypatch.setattr(requests.Session, 'send', mock_send) | |
monkeypatch.setattr(requests, 'get', mock_get) | |
- console.cmd_unfollow(app, user, ['blixa']) | |
+ console.run_command(app, user, 'unfollow', ['blixa']) | |
out, err = capsys.readouterr() | |
assert "You are no longer following blixa" in out | |
@@ -265,7 +265,7 @@ def test_unfollow_not_found(monkeypatch, capsys): | |
monkeypatch.setattr(requests, 'get', mock_get) | |
- console.cmd_unfollow(app, user, ['blixa']) | |
+ console.run_command(app, user, 'unfollow', ['blixa']) | |
out, err = capsys.readouterr() | |
assert "Account not found" in err | |
@@ -297,7 +297,7 @@ def test_whoami(monkeypatch, capsys): | |
monkeypatch.setattr(requests, 'get', mock_get) | |
- console.cmd_whoami(app, user, []) | |
+ console.run_command(app, user, 'whoami', []) | |
out, err = capsys.readouterr() | |
out = uncolorize(out) | |
diff --git a/toot/__init__.py b/toot/__init__.py | |
@@ -1,4 +1,6 @@ | |
# -*- coding: utf-8 -*- | |
+from __future__ import unicode_literals | |
+from __future__ import print_function | |
from collections import namedtuple | |
@@ -7,5 +9,9 @@ User = namedtuple('User', ['instance', 'username', 'access_toke… | |
DEFAULT_INSTANCE = 'mastodon.social' | |
-CLIENT_NAME = 'toot - Mastodon CLI Interface' | |
+CLIENT_NAME = 'toot - a Mastodon CLI client' | |
CLIENT_WEBSITE = 'https://github.com/ihabunek/toot' | |
+ | |
+ | |
+class ConsoleError(Exception): | |
+ pass | |
diff --git a/toot/api.py b/toot/api.py | |
@@ -5,7 +5,7 @@ import requests | |
from requests import Request, Session | |
-from toot import App, User, CLIENT_NAME, CLIENT_WEBSITE | |
+from toot import CLIENT_NAME, CLIENT_WEBSITE | |
SCOPES = 'read write follow' | |
diff --git a/toot/commands.py b/toot/commands.py | |
@@ -0,0 +1,307 @@ | |
+# -*- coding: utf-8 -*- | |
+from __future__ import unicode_literals | |
+from __future__ import print_function | |
+ | |
+import json | |
+import requests | |
+ | |
+from bs4 import BeautifulSoup | |
+from builtins import input | |
+from datetime import datetime | |
+from future.moves.itertools import zip_longest | |
+from getpass import getpass | |
+from itertools import chain | |
+from textwrap import TextWrapper | |
+ | |
+from toot import api, config, DEFAULT_INSTANCE, User, App, ConsoleError | |
+from toot.output import green, yellow, print_error | |
+ | |
+ | |
+def register_app(instance): | |
+ print("Registering application with %s" % green(instance)) | |
+ | |
+ try: | |
+ response = api.create_app(instance) | |
+ except: | |
+ raise ConsoleError("Registration failed. Did you enter a valid instanc… | |
+ | |
+ base_url = 'https://' + instance | |
+ | |
+ app = App(instance, base_url, response['client_id'], response['client_secr… | |
+ path = config.save_app(app) | |
+ print("Application tokens saved to: {}\n".format(green(path))) | |
+ | |
+ return app | |
+ | |
+ | |
+def create_app_interactive(): | |
+ instance = input("Choose an instance [%s]: " % green(DEFAULT_INSTANCE)) | |
+ if not instance: | |
+ instance = DEFAULT_INSTANCE | |
+ | |
+ return config.load_app(instance) or register_app(instance) | |
+ | |
+ | |
+def login_interactive(app): | |
+ print("\nLog in to " + green(app.instance)) | |
+ email = input('Email: ') | |
+ password = getpass('Password: ') | |
+ | |
+ if not email or not password: | |
+ raise ConsoleError("Email and password cannot be empty.") | |
+ | |
+ try: | |
+ print("Authenticating...") | |
+ response = api.login(app, email, password) | |
+ except api.ApiError: | |
+ raise ConsoleError("Login failed") | |
+ | |
+ user = User(app.instance, email, response['access_token']) | |
+ path = config.save_user(user) | |
+ print("Access token saved to: " + green(path)) | |
+ | |
+ return user | |
+ | |
+ | |
+def two_factor_login_interactive(app): | |
+ """Hacky implementation of two factor authentication""" | |
+ | |
+ print("Log in to " + green(app.instance)) | |
+ email = input('Email: ') | |
+ password = getpass('Password: ') | |
+ | |
+ sign_in_url = app.base_url + '/auth/sign_in' | |
+ | |
+ session = requests.Session() | |
+ | |
+ # Fetch sign in form | |
+ response = session.get(sign_in_url) | |
+ response.raise_for_status() | |
+ | |
+ soup = BeautifulSoup(response.content, "html.parser") | |
+ form = soup.find('form') | |
+ inputs = form.find_all('input') | |
+ | |
+ data = {i.attrs.get('name'): i.attrs.get('value') for i in inputs} | |
+ data['user[email]'] = email | |
+ data['user[password]'] = password | |
+ | |
+ # Submit form, get 2FA entry form | |
+ response = session.post(sign_in_url, data) | |
+ response.raise_for_status() | |
+ | |
+ soup = BeautifulSoup(response.content, "html.parser") | |
+ form = soup.find('form') | |
+ inputs = form.find_all('input') | |
+ | |
+ data = {i.attrs.get('name'): i.attrs.get('value') for i in inputs} | |
+ data['user[otp_attempt]'] = input("2FA Token: ") | |
+ | |
+ # Submit token | |
+ response = session.post(sign_in_url, data) | |
+ response.raise_for_status() | |
+ | |
+ # Extract access token from response | |
+ soup = BeautifulSoup(response.content, "html.parser") | |
+ initial_state = soup.find('script', id='initial-state') | |
+ | |
+ if not initial_state: | |
+ raise ConsoleError("Login failed: Invalid 2FA token?") | |
+ | |
+ data = json.loads(initial_state.get_text()) | |
+ access_token = data['meta']['access_token'] | |
+ | |
+ user = User(app.instance, email, access_token) | |
+ path = config.save_user(user) | |
+ print("Access token saved to: " + green(path)) | |
+ | |
+ | |
+def _print_timeline(item): | |
+ def wrap_text(text, width): | |
+ wrapper = TextWrapper(width=width, break_long_words=False, break_on_hy… | |
+ return chain(*[wrapper.wrap(l) for l in text.split("\n")]) | |
+ | |
+ def timeline_rows(item): | |
+ name = item['name'] | |
+ time = item['time'].strftime('%Y-%m-%d %H:%M%Z') | |
+ | |
+ left_column = [name, time] | |
+ if 'reblogged' in item: | |
+ left_column.append(item['reblogged']) | |
+ | |
+ text = item['text'] | |
+ | |
+ right_column = wrap_text(text, 80) | |
+ | |
+ return zip_longest(left_column, right_column, fillvalue="") | |
+ | |
+ for left, right in timeline_rows(item): | |
+ print("{:30} │ {}".format(left, right)) | |
+ | |
+ | |
+def _parse_timeline(item): | |
+ content = item['reblog']['content'] if item['reblog'] else item['content'] | |
+ reblogged = item['reblog']['account']['username'] if item['reblog'] else "" | |
+ | |
+ name = item['account']['display_name'] + " @" + item['account']['username'] | |
+ soup = BeautifulSoup(content, "html.parser") | |
+ text = soup.get_text().replace(''', "'") | |
+ time = datetime.strptime(item['created_at'], "%Y-%m-%dT%H:%M:%S.%fZ") | |
+ | |
+ return { | |
+ "name": name, | |
+ "text": text, | |
+ "time": time, | |
+ "reblogged": reblogged, | |
+ } | |
+ | |
+ | |
+def timeline(app, user, args): | |
+ items = api.timeline_home(app, user) | |
+ parsed_items = [_parse_timeline(t) for t in items] | |
+ | |
+ print("─" * 31 + "┬" + "─" * 88) | |
+ for item in parsed_items: | |
+ _print_timeline(item) | |
+ print("─" * 31 + "┼" + "─" * 88) | |
+ | |
+ | |
+def post(app, user, args): | |
+ if args.media: | |
+ media = _do_upload(app, user, args.media) | |
+ media_ids = [media['id']] | |
+ else: | |
+ media_ids = None | |
+ | |
+ response = api.post_status(app, user, args.text, media_ids=media_ids, visi… | |
+ | |
+ print("Toot posted: " + green(response.get('url'))) | |
+ | |
+ | |
+def auth(app, user, args): | |
+ if app and user: | |
+ print("You are logged in to {} as {}\n".format( | |
+ yellow(app.instance), | |
+ yellow(user.username) | |
+ )) | |
+ print("User data: " + green(config.get_user_config_path())) | |
+ print("App data: " + green(config.get_instance_config_path(app.instan… | |
+ else: | |
+ print("You are not logged in") | |
+ | |
+ | |
+def login(app, user, args): | |
+ app = create_app_interactive() | |
+ login_interactive(app) | |
+ | |
+ print() | |
+ print(green("✓ Successfully logged in.")) | |
+ | |
+ | |
+def login_2fa(app, user, args): | |
+ print() | |
+ print(yellow("Two factor authentication is experimental.")) | |
+ print(yellow("If you have problems logging in, please open an issue:")) | |
+ print(yellow("https://github.com/ihabunek/toot/issues")) | |
+ print() | |
+ | |
+ app = create_app_interactive() | |
+ two_factor_login_interactive(app) | |
+ | |
+ print() | |
+ print(green("✓ Successfully logged in.")) | |
+ | |
+ | |
+def logout(app, user, args): | |
+ config.delete_user() | |
+ | |
+ print(green("✓ You are now logged out")) | |
+ | |
+ | |
+def upload(app, user, args): | |
+ response = _do_upload(app, user, args.file) | |
+ | |
+ print("\nSuccessfully uploaded media ID {}, type '{}'".format( | |
+ yellow(response['id']), yellow(response['type']))) | |
+ print("Original URL: " + green(response['url'])) | |
+ print("Preview URL: " + green(response['preview_url'])) | |
+ print("Text URL: " + green(response['text_url'])) | |
+ | |
+ | |
+def _print_accounts(accounts): | |
+ if not accounts: | |
+ return | |
+ | |
+ print("\nAccounts:") | |
+ for account in accounts: | |
+ acct = green("@{}".format(account['acct'])) | |
+ display_name = account['display_name'] | |
+ print("* {} {}".format(acct, display_name)) | |
+ | |
+ | |
+def _print_hashtags(hashtags): | |
+ if not hashtags: | |
+ return | |
+ | |
+ print("\nHashtags:") | |
+ print(", ".join([green("#" + t) for t in hashtags])) | |
+ | |
+ | |
+def search(app, user, args): | |
+ response = api.search(app, user, args.query, args.resolve) | |
+ | |
+ _print_accounts(response['accounts']) | |
+ _print_hashtags(response['hashtags']) | |
+ | |
+ | |
+def _do_upload(app, user, file): | |
+ print("Uploading media: {}".format(green(file.name))) | |
+ return api.upload_media(app, user, file) | |
+ | |
+ | |
+def _find_account(app, user, account_name): | |
+ """For a given account name, returns the Account object or None if not fou… | |
+ response = api.search(app, user, account_name, False) | |
+ | |
+ for account in response['accounts']: | |
+ if account['acct'] == account_name or "@" + account['acct'] == account… | |
+ return account | |
+ | |
+ | |
+def follow(app, user, args): | |
+ account = _find_account(app, user, args.account) | |
+ | |
+ if not account: | |
+ print_error("Account not found") | |
+ return | |
+ | |
+ api.follow(app, user, account['id']) | |
+ | |
+ print(green("✓ You are now following %s" % args.account)) | |
+ | |
+ | |
+def unfollow(app, user, args): | |
+ account = _find_account(app, user, args.account) | |
+ | |
+ if not account: | |
+ print_error("Account not found") | |
+ return | |
+ | |
+ api.unfollow(app, user, account['id']) | |
+ | |
+ print(green("✓ You are no longer following %s" % args.account)) | |
+ | |
+ | |
+def whoami(app, user, args): | |
+ response = api.verify_credentials(app, user) | |
+ | |
+ print("{} {}".format(green("@" + response['acct']), response['display_name… | |
+ print(response['note']) | |
+ print(response['url']) | |
+ print("") | |
+ print("ID: " + green(response['id'])) | |
+ print("Since: " + green(response['created_at'][:19].replace('T', ' @ '))) | |
+ print("") | |
+ print("Followers: " + yellow(response['followers_count'])) | |
+ print("Following: " + yellow(response['following_count'])) | |
+ print("Statuses: " + yellow(response['statuses_count'])) | |
diff --git a/toot/console.py b/toot/console.py | |
@@ -2,479 +2,183 @@ | |
from __future__ import unicode_literals | |
from __future__ import print_function | |
-import json | |
-import logging | |
import os | |
-import requests | |
import sys | |
+import logging | |
from argparse import ArgumentParser, FileType | |
-from bs4 import BeautifulSoup | |
-from builtins import input | |
-from datetime import datetime | |
-from future.moves.itertools import zip_longest | |
-from getpass import getpass | |
-from itertools import chain | |
-from textwrap import TextWrapper | |
- | |
-from toot import api, config, DEFAULT_INSTANCE, User, App | |
-from toot.api import ApiError | |
- | |
- | |
-class ConsoleError(Exception): | |
- pass | |
- | |
- | |
-def red(text): | |
- return "\033[31m{}\033[0m".format(text) | |
- | |
- | |
-def green(text): | |
- return "\033[32m{}\033[0m".format(text) | |
- | |
- | |
-def yellow(text): | |
- return "\033[33m{}\033[0m".format(text) | |
- | |
- | |
-def blue(text): | |
- return "\033[34m{}\033[0m".format(text) | |
- | |
- | |
-def print_error(text): | |
- print(red(text), file=sys.stderr) | |
- | |
- | |
-def register_app(instance): | |
- print("Registering application with %s" % green(instance)) | |
- | |
- try: | |
- response = api.create_app(instance) | |
- except: | |
- raise ConsoleError("Registration failed. Did you enter a valid instanc… | |
- | |
- base_url = 'https://' + instance | |
- | |
- app = App(instance, base_url, response['client_id'], response['client_secr… | |
- path = config.save_app(app) | |
- print("Application tokens saved to: {}".format(green(path))) | |
- | |
- return app | |
- | |
- | |
-def create_app_interactive(): | |
- instance = input("Choose an instance [%s]: " % green(DEFAULT_INSTANCE)) | |
- if not instance: | |
- instance = DEFAULT_INSTANCE | |
- | |
- return config.load_app(instance) or register_app(instance) | |
- | |
- | |
-def login_interactive(app): | |
- print("\nLog in to " + green(app.instance)) | |
- email = input('Email: ') | |
- password = getpass('Password: ') | |
- | |
- if not email or not password: | |
- raise ConsoleError("Email and password cannot be empty.") | |
- | |
- try: | |
- print("Authenticating...") | |
- response = api.login(app, email, password) | |
- except ApiError: | |
- raise ConsoleError("Login failed") | |
- | |
- user = User(app.instance, email, response['access_token']) | |
- path = config.save_user(user) | |
- print("Access token saved to: " + green(path)) | |
- | |
- return user | |
- | |
- | |
-def two_factor_login_interactive(app): | |
- """Hacky implementation of two factor authentication""" | |
- | |
- print("Log in to " + green(app.instance)) | |
- email = input('Email: ') | |
- password = getpass('Password: ') | |
- | |
- sign_in_url = app.base_url + '/auth/sign_in' | |
- | |
- session = requests.Session() | |
+from collections import namedtuple | |
+from toot import config, api, commands, ConsoleError, CLIENT_NAME, CLIENT_WEBS… | |
+from toot.output import print_error | |
- # Fetch sign in form | |
- response = session.get(sign_in_url) | |
- response.raise_for_status() | |
- soup = BeautifulSoup(response.content, "html.parser") | |
- form = soup.find('form') | |
- inputs = form.find_all('input') | |
+VISIBILITY_CHOICES = ['public', 'unlisted', 'private', 'direct'] | |
- data = {i.attrs.get('name'): i.attrs.get('value') for i in inputs} | |
- data['user[email]'] = email | |
- data['user[password]'] = password | |
- # Submit form, get 2FA entry form | |
- response = session.post(sign_in_url, data) | |
- response.raise_for_status() | |
- | |
- soup = BeautifulSoup(response.content, "html.parser") | |
- form = soup.find('form') | |
- inputs = form.find_all('input') | |
- | |
- data = {i.attrs.get('name'): i.attrs.get('value') for i in inputs} | |
- data['user[otp_attempt]'] = input("2FA Token: ") | |
- | |
- # Submit token | |
- response = session.post(sign_in_url, data) | |
- response.raise_for_status() | |
- | |
- # Extract access token from response | |
- soup = BeautifulSoup(response.content, "html.parser") | |
- initial_state = soup.find('script', id='initial-state') | |
+def visibility(value): | |
+ """Validates the visibilty parameter""" | |
+ if value not in VISIBILITY_CHOICES: | |
+ raise ValueError("Invalid visibility value") | |
- if not initial_state: | |
- raise ConsoleError("Login failed: Invalid 2FA token?") | |
+ return value | |
- data = json.loads(initial_state.get_text()) | |
- access_token = data['meta']['access_token'] | |
- user = User(app.instance, email, access_token) | |
- path = config.save_user(user) | |
- print("Access token saved to: " + green(path)) | |
+Command = namedtuple("Command", ["name", "description", "require_auth", "argum… | |
+ | |
+ | |
+COMMANDS = [ | |
+ Command( | |
+ name="login", | |
+ description="Log into a Mastodon instance", | |
+ arguments=[], | |
+ require_auth=False, | |
+ ), | |
+ Command( | |
+ name="login_2fa", | |
+ description="Log in using two factor authentication (experimental)", | |
+ arguments=[], | |
+ require_auth=False, | |
+ ), | |
+ Command( | |
+ name="logout", | |
+ description="Log out, delete stored access keys", | |
+ arguments=[], | |
+ require_auth=False, | |
+ ), | |
+ Command( | |
+ name="auth", | |
+ description="Show stored credentials", | |
+ arguments=[], | |
+ require_auth=False, | |
+ ), | |
+ Command( | |
+ name="whoami", | |
+ description="Display logged in user details", | |
+ arguments=[], | |
+ require_auth=True, | |
+ ), | |
+ Command( | |
+ name="post", | |
+ description="Post a status text to your timeline", | |
+ arguments=[ | |
+ (["text"], { | |
+ "help": "The status text to post.", | |
+ }), | |
+ (["-m", "--media"], { | |
+ "type": FileType('rb'), | |
+ "help": "path to the media file to attach" | |
+ }), | |
+ (["-v", "--visibility"], { | |
+ "type": visibility, | |
+ "default": "public", | |
+ "help": 'post visibility, one of: %s' % ", ".join(VISIBILITY_C… | |
+ }) | |
+ ], | |
+ require_auth=True, | |
+ ), | |
+ Command( | |
+ name="upload", | |
+ description="Upload an image or video file", | |
+ arguments=[ | |
+ (["file"], { | |
+ "help": "Path to the file to upload", | |
+ "type": FileType('rb') | |
+ }) | |
+ ], | |
+ require_auth=True, | |
+ ), | |
+ Command( | |
+ name="search", | |
+ description="Search for users or hashtags", | |
+ arguments=[ | |
+ (["query"], { | |
+ "help": "the search query", | |
+ }), | |
+ (["-r", "--resolve"], { | |
+ "action": 'store_true', | |
+ "default": False, | |
+ "help": "Resolve non-local accounts", | |
+ }), | |
+ ], | |
+ require_auth=True, | |
+ ), | |
+ Command( | |
+ name="follow", | |
+ description="Follow an account", | |
+ arguments=[ | |
+ (["account"], { | |
+ "help": "account name, e.g. 'Gargron' or '[email protected]… | |
+ }), | |
+ ], | |
+ require_auth=True, | |
+ ), | |
+ Command( | |
+ name="unfollow", | |
+ description="Unfollow an account", | |
+ arguments=[ | |
+ (["account"], { | |
+ "help": "account name, e.g. 'Gargron' or '[email protected]… | |
+ }), | |
+ ], | |
+ require_auth=True, | |
+ ), | |
+ Command( | |
+ name="timeline", | |
+ description="Show recent items in your public timeline", | |
+ arguments=[], | |
+ require_auth=True, | |
+ ), | |
+] | |
def print_usage(): | |
- print("toot - interact with Mastodon from the command line") | |
+ print(CLIENT_NAME) | |
print("") | |
print("Usage:") | |
- print(" toot login - log into a Mastodon instance") | |
- print(" toot 2fa - log into a Mastodon instance using 2FA (experim… | |
- print(" toot logout - log out (delete stored access tokens)") | |
- print(" toot auth - display stored authentication tokens") | |
- print(" toot whoami - display logged in user details") | |
- print(" toot post - toot a new post to your timeline") | |
- print(" toot search - search for accounts or hashtags") | |
- print(" toot timeline - shows your public timeline") | |
- print(" toot follow - follow an account") | |
- print(" toot unfollow - unfollow an account") | |
+ | |
+ max_name_len = max(len(command.name) for command in COMMANDS) | |
+ | |
+ for command in COMMANDS: | |
+ print(" toot", command.name.ljust(max_name_len + 2), command.descript… | |
+ | |
print("") | |
print("To get help for each command run:") | |
print(" toot <command> --help") | |
print("") | |
- print("https://github.com/ihabunek/toot") | |
- | |
- | |
-def print_timeline(item): | |
- def wrap_text(text, width): | |
- wrapper = TextWrapper(width=width, break_long_words=False, break_on_hy… | |
- return chain(*[wrapper.wrap(l) for l in text.split("\n")]) | |
- | |
- def timeline_rows(item): | |
- name = item['name'] | |
- time = item['time'].strftime('%Y-%m-%d %H:%M%Z') | |
- | |
- left_column = [name, time] | |
- if 'reblogged' in item: | |
- left_column.append(item['reblogged']) | |
- | |
- text = item['text'] | |
- | |
- right_column = wrap_text(text, 80) | |
- | |
- return zip_longest(left_column, right_column, fillvalue="") | |
- | |
- for left, right in timeline_rows(item): | |
- print("{:30} │ {}".format(left, right)) | |
- | |
- | |
-def parse_timeline(item): | |
- content = item['reblog']['content'] if item['reblog'] else item['content'] | |
- reblogged = item['reblog']['account']['username'] if item['reblog'] else "" | |
- | |
- name = item['account']['display_name'] + " @" + item['account']['username'] | |
- soup = BeautifulSoup(content, "html.parser") | |
- text = soup.get_text().replace(''', "'") | |
- time = datetime.strptime(item['created_at'], "%Y-%m-%dT%H:%M:%S.%fZ") | |
- | |
- return { | |
- "name": name, | |
- "text": text, | |
- "time": time, | |
- "reblogged": reblogged, | |
- } | |
- | |
- | |
-def cmd_timeline(app, user, args): | |
- parser = ArgumentParser(prog="toot timeline", | |
- description="Show recent items in your public time… | |
- epilog="https://github.com/ihabunek/toot") | |
- | |
- args = parser.parse_args(args) | |
- | |
- items = api.timeline_home(app, user) | |
- parsed_items = [parse_timeline(t) for t in items] | |
- | |
- print("─" * 31 + "┬" + "─" * 88) | |
- for item in parsed_items: | |
- print_timeline(item) | |
- print("─" * 31 + "┼" + "─" * 88) | |
- | |
- | |
-def visibility(value): | |
- if value not in ['public', 'unlisted', 'private', 'direct']: | |
- raise ValueError("Invalid visibility value") | |
- | |
- return value | |
- | |
- | |
-def cmd_post_status(app, user, args): | |
- parser = ArgumentParser(prog="toot post", | |
- description="Post a status text to the timeline", | |
- epilog="https://github.com/ihabunek/toot") | |
- parser.add_argument("text", help="The status text to post.") | |
- parser.add_argument("-m", "--media", type=FileType('rb'), | |
- help="path to the media file to attach") | |
- parser.add_argument("-v", "--visibility", type=visibility, default="public… | |
- help='post visibility, either "public" (default), "dir… | |
- | |
- args = parser.parse_args(args) | |
+ print(CLIENT_WEBSITE) | |
- if args.media: | |
- media = do_upload(app, user, args.media) | |
- media_ids = [media['id']] | |
- else: | |
- media_ids = None | |
- response = api.post_status(app, user, args.text, media_ids=media_ids, visi… | |
+def get_argument_parser(name, command): | |
+ parser = ArgumentParser( | |
+ prog='toot %s' % name, | |
+ description=command.description, | |
+ epilog=CLIENT_WEBSITE) | |
- print("Toot posted: " + green(response.get('url'))) | |
+ for args, kwargs in command.arguments: | |
+ parser.add_argument(*args, **kwargs) | |
+ return parser | |
-def cmd_auth(app, user, args): | |
- parser = ArgumentParser(prog="toot auth", | |
- description="Show login details", | |
- epilog="https://github.com/ihabunek/toot") | |
- parser.parse_args(args) | |
- if app and user: | |
- print("You are logged in to {} as {}".format(green(app.instance), gree… | |
- print("User data: " + green(config.get_user_config_path())) | |
- print("App data: " + green(config.get_instance_config_path(app.instan… | |
- else: | |
- print("You are not logged in") | |
+def run_command(app, user, name, args): | |
+ command = next((c for c in COMMANDS if c.name == name), None) | |
- | |
-def cmd_login(args): | |
- parser = ArgumentParser(prog="toot login", | |
- description="Log into a Mastodon instance", | |
- epilog="https://github.com/ihabunek/toot") | |
- parser.parse_args(args) | |
- | |
- app = create_app_interactive() | |
- user = login_interactive(app) | |
- | |
- return app, user | |
- | |
- | |
-def cmd_2fa(args): | |
- parser = ArgumentParser(prog="toot 2fa", | |
- description="Log into a Mastodon instance using 2 … | |
- epilog="https://github.com/ihabunek/toot") | |
- parser.parse_args(args) | |
- | |
- print() | |
- print(yellow("Two factor authentication is experimental.")) | |
- print(yellow("If you have problems logging in, please open an issue:")) | |
- print(yellow("https://github.com/ihabunek/toot/issues")) | |
- print() | |
- | |
- app = create_app_interactive() | |
- user = two_factor_login_interactive(app) | |
- | |
- return app, user | |
- | |
- | |
-def cmd_logout(app, user, args): | |
- parser = ArgumentParser(prog="toot logout", | |
- description="Log out, delete stored access keys", | |
- epilog="https://github.com/ihabunek/toot") | |
- parser.parse_args(args) | |
- | |
- config.delete_user() | |
- | |
- print(green("✓ You are now logged out")) | |
- | |
- | |
-def cmd_upload(app, user, args): | |
- parser = ArgumentParser(prog="toot upload", | |
- description="Upload an image or video file", | |
- epilog="https://github.com/ihabunek/toot") | |
- parser.add_argument("file", help="Path to the file to upload", type=FileTy… | |
- | |
- args = parser.parse_args(args) | |
- | |
- response = do_upload(app, user, args.file) | |
- | |
- print("\nSuccessfully uploaded media ID {}, type '{}'".format( | |
- yellow(response['id']), yellow(response['type']))) | |
- print("Original URL: " + green(response['url'])) | |
- print("Preview URL: " + green(response['preview_url'])) | |
- print("Text URL: " + green(response['text_url'])) | |
- | |
- | |
-def _print_accounts(accounts): | |
- if not accounts: | |
- return | |
- | |
- print("\nAccounts:") | |
- for account in accounts: | |
- acct = green("@{}".format(account['acct'])) | |
- display_name = account['display_name'] | |
- print("* {} {}".format(acct, display_name)) | |
- | |
- | |
-def _print_hashtags(hashtags): | |
- if not hashtags: | |
- return | |
- | |
- print("\nHashtags:") | |
- print(", ".join([green("#" + t) for t in hashtags])) | |
- | |
- | |
-def cmd_search(app, user, args): | |
- parser = ArgumentParser(prog="toot search", | |
- description="Search for content", | |
- epilog="https://github.com/ihabunek/toot") | |
- | |
- parser.add_argument("query", help="The search query") | |
- parser.add_argument("-r", "--resolve", action='store_true', default=False, | |
- help="Whether to resolve non-local accounts") | |
- | |
- args = parser.parse_args(args) | |
- | |
- response = api.search(app, user, args.query, args.resolve) | |
- | |
- _print_accounts(response['accounts']) | |
- _print_hashtags(response['hashtags']) | |
- | |
- | |
-def do_upload(app, user, file): | |
- print("Uploading media: {}".format(green(file.name))) | |
- return api.upload_media(app, user, file) | |
- | |
- | |
-def _find_account(app, user, account_name): | |
- """For a given account name, returns the Account object or None if not fou… | |
- response = api.search(app, user, account_name, False) | |
- | |
- for account in response['accounts']: | |
- if account['acct'] == account_name or "@" + account['acct'] == account… | |
- return account | |
- | |
- | |
-def cmd_follow(app, user, args): | |
- parser = ArgumentParser(prog="toot follow", | |
- description="Follow an account", | |
- epilog="https://github.com/ihabunek/toot") | |
- parser.add_argument("account", help="Account name, e.g. 'Gargron' or 'poly… | |
- args = parser.parse_args(args) | |
- | |
- account = _find_account(app, user, args.account) | |
- | |
- if not account: | |
- print_error("Account not found") | |
- return | |
- | |
- api.follow(app, user, account['id']) | |
- | |
- print(green("✓ You are now following %s" % args.account)) | |
- | |
- | |
-def cmd_unfollow(app, user, args): | |
- parser = ArgumentParser(prog="toot unfollow", | |
- description="Unfollow an account", | |
- epilog="https://github.com/ihabunek/toot") | |
- parser.add_argument("account", help="Account name, e.g. 'Gargron' or 'poly… | |
- args = parser.parse_args(args) | |
- | |
- account = _find_account(app, user, args.account) | |
- | |
- if not account: | |
- print_error("Account not found") | |
+ if not command: | |
+ print_error("Unknown command '{}'\n".format(name)) | |
+ print_usage() | |
return | |
- api.unfollow(app, user, account['id']) | |
- | |
- print(green("✓ You are no longer following %s" % args.account)) | |
- | |
- | |
-def cmd_whoami(app, user, args): | |
- parser = ArgumentParser(prog="toot whoami", | |
- description="Display logged in user details", | |
- epilog="https://github.com/ihabunek/toot") | |
- parser.parse_args(args) | |
- | |
- response = api.verify_credentials(app, user) | |
- | |
- print("{} {}".format(green("@" + response['acct']), response['display_name… | |
- print(response['note']) | |
- print(response['url']) | |
- print("") | |
- print("ID: " + green(response['id'])) | |
- print("Since: " + green(response['created_at'][:19].replace('T', ' @ '))) | |
- print("") | |
- print("Followers: " + yellow(response['followers_count'])) | |
- print("Following: " + yellow(response['following_count'])) | |
- print("Statuses: " + yellow(response['statuses_count'])) | |
- | |
- | |
-def run_command(command, args): | |
- user = config.load_user() | |
- app = config.load_app(user.instance) if user else None | |
- | |
- # Commands which can run when not logged in | |
- if command == 'login': | |
- return cmd_login(args) | |
+ parser = get_argument_parser(name, command) | |
+ parsed_args = parser.parse_args(args) | |
- if command == '2fa': | |
- return cmd_2fa(args) | |
- | |
- if command == 'auth': | |
- return cmd_auth(app, user, args) | |
- | |
- # Commands which require user to be logged in | |
- if not app or not user: | |
- print_error("You are not logged in.") | |
+ if command.require_auth and (not user or not app): | |
+ print_error("This command requires that you are logged in.") | |
print_error("Please run `toot login` first.") | |
return | |
- if command == 'logout': | |
- return cmd_logout(app, user, args) | |
- | |
- if command == 'post': | |
- return cmd_post_status(app, user, args) | |
- | |
- if command == 'timeline': | |
- return cmd_timeline(app, user, args) | |
+ fn = commands.__dict__.get(name) | |
- if command == 'upload': | |
- return cmd_upload(app, user, args) | |
- | |
- if command == 'search': | |
- return cmd_search(app, user, args) | |
- | |
- if command == 'follow': | |
- return cmd_follow(app, user, args) | |
- | |
- if command == 'unfollow': | |
- return cmd_unfollow(app, user, args) | |
- | |
- if command == 'whoami': | |
- return cmd_whoami(app, user, args) | |
- | |
- print_error("Unknown command '{}'\n".format(command)) | |
- print_usage() | |
+ return fn(app, user, parsed_args) | |
def main(): | |
@@ -485,15 +189,18 @@ def main(): | |
if not sys.stdin.isatty(): | |
sys.argv.append(sys.stdin.read()) | |
- command = sys.argv[1] if len(sys.argv) > 1 else None | |
+ command_name = sys.argv[1] if len(sys.argv) > 1 else None | |
args = sys.argv[2:] | |
- if not command: | |
+ if not command_name: | |
return print_usage() | |
+ user = config.load_user() | |
+ app = config.load_app(user.instance) if user else None | |
+ | |
try: | |
- run_command(command, args) | |
+ run_command(app, user, command_name, args) | |
except ConsoleError as e: | |
print_error(str(e)) | |
- except ApiError as e: | |
+ except api.ApiError as e: | |
print_error(str(e)) | |
diff --git a/toot/output.py b/toot/output.py | |
@@ -0,0 +1,37 @@ | |
+# -*- coding: utf-8 -*- | |
+from __future__ import unicode_literals | |
+from __future__ import print_function | |
+ | |
+import sys | |
+ | |
+ | |
+def _color(text, color): | |
+ return "\033[3{}m{}\033[0m".format(color, text) | |
+ | |
+ | |
+def red(text): | |
+ return _color(text, 1) | |
+ | |
+ | |
+def green(text): | |
+ return _color(text, 2) | |
+ | |
+ | |
+def yellow(text): | |
+ return _color(text, 3) | |
+ | |
+ | |
+def blue(text): | |
+ return _color(text, 4) | |
+ | |
+ | |
+def magenta(text): | |
+ return _color(text, 5) | |
+ | |
+ | |
+def cyan(text): | |
+ return _color(text, 6) | |
+ | |
+ | |
+def print_error(text): | |
+ print(red(text), file=sys.stderr) |