Hello, I've written a simple block that takes a c34vector message as input and outputs a message containing average mag^2 of the samples in the vector.
I've added the QA code in Python and it seems to be working except the test never finishes. I've added a few print statements to see whats up. From what I can gather: - The freeze doesn't use up CPU (at least nothing noticeable) - Random number of messages are processed before the freeze occurs, sometimes none are sent Could someone give me a hint on that? I've added the relevant code in the attachment. BR, Michal
/* -*- c++ -*- */ /* * Copyright 2014 <+YOU OR YOUR COMPANY+>. * * This is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 3, or (at your option) * any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this software; see the file COPYING. If not, write to * the Free Software Foundation, Inc., 51 Franklin Street, * Boston, MA 02110-1301, USA. */ #ifdef HAVE_CONFIG_H #include "config.h" #endif #include <gnuradio/io_signature.h> #include "burst_to_power_impl.h" namespace gr { namespace gsm_measure { burst_to_power::sptr burst_to_power::make() { return gnuradio::get_initial_sptr (new burst_to_power_impl()); } /* * The private constructor */ burst_to_power_impl::burst_to_power_impl() : gr::block("burst_to_power", gr::io_signature::make(0,0,0), gr::io_signature::make(0,0,0)) { message_port_register_in(pmt::mp("bursts")); message_port_register_out(pmt::mp("power")); set_msg_handler(pmt::mp("bursts"), boost::bind(&burst_to_power_impl::to_power, this, _1)); } /* * Our virtual destructor. */ burst_to_power_impl::~burst_to_power_impl() { } void burst_to_power_impl::to_power(pmt::pmt_t pmt_vector){ std::vector <gr_complex> burst; double power = 0; if (pmt::is_c32vector(pmt_vector)){ //std::cout << "It's a c32vector, computing power " << std::endl; burst = pmt::c32vector_elements(pmt_vector); for (int i = 0; i < burst.size(); i++){ power = power + std::norm(burst.at(i)); } power = power/burst.size(); message_port_pub(pmt::mp("power"), pmt::from_double(power)); } else std::cout << "Not a c32vector!!! Check your flowgraph" << std::endl; } } /* namespace gsm_measure */ } /* namespace gr */
#!/usr/bin/env python # -*- coding: utf-8 -*- # # Copyright 2014 <+YOU OR YOUR COMPANY+>. # # This is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 3, or (at your option) # any later version. # # This software is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this software; see the file COPYING. If not, write to # the Free Software Foundation, Inc., 51 Franklin Street, # Boston, MA 02110-1301, USA. # from gnuradio import gr, gr_unittest from gnuradio import blocks import gsm_measure_swig as gsm_measure import pmt import numpy as np # Simple block to consume messages class message_consumer(gr.sync_block): def __init__(self): gr.sync_block.__init__( self, name = "message consumer", in_sig = None, out_sig = None ) self.msg_list = [] self.message_port_register_in(pmt.intern('in_port')) self.set_msg_handler(pmt.intern('in_port'), self.handle_msg) def handle_msg(self, msg): # Create a new PMT from double and put in list print("Message received") self.msg_list.append(pmt.from_double(pmt.to_double(msg))) # Simple block to generate messages class message_generator(gr.sync_block): def __init__(self, msg_list, msg_interval): gr.sync_block.__init__( self, name = "message generator", in_sig = [np.float32], out_sig = None ) self.msg_list = msg_list self.msg_interval = msg_interval self.msg_ctr = 0 self.message_port_register_out(pmt.intern('out_port')) def work(self, input_items, output_items): inLen = len(input_items[0]) while self.msg_ctr < len(self.msg_list) and \ (self.msg_ctr * self.msg_interval) < \ (self.nitems_read(0) + inLen): self.message_port_pub(pmt.intern('out_port'), self.msg_list[self.msg_ctr]) print "Message number " + repr(self.msg_ctr) + " sent!" self.msg_ctr += 1 return inLen class qa_burst_to_power (gr_unittest.TestCase): def setUp (self): self.tb = gr.top_block () def tearDown (self): self.tb = None def power(burst): power = 0 for i in range(burst.size): powers = np.square(np.absolute(x)) power = power + powers[i] return power/burst.size def test_001_t (self): bursts_re = np.random.uniform(1, 100, [1280]) bursts_im = np.random.uniform(1, 100, [1280]) bursts = bursts_re + 1j*bursts_im num_msgs = 20 msg_interval = 1 msg_list = [] for i in range(num_msgs): msg_list.append(pmt.init_c32vector(64, bursts[64*i:64*(i+1)])) # Create vector source with dummy data to trigger messages src_data = [] for i in range(num_msgs*msg_interval): src_data.append(float(i)) src = blocks.vector_source_f(src_data, False) msg_gen = message_generator(msg_list, msg_interval) msg_pow = gsm_measure.burst_to_power() msg_cons = message_consumer() # Connect vector source to message gen self.tb.connect(src, msg_gen) # Connect message generator to message consumer self.tb.msg_connect(msg_gen, 'out_port', msg_pow, 'bursts') self.tb.msg_connect(msg_pow, 'power', msg_cons, 'in_port') # Verify that the messge port query functions work self.assertEqual(pmt.symbol_to_string(pmt.vector_ref( msg_gen.message_ports_out(), 0)), 'out_port') self.assertEqual(pmt.symbol_to_string(pmt.vector_ref( msg_pow.message_ports_in(), 0)), 'bursts') self.assertEqual(pmt.symbol_to_string(pmt.vector_ref( msg_pow.message_ports_out(), 0)), 'power') self.assertEqual(pmt.symbol_to_string(pmt.vector_ref( msg_cons.message_ports_in(), 0)), 'in_port') # Run to verify message passing self.tb.start() # Wait for all messages to be sent while msg_gen.msg_ctr < num_msgs: time.sleep(0.5) self.tb.stop() self.tb.wait() # Verify that the message consumer got all the messages self.assertEqual(num_msgs, len(msg_cons.msg_list)) for i in range(num_msgs): received_result = pmt.to_double(msg_cons.msg_list[i]) calculated_result = power(pmt.c32vector_elements(msg_list[i])) self.assertAlmostEqual(calculated_result, received_result) if __name__ == '__main__': gr_unittest.run(qa_burst_to_power, "qa_burst_to_power.xml")
_______________________________________________ Discuss-gnuradio mailing list Discuss-gnuradio@gnu.org https://lists.gnu.org/mailman/listinfo/discuss-gnuradio