Hii
I added new module named "Channel_Estimation" and block
named"ofdm_chanest_MMSE_vcvc" using gr_modtool. I am attaching tha QA code
file, which is same as the QA code file for "ofdm_chanest_vcvc". I am
attaching that too.

When i am running *ctest, output* is :

    Start 1: test_Channel_Estimation
1/2 Test #1: test_Channel_Estimation ..........   Passed    0.00 sec
    Start 2: qa_ofdm_chanest_MMSE_vcvc
2/2 Test #2: qa_ofdm_chanest_MMSE_vcvc ........***Failed    0.05 sec

50% tests passed, 1 tests failed out of 2

Total Test time (real) =   0.05 sec

The following tests FAILED:
 2 - qa_ofdm_chanest_MMSE_vcvc (Failed)
Errors while running CTest


When i am running * ctest -V -R square, output* is :

Constructing a list of tests
Done constructing a list of tests
Checking test dependency graph...
Checking test dependency graph end
No tests were found!!!


What should i do ?

Thanks..
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# 
# Copyright 2015 <+YOU OR YOUR COMPANY+>.
# 
# This is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3, or (at your option)
# any later version.
# 
# This software is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
# 
# You should have received a copy of the GNU General Public License
# along with this software; see the file COPYING.  If not, write to
# the Free Software Foundation, Inc., 51 Franklin Street,
# Boston, MA 02110-1301, USA.
# 
import sys
import numpy
import random
import numpy

from gnuradio import gr, gr_unittest, blocks, analog, digital
import pmt
import Channel_Estimation_swig as Channel_Estimation


def shift_tuple(vec, N):
    """ Shifts a vector by N elements. Fills up with zeros. """
    if N > 0:
        return (0,) * N + tuple(vec[0:-N])
    else:
        N = -N
        return tuple(vec[N:]) + (0,) * N

def rand_range(min_val, max_val):
    """ Returns a random value (uniform) from the interval min_val, max_val """
    return random.random() * (max_val - min_val) + min_val


class qa_ofdm_chanest_MMSE_vcvc (gr_unittest.TestCase):

    def setUp (self):
        self.tb = gr.top_block ()

    def tearDown (self):
        self.tb = None


    def test_001_t (self):
        # set up fg
       #self.tb.run ()
        # check data
	        """ Add a frequency offset, check if it's correctly detected.
        Also add some random tags and see if they come out at the correct
        position. """
        fft_len = 16
        carr_offset = -2
        sync_symbol1 = (0, 0, 0, 1,  0, 1,  0, -1, 0, 1,  0, -1,  0, 1, 0, 0)
        sync_symbol2 = (0, 0, 0, 1, -1, 1, -1,  1, 0, 1, -1, -1, -1, 1, 0, 0)
        data_symbol  = (0, 0, 0, 1, -1, 1, -1,  1, 0, 1, -1, -1, -1, 1, 0, 0)
        tx_data = shift_tuple(sync_symbol1, carr_offset) + \
                  shift_tuple(sync_symbol2, carr_offset) + \
                  shift_tuple(data_symbol, carr_offset)
        tag1 = gr.tag_t()
        tag1.offset = 0
        tag1.key = pmt.string_to_symbol("test_tag_1")
        tag1.value = pmt.from_long(23)
        tag2 = gr.tag_t()
        tag2.offset = 2
        tag2.key = pmt.string_to_symbol("test_tag_2")
        tag2.value = pmt.from_long(42)
        src = blocks.vector_source_c(tx_data, False, fft_len, (tag1, tag2))
        chanest = Channel_Estimation.ofdm_chanest_MMSE_vcvc(sync_symbol1, sync_symbol2, 1) #digital.ofdm_chanest_vcvc(sync_symbol1, sync_symbol2, 1)
        sink = blocks.vector_sink_c(fft_len)
        self.tb.connect(src, chanest, sink)
        self.tb.run()
        self.assertEqual(shift_tuple(sink.data(), -carr_offset), data_symbol)
        tags = sink.tags()
        ptags = {}
        for tag in tags:
            ptag = gr.tag_to_python(tag)
            ptags[ptag.key] = (ptag.value, ptag.offset)
            if ptag.key == 'ofdm_sync_chan_taps':
                ptags[ptag.key] = (None, ptag.offset)
        expected_tags = {
                'ofdm_sync_carr_offset': (-2, 0),
                'ofdm_sync_chan_taps': (None, 0),
                'test_tag_1': (23, 0),
                'test_tag_2': (42, 0),
        }
        self.assertEqual(ptags, expected_tags)


    def test_002_offset_1sym (self):
        """ Add a frequency offset, check if it's correctly detected.
        Difference to previous test is, it only uses one synchronisation symbol. """
        fft_len = 16
        carr_offset = -2
        # This will not correct for +2 because it thinks carrier 14 is used
        # (because of interpolation)
        sync_symbol = (0, 0, 0, 1,  0, 1,  0, -1, 0, 1,  0, -1,  0, 1, 0, 0)
        data_symbol = (0, 0, 0, 1, -1, 1, -1,  1, 0, 1, -1, -1, -1, 1, 0, 0)
        tx_data = shift_tuple(sync_symbol, carr_offset) + \
                  shift_tuple(data_symbol, carr_offset)
        src = blocks.vector_source_c(tx_data, False, fft_len)
        # 17 is out of bounds!
        chanest = Channel_Estimation.ofdm_chanest_MMSE_vcvc(sync_symbol, (), 1, 0, 17)#digital.ofdm_chanest_vcvc(sync_symbol, (), 1, 0, 17)
        sink = blocks.vector_sink_c(fft_len)
        self.tb.connect(src, chanest, sink)
        self.tb.run()
        self.assertEqual(shift_tuple(sink.data(), -carr_offset), data_symbol)
        tags = sink.tags()
        for tag in tags:
            if pmt.symbol_to_string(tag.key) == 'ofdm_sync_carr_offset':
                carr_offset_hat = pmt.to_long(tag.value)
                self.assertEqual(pmt.to_long(tag.value), carr_offset)

    def test_003_channel_no_carroffset (self):
        """ Add a channel, check if it's correctly estimated """
        fft_len = 16
        carr_offset = 0
        sync_symbol1 = (0, 0, 0, 1,  0, 1,  0, -1, 0, 1,  0, -1,  0, 1, 0, 0)
        sync_symbol2 = (0, 0, 0, 1j, -1, 1, -1j,  1j, 0, 1, -1j, -1, -1j, 1, 0, 0)
        data_symbol  = (0, 0, 0, 1, -1, 1, -1,  1, 0, 1, -1, -1, -1, 1, 0, 0)
        tx_data = sync_symbol1 + sync_symbol2 + data_symbol
        channel = (0, 0, 0, 2, -2, 2, 3j, 2, 0, 2, 2, 2, 2, 3, 0, 0)
        src = blocks.vector_source_c(tx_data, False, fft_len)
        chan = blocks.multiply_const_vcc(channel)
        chanest = Channel_Estimation.ofdm_chanest_MMSE_vcvc(sync_symbol1, sync_symbol2, 1)
        sink = blocks.vector_sink_c(fft_len)
        sink_chanest = blocks.vector_sink_c(fft_len)
        self.tb.connect(src, chan, chanest, sink)
        self.tb.connect((chanest, 1), sink_chanest)
        self.tb.run()
        tags = sink.tags()
        self.assertEqual(shift_tuple(sink.data(), -carr_offset), tuple(numpy.multiply(data_symbol, channel)))
        for tag in tags:
            if pmt.symbol_to_string(tag.key) == 'ofdm_sync_carr_offset':
                self.assertEqual(pmt.to_long(tag.value), carr_offset)
            if pmt.symbol_to_string(tag.key) == 'ofdm_sync_chan_taps':
                self.assertEqual(pmt.c32vector_elements(tag.value), channel)
        self.assertEqual(sink_chanest.data(), channel)

    def test_004_channel_no_carroffset_1sym (self):
        """ Add a channel, check if it's correctly estimated.
        Only uses 1 synchronisation symbol. """
        fft_len = 16
        carr_offset = 0
        sync_symbol = (0, 0, 0, 1,  0, 1,  0, -1, 0, 1,  0, -1,  0, 1, 0, 0)
        data_symbol  = (0, 0, 0, 1, -1, 1, -1,  1, 0, 1, -1, -1, -1, 1, 0, 0)
        tx_data = sync_symbol + data_symbol
        channel = (0, 0, 0, 2, 2, 2, 2, 3, 3, 2.5, 2.5, -3, -3, 1j, 1j, 0)
        #channel = (0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0)
        src = blocks.vector_source_c(tx_data, False, fft_len)
        chan = blocks.multiply_const_vcc(channel)
        chanest = Channel_Estimation.ofdm_chanest_MMSE_vcvc(sync_symbol, (), 1)
        sink = blocks.vector_sink_c(fft_len)
        sink_chanest = blocks.vector_sink_c(fft_len)
        self.tb.connect(src, chan, chanest, sink)
        self.tb.connect((chanest, 1), sink_chanest)
        self.tb.run()
        self.assertEqual(sink_chanest.data(), channel)
        tags = sink.tags()
        for tag in tags:
            if pmt.symbol_to_string(tag.key) == 'ofdm_sync_carr_offset':
                self.assertEqual(pmt.to_long(tag.value), carr_offset)
            if pmt.symbol_to_string(tag.key) == 'ofdm_sync_chan_taps':
                self.assertEqual(pmt.c32vector_elements(tag.value), channel)

    def test_005_both_1sym_force (self):
        """ Add a channel, check if it's correctly estimated.
        Only uses 1 synchronisation symbol. """
        fft_len = 16
        carr_offset = 0
        sync_symbol = (0, 0, 0, 1,  0, 1,  0, -1, 0, 1,  0, -1,  0, 1, 0, 0)
        ref_symbol  = (0, 0, 0, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 0, 0)
        data_symbol = (0, 0, 0, 1, -1, 1, -1,  1, 0, 1, -1, -1, -1, 1, 0, 0)
        tx_data = sync_symbol + data_symbol
        channel = (0, 0, 0, 2, 2, 2, 2.5, 3, 2.5, 2, 2.5, 3, 2, 1, 1, 0)
        src = blocks.vector_source_c(tx_data, False, fft_len)
        chan = blocks.multiply_const_vcc(channel)
        chanest =Channel_Estimation.ofdm_chanest_MMSE_vcvc(sync_symbol, ref_symbol, 1)
        sink = blocks.vector_sink_c(fft_len)
        self.tb.connect(src, chan, chanest, sink)
        self.tb.run()
        tags = sink.tags()
        for tag in tags:
            if pmt.symbol_to_string(tag.key) == 'ofdm_sync_carr_offset':
                self.assertEqual(pmt.to_long(tag.value), carr_offset)
            if pmt.symbol_to_string(tag.key) == 'ofdm_sync_chan_taps':
                self.assertEqual(pmt.c32vector_elements(tag.value), channel)

    def test_006_channel_and_carroffset (self):
        """ Add a channel, check if it's correctly estimated """
        fft_len = 16
        carr_offset = 2
        # Index         0  1  2   3   4  5    6   7  8  9   10  11   12 13 14 15
        sync_symbol1 = (0, 0, 0,  1,  0, 1,  0,  -1, 0, 1,   0, -1,   0, 1, 0, 0)
        sync_symbol2 = (0, 0, 0, 1j, -1, 1, -1j, 1j, 0, 1, -1j, -1, -1j, 1, 0, 0)
        data_symbol  = (0, 0, 0,  1, -1, 1,  -1,  1, 0, 1,  -1, -1,  -1, 1, 0, 0)
        # Channel       0  1  2  3  4   5   6  7    8   9 10 11   12  13   14  15
        # Shifted      (0, 0, 0, 0, 0, 1j, -1, 1, -1j, 1j, 0, 1, -1j, -1, -1j, 1)
        chanest_exp  = (0, 0, 0, 5, 6, 7, 8, 9, 0, 11, 12, 13, 14, 15, 0, 0)
        tx_data = shift_tuple(sync_symbol1, carr_offset) + \
                  shift_tuple(sync_symbol2, carr_offset) + \
                  shift_tuple(data_symbol, carr_offset)
        channel = range(fft_len)
        src = blocks.vector_source_c(tx_data, False, fft_len)
        chan = blocks.multiply_const_vcc(channel)
        chanest = Channel_Estimation.ofdm_chanest_MMSE_vcvc(sync_symbol1, sync_symbol2, 1)
        sink = blocks.vector_sink_c(fft_len)
        self.tb.connect(src, chan, chanest, sink)
        self.tb.run()
        tags = sink.tags()
        chan_est = None
        for tag in tags:
            if pmt.symbol_to_string(tag.key) == 'ofdm_sync_carr_offset':
                self.assertEqual(pmt.to_long(tag.value), carr_offset)
            if pmt.symbol_to_string(tag.key) == 'ofdm_sync_chan_taps':
                chan_est = pmt.c32vector_elements(tag.value)
        self.assertEqual(chan_est, chanest_exp)
        self.assertEqual(sink.data(), tuple(numpy.multiply(shift_tuple(data_symbol, carr_offset), channel)))


    def test_999_all_at_once(self):
        """docstring for test_999_all_at_once"""
        fft_len = 32
        # 6 carriers empty, 10 carriers full, 1 DC carrier, 10 carriers full, 5 carriers empty
        syncsym_mask = (0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0)
        carrier_mask = (0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0)
        max_offset = 4
        wgn_amplitude = 0.05
        min_chan_ampl = 0.1
        max_chan_ampl = 5
        n_iter = 20 # The more the accurater
        def run_flow_graph(sync_sym1, sync_sym2, data_sym):
            top_block = gr.top_block()
            carr_offset = random.randint(-max_offset/2, max_offset/2) * 2
            tx_data = shift_tuple(sync_sym1, carr_offset) + \
                      shift_tuple(sync_sym2, carr_offset) + \
                      shift_tuple(data_sym,  carr_offset)
            channel = [rand_range(min_chan_ampl, max_chan_ampl) * numpy.exp(1j * rand_range(0, 2 * numpy.pi)) for x in range(fft_len)]
            src = blocks.vector_source_c(tx_data, False, fft_len)
            chan = blocks.multiply_const_vcc(channel)
            noise = analog.noise_source_c(analog.GR_GAUSSIAN, wgn_amplitude)
            add = blocks.add_cc(fft_len)
            chanest = Channel_Estimation.ofdm_chanest_MMSE_vcvc(sync_sym1, sync_sym2, 1)
            sink = blocks.vector_sink_c(fft_len)
            top_block.connect(src, chan, (add, 0), chanest, sink)
            top_block.connect(noise, blocks.stream_to_vector(gr.sizeof_gr_complex, fft_len), (add, 1))
            top_block.run()
            channel_est = None
            carr_offset_hat = 0
            rx_sym_est = [0,] * fft_len
            tags = sink.tags()
            for tag in tags:
                if pmt.symbol_to_string(tag.key) == 'ofdm_sync_carr_offset':
                    carr_offset_hat = pmt.to_long(tag.value)
                    self.assertEqual(carr_offset, carr_offset_hat)
                if pmt.symbol_to_string(tag.key) == 'ofdm_sync_chan_taps':
                    channel_est = shift_tuple(pmt.c32vector_elements(tag.value), carr_offset)
            shifted_carrier_mask = shift_tuple(carrier_mask, carr_offset)
            for i in range(fft_len):
                if shifted_carrier_mask[i] and channel_est[i]:
                    self.assertAlmostEqual(channel[i], channel_est[i], places=0)
                    rx_sym_est[i] = (sink.data()[i] / channel_est[i]).real
            return (carr_offset, list(shift_tuple(rx_sym_est, -carr_offset_hat)))
        bit_errors = 0
        for k in xrange(n_iter):
            sync_sym = [(random.randint(0, 1) * 2 - 1) * syncsym_mask[i] for i in range(fft_len)]
            ref_sym  = [(random.randint(0, 1) * 2 - 1) * carrier_mask[i] for i in range(fft_len)]
            data_sym = [(random.randint(0, 1) * 2 - 1) * carrier_mask[i] for i in range(fft_len)]
            data_sym[26] = 1
            (carr_offset, rx_sym) = run_flow_graph(sync_sym, ref_sym, data_sym)
            rx_sym_est = [0,] * fft_len
            for i in xrange(fft_len):
                if carrier_mask[i] == 0:
                    continue
                rx_sym_est[i] = {True: 1, False: -1}[rx_sym[i] > 0]
                if rx_sym_est[i] != data_sym[i]:
                    bit_errors += 1
        # This is much more than we could allow
        self.assertTrue(bit_errors < n_iter)



if __name__ == '__main__':
    gr_unittest.run(qa_ofdm_chanest_MMSE_vcvc, "qa_ofdm_chanest_MMSE_vcvc.xml")
#!/usr/bin/env python
# Copyright 2012-2014 Free Software Foundation, Inc.
# 
# This file is part of GNU Radio
# 
# GNU Radio is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3, or (at your option)
# any later version.
# 
# GNU Radio is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
# 
# You should have received a copy of the GNU General Public License
# along with GNU Radio; see the file COPYING.  If not, write to
# the Free Software Foundation, Inc., 51 Franklin Street,
# Boston, MA 02110-1301, USA.
# 

import sys
import numpy
import random
import numpy

from gnuradio import gr, gr_unittest, blocks, analog, digital
import pmt

def shift_tuple(vec, N):
    """ Shifts a vector by N elements. Fills up with zeros. """
    if N > 0:
        return (0,) * N + tuple(vec[0:-N])
    else:
        N = -N
        return tuple(vec[N:]) + (0,) * N

def rand_range(min_val, max_val):
    """ Returns a random value (uniform) from the interval min_val, max_val """
    return random.random() * (max_val - min_val) + min_val


class qa_ofdm_chanest_vcvc (gr_unittest.TestCase):

    def setUp (self):
        self.tb = gr.top_block ()

    def tearDown (self):
        self.tb = None

    def test_001_offset_2sym (self):
        """ Add a frequency offset, check if it's correctly detected.
        Also add some random tags and see if they come out at the correct
        position. """
        fft_len = 16
        carr_offset = -2
        sync_symbol1 = (0, 0, 0, 1,  0, 1,  0, -1, 0, 1,  0, -1,  0, 1, 0, 0)
        sync_symbol2 = (0, 0, 0, 1, -1, 1, -1,  1, 0, 1, -1, -1, -1, 1, 0, 0)
        data_symbol  = (0, 0, 0, 1, -1, 1, -1,  1, 0, 1, -1, -1, -1, 1, 0, 0)
        tx_data = shift_tuple(sync_symbol1, carr_offset) + \
                  shift_tuple(sync_symbol2, carr_offset) + \
                  shift_tuple(data_symbol, carr_offset)
        tag1 = gr.tag_t()
        tag1.offset = 0
        tag1.key = pmt.string_to_symbol("test_tag_1")
        tag1.value = pmt.from_long(23)
        tag2 = gr.tag_t()
        tag2.offset = 2
        tag2.key = pmt.string_to_symbol("test_tag_2")
        tag2.value = pmt.from_long(42)
        src = blocks.vector_source_c(tx_data, False, fft_len, (tag1, tag2))
        chanest = digital.ofdm_chanest_vcvc(sync_symbol1, sync_symbol2, 1)
        sink = blocks.vector_sink_c(fft_len)
        self.tb.connect(src, chanest, sink)
        self.tb.run()
        self.assertEqual(shift_tuple(sink.data(), -carr_offset), data_symbol)
        tags = sink.tags()
        ptags = {}
        for tag in tags:
            ptag = gr.tag_to_python(tag)
            ptags[ptag.key] = (ptag.value, ptag.offset)
            if ptag.key == 'ofdm_sync_chan_taps':
                ptags[ptag.key] = (None, ptag.offset)
        expected_tags = {
                'ofdm_sync_carr_offset': (-2, 0),
                'ofdm_sync_chan_taps': (None, 0),
                'test_tag_1': (23, 0),
                'test_tag_2': (42, 0),
        }
        self.assertEqual(ptags, expected_tags)


    def test_002_offset_1sym (self):
        """ Add a frequency offset, check if it's correctly detected.
        Difference to previous test is, it only uses one synchronisation symbol. """
        fft_len = 16
        carr_offset = -2
        # This will not correct for +2 because it thinks carrier 14 is used
        # (because of interpolation)
        sync_symbol = (0, 0, 0, 1,  0, 1,  0, -1, 0, 1,  0, -1,  0, 1, 0, 0)
        data_symbol = (0, 0, 0, 1, -1, 1, -1,  1, 0, 1, -1, -1, -1, 1, 0, 0)
        tx_data = shift_tuple(sync_symbol, carr_offset) + \
                  shift_tuple(data_symbol, carr_offset)
        src = blocks.vector_source_c(tx_data, False, fft_len)
        # 17 is out of bounds!
        chanest = digital.ofdm_chanest_vcvc(sync_symbol, (), 1, 0, 17)
        sink = blocks.vector_sink_c(fft_len)
        self.tb.connect(src, chanest, sink)
        self.tb.run()
        self.assertEqual(shift_tuple(sink.data(), -carr_offset), data_symbol)
        tags = sink.tags()
        for tag in tags:
            if pmt.symbol_to_string(tag.key) == 'ofdm_sync_carr_offset':
                carr_offset_hat = pmt.to_long(tag.value)
                self.assertEqual(pmt.to_long(tag.value), carr_offset)

    def test_003_channel_no_carroffset (self):
        """ Add a channel, check if it's correctly estimated """
        fft_len = 16
        carr_offset = 0
        sync_symbol1 = (0, 0, 0, 1,  0, 1,  0, -1, 0, 1,  0, -1,  0, 1, 0, 0)
        sync_symbol2 = (0, 0, 0, 1j, -1, 1, -1j,  1j, 0, 1, -1j, -1, -1j, 1, 0, 0)
        data_symbol  = (0, 0, 0, 1, -1, 1, -1,  1, 0, 1, -1, -1, -1, 1, 0, 0)
        tx_data = sync_symbol1 + sync_symbol2 + data_symbol
        channel = (0, 0, 0, 2, -2, 2, 3j, 2, 0, 2, 2, 2, 2, 3, 0, 0)
        src = blocks.vector_source_c(tx_data, False, fft_len)
        chan = blocks.multiply_const_vcc(channel)
        chanest = digital.ofdm_chanest_vcvc(sync_symbol1, sync_symbol2, 1)
        sink = blocks.vector_sink_c(fft_len)
        sink_chanest = blocks.vector_sink_c(fft_len)
        self.tb.connect(src, chan, chanest, sink)
        self.tb.connect((chanest, 1), sink_chanest)
        self.tb.run()
        tags = sink.tags()
        self.assertEqual(shift_tuple(sink.data(), -carr_offset), tuple(numpy.multiply(data_symbol, channel)))
        for tag in tags:
            if pmt.symbol_to_string(tag.key) == 'ofdm_sync_carr_offset':
                self.assertEqual(pmt.to_long(tag.value), carr_offset)
            if pmt.symbol_to_string(tag.key) == 'ofdm_sync_chan_taps':
                self.assertEqual(pmt.c32vector_elements(tag.value), channel)
        self.assertEqual(sink_chanest.data(), channel)

    def test_004_channel_no_carroffset_1sym (self):
        """ Add a channel, check if it's correctly estimated.
        Only uses 1 synchronisation symbol. """
        fft_len = 16
        carr_offset = 0
        sync_symbol = (0, 0, 0, 1,  0, 1,  0, -1, 0, 1,  0, -1,  0, 1, 0, 0)
        data_symbol  = (0, 0, 0, 1, -1, 1, -1,  1, 0, 1, -1, -1, -1, 1, 0, 0)
        tx_data = sync_symbol + data_symbol
        channel = (0, 0, 0, 2, 2, 2, 2, 3, 3, 2.5, 2.5, -3, -3, 1j, 1j, 0)
        #channel = (0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0)
        src = blocks.vector_source_c(tx_data, False, fft_len)
        chan = blocks.multiply_const_vcc(channel)
        chanest = digital.ofdm_chanest_vcvc(sync_symbol, (), 1)
        sink = blocks.vector_sink_c(fft_len)
        sink_chanest = blocks.vector_sink_c(fft_len)
        self.tb.connect(src, chan, chanest, sink)
        self.tb.connect((chanest, 1), sink_chanest)
        self.tb.run()
        self.assertEqual(sink_chanest.data(), channel)
        tags = sink.tags()
        for tag in tags:
            if pmt.symbol_to_string(tag.key) == 'ofdm_sync_carr_offset':
                self.assertEqual(pmt.to_long(tag.value), carr_offset)
            if pmt.symbol_to_string(tag.key) == 'ofdm_sync_chan_taps':
                self.assertEqual(pmt.c32vector_elements(tag.value), channel)

    def test_005_both_1sym_force (self):
        """ Add a channel, check if it's correctly estimated.
        Only uses 1 synchronisation symbol. """
        fft_len = 16
        carr_offset = 0
        sync_symbol = (0, 0, 0, 1,  0, 1,  0, -1, 0, 1,  0, -1,  0, 1, 0, 0)
        ref_symbol  = (0, 0, 0, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 0, 0)
        data_symbol = (0, 0, 0, 1, -1, 1, -1,  1, 0, 1, -1, -1, -1, 1, 0, 0)
        tx_data = sync_symbol + data_symbol
        channel = (0, 0, 0, 2, 2, 2, 2.5, 3, 2.5, 2, 2.5, 3, 2, 1, 1, 0)
        src = blocks.vector_source_c(tx_data, False, fft_len)
        chan = blocks.multiply_const_vcc(channel)
        chanest = digital.ofdm_chanest_vcvc(sync_symbol, ref_symbol, 1)
        sink = blocks.vector_sink_c(fft_len)
        self.tb.connect(src, chan, chanest, sink)
        self.tb.run()
        tags = sink.tags()
        for tag in tags:
            if pmt.symbol_to_string(tag.key) == 'ofdm_sync_carr_offset':
                self.assertEqual(pmt.to_long(tag.value), carr_offset)
            if pmt.symbol_to_string(tag.key) == 'ofdm_sync_chan_taps':
                self.assertEqual(pmt.c32vector_elements(tag.value), channel)

    def test_006_channel_and_carroffset (self):
        """ Add a channel, check if it's correctly estimated """
        fft_len = 16
        carr_offset = 2
        # Index         0  1  2   3   4  5    6   7  8  9   10  11   12 13 14 15
        sync_symbol1 = (0, 0, 0,  1,  0, 1,  0,  -1, 0, 1,   0, -1,   0, 1, 0, 0)
        sync_symbol2 = (0, 0, 0, 1j, -1, 1, -1j, 1j, 0, 1, -1j, -1, -1j, 1, 0, 0)
        data_symbol  = (0, 0, 0,  1, -1, 1,  -1,  1, 0, 1,  -1, -1,  -1, 1, 0, 0)
        # Channel       0  1  2  3  4   5   6  7    8   9 10 11   12  13   14  15
        # Shifted      (0, 0, 0, 0, 0, 1j, -1, 1, -1j, 1j, 0, 1, -1j, -1, -1j, 1)
        chanest_exp  = (0, 0, 0, 5, 6, 7, 8, 9, 0, 11, 12, 13, 14, 15, 0, 0)
        tx_data = shift_tuple(sync_symbol1, carr_offset) + \
                  shift_tuple(sync_symbol2, carr_offset) + \
                  shift_tuple(data_symbol, carr_offset)
        channel = range(fft_len)
        src = blocks.vector_source_c(tx_data, False, fft_len)
        chan = blocks.multiply_const_vcc(channel)
        chanest = digital.ofdm_chanest_vcvc(sync_symbol1, sync_symbol2, 1)
        sink = blocks.vector_sink_c(fft_len)
        self.tb.connect(src, chan, chanest, sink)
        self.tb.run()
        tags = sink.tags()
        chan_est = None
        for tag in tags:
            if pmt.symbol_to_string(tag.key) == 'ofdm_sync_carr_offset':
                self.assertEqual(pmt.to_long(tag.value), carr_offset)
            if pmt.symbol_to_string(tag.key) == 'ofdm_sync_chan_taps':
                chan_est = pmt.c32vector_elements(tag.value)
        self.assertEqual(chan_est, chanest_exp)
        self.assertEqual(sink.data(), tuple(numpy.multiply(shift_tuple(data_symbol, carr_offset), channel)))


    def test_999_all_at_once(self):
        """docstring for test_999_all_at_once"""
        fft_len = 32
        # 6 carriers empty, 10 carriers full, 1 DC carrier, 10 carriers full, 5 carriers empty
        syncsym_mask = (0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0)
        carrier_mask = (0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0)
        max_offset = 4
        wgn_amplitude = 0.05
        min_chan_ampl = 0.1
        max_chan_ampl = 5
        n_iter = 20 # The more the accurater
        def run_flow_graph(sync_sym1, sync_sym2, data_sym):
            top_block = gr.top_block()
            carr_offset = random.randint(-max_offset/2, max_offset/2) * 2
            tx_data = shift_tuple(sync_sym1, carr_offset) + \
                      shift_tuple(sync_sym2, carr_offset) + \
                      shift_tuple(data_sym,  carr_offset)
            channel = [rand_range(min_chan_ampl, max_chan_ampl) * numpy.exp(1j * rand_range(0, 2 * numpy.pi)) for x in range(fft_len)]
            src = blocks.vector_source_c(tx_data, False, fft_len)
            chan = blocks.multiply_const_vcc(channel)
            noise = analog.noise_source_c(analog.GR_GAUSSIAN, wgn_amplitude)
            add = blocks.add_cc(fft_len)
            chanest = digital.ofdm_chanest_vcvc(sync_sym1, sync_sym2, 1)
            sink = blocks.vector_sink_c(fft_len)
            top_block.connect(src, chan, (add, 0), chanest, sink)
            top_block.connect(noise, blocks.stream_to_vector(gr.sizeof_gr_complex, fft_len), (add, 1))
            top_block.run()
            channel_est = None
            carr_offset_hat = 0
            rx_sym_est = [0,] * fft_len
            tags = sink.tags()
            for tag in tags:
                if pmt.symbol_to_string(tag.key) == 'ofdm_sync_carr_offset':
                    carr_offset_hat = pmt.to_long(tag.value)
                    self.assertEqual(carr_offset, carr_offset_hat)
                if pmt.symbol_to_string(tag.key) == 'ofdm_sync_chan_taps':
                    channel_est = shift_tuple(pmt.c32vector_elements(tag.value), carr_offset)
            shifted_carrier_mask = shift_tuple(carrier_mask, carr_offset)
            for i in range(fft_len):
                if shifted_carrier_mask[i] and channel_est[i]:
                    self.assertAlmostEqual(channel[i], channel_est[i], places=0)
                    rx_sym_est[i] = (sink.data()[i] / channel_est[i]).real
            return (carr_offset, list(shift_tuple(rx_sym_est, -carr_offset_hat)))
        bit_errors = 0
        for k in xrange(n_iter):
            sync_sym = [(random.randint(0, 1) * 2 - 1) * syncsym_mask[i] for i in range(fft_len)]
            ref_sym  = [(random.randint(0, 1) * 2 - 1) * carrier_mask[i] for i in range(fft_len)]
            data_sym = [(random.randint(0, 1) * 2 - 1) * carrier_mask[i] for i in range(fft_len)]
            data_sym[26] = 1
            (carr_offset, rx_sym) = run_flow_graph(sync_sym, ref_sym, data_sym)
            rx_sym_est = [0,] * fft_len
            for i in xrange(fft_len):
                if carrier_mask[i] == 0:
                    continue
                rx_sym_est[i] = {True: 1, False: -1}[rx_sym[i] > 0]
                if rx_sym_est[i] != data_sym[i]:
                    bit_errors += 1
        # This is much more than we could allow
        self.assertTrue(bit_errors < n_iter)


if __name__ == '__main__':
    gr_unittest.run(qa_ofdm_chanest_vcvc, "qa_ofdm_chanest_vcvc.xml")

_______________________________________________
Discuss-gnuradio mailing list
Discuss-gnuradio@gnu.org
https://lists.gnu.org/mailman/listinfo/discuss-gnuradio

Reply via email to