[IMP] account_bank_statement_import_txt_xlsx: Allow mapping with reference to column numbers, and concatenation

A new field 'File does not contain header line' is added in the Statement Sheet Mappings. If you set to True,
then you can map the columns by indicating in each field of the 'Columns' section the column number in the file.

We also allow to concatenate multiple columns in the file to a single column to a single field of the statement line.
You have to indicate the names of the columns separated by comma.
This commit is contained in:
Jordi Ballester Alomar
2022-12-16 11:09:18 +01:00
parent 5aa465bc14
commit 516df6c2bf
6 changed files with 154 additions and 100 deletions

View File

@@ -17,7 +17,7 @@
"multi_step_wizard", "multi_step_wizard",
"web_widget_dropdown_dynamic", "web_widget_dropdown_dynamic",
], ],
"external_dependencies": {"python": ["xlrd"]}, "external_dependencies": {"python": ["xlrd", "chardet"]},
"data": [ "data": [
"security/ir.model.access.csv", "security/ir.model.access.csv",
"data/map_data.xml", "data/map_data.xml",

View File

@@ -28,7 +28,9 @@ class AccountBankStatementImport(models.TransientModel):
self.ensure_one() self.ensure_one()
try: try:
Parser = self.env["account.bank.statement.import.sheet.parser"] Parser = self.env["account.bank.statement.import.sheet.parser"]
return Parser.parse(data_file, self.sheet_mapping_id) return Parser.parse(
data_file, self.sheet_mapping_id, self.attachment_ids[:1].name
)
except BaseException: except BaseException:
if self.env.context.get("account_bank_statement_import_txt_xlsx_test"): if self.env.context.get("account_bank_statement_import_txt_xlsx_test"):
raise raise

View File

@@ -54,6 +54,11 @@ class AccountBankStatementImportSheetMapping(models.Model):
) )
quotechar = fields.Char(string="Text qualifier", size=1, default='"') quotechar = fields.Char(string="Text qualifier", size=1, default='"')
timestamp_format = fields.Char(string="Timestamp Format", required=True) timestamp_format = fields.Char(string="Timestamp Format", required=True)
no_header = fields.Boolean(
"File does not contain header line",
help="When this occurs please indicate the column number in the Columns section "
"instead of the column name, considering that the first column is 0",
)
timestamp_column = fields.Char(string="Timestamp column", required=True) timestamp_column = fields.Char(string="Timestamp column", required=True)
currency_column = fields.Char( currency_column = fields.Char(
string="Currency column", string="Currency column",

View File

@@ -7,8 +7,10 @@ import logging
from datetime import datetime from datetime import datetime
from decimal import Decimal from decimal import Decimal
from io import StringIO from io import StringIO
from os import path
from odoo import _, api, models from odoo import _, api, models
from odoo.exceptions import UserError
_logger = logging.getLogger(__name__) _logger = logging.getLogger(__name__)
@@ -19,6 +21,14 @@ try:
except (ImportError, IOError) as err: # pragma: no cover except (ImportError, IOError) as err: # pragma: no cover
_logger.error(err) _logger.error(err)
try:
import chardet
except ImportError:
_logger.warning(
"chardet library not found, please install it "
"from http://pypi.python.org/pypi/chardet"
)
class AccountBankStatementImportSheetParser(models.TransientModel): class AccountBankStatementImportSheetParser(models.TransientModel):
_name = "account.bank.statement.import.sheet.parser" _name = "account.bank.statement.import.sheet.parser"
@@ -42,7 +52,7 @@ class AccountBankStatementImportSheetParser(models.TransientModel):
return list(next(csv_data)) return list(next(csv_data))
@api.model @api.model
def parse(self, data_file, mapping): def parse(self, data_file, mapping, filename):
journal = self.env["account.journal"].browse(self.env.context.get("journal_id")) journal = self.env["account.journal"].browse(self.env.context.get("journal_id"))
currency_code = (journal.currency_id or journal.company_id.currency_id).name currency_code = (journal.currency_id or journal.company_id.currency_id).name
account_number = journal.bank_account_id.acc_number account_number = journal.bank_account_id.acc_number
@@ -66,6 +76,7 @@ class AccountBankStatementImportSheetParser(models.TransientModel):
{ {
"balance_start": float(balance_start), "balance_start": float(balance_start),
"balance_end_real": float(balance_end), "balance_end_real": float(balance_end),
"name": _("%s: %s") % (journal.code, path.basename(filename),),
} }
) )
@@ -78,6 +89,43 @@ class AccountBankStatementImportSheetParser(models.TransientModel):
return currency_code, account_number, [data] return currency_code, account_number, [data]
def _get_column_indexes(self, header, column_name, mapping):
column_indexes = []
if mapping[column_name] and "," in mapping[column_name]:
# We have to concatenate the values
column_names_or_indexes = mapping[column_name].split(",")
else:
column_names_or_indexes = [mapping[column_name]]
for column_name_or_index in column_names_or_indexes:
if mapping.no_header:
column_index = (
column_name_or_index and int(column_name_or_index) or None
)
if column_index:
column_indexes.append(column_index)
else:
if column_name_or_index:
column_indexes.append(header.index(column_name_or_index))
return column_indexes
def _get_column_names(self):
return [
"timestamp_column",
"currency_column",
"amount_column",
"balance_column",
"original_currency_column",
"original_amount_column",
"debit_credit_column",
"transaction_id_column",
"description_column",
"notes_column",
"reference_column",
"partner_name_column",
"bank_name_column",
"bank_account_column",
]
def _parse_lines(self, mapping, data_file, currency_code): def _parse_lines(self, mapping, data_file, currency_code):
columns = dict() columns = dict()
try: try:
@@ -98,69 +146,40 @@ class AccountBankStatementImportSheetParser(models.TransientModel):
csv_options["delimiter"] = csv_delimiter csv_options["delimiter"] = csv_delimiter
if mapping.quotechar: if mapping.quotechar:
csv_options["quotechar"] = mapping.quotechar csv_options["quotechar"] = mapping.quotechar
csv_or_xlsx = reader( try:
StringIO(data_file.decode(mapping.file_encoding or "utf-8")), decoded_file = data_file.decode(mapping.file_encoding or "utf-8")
**csv_options except UnicodeDecodeError:
# Try auto guessing the format
detected_encoding = chardet.detect(data_file).get("encoding", False)
if not detected_encoding:
raise UserError(
_("No valid encoding was found for the attached file")
) )
decoded_file = data_file.decode(detected_encoding)
csv_or_xlsx = reader(StringIO(decoded_file), **csv_options)
if isinstance(csv_or_xlsx, tuple): if isinstance(csv_or_xlsx, tuple):
header = [str(value) for value in csv_or_xlsx[1].row_values(0)] header = [str(value) for value in csv_or_xlsx[1].row_values(0)]
else: else:
header = [value.strip() for value in next(csv_or_xlsx)] header = [value.strip() for value in next(csv_or_xlsx)]
columns["timestamp_column"] = header.index(mapping.timestamp_column) for column_name in self._get_column_names():
columns["currency_column"] = ( columns[column_name] = self._get_column_indexes(
header.index(mapping.currency_column) if mapping.currency_column else None header, column_name, mapping
)
columns["amount_column"] = header.index(mapping.amount_column)
columns["balance_column"] = (
header.index(mapping.balance_column) if mapping.balance_column else None
)
columns["original_currency_column"] = (
header.index(mapping.original_currency_column)
if mapping.original_currency_column
else None
)
columns["original_amount_column"] = (
header.index(mapping.original_amount_column)
if mapping.original_amount_column
else None
)
columns["debit_credit_column"] = (
header.index(mapping.debit_credit_column)
if mapping.debit_credit_column
else None
)
columns["transaction_id_column"] = (
header.index(mapping.transaction_id_column)
if mapping.transaction_id_column
else None
)
columns["description_column"] = (
header.index(mapping.description_column)
if mapping.description_column
else None
)
columns["notes_column"] = (
header.index(mapping.notes_column) if mapping.notes_column else None
)
columns["reference_column"] = (
header.index(mapping.reference_column) if mapping.reference_column else None
)
columns["partner_name_column"] = (
header.index(mapping.partner_name_column)
if mapping.partner_name_column
else None
)
columns["bank_name_column"] = (
header.index(mapping.bank_name_column) if mapping.bank_name_column else None
)
columns["bank_account_column"] = (
header.index(mapping.bank_account_column)
if mapping.bank_account_column
else None
) )
return self._parse_rows(mapping, currency_code, csv_or_xlsx, columns) return self._parse_rows(mapping, currency_code, csv_or_xlsx, columns)
def _get_values_from_column(self, values, columns, column_name):
indexes = columns[column_name]
content_l = []
max_index = len(values) - 1
for index in indexes:
if isinstance(index, int) and index <= max_index:
content_l.append(values[index])
else:
content_l.append(values[index])
if all(isinstance(content, str) for content in content_l):
return " ".join(content_l)
return content_l[0]
def _parse_rows(self, mapping, currency_code, csv_or_xlsx, columns): # noqa: C901 def _parse_rows(self, mapping, currency_code, csv_or_xlsx, columns): # noqa: C901
if isinstance(csv_or_xlsx, tuple): if isinstance(csv_or_xlsx, tuple):
rows = range(1, csv_or_xlsx[1].nrows) rows = range(1, csv_or_xlsx[1].nrows)
@@ -182,66 +201,70 @@ class AccountBankStatementImportSheetParser(models.TransientModel):
else: else:
values = list(row) values = list(row)
timestamp = values[columns["timestamp_column"]] timestamp = self._get_values_from_column(
values, columns, "timestamp_column"
)
currency = ( currency = (
values[columns["currency_column"]] self._get_values_from_column(values, columns, "currency_column")
if columns["currency_column"] is not None if columns["currency_column"]
else currency_code else currency_code
) )
amount = values[columns["amount_column"]] amount = self._get_values_from_column(values, columns, "amount_column")
balance = ( balance = (
values[columns["balance_column"]] self._get_values_from_column(values, columns, "balance_column")
if columns["balance_column"] is not None if columns["balance_column"]
else None else None
) )
original_currency = ( original_currency = (
values[columns["original_currency_column"]] self._get_values_from_column(
if columns["original_currency_column"] is not None values, columns, "original_currency_column"
)
if columns["original_currency_column"]
else None else None
) )
original_amount = ( original_amount = (
values[columns["original_amount_column"]] self._get_values_from_column(values, columns, "original_amount_column")
if columns["original_amount_column"] is not None if columns["original_amount_column"]
else None else None
) )
debit_credit = ( debit_credit = (
values[columns["debit_credit_column"]] self._get_values_from_column(values, columns, "debit_credit_column")
if columns["debit_credit_column"] is not None if columns["debit_credit_column"]
else None else None
) )
transaction_id = ( transaction_id = (
values[columns["transaction_id_column"]] self._get_values_from_column(values, columns, "transaction_id_column")
if columns["transaction_id_column"] is not None if columns["transaction_id_column"]
else None else None
) )
description = ( description = (
values[columns["description_column"]] self._get_values_from_column(values, columns, "description_column")
if columns["description_column"] is not None if columns["description_column"]
else None else None
) )
notes = ( notes = (
values[columns["notes_column"]] self._get_values_from_column(values, columns, "notes_column")
if columns["notes_column"] is not None if columns["notes_column"]
else None else None
) )
reference = ( reference = (
values[columns["reference_column"]] self._get_values_from_column(values, columns, "reference_column")
if columns["reference_column"] is not None if columns["reference_column"]
else None else None
) )
partner_name = ( partner_name = (
values[columns["partner_name_column"]] self._get_values_from_column(values, columns, "partner_name_column")
if columns["partner_name_column"] is not None if columns["partner_name_column"]
else None else None
) )
bank_name = ( bank_name = (
values[columns["bank_name_column"]] self._get_values_from_column(values, columns, "bank_name_column")
if columns["bank_name_column"] is not None if columns["bank_name_column"]
else None else None
) )
bank_account = ( bank_account = (
values[columns["bank_account_column"]] self._get_values_from_column(values, columns, "bank_account_column")
if columns["bank_account_column"] is not None if columns["bank_account_column"]
else None else None
) )

View File

@@ -39,6 +39,18 @@
<group> <group>
<field name="timestamp_format" /> <field name="timestamp_format" />
</group> </group>
<group>
<field name="no_header" />
<div
class="alert alert-warning"
role="alert"
attrs="{'invisible': [('no_header', '=', False)]}"
>
<span
class="fa fa-info-circle"
/> indicate the column number in the Columns section. The first column is 0.
</div>
</group>
<group <group
attrs="{'invisible': [('debit_credit_column', '=', False)]}" attrs="{'invisible': [('debit_credit_column', '=', False)]}"
> >
@@ -53,6 +65,16 @@
</group> </group>
</group> </group>
<group string="Columns"> <group string="Columns">
<group colspan="4" col="2">
<div class="alert alert-info" role="alert">
<span
class="fa fa-info-circle"
/> Add the column names or column number (when the file has no header).
You can concatenate multiple columns in the file into the same field, indicating the
column names or numbers separated by comma.
</div>
</group>
<group>
<field name="timestamp_column" /> <field name="timestamp_column" />
<field name="currency_column" /> <field name="currency_column" />
<field name="amount_column" /> <field name="amount_column" />
@@ -68,6 +90,7 @@
<field name="bank_name_column" /> <field name="bank_name_column" />
<field name="bank_account_column" /> <field name="bank_account_column" />
</group> </group>
</group>
</sheet> </sheet>
</form> </form>
</field> </field>

View File

@@ -1,4 +1,5 @@
# generated from manifests external_dependencies # generated from manifests external_dependencies
chardet
cryptography cryptography
ofxparse ofxparse
xlrd xlrd