telium-test.py 7.05 KB
Newer Older
François C. committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218
#! /usr/bin/python
# -*- encoding: utf-8 -*-
##############################################################################
#
#    Hardware Telium Test script
#    Copyright (C) 2014 Akretion (http://www.akretion.com)
#    @author Alexis de Lattre <alexis.delattre@akretion.com>
#
#    This program is free software: you can redistribute it and/or modify
#    it under the terms of the GNU Affero General Public License as
#    published by the Free Software Foundation, either version 3 of the
#    License, or (at your option) any later version.
#
#    This program is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU Affero General Public License for more details.
#
#    You should have received a copy of the GNU Affero General Public License
#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
##############################################################################


from serial import Serial
import curses.ascii
import time
import pycountry


DEVICE = '/dev/ttyACM0'
DEVICE_RATE = 9600
PAYMENT_MODE = 'card'  # 'card' ou 'check'
CURRENCY_ISO = 'EUR'
AMOUNT = 12.42


def serial_write(serial, text):
    assert isinstance(text, str), 'text must be a string'
    serial.write(text)


def initialize_msg(serial):
    max_attempt = 3
    attempt_nr = 0
    while attempt_nr < max_attempt:
        attempt_nr += 1
        send_one_byte_signal(serial, 'ENQ')
        if get_one_byte_answer(serial, 'ACK'):
            return True
        else:
            print "Terminal : SAME PLAYER TRY AGAIN"
            send_one_byte_signal(serial, 'EOT')
            # Wait 1 sec between each attempt
            time.sleep(1)
    return False


def send_one_byte_signal(serial, signal):
    ascii_names = curses.ascii.controlnames
    assert signal in ascii_names, 'Wrong signal'
    char = ascii_names.index(signal)
    serial_write(serial, chr(char))
    print 'Signal %s sent to terminal' % signal


def get_one_byte_answer(serial, expected_signal):
    ascii_names = curses.ascii.controlnames
    one_byte_read = serial.read(1)
    expected_char = ascii_names.index(expected_signal)
    if one_byte_read == chr(expected_char):
        print "%s received from terminal" % expected_signal
        return True
    else:
        return False


def prepare_data_to_send():
    if PAYMENT_MODE == 'check':
        payment_mode = 'C'
    elif PAYMENT_MODE == 'card':
        payment_mode = '1'
    else:
        print "The payment mode '%s' is not supported" % PAYMENT_MODE
        return False
    cur_iso_letter = CURRENCY_ISO.upper()
    try:
        cur = pycountry.currencies.get(letter=cur_iso_letter)
        cur_numeric = str(cur.numeric)
    except:
        print "Currency %s is not recognized" % cur_iso_letter
        return False
    data = {
        'pos_number': str(1).zfill(2),
        'answer_flag': '0',
        'transaction_type': '0',
        'payment_mode': payment_mode,
        'currency_numeric': cur_numeric.zfill(3),
        'private': ' ' * 10,
        'delay': 'A011',
        'auto': 'B010',
        'amount_msg': ('%.0f' % (AMOUNT * 100)).zfill(8),
    }
    return data


def generate_lrc(real_msg_with_etx):
    lrc = 0
    for char in real_msg_with_etx:
        lrc ^= ord(char)
    return lrc


def send_message(serial, data):
    '''We use protocol E+'''
    ascii_names = curses.ascii.controlnames
    real_msg = (
        data['pos_number'] +
        data['amount_msg'] +
        data['answer_flag'] +
        data['payment_mode'] +
        data['transaction_type'] +
        data['currency_numeric'] +
        data['private'] +
        data['delay'] +
        data['auto'])
    print 'Real message to send = %s' % real_msg
    assert len(real_msg) == 34, 'Wrong length for protocol E+'
    real_msg_with_etx = real_msg + chr(ascii_names.index('ETX'))
    lrc = generate_lrc(real_msg_with_etx)
    message = chr(ascii_names.index('STX')) + real_msg_with_etx + chr(lrc)
    serial_write(serial, message)
    print 'Message sent to terminal'


def compare_data_vs_answer(data, answer_data):
    for field in [
            'pos_number', 'amount_msg',
            'currency_numeric', 'private']:
        if data[field] != answer_data[field]:
            print (
                "Field %s has value '%s' in data and value '%s' in answer"
                % (field, data[field], answer_data[field]))


def parse_terminal_answer(real_msg, data):
    answer_data = {
        'pos_number': real_msg[0:2],
        'transaction_result': real_msg[2],
        'amount_msg': real_msg[3:11],
        'payment_mode': real_msg[11],
        'currency_numeric': real_msg[12:15],
        'private': real_msg[15:26],
    }
    print 'answer_data = %s' % answer_data
    compare_data_vs_answer(data, answer_data)
    return answer_data


def get_answer_from_terminal(serial, data):
    ascii_names = curses.ascii.controlnames
    full_msg_size = 1+2+1+8+1+3+10+1+1
    msg = serial.read(size=full_msg_size)
    print '%d bytes read from terminal' % full_msg_size
    assert len(msg) == full_msg_size, 'Answer has a wrong size'
    if msg[0] != chr(ascii_names.index('STX')):
        print 'The first byte of the answer from terminal should be STX'
    if msg[-2] != chr(ascii_names.index('ETX')):
        print 'The byte before final of the answer from terminal should be ETX'
    lrc = msg[-1]
    computed_lrc = chr(generate_lrc(msg[1:-1]))
    if computed_lrc != lrc:
        print 'The LRC of the answer from terminal is wrong'
    real_msg = msg[1:-2]
    print 'Real answer received = %s' % real_msg
    return parse_terminal_answer(real_msg, data)


def transaction_start():
    '''This function sends the data to the serial/usb port.
    '''
    serial = False
    try:
        print(
            'Opening serial port %s for payment terminal with '
            'baudrate %d' % (DEVICE, DEVICE_RATE))
        # IMPORTANT : don't modify timeout=3 seconds
        # This parameter is very important ; the Telium spec say
        # that we have to wait to up 3 seconds to get LRC
        serial = Serial(
            DEVICE, DEVICE_RATE, timeout=3)
        print 'serial.is_open = %s' % serial.isOpen()
        if initialize_msg(serial):
            data = prepare_data_to_send()
            if not data:
                return
            send_message(serial, data)
            if get_one_byte_answer(serial, 'ACK'):
                send_one_byte_signal(serial, 'EOT')

                print "Now expecting answer from Terminal"
                if get_one_byte_answer(serial, 'ENQ'):
                    send_one_byte_signal(serial, 'ACK')
                    get_answer_from_terminal(serial, data)
                    send_one_byte_signal(serial, 'ACK')
                    if get_one_byte_answer(serial, 'EOT'):
                        print "Answer received from Terminal"

    except Exception, e:
        print 'Exception in serial connection: %s' % str(e)
    finally:
        if serial:
            print 'Closing serial port for payment terminal'
            serial.close()


if __name__ == '__main__':
    transaction_start()