import datetime
import functools
import json
import os
import uuid
import requests
from todoist import models
from todoist.managers.activity import ActivityManager
from todoist.managers.backups import BackupsManager
from todoist.managers.biz_invitations import BizInvitationsManager
from todoist.managers.business_users import BusinessUsersManager
from todoist.managers.collaborator_states import CollaboratorStatesManager
from todoist.managers.collaborators import CollaboratorsManager
from todoist.managers.completed import CompletedManager
from todoist.managers.emails import EmailsManager
from todoist.managers.filters import FiltersManager
from todoist.managers.invitations import InvitationsManager
from todoist.managers.items import ItemsManager
from todoist.managers.labels import LabelsManager
from todoist.managers.live_notifications import LiveNotificationsManager
from todoist.managers.locations import LocationsManager
from todoist.managers.notes import NotesManager, ProjectNotesManager
from todoist.managers.projects import ProjectsManager
from todoist.managers.quick import QuickManager
from todoist.managers.reminders import RemindersManager
from todoist.managers.templates import TemplatesManager
from todoist.managers.uploads import UploadsManager
from todoist.managers.user import UserManager
from todoist.managers.user_settings import UserSettingsManager
[docs]class SyncError(Exception):
pass
[docs]class TodoistAPI(object):
"""
Implements the API that makes it possible to interact with a Todoist user
account and its data.
"""
_serialize_fields = ('token', 'api_endpoint', 'sync_token', 'state',
'temp_ids')
[docs] @classmethod
def deserialize(cls, data):
obj = cls()
for key in cls._serialize_fields:
if key in data:
setattr(obj, key, data[key])
return obj
def __init__(self,
token='',
api_endpoint='https://todoist.com',
session=None,
cache='~/.todoist-sync/'):
self.api_endpoint = api_endpoint
self.reset_state()
self.token = token # User's API token
self.temp_ids = {} # Mapping of temporary ids to real ids
self.queue = [] # Requests to be sent are appended here
self.session = session or requests.Session(
) # Session instance for requests
# managers
self.projects = ProjectsManager(self)
self.project_notes = ProjectNotesManager(self)
self.items = ItemsManager(self)
self.labels = LabelsManager(self)
self.filters = FiltersManager(self)
self.notes = NotesManager(self)
self.live_notifications = LiveNotificationsManager(self)
self.reminders = RemindersManager(self)
self.locations = LocationsManager(self)
self.invitations = InvitationsManager(self)
self.biz_invitations = BizInvitationsManager(self)
self.user = UserManager(self)
self.user_settings = UserSettingsManager(self)
self.collaborators = CollaboratorsManager(self)
self.collaborator_states = CollaboratorStatesManager(self)
self.completed = CompletedManager(self)
self.uploads = UploadsManager(self)
self.activity = ActivityManager(self)
self.business_users = BusinessUsersManager(self)
self.templates = TemplatesManager(self)
self.backups = BackupsManager(self)
self.quick = QuickManager(self)
self.emails = EmailsManager(self)
if cache: # Read and write user state on local disk cache
self.cache = os.path.expanduser(cache)
self._read_cache()
else:
self.cache = None
[docs] def reset_state(self):
self.sync_token = '*'
self.state = { # Local copy of all of the user's objects
'collaborator_states': [],
'collaborators': [],
'day_orders': {},
'day_orders_timestamp': '',
'filters': [],
'items': [],
'labels': [],
'live_notifications': [],
'live_notifications_last_read_id': -1,
'locations': [],
'notes': [],
'project_notes': [],
'projects': [],
'reminders': [],
'settings_notifications': {},
'user': {},
'user_settings': {},
}
def __getitem__(self, key):
return self.state[key]
[docs] def serialize(self):
return {key: getattr(self, key) for key in self._serialize_fields}
[docs] def get_api_url(self):
return '%s/API/v8/' % self.api_endpoint
def _update_state(self, syncdata):
"""
Updates the local state, with the data returned by the server after a
sync.
"""
# Check sync token first
if 'sync_token' in syncdata:
self.sync_token = syncdata['sync_token']
# It is straightforward to update these type of data, since it is
# enough to just see if they are present in the sync data, and then
# either replace the local values or update them.
if 'day_orders' in syncdata:
self.state['day_orders'].update(syncdata['day_orders'])
if 'day_orders_timestamp' in syncdata:
self.state['day_orders_timestamp'] = syncdata[
'day_orders_timestamp']
if 'live_notifications_last_read_id' in syncdata:
self.state['live_notifications_last_read_id'] = syncdata[
'live_notifications_last_read_id']
if 'locations' in syncdata:
self.state['locations'] = syncdata['locations']
if 'settings_notifications' in syncdata:
self.state['settings_notifications'].update(
syncdata['settings_notifications'])
if 'user' in syncdata:
self.state['user'].update(syncdata['user'])
if 'user_settings' in syncdata:
self.state['user_settings'].update(syncdata['user_settings'])
# Updating these type of data is a bit more complicated, since it is
# necessary to find out whether an object in the sync data is new,
# updates an existing object, or marks an object to be deleted. But
# the same procedure takes place for each of these types of data.
resp_models_mapping = [
('collaborators', models.Collaborator),
('collaborator_states', models.CollaboratorState),
('filters', models.Filter),
('items', models.Item),
('labels', models.Label),
('live_notifications', models.LiveNotification),
('notes', models.Note),
('project_notes', models.ProjectNote),
('projects', models.Project),
('reminders', models.Reminder),
]
for datatype, model in resp_models_mapping:
if datatype not in syncdata:
continue
# Process each object of this specific type in the sync data.
for remoteobj in syncdata[datatype]:
# Find out whether the object already exists in the local
# state.
localobj = self._find_object(datatype, remoteobj)
if localobj is not None:
# If the object is already present in the local state, then
# we either update it, or if marked as to be deleted, we
# remove it.
is_deleted = remoteobj.get('is_deleted', 0)
if is_deleted == 0 or is_deleted is False:
localobj.data.update(remoteobj)
else:
self.state[datatype].remove(localobj)
else:
# If not, then the object is new and it should be added,
# unless it is marked as to be deleted (in which case it's
# ignored).
is_deleted = remoteobj.get('is_deleted', 0)
if is_deleted == 0 or is_deleted is False:
newobj = model(remoteobj, self)
self.state[datatype].append(newobj)
def _read_cache(self):
if not self.cache:
return
try:
os.makedirs(self.cache)
except OSError:
if not os.path.isdir(self.cache):
raise
try:
with open(self.cache + self.token + '.json') as f:
state = f.read()
state = json.loads(state)
self._update_state(state)
with open(self.cache + self.token + '.sync') as f:
sync_token = f.read()
self.sync_token = sync_token
except:
return
def _write_cache(self):
if not self.cache:
return
result = json.dumps(
self.state, indent=2, sort_keys=True, default=state_default)
with open(self.cache + self.token + '.json', 'w') as f:
f.write(result)
with open(self.cache + self.token + '.sync', 'w') as f:
f.write(self.sync_token)
def _find_object(self, objtype, obj):
"""
Searches for an object in the local state, depending on the type of
object, and then on its primary key is. If the object is found it is
returned, and if not, then None is returned.
"""
if objtype == 'collaborators':
return self.collaborators.get_by_id(obj['id'])
elif objtype == 'collaborator_states':
return self.collaborator_states.get_by_ids(obj['project_id'],
obj['user_id'])
elif objtype == 'filters':
return self.filters.get_by_id(obj['id'], only_local=True)
elif objtype == 'items':
return self.items.get_by_id(obj['id'], only_local=True)
elif objtype == 'labels':
return self.labels.get_by_id(obj['id'], only_local=True)
elif objtype == 'live_notifications':
return self.live_notifications.get_by_id(obj['id'])
elif objtype == 'notes':
return self.notes.get_by_id(obj['id'], only_local=True)
elif objtype == 'project_notes':
return self.project_notes.get_by_id(obj['id'], only_local=True)
elif objtype == 'projects':
return self.projects.get_by_id(obj['id'], only_local=True)
elif objtype == 'reminders':
return self.reminders.get_by_id(obj['id'], only_local=True)
else:
return None
def _replace_temp_id(self, temp_id, new_id):
"""
Replaces the temporary id generated locally when an object was first
created, with a real Id supplied by the server. True is returned if
the temporary id was found and replaced, and False otherwise.
"""
# Go through all the objects for which we expect the temporary id to be
# replaced by a real one.
for datatype in [
'filters', 'items', 'labels', 'notes', 'project_notes',
'projects', 'reminders'
]:
for obj in self.state[datatype]:
if obj.temp_id == temp_id:
obj['id'] = new_id
return True
return False
def _get(self, call, url=None, **kwargs):
"""
Sends an HTTP GET request to the specified URL, and returns the JSON
object received (if any), or whatever answer it got otherwise.
"""
if not url:
url = self.get_api_url()
response = self.session.get(url + call, **kwargs)
try:
return response.json()
except ValueError:
return response.text
def _post(self, call, url=None, **kwargs):
"""
Sends an HTTP POST request to the specified URL, and returns the JSON
object received (if any), or whatever answer it got otherwise.
"""
if not url:
url = self.get_api_url()
response = self.session.post(url + call, **kwargs)
try:
return response.json()
except ValueError:
return response.text
# Sync
[docs] def generate_uuid(self):
"""
Generates a uuid.
"""
return str(uuid.uuid1())
[docs] def sync(self, commands=None):
"""
Sends to the server the changes that were made locally, and also
fetches the latest updated data from the server.
"""
post_data = {
'token': self.token,
'sync_token': self.sync_token,
'day_orders_timestamp': self.state['day_orders_timestamp'],
'include_notification_settings': 1,
'resource_types': json_dumps(['all']),
'commands': json_dumps(commands or []),
}
response = self._post('sync', data=post_data)
if 'temp_id_mapping' in response:
for temp_id, new_id in response['temp_id_mapping'].items():
self.temp_ids[temp_id] = new_id
self._replace_temp_id(temp_id, new_id)
self._update_state(response)
self._write_cache()
return response
[docs] def commit(self, raise_on_error=True):
"""
Commits all requests that are queued. Note that, without calling this
method none of the changes that are made to the objects are actually
synchronized to the server, unless one of the aforementioned Sync API
calls are called directly.
"""
if len(self.queue) == 0:
return
ret = self.sync(commands=self.queue)
del self.queue[:]
if 'sync_status' in ret:
if raise_on_error:
for k, v in ret['sync_status'].items():
if v != 'ok':
raise SyncError(k, v)
return ret
# Miscellaneous
[docs] def query(self, queries, **kwargs):
"""
DEPRECATED: query endpoint is deprecated for a long time and this
method will be removed in the next major version of todoist-python
"""
params = {'queries': json_dumps(queries), 'token': self.token}
params.update(kwargs)
return self._get('query', params=params)
[docs] def add_item(self, content, **kwargs):
"""
Adds a new task.
"""
params = {'token': self.token, 'content': content}
params.update(kwargs)
if 'labels' in params:
params['labels'] = str(params['labels'])
return self._get('add_item', params=params)
# Class
def __repr__(self):
name = self.__class__.__name__
unsaved = '*' if len(self.queue) > 0 else ''
email = self.user.get('email')
email_repr = repr(email) if email else '<not synchronized>'
return '%s%s(%s)' % (name, unsaved, email_repr)
[docs]def state_default(obj):
return obj.data
[docs]def json_default(obj):
if isinstance(obj, datetime.datetime):
return obj.strftime('%Y-%m-%dT%H:%M:%S')
elif isinstance(obj, datetime.date):
return obj.strftime('%Y-%m-%d')
elif isinstance(obj, datetime.time):
return obj.strftime('%H:%M:%S')
json_dumps = functools.partial(
json.dumps, separators=',:', default=json_default)