Backport of V11 developments (from #137)

This commit is contained in:
Andrea
2018-01-13 14:02:02 +01:00
parent f50d296c54
commit 0a18da1c6c
6 changed files with 243 additions and 80 deletions

View File

@@ -1,23 +1,7 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2014-2015 Therp BV <http://therp.nl>.
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
"""Generic parser for MT940 files, base for customized versions per bank."""
##############################################################################
#
# Copyright (C) 2014-2015 Therp BV <http://therp.nl>.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
##############################################################################
import re
import logging
from datetime import datetime
@@ -138,35 +122,65 @@ class MT940(object):
(line[:12], self.mt940_type)
)
def parse(self, data):
def is_mt940_statement(self, line):
"""determine if line is the start of a statement"""
if not bool(line.startswith('{4:')):
raise ValueError(
'The pre processed match %s does not seem to be a'
' valid %s MT940 format bank statement. Every statement'
' should start be a dict starting with {4:..' % line
)
def pre_process_data(self, data):
matches = []
self.is_mt940(line=data)
data = data.replace(
'-}', '}').replace('}{', '}\r\n{').replace('\r\n', '\n')
if data.startswith(':940:'):
for statement in data.replace(':940:', '').split(':20:'):
match = '{4:\n:20:' + statement + '}'
matches.append(match)
else:
tag_re = re.compile(
r'(\{4:[^{}]+\})',
re.MULTILINE)
matches = tag_re.findall(data)
return matches
def parse(self, data, header_lines=None):
"""Parse mt940 bank statement file contents."""
self.is_mt940(data)
iterator = data.replace('\r\n', '\n').split('\n').__iter__()
line = None
record_line = ''
try:
while True:
if not self.current_statement:
self.handle_header(line, iterator)
line = iterator.next()
if not self.is_tag(line) and not self.is_footer(line):
record_line = self.add_record_line(line, record_line)
continue
data = data.decode()
matches = self.pre_process_data(data)
for match in matches:
self.is_mt940_statement(line=match)
iterator = '\n'.join(
match.split('\n')[1:-1]).split('\n').__iter__()
line = None
record_line = ''
try:
while True:
if not self.current_statement:
self.handle_header(line, iterator,
header_lines=header_lines)
line = iterator.next()
if not self.is_tag(line) and not self.is_footer(line):
record_line = self.add_record_line(line, record_line)
continue
if record_line:
self.handle_record(record_line)
if self.is_footer(line):
self.handle_footer(line, iterator)
record_line = ''
continue
record_line = line
except StopIteration:
pass
if self.current_statement:
if record_line:
self.handle_record(record_line)
if self.is_footer(line):
self.handle_footer(line, iterator)
record_line = ''
continue
record_line = line
except StopIteration:
pass
if self.current_statement:
if record_line:
self.handle_record(record_line)
record_line = ''
self.statements.append(self.current_statement)
self.current_statement = None
self.statements.append(self.current_statement)
self.current_statement = None
return self.currency_code, self.account_number, self.statements
def add_record_line(self, line, record_line):
@@ -181,9 +195,11 @@ class MT940(object):
"""determine if a line has a tag"""
return line and bool(re.match(self.tag_regex, line))
def handle_header(self, dummy_line, iterator):
def handle_header(self, dummy_line, iterator, header_lines=None):
"""skip header lines, create current statement"""
for dummy_i in range(self.header_lines):
if not header_lines:
header_lines = self.header_lines
for dummy_i in range(header_lines):
iterator.next()
self.current_statement = {
'name': None,
@@ -229,10 +245,13 @@ class MT940(object):
# statement for each 20: tag encountered.
if not self.currency_code:
self.currency_code = data[7:10]
self.current_statement['balance_start'] = str2amount(
data[0],
data[10:]
)
self.current_statement['balance_start'] = str2amount(
data[0],
data[10:]
)
if not self.current_statement['date']:
self.current_statement['date'] = datetime.strptime(data[1:7],
'%y%m%d')
def handle_tag_61(self, data):
"""get transaction values"""
@@ -271,7 +290,7 @@ class MT940(object):
statement_name = self.current_statement['name'] or ''
test_empty_id = re.sub(r'[\s0]', '', statement_name)
is_account_number = statement_name.startswith(self.account_number)
if ((not test_empty_id) or is_account_number):
if not test_empty_id or is_account_number:
self.current_statement['name'] = '%s-%s' % (
self.account_number,
self.current_statement['date'].strftime('%Y-%m-%d'),