I'm developing my doctoral research on wavelet coding and I'm trying to
automate the experimental tests developed from the developed codes, I made
a python script from a TCP client that sends it to a TCP server created by
the socket block PDU, however at the reception of this client I'm not able
to handle the data received because I don't know which format the PDU
socket sends and I didn't find it in the documentation. Sending to the PDU
socket the output of the block created by me, which is of the complex
message type. Attached is the client in python, flowchart code and the
client output.
Thankful,
Me. Eng. Felipe Gama
<type 'unicode'>
[u'\x00', u'\x00', u'\x00', u'\x00', u'\x00', u'\x00', u'\x00', u'\x00',
u'\x00', u'\x00', u'\x00', u'\x00', u'\x00', u'\x00', u'\x00', u'\x00',
u'\x00', u'\x00', u'?', u'\x00', u'\x00', u'\x00', u'\x00', u'\x00', u'\x00',
u'?', u'\x00', u'\x00', u'\x00', u'\x00', u'\x00', u'\x00', u'?', u'\x00',
u'\x00', u'\x00', u'\x00', u'\x00', u'\x00', u'\x00', u'\x00', u'\x00',
u'\x00', u'\x00', u'\x00', u'\x00', u'\x00', u'\x00', u'\x00', u'\x00',
u'\x00', u'\x00', u'\x00', u'\x00', u'\x00', u'?', u'\x00', u'\x00', u'\x00',
u'\x00', u'\x00', u'\x00', u'?', u'\x00', u'\x00', u'\x00', u'\x00', u'\x00',
u'\x00', u'?', u'\x00', u'\x00', u'\x00', u'\x00', u'\x00', u'\x00', u'\x00',
u'\x00', u'\x00', u'\x00', u'\x00', u'\x00', u'\x00', u'\x00', u'\x00',
u'\x00', u'\x00', u'\x00', u'\x00', u'\x00', u'\x00', u'\x00', u'?', u'\x00',
u'\x00', u'\x00', u'\x00', u'\x00', u'\x00', u'?', u'\x00', u'\x00', u'\x00',
u'\x00', u'\x00', u'\x00', u'?', u'\x00', u'\x00', u'\x00', u'\x00', u'\x00',
u'\x00', u'\x00', u'\x00', u'\x00', u'\x00', u'\x00', u'\x00', u'\x00',
u'\x00', u'\x00', u'\x00', u'\x00', u'\x00', u'\x00', u'\x00', u'\x00',
u'\x00', u'?', u'\x00', u'\x00', u'\x00', u'\x00', u'\x00', u'\x00', u'?',
u'\x00', u'\x00', u'\x00', u'\x00', u'\x00', u'\x00', u'\x00', u'\x00',
u'\x00', u'\x00', u'\x00', u'\x00']
\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000�?\000\000\000\000\000\000�?\000\000\000\000\000\000�?\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000�?\000\000\000\000\000\000�?\000\000\000\000\000\000�?\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000�?\000\000\000\000\000\000�?\000\000\000\000\000\000�?\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000�?\000\000\000\000\000\000�?\000\000\000\000\000\000\000\000\000\000\000\000
#!/usr/bin/env python2
# -*- coding: utf-8 -*-
##################################################
# GNU Radio Python Flow Graph
# Title: teste experimantal
# Author: Felipe Gama
# Generated: Mon Aug 10 22:50:57 2020
##################################################
if __name__ == '__main__':
import ctypes
import sys
if sys.platform.startswith('linux'):
try:
x11 = ctypes.cdll.LoadLibrary('libX11.so')
x11.XInitThreads()
except:
print "Warning: failed to XInitThreads()"
from PyQt4 import Qt
from gnuradio import blocks
from gnuradio import eng_notation
from gnuradio import gr
from gnuradio.eng_option import eng_option
from gnuradio.filter import firdes
from optparse import OptionParser
import math
import sys
import wavelet
class teste_experimental(gr.top_block, Qt.QWidget):
def __init__(self):
gr.top_block.__init__(self, "teste experimantal")
Qt.QWidget.__init__(self)
self.setWindowTitle("teste experimantal")
try:
self.setWindowIcon(Qt.QIcon.fromTheme('gnuradio-grc'))
except:
pass
self.top_scroll_layout = Qt.QVBoxLayout()
self.setLayout(self.top_scroll_layout)
self.top_scroll = Qt.QScrollArea()
self.top_scroll.setFrameStyle(Qt.QFrame.NoFrame)
self.top_scroll_layout.addWidget(self.top_scroll)
self.top_scroll.setWidgetResizable(True)
self.top_widget = Qt.QWidget()
self.top_scroll.setWidget(self.top_widget)
self.top_layout = Qt.QVBoxLayout(self.top_widget)
self.top_grid_layout = Qt.QGridLayout()
self.top_layout.addLayout(self.top_grid_layout)
self.settings = Qt.QSettings("GNU Radio", "teste_experimental")
self.restoreGeometry(self.settings.value("geometry").toByteArray())
##################################################
# Variables
##################################################
self.text_msg = text_msg = "disparo"
self.tamFonte = tamFonte = 10
self.samp_rate = samp_rate = 1000000
##################################################
# Blocks
##################################################
self.wavelet_deco_wavelet_str_0_0 = wavelet.deco_wavelet_str(32,"",0)
self.wavelet_cod_wavelet_str_0 = wavelet.cod_wavelet_str(32)
self.blocks_socket_pdu_0 = blocks.socket_pdu("TCP_SERVER", "127.0.0.1", "5006", 10000, False)
self.blocks_message_debug_0 = blocks.message_debug()
##################################################
# Connections
##################################################
self.msg_connect((self.blocks_socket_pdu_0, 'pdus'), (self.wavelet_cod_wavelet_str_0, 'in'))
self.msg_connect((self.wavelet_cod_wavelet_str_0, 'out'), (self.wavelet_deco_wavelet_str_0_0, 'in'))
self.msg_connect((self.wavelet_deco_wavelet_str_0_0, 'out'), (self.blocks_message_debug_0, 'print'))
self.msg_connect((self.wavelet_deco_wavelet_str_0_0, 'out'), (self.blocks_socket_pdu_0, 'pdus'))
def closeEvent(self, event):
self.settings = Qt.QSettings("GNU Radio", "teste_experimental")
self.settings.setValue("geometry", self.saveGeometry())
event.accept()
def get_text_msg(self):
return self.text_msg
def set_text_msg(self, text_msg):
self.text_msg = text_msg
def get_tamFonte(self):
return self.tamFonte
def set_tamFonte(self, tamFonte):
self.tamFonte = tamFonte
def get_samp_rate(self):
return self.samp_rate
def set_samp_rate(self, samp_rate):
self.samp_rate = samp_rate
def main(top_block_cls=teste_experimental, options=None):
from distutils.version import StrictVersion
if StrictVersion(Qt.qVersion()) >= StrictVersion("4.5.0"):
style = gr.prefs().get_string('qtgui', 'style', 'raster')
Qt.QApplication.setGraphicsSystem(style)
qapp = Qt.QApplication(sys.argv)
tb = top_block_cls()
tb.start()
tb.show()
def quitting():
tb.stop()
tb.wait()
qapp.connect(qapp, Qt.SIGNAL("aboutToQuit()"), quitting)
qapp.exec_()
if __name__ == '__main__':
main()
import socket , pickle
import time
import math
from random import randint
import numpy as np
import pmt
import base64
def fonte(tamanho):
f=[]
for k in range(int(tamanho)):
f.append(randint(0,1))
return f
def moduladorOQPSK(data):
si = 2*(np.array(data[0:(len(data)):2])-0.5)
sq = 2*(np.array(data[1:(len(data)):2])-0.5)
modulado = np.array(si,dtype=complex)
modulado.imag = np.array(sq)
return np.array(modulado)/math.sqrt(2)
def arrayToString(comp):
s=[]
for c in comp:
if(c.imag<0):
s.append(str(c.real)+str(c.imag)+"j")
else:
s.append(str(c.real)+"+"+str(c.imag)+"j")
str1 = ','.join(str(e) for e in s)
return str1
TCP_IP = '127.0.0.1'
TCP_PORT = 5006
BUFFER_SIZE = 10000
data = moduladorOQPSK(fonte(20))
listToStr = arrayToString(data)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((TCP_IP, TCP_PORT))
N=1000
teste=b"abcde"
for k in range(N):
time.sleep(1)
s.send(listToStr)
data = s.recv(BUFFER_SIZE)
lines = []
for line in data:
try:
aux= line.decode()
lines.append(aux)
print(type(aux))
except:
print line
print(lines)
try:
print(data)
except:
print('ERRO')
_______________________________________________
USRP-users mailing list
USRP-users@lists.ettus.com
http://lists.ettus.com/mailman/listinfo/usrp-users_lists.ettus.com