diff --git a/account_banking/parsers/models.py b/account_banking/parsers/models.py index a8a0c3014..477b4536a 100644 --- a/account_banking/parsers/models.py +++ b/account_banking/parsers/models.py @@ -19,6 +19,7 @@ # ############################################################################## +import re from tools.translate import _ class mem_bank_statement(object): @@ -34,7 +35,7 @@ class mem_bank_statement(object): # Lock attributes to enable parsers to trigger non-conformity faults __slots__ = [ 'start_balance','end_balance', 'date', 'local_account', - 'local_currency', 'id', 'statements' + 'local_currency', 'id', 'transactions' ] def __init__(self, *args, **kwargs): super(mem_bank_statement, self).__init__(*args, **kwargs) @@ -353,6 +354,33 @@ class parser(object): name = "%s-%d" % (base, suffix) return name + def get_unique_account_identifier(self, cr, account): + """ + Get an identifier for a local bank account, based on the last + characters of the account number with minimum length 3. + The identifier should be unique amongst the company accounts + + Presumably, the bank account is one of the company accounts + itself but importing bank statements for non-company accounts + is not prevented anywhere else in the system so the 'account' + param being a company account is not enforced here either. + """ + def normalize(account_no): + return re.sub('\s', '', account_no) + + account = normalize(account) + cr.execute( + """SELECT acc_number FROM res_partner_bank + WHERE company_id IS NOT NULL""") + accounts = [normalize(row[0]) for row in cr.fetchall()] + tail_length = 3 + while tail_length <= len(account): + tail = account[-tail_length:] + if len([acc for acc in accounts if acc.endswith(tail)]) < 2: + return tail + tail_length += 1 + return account + def parse(self, cr, data): ''' Parse data. diff --git a/account_banking_camt/__init__.py b/account_banking_camt/__init__.py new file mode 100644 index 000000000..f8b1d5b3a --- /dev/null +++ b/account_banking_camt/__init__.py @@ -0,0 +1 @@ +import camt diff --git a/account_banking_camt/__openerp__.py b/account_banking_camt/__openerp__.py new file mode 100644 index 000000000..e7950c991 --- /dev/null +++ b/account_banking_camt/__openerp__.py @@ -0,0 +1,33 @@ +############################################################################## +# +# Copyright (C) 2013 Therp BV () +# All Rights Reserved +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +# +############################################################################## +{ + 'name': 'CAMT Format Bank Statements Import', + 'version': '0.1', + 'license': 'AGPL-3', + 'author': 'Therp BV', + 'website': 'https://launchpad.net/banking-addons', + 'category': 'Banking addons', + 'depends': ['account_banking'], + 'description': ''' +Module to import SEPA CAMT.053 Format bank statement files. Based +on the Banking addons framework. + ''', + 'installable': True, +} diff --git a/account_banking_camt/camt.py b/account_banking_camt/camt.py new file mode 100644 index 000000000..2ec31439f --- /dev/null +++ b/account_banking_camt/camt.py @@ -0,0 +1,278 @@ +# -*- coding: utf-8 -*- +############################################################################## +# +# Copyright (C) 2013 Therp BV () +# All Rights Reserved +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +# +############################################################################## + +from lxml import etree +from openerp.osv.orm import except_orm +from account_banking.parsers import models +from account_banking.parsers.convert import str2date + +bt = models.mem_bank_transaction + +class transaction(models.mem_bank_transaction): + + def __init__(self, values, *args, **kwargs): + super(transaction, self).__init__(*args, **kwargs) + for attr in values: + setattr(self, attr, values[attr]) + + def is_valid(self): + return not self.error_message + +class parser(models.parser): + code = 'CAMT' + country_code = 'NL' + name = 'Generic CAMT Format' + doc = '''\ +CAMT Format parser +''' + + def tag(self, node): + """ + Return the tag of a node, stripped from its namespace + """ + return node.tag[len(self.ns):] + + def assert_tag(self, node, expected): + """ + Get node's stripped tag and compare with expected + """ + assert self.tag(node) == expected, ( + "Expected tag '%s', got '%s' instead" % + (self.tag(node), expected)) + + def xpath(self, node, expr): + """ + Wrap namespaces argument into call to Element.xpath(): + + self.xpath(node, './ns:Acct/ns:Id') + """ + return node.xpath(expr, namespaces={'ns': self.ns[1:-1]}) + + def find(self, node, expr): + """ + Like xpath(), but return first result if any or else False + + Return None to test nodes for being truesy + """ + result = node.xpath(expr, namespaces={'ns': self.ns[1:-1]}) + if result: + return result[0] + return None + + def get_balance_type_node(self, node, balance_type): + """ + :param node: BkToCstmrStmt/Stmt/Bal node + :param balance type: one of 'OPBD', 'PRCD', 'ITBD', 'CLBD' + """ + code_expr = './ns:Bal/ns:Tp/ns:CdOrPrtry/ns:Cd[text()="%s"]/../../..' % balance_type + return self.xpath(node, code_expr) + + def parse_amount(self, node): + """ + Parse an element that contains both Amount and CreditDebitIndicator + + :return: signed amount + :returntype: float + """ + sign = -1 if node.find(self.ns + 'CdtDbtInd').text == 'DBIT' else 1 + return sign * float(node.find(self.ns + 'Amt').text) + + def get_start_balance(self, node): + """ + Find the (only) balance node with code OpeningBalance, or + the only one with code 'PreviousClosingBalance' + or the first balance node with code InterimBalance in + the case of preceeding pagination. + + :param node: BkToCstmrStmt/Stmt/Bal node + """ + nodes = ( + self.get_balance_type_node(node, 'OPBD') or + self.get_balance_type_node(node, 'PRCD') or + self.get_balance_type_node(node, 'ITBD')) + return self.parse_amount(nodes[0]) + + def get_end_balance(self, node): + """ + Find the (only) balance node with code ClosingBalance, or + the second (and last) balance node with code InterimBalance in + the case of continued pagination. + + :param node: BkToCstmrStmt/Stmt/Bal node + """ + nodes = ( + self.get_balance_type_node(node, 'CLBD') or + self.get_balance_type_node(node, 'ITBD')) + return self.parse_amount(nodes[-1]) + + def parse_Stmt(self, cr, node): + """ + Parse a single Stmt node. + + Be sure to craft a unique, but short enough statement identifier, + as it is used as the basis of the generated move lines' names + which overflow when using the full IBAN and CAMT statement id. + """ + statement = models.mem_bank_statement() + statement.local_account = ( + self.xpath(node, './ns:Acct/ns:Id/ns:IBAN')[0].text + if self.xpath(node, './ns:Acct/ns:Id/ns:IBAN') + else self.xpath(node, './ns:Acct/ns:Id/ns:Othr/ns:Id')[0].text) + + identifier = node.find(self.ns + 'Id').text + if identifier.upper().startswith('CAMT053'): + identifier = identifier[7:] + statement.id = self.get_unique_statement_id( + cr, "%s-%s" % ( + self.get_unique_account_identifier( + cr, statement.local_account), + identifier) + ) + + statement.local_currency = self.xpath(node, './ns:Acct/ns:Ccy')[0].text + statement.start_balance = self.get_start_balance(node) + statement.end_balance = self.get_end_balance(node) + number = 0 + for Ntry in self.xpath(node, './ns:Ntry'): + transaction_detail = self.parse_Ntry(Ntry) + if number == 0: + # Take the statement date from the first transaction + statement.date = str2date( + transaction_detail['execution_date'], "%Y-%m-%d") + number += 1 + transaction_detail['id'] = str(number).zfill(4) + statement.transactions.append( + transaction(transaction_detail)) + return statement + + def get_transfer_type(self, node): + """ + Map entry descriptions to transfer types. To extend with + proper mapping from BkTxCd/Domn/Cd/Fmly/Cd to transfer types + if we can get our hands on real life samples. + + For now, leave as a hook for bank specific overrides to map + properietary codes from BkTxCd/Prtry/Cd. + + :param node: Ntry node + """ + return bt.ORDER + + def parse_Ntry(self, node): + """ + :param node: Ntry node + """ + entry_details = { + 'execution_date': self.xpath(node, './ns:BookgDt/ns:Dt')[0].text, + 'effective_date': self.xpath(node, './ns:ValDt/ns:Dt')[0].text, + 'transfer_type': self.get_transfer_type(node), + 'transferred_amount': self.parse_amount(node) + } + TxDtls = self.xpath(node, './ns:NtryDtls/ns:TxDtls') + if len(TxDtls) == 1: + vals = self.parse_TxDtls(TxDtls[0], entry_details) + else: + vals = entry_details + return vals + + def get_party_values(self, TxDtls): + """ + Determine to get either the debtor or creditor party node + and extract the available data from it + """ + vals = {} + party_type = self.find( + TxDtls, '../../ns:CdtDbtInd').text == 'CRDT' and 'Dbtr' or 'Cdtr' + party_node = self.find(TxDtls, './ns:RltdPties/ns:%s' % party_type) + account_node = self.find( + TxDtls, './ns:RltdPties/ns:%sAcct/ns:Id' % party_type) + bic_node = self.find( + TxDtls, + './ns:RltdAgts/ns:%sAgt/ns:FinInstnId/ns:BIC' % party_type) + if party_node is not None: + name_node = self.find(party_node, './ns:Nm') + vals['remote_owner'] = ( + name_node.text if name_node is not None else False) + country_node = self.find(party_node, './ns:PstlAdr/ns:Ctry') + vals['remote_owner_country'] = ( + country_node.text if country_node is not None else False) + address_node = self.find(party_node, './ns:PstlAdr/ns:AdrLine') + if address_node is not None: + vals['remote_owner_address'] = [address_node.text] + if account_node is not None: + iban_node = self.find(account_node, './ns:IBAN') + if iban_node is not None: + vals['remote_account'] = iban_node.text + if bic_node is not None: + vals['remote_bank_bic'] = bic_node.text + else: + domestic_node = self.find(account_node, './ns:Othr/ns:Id') + vals['remote_account'] = ( + domestic_node.text if domestic_node is not None else False) + return vals + + def parse_TxDtls(self, TxDtls, entry_values): + """ + Parse a single TxDtls node + """ + vals = dict(entry_values) + unstructured = self.xpath(TxDtls, './ns:RmtInf/ns:Ustrd') + if unstructured: + vals['message'] = ' '.join([x.text for x in unstructured]) + structured = self.find( + TxDtls, './ns:RmtInf/ns:Strd/ns:CdtrRefInf/ns:Ref') + if structured is None or not structured.text: + structured = self.find(TxDtls, './ns:Refs/ns:EndToEndId') + if structured is not None: + vals['reference'] = structured.text + else: + if vals.get('message'): + vals['reference'] = vals['message'] + vals.update(self.get_party_values(TxDtls)) + return vals + + def check_version(self): + """ + Sanity check the document's namespace + """ + if not self.ns.startswith('{urn:iso:std:iso:20022:tech:xsd:camt.'): + raise except_orm( + "Error", + "This does not seem to be a CAMT format bank statement.") + + if not self.ns.startswith('{urn:iso:std:iso:20022:tech:xsd:camt.053.'): + raise except_orm( + "Error", + "Only CAMT.053 is supported at the moment.") + return True + + def parse(self, cr, data): + """ + Parse a CAMT053 XML file + """ + root = etree.fromstring(data) + self.ns = root.tag[:root.tag.index("}") + 1] + self.check_version() + self.assert_tag(root[0][0], 'GrpHdr') + statements = [] + for node in root[0][1:]: + statements.append(self.parse_Stmt(cr, node)) + return statements