Files
tra_backend/hr_employee_shift/models/hr_shift_payroll.py
2020-07-21 10:09:46 +00:00

177 lines
7.8 KiB
Python

# -*- coding: utf-8 -*-
from datetime import timedelta
from odoo import models, fields, api, _, tools
from datetime import datetime, time
import datetime
import math
from pytz import utc
from odoo.tools.float_utils import float_round
from collections import namedtuple
class HrPayroll(models.Model):
_inherit = 'hr.payslip'
@api.model
def get_worked_day_lines(self, contract_ids, date_from, date_to):
"""
@param contract_ids: list of contract id
@return: returns a list of dict containing the input that should be applied for the given contract between date_from and date_to
"""
def was_on_leave_interval(employee_id, date_from, date_to):
date_from = fields.Datetime.to_string(date_from)
date_to = fields.Datetime.to_string(date_to)
return self.env['hr.leave'].search([
('state', '=', 'validate'),
('employee_id', '=', employee_id),
# ('type', '=', 'remove'),
('date_from', '<=', date_from),
('date_to', '>=', date_to)
], limit=1)
res = []
# fill only if the contract as a working schedule linked
uom_day = self.env.ref('product.product_uom_day', raise_if_not_found=False)
for contract in contract_ids:
uom_hour = self.env.ref('product.product_uom_hour', raise_if_not_found=False)
interval_data = []
holidays = self.env['hr.leave']
attendances = {
'name': _("Normal Working Days paid at 100%"),
'sequence': 1,
'code': 'WORK100',
'number_of_days': 0.0,
'number_of_hours': 0.0,
'contract_id': contract.id,
}
leaves = {}
# Gather all intervals and holidays
for days in contract.shift_schedule:
start_date = datetime.datetime.strptime(str(days.start_date), tools.DEFAULT_SERVER_DATE_FORMAT)
end_date = datetime.datetime.strptime(str(days.end_date), tools.DEFAULT_SERVER_DATE_FORMAT)
nb_of_days = (days.end_date - days.start_date).days + 1
for day in range(0, nb_of_days):
working_intervals_on_day = days.hr_shift._get_day_work_intervals(
start_date + timedelta(days=day))
for interval in working_intervals_on_day:
interval_data.append(
(interval, was_on_leave_interval(contract.employee_id.id, interval[0], interval[1])))
# Extract information from previous data. A working interval is considered:
# - as a leave if a hr.holiday completely covers the period
# - as a working period instead
for interval, holiday in interval_data:
holidays |= holiday
hours = (interval[1] - interval[0]).total_seconds() / 3600.0
if holiday:
# if he was on leave, fill the leaves dict
if holiday.holiday_status_id.name in leaves:
leaves[holiday.holiday_status_id.name]['number_of_hours'] += hours
else:
leaves[holiday.holiday_status_id.name] = {
'name': holiday.holiday_status_id.name,
'sequence': 5,
'code': holiday.holiday_status_id.name,
'number_of_days': 0.0,
'number_of_hours': hours,
'contract_id': contract.id,
}
else:
# add the input vals to tmp (increment if existing)
attendances['number_of_hours'] += hours
# Clean-up the results
leaves = [value for key, value in leaves.items()]
for data in [attendances] + leaves:
data['number_of_days'] = uom_hour._compute_quantity(data['number_of_hours'], uom_day) \
if uom_day and uom_hour \
else data['number_of_hours'] / 8.0
res.append(data)
return res
class Calendar(models.Model):
_inherit = 'resource.calendar'
_interval_obj = namedtuple('Interval', ('start_datetime', 'end_datetime', 'data'))
def string_to_datetime(self, value):
""" Convert the given string value to a datetime in UTC. """
return utc.localize(fields.Datetime.from_string(value))
def float_to_time(self, hours):
""" Convert a number of hours into a time object. """
if hours == 24.0:
return time.max
fractional, integral = math.modf(hours)
return time(int(integral), int(float_round(60 * fractional, precision_digits=0)), 0)
def _interval_new(self, start_datetime, end_datetime, kw=None):
kw = kw if kw is not None else dict()
kw.setdefault('attendances', self.env['resource.calendar.attendance'])
kw.setdefault('leaves', self.env['resource.calendar.leaves'])
return self._interval_obj(start_datetime, end_datetime, kw)
def _get_day_work_intervals(self, day_date, start_time=None, end_time=None, compute_leaves=False,
resource_id=None):
self.ensure_one()
if not start_time:
start_time = datetime.time.min
if not end_time:
end_time = datetime.time.max
working_intervals = [att_interval for att_interval in
self._iter_day_attendance_intervals(day_date, start_time, end_time)]
# filter according to leaves
if compute_leaves:
leaves = self._get_leave_intervals(
resource_id=resource_id,
start_datetime=datetime.datetime.combine(day_date, start_time),
end_datetime=datetime.datetime.combine(day_date, end_time))
working_intervals = [
sub_interval
for interval in working_intervals
for sub_interval in self._leave_intervals(interval, leaves)]
# adapt tz
return [self._interval_new(
self.string_to_datetime(interval[0]),
self.string_to_datetime(interval[1]),
interval[2]) for interval in working_intervals]
def _get_day_attendances(self, day_date, start_time, end_time):
""" Given a day date, return matching attendances. Those can be limited
by starting and ending time objects. """
self.ensure_one()
weekday = day_date.weekday()
attendances = self.env['resource.calendar.attendance']
for attendance in self.attendance_ids.filtered(
lambda att:
int(att.dayofweek) == weekday and
not (att.date_from and fields.Date.from_string(att.date_from) > day_date) and
not (att.date_to and fields.Date.from_string(att.date_to) < day_date)):
if start_time and self.float_to_time(attendance.hour_to) < start_time:
continue
if end_time and self.float_to_time(attendance.hour_from) > end_time:
continue
attendances |= attendance
return attendances
def _iter_day_attendance_intervals(self, day_date, start_time, end_time):
""" Get an iterator of all interval of current day attendances. """
for calendar_working_day in self._get_day_attendances(day_date, start_time, end_time):
from_time = self.float_to_time(calendar_working_day.hour_from)
to_time = self.float_to_time(calendar_working_day.hour_to)
dt_f = datetime.datetime.combine(day_date, max(from_time, start_time))
dt_t = datetime.datetime.combine(day_date, min(to_time, end_time))
yield self._interval_new(dt_f, dt_t, {'attendances': calendar_working_day})