task-web/tutorial01/onshift/views.py

156 lines
5.8 KiB
Python

from django.shortcuts import get_object_or_404, render
from django.http import HttpResponse, HttpResponseRedirect
from django.utils.html import format_html
from django.utils.dateparse import parse_datetime
from django.utils import timezone
from django.urls import reverse
from django.conf import settings
from django.core.cache import cache
import pytz
import datetime
from planday import Planday
from django_slack import slack_message
from .models import LogEntry, CleaningTask, CleaningTaskSignoff
local_timezone = pytz.timezone('Europe/Copenhagen')
planday = Planday(settings.PLANDAY_APPID, settings.PLANDAY_REFRESH_TOKEN)
planday.refresh_access_token()
def today():
return datetime.date.today().isoformat()
def get_employee(employee_id):
# FIXME: Error handling
# FIXME: Use an employee cache, not the default cache
employee = cache.get(employee_id)
if employee:
return employee
employee = planday.get_employee(employee_id)
if employee == None:
return
cache.set(employee_id, employee, timeout=60*60)
return employee
def index(request):
return onshift_today(request)
def onshift_today(request):
shifts = planday.get_shifts(from_=today(), to=today())
for shift in shifts:
status = shift['status']
shift['start_time'] = parse_datetime(shift['startDateTime']).time().isoformat()
shift['end_time'] = parse_datetime(shift['endDateTime']).time().isoformat()
if 'employeeId' in shift:
employee_id = shift['employeeId']
employee = get_employee(employee_id)
shift['employee'] = employee
return render(request, 'mellemfolk/onshift.html', { 'shifts': shifts })
def onshift_now_view(request):
shifts = shifts_now()
for shift in shifts:
status = shift['status']
shift['start_time'] = parse_datetime(shift['startDateTime']).time().isoformat()
shift['end_time'] = parse_datetime(shift['endDateTime']).time().isoformat()
if 'employeeId' in shift:
employee_id = shift['employeeId']
employee = get_employee(employee_id)
shift['employee'] = employee
return render(request, 'mellemfolk/onshift.html', { 'shifts': shifts })
def shifts_now():
now = timezone.now()
now = now.astimezone(local_timezone)
def is_now(shift):
start = local_timezone.localize(parse_datetime(shift['startDateTime']))
end = local_timezone.localize(parse_datetime(shift['endDateTime']))
return start <= now <= end
shifts = planday.get_shifts(from_=today(), to=today())
return list(filter(is_now, shifts))
def employee(request, employee_id):
employee = get_employee(employee_id)
return render(request, 'mellemfolk/employee.html',
{ 'first_name': employee['firstName'],
'last_name': employee['lastName'],
'email': employee['email'], 'cell_phone': employee['cellPhone'],
})
def submit_log_entry(request):
if 'log_text' not in request.POST:
return # FIXME
log_text = request.POST['log_text']
author = request.POST.get('author', default='')
log_entry = LogEntry(author=author,
log_text=log_text, pub_date=timezone.now())
log_entry.save()
s = slack_message('mellemfolk/log_entry.slack', {
'log_entry': log_entry,
})
return HttpResponseRedirect(reverse('mellemfolk:log_entry', args=(log_entry.id,)))
def log_entry(request, log_entry_id):
log_entry = get_object_or_404(LogEntry, pk=log_entry_id)
return render(request, 'mellemfolk/log-entry.html',
{ 'log_entry': log_entry })
def log_entries(request):
latest_log_entries = LogEntry.objects.order_by('-pub_date')[:25]
employees = filter(None, map(
lambda s: planday.get_employee(s['employeeId'])
if 'employeeId' in s
else None,
shifts_now()))
employees = list(map(lambda e: "{firstName} {lastName}".format(**e),
employees))
return render(request, 'mellemfolk/log-entries.html',
{ 'latest_log_entries': latest_log_entries,
'employees': employees, })
def cleaning_task(request, cleaning_task_id):
cleaning_task = CleaningTask.objects.get(pk=cleaning_task_id)
employees = filter(None, map(
lambda s: planday.get_employee(s['employeeId'])
if 'employeeId' in s
else None,
shifts_now()))
employees = " & ".join(map(
lambda e: "{0} {1}".format(e['firstName'], e['lastName']),
employees))
signoffs = []
signoffs = CleaningTaskSignoff.objects.filter(task=cleaning_task)\
.order_by('-date')[:5]
return render(request, 'mellemfolk/cleaning-task.html', {
'task': cleaning_task,
'employees': employees,
'signoffs': signoffs, })
def cleaning_schedule(request):
tasks = CleaningTask.objects.all()
if 'show-inactive' not in request.GET:
tasks = tasks.filter(active=True)
return render(request, 'mellemfolk/cleaning-schedule.html',
{ 'tasks': tasks })
def cleaning_schedule_now(request):
now = timezone.now().astimezone(local_timezone)
day_of_the_week = now.weekday()
time = now.time()
tasks = CleaningTask.objects.filter(active=True,
shift__day_of_the_week=day_of_the_week,
shift__shift_slot__start__lte=time,
shift__shift_slot__end__gte=time)
return render(request, 'mellemfolk/cleaning-schedule.html',
{ 'tasks': tasks })
def signoff_task(request, cleaning_task_id):
if 'signoff' not in request.POST:
return # FIXME
employee = request.POST['signoff']
task = CleaningTask.objects.get(pk=cleaning_task_id)
signoff = CleaningTaskSignoff(task = task, employee = employee)
signoff.save()
return HttpResponseRedirect(reverse('mellemfolk:cleaning_task', args=(cleaning_task_id,)))