Hello,

I've written a simple block that takes a c34vector message as input and
outputs a message containing average mag^2 of the samples in the vector.

I've added the QA code in Python and it seems to be working except the test
never finishes. I've added a few print statements to see whats up. From
what I can gather:
- The freeze doesn't use up CPU (at least nothing noticeable)
- Random number of messages are processed before the freeze occurs,
sometimes none are sent

Could someone give me a hint on that? I've added the relevant code in the
attachment.

BR,
Michal
/* -*- c++ -*- */
/* 
 * Copyright 2014 <+YOU OR YOUR COMPANY+>.
 * 
 * This is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 3, or (at your option)
 * any later version.
 * 
 * This software is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * 
 * You should have received a copy of the GNU General Public License
 * along with this software; see the file COPYING.  If not, write to
 * the Free Software Foundation, Inc., 51 Franklin Street,
 * Boston, MA 02110-1301, USA.
 */

#ifdef HAVE_CONFIG_H
#include "config.h"
#endif

#include <gnuradio/io_signature.h>
#include "burst_to_power_impl.h"


namespace gr {
  namespace gsm_measure {

    burst_to_power::sptr
    burst_to_power::make()
    {
      return gnuradio::get_initial_sptr
        (new burst_to_power_impl());
    }

    /*
     * The private constructor
     */
    burst_to_power_impl::burst_to_power_impl()
      : gr::block("burst_to_power",
              gr::io_signature::make(0,0,0),
              gr::io_signature::make(0,0,0))
    {
            message_port_register_in(pmt::mp("bursts"));
            message_port_register_out(pmt::mp("power"));
            set_msg_handler(pmt::mp("bursts"), boost::bind(&burst_to_power_impl::to_power, this, _1));

  }

    /*
     * Our virtual destructor.
     */
    burst_to_power_impl::~burst_to_power_impl()
    {
    }

    void burst_to_power_impl::to_power(pmt::pmt_t pmt_vector){
        std::vector <gr_complex> burst;
        double power = 0;

        if (pmt::is_c32vector(pmt_vector)){
            //std::cout << "It's a c32vector, computing power " << std::endl;
            burst = pmt::c32vector_elements(pmt_vector);
            for (int i = 0; i < burst.size(); i++){
                power = power + std::norm(burst.at(i));
            }
            power = power/burst.size();
            message_port_pub(pmt::mp("power"), pmt::from_double(power));
        }
        else std::cout << "Not a c32vector!!! Check your flowgraph" << std::endl;
    }





  } /* namespace gsm_measure */
} /* namespace gr */

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# 
# Copyright 2014 <+YOU OR YOUR COMPANY+>.
# 
# This is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3, or (at your option)
# any later version.
# 
# This software is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
# 
# You should have received a copy of the GNU General Public License
# along with this software; see the file COPYING.  If not, write to
# the Free Software Foundation, Inc., 51 Franklin Street,
# Boston, MA 02110-1301, USA.
# 

from gnuradio import gr, gr_unittest
from gnuradio import blocks
import gsm_measure_swig as gsm_measure
import pmt
import numpy as np

# Simple block to consume messages
class message_consumer(gr.sync_block):
    def __init__(self):
        gr.sync_block.__init__(
            self,
            name = "message consumer",
            in_sig = None,
            out_sig = None
        )
        self.msg_list = []
        self.message_port_register_in(pmt.intern('in_port'))
        self.set_msg_handler(pmt.intern('in_port'),
                             self.handle_msg)

    def handle_msg(self, msg):
        # Create a new PMT from double and put in list
        print("Message received")
        self.msg_list.append(pmt.from_double(pmt.to_double(msg)))


# Simple block to generate messages
class message_generator(gr.sync_block):
    def __init__(self, msg_list, msg_interval):
        gr.sync_block.__init__(
            self,
            name = "message generator",
            in_sig = [np.float32],
            out_sig = None
        )
        self.msg_list = msg_list
        self.msg_interval = msg_interval
        self.msg_ctr = 0
        self.message_port_register_out(pmt.intern('out_port'))


    def work(self, input_items, output_items):
        inLen = len(input_items[0])
        while self.msg_ctr < len(self.msg_list) and \
                (self.msg_ctr * self.msg_interval) < \
                (self.nitems_read(0) + inLen):
            self.message_port_pub(pmt.intern('out_port'),
                                  self.msg_list[self.msg_ctr])
            print "Message number " + repr(self.msg_ctr) + " sent!"
            self.msg_ctr += 1

        return inLen

class qa_burst_to_power (gr_unittest.TestCase):

    def setUp (self):
        self.tb = gr.top_block ()

    def tearDown (self):
        self.tb = None

    def power(burst):
        power = 0
        for i in range(burst.size):
            powers = np.square(np.absolute(x))
            power = power + powers[i]
        return power/burst.size
    
    def test_001_t (self):
    
        bursts_re = np.random.uniform(1, 100, [1280])
        bursts_im = np.random.uniform(1, 100, [1280])
        bursts = bursts_re + 1j*bursts_im
                
        num_msgs = 20
        msg_interval = 1
        msg_list = []
        for i in range(num_msgs):
            msg_list.append(pmt.init_c32vector(64, bursts[64*i:64*(i+1)]))

        # Create vector source with dummy data to trigger messages
        src_data = []
        for i in range(num_msgs*msg_interval):
            src_data.append(float(i))
        src = blocks.vector_source_f(src_data, False)
        
        
        msg_gen = message_generator(msg_list, msg_interval)
        msg_pow = gsm_measure.burst_to_power()
        msg_cons = message_consumer()
        
        # Connect vector source to message gen
        self.tb.connect(src, msg_gen)
        
        # Connect message generator to message consumer
        self.tb.msg_connect(msg_gen, 'out_port', msg_pow, 'bursts')
        self.tb.msg_connect(msg_pow, 'power', msg_cons, 'in_port')

        # Verify that the messge port query functions work
        self.assertEqual(pmt.symbol_to_string(pmt.vector_ref(
                    msg_gen.message_ports_out(), 0)), 'out_port')
        self.assertEqual(pmt.symbol_to_string(pmt.vector_ref(
                    msg_pow.message_ports_in(), 0)), 'bursts')
        self.assertEqual(pmt.symbol_to_string(pmt.vector_ref(
                    msg_pow.message_ports_out(), 0)), 'power')                    
        self.assertEqual(pmt.symbol_to_string(pmt.vector_ref(
                    msg_cons.message_ports_in(), 0)), 'in_port')
        
        # Run to verify message passing
        self.tb.start()

        # Wait for all messages to be sent
        while msg_gen.msg_ctr < num_msgs:
            time.sleep(0.5)
            
        self.tb.stop()
        self.tb.wait()               
        
        # Verify that the message consumer got all the messages
        self.assertEqual(num_msgs, len(msg_cons.msg_list))
        for i in range(num_msgs):
            received_result = pmt.to_double(msg_cons.msg_list[i])
            calculated_result = power(pmt.c32vector_elements(msg_list[i]))
            self.assertAlmostEqual(calculated_result, received_result)


if __name__ == '__main__':
    gr_unittest.run(qa_burst_to_power, "qa_burst_to_power.xml")
_______________________________________________
Discuss-gnuradio mailing list
Discuss-gnuradio@gnu.org
https://lists.gnu.org/mailman/listinfo/discuss-gnuradio

Reply via email to