Hi All, My code is setup to detect regular beeps in a real time buffered stream of samples from a SDR. For efficiency reasons I perform beep detection using fft - and if no beeps are detected no further processing is done.
This works pretty well except for one edge case - where the beep is on the edge of the fft, then it gets skipped. I think I need to overlap the fft processing. My beeps are 0.018s long with a usual sample rate of 2.048e6. So this is my current code without overlapping: def process(self, samples: SamplesT): # look for the presence of a beep within the chunk and : # (1) if beep found calculate the offset # (2) if beep not found iterate the counters and move on start_time = time.time() fft_size = self.fft_size f = np.linspace(self.sample_rate/-2, self.sample_rate/2, fft_size) num_ffts = len(samples) // fft_size # // is an integer division which rounds down fft_thresh = 0.1 beep_freqs = [] for i in range(num_ffts): fft = np.abs(np.fft.fftshift(np.fft.fft(samples[i*fft_size:(i+1)*fft_size]))) / fft_size if np.max(fft) > fft_thresh: beep_freqs.append(np.linspace(self.sample_rate/-2, self.sample_rate/2, fft_size)[np.argmax(fft)]) finish_time = time.time() # if not beeps increment and exit early if len(beep_freqs) == 0: self.stateful_index += (samples.size/100) + 14 return And this is my current attempt at overlapping: def process(self, samples: SamplesT): # look for the presence of a beep within the chunk and : # (1) if beep found calculate the offset # (2) if beep not found iterate the counters and move on fft_size = self.fft_size f = np.linspace(self.sample_rate/-2, self.sample_rate/2, fft_size) size = fft_size # 8704 # step = self.sample_rate * self.beep_duration + 1000 step = 7704 samples_to_send_to_fft = [samples[i : i + size] for i in range(0, len(samples)//fft_size, step)] num_ffts = len(samples_to_send_to_fft[0]) # // is an integer division which rounds down fft_thresh = 0.1 beep_freqs = [] for i in range(num_ffts): # fft = np.abs(np.fft.fftshift(np.fft.fft(samples[i*fft_size:(i+1)*fft_size]))) / fft_size fft = np.abs(np.fft.fftshift(np.fft.fft(samples_to_send_to_fft[0][i]))) / fft_size if np.max(fft) > fft_thresh: beep_freqs.append(np.linspace(self.sample_rate/-2, self.sample_rate/2, fft_size)[np.argmax(fft)]) # if no beeps increment and exit early if len(beep_freqs) == 0: self.stateful_index += (samples.size/100) return However it doesn't work at all. I think the way the array is constructed with this overlapping is different structure? My overlapping technique is as per here : https://stackoverflow.com/questions/38163366/split-list-into-separate-but-overlapping-chunks I am very interested if anyone can help with this please? Thank you Al