๊ฒฐ๋ก ๋ถํฐ ๋งํ๋ฉด stft๋ ๋ถํ๊ฐ ๋๋ฌด ์ฌํด์ ์ค์๊ฐ์ผ๋ก ์๊ฐํํ๋ ๊ฒ์ ๋ฌด๋ฆฌ๋ฌด๋ฆฌ๋ฌด๋ฆฌ์ด๋ค. ์ผ๋จ matplotlib ์์ฒด์์ ๋จ์ ์๊ฐํ๋ฅผ ํ๊ธฐ์๋ ๋ฐ์ดํฐ ์์ด ๋๋ฌด ๋ง๊ธฐ ๋๋ฌธ์ ์ผ์ข ์ ๋ นํ๊ธฐ๋ฅ์ ๋ฃ์ด์ ์ํ๋ ๋งํผ์ ๋ฐ์ดํฐ๋ฅผ fft / stft ์ฒ๋ฆฌํด์ ๋ฐ๋ก ์๊ฐํํ๊ธฐ๋ก ๊ฒฐ์ ํ์๋ค. ๊ทธ๋๋ FFT ์ ๋๋ ์ค์๊ฐ์ผ๋ก ๊ฐ๋ฅํ ๊ฒ ๊ฐ์์ gpu accelerator๋ฅผ ์ฌ์ฉํด์ ๊ตฌํํ์๋ค. ๊ทธ๋ฌ๋ CUDA ํ๊ฒฝ์ด ์๋ ๊ณณ์์๋ ์ธ ์ ์๋๋ก numpy๋ก๋ ์ฝ๋๋ฅผ ์ถ๊ฐํ๋ค.
์ด์ ๊น์ง ์ฝ๋๋ ์๊ธฐ์ ์๋ค. https://dnai-deny.tistory.com/65
- PyTorch๋ก ๊ตฌํํ ๋ฒ์
import torch
import matplotlib.pyplot as plt
import matplotlib.animation as animation
class RealTimeAnimation:
"""
Animate original voltage signal graph and fft result.
"""
def __init__(self):
# define plot
self.fig = plt.figure(figsize=(20, 10))
self.fig.suptitle('Vibration Data', fontsize=20)
plt.subplots_adjust(hspace=0.3)
self.original = self.fig.add_subplot(211)
self.original.title.set_text('Signal Graph')
self.original.set(xlabel="samples", ylabel="mV")
self.fft_graph = self.fig.add_subplot(212)
self.fft_graph.title.set_text('FFT Graph')
self.fft_graph.set(xlabel="frequency", ylabel="amplitude")
# connect animation method
self.animation = animation.FuncAnimation(self.fig, self.update, interval=50)
def update():
...
# update fft graph
Fs = 25600.0
Ts = 1/Fs
n = len(graph_data)
# pytorch fft
graph_data = torch.Tensor(graph_data)
graph_data.to("cuda:0")
fft_data = torch.fft.rfft(graph_data)
frequency = torch.arange(0.0, Fs/2.0, Fs/n)
self.fft_graph.clear()
self.fft_graph.plot(frequency[:len(fft_data)//2], np.abs(fft_data[:len(fft_data)//2]))
self.fft_graph.title.set_text('FFT Graph')
self.fft_graph.set(xlabel="frequency", ylabel="amplitude")
__init__
ํจ์์ fft graph๋ฅผ ๊ทธ๋ฆด ๋ถ๋ถ์ ์๋ก ๋ฃ์ด์ฃผ๊ณ , update ํจ์์์ fft๋ฅผ ์ฒ๋ฆฌํ๋๋ก ์ถ๊ฐํ์๋ค. FFT ์์ฒด๋ ์ ํธ๋ฐ์ดํฐ๋ง ์์ผ๋ฉด ๋๊ธฐ ๋๋ฌธ์ rfft(์์ ์ฃผํ์๋ฅผ ๋ ๋ ค์ฃผ๋ ๋ฒ์ ์ fft, ์ํ์ง ์์ผ๋ฉด fft๋ก ํ๋ฉด ๋๋ค)๋ฅผ ์ ์ฉํด graph data์ ๋ํ fft data๋ฅผ ์ป์๋ค.
๊ทธ๋ฆฌ๊ณ ํด๋นํ๋ frequency ๊ณ์ฐ์ ์ํด์ Fs(Sampling rate)์ ์ ๋ฐ์ผ๋ก ์ก๊ณ ๊ทธ๋ํ ์๊ฐํ๋ฅผ ์ํ x์ขํ์ธ frequency๋ฅผ ๊ณ์ฐํ์๋ค.
Sampling rate๋ ์ค์ ์ธก์ ํ๊ณ ์ถ์ ์ฃผํ์ ๋ฒ์์ ์ ์ด๋ ๋ ๋ฐฐ๋ฅผ ์ก์์ค์ผํ๋ค. ์ด์ ๋ํ ๋ด์ฉ์ ๋์ดํด์คํธ ์ฃผํ์์ ์๋ฆฌ์ด์ฑ์ ๋ํด ์ฐพ์๋ณด๋ฉด ๋ ์์ธํ๊ณ ์ ๋ฌธ์ ์ธ ์ด์ ๋ฅผ ์์๋ณผ ์ ์๋ค.
FFT๋ ์ข์ฐ๊ฐ ๋์นญ์ ์ผ๋ก ๋์ค๊ธฐ ๋๋ฌธ์ ํ์ชฝ(๋ฐ)๋ง ๋ณด๋ฉด ๋๋ค๋ค. ๋ฐ๋ผ์ plottingํ ๋ fft_data์ ์ ๋ฐ๋ง ์๊ฐํํ์๋ค. ์ ๋๊ฐ ํจ์๋ฅผ ์ทจํด์ฃผ๋ ์ด์ ๋, ๊ณ์ฐ ๊ฒฐ๊ณผ๊ฐ ๋ณต์์์ธ ๊ฒฝ์ฐ๋ฅผ ํผํ๊ธฐ ์ํด์์ด๋ค.
์ง์! ์คํํด๋ณด๋ฉด ์ ๋์จ๋ค. ์๋ ๋ ๋ธ๋ญ์ ๊ฐ๊ฐ numpy๋ฒ์ , tensorflow ๋ฒ์ ์ด๋ค.
from numpy.fft import rfft, rfftfreq
frequency = rfftfreq(n, Ts)[:-1]
fft_data = (rfft(graph_data)/n)[:-1] * 2
import tensorflow as tf
with tf.device("/device:GPU:0"):
fft_data = tf.signal.rfft(input_tensor=tf.cast(graph_data, tf.float32))
frequency = tf.range(0.0, tf.divide(Fs,2.0), tf.divide(Fs, tf.cast(n, tf.float32)))
2. ๋ นํ ๋ฐ ์ผ์์ ์ง ๋ฒํผ ์ถ๊ฐ
class RealTimeAnimation:
"""
Animate original voltage signal graph and fft result.
"""
def __init__(self):
...
# define buttons
self.paused = False
ax = plt.axes([0.8, 0.025, 0.1, 0.04])
ax2 = plt.axes([0.68, 0.025, 0.1, 0.04])
ax3 = plt.axes([0.56, 0.025, 0.1, 0.04])
ax4 = plt.axes([0.44, 0.025, 0.1, 0.04])
# plot control button
self.pause_button = Button(ax, "Pause/Restart")
self.pause_button.on_clicked(self.toggle_event)
# start record button
self.pos = 0
self.record_button = Button(ax2, "Start Recording")
self.record_button.on_clicked(self.start_recording)
# data save button
self.save_button = Button(ax3, "Save Data")
self.save_button.on_clicked(self.save_data)
# show entire data analysis button
self.show_button = Button(ax4, "Show Analysis")
self.show_button.on_clicked(show_result)
def update(self, i) -> None:
...
def toggle_event(self, *args, **kwargs) -> None:
"""
On click event function of pause button.
Stop / Restart plotting graphs.
"""
if self.paused:
self.animation.event_source.start()
else:
self.animation.event_source.stop()
self.paused = not self.paused
def start_recording(self, *args, **kwargs) -> None:
data_lock.acquire()
self.pos = len(entire_data) - 1
data_lock.release()
print("Recording started.")
def save_data(self, *args, **kwargs) -> None:
"""
Save entire voltage data in csv.
During save data, graph plotting will be paused.
Using lock.
"""
self.animation.event_source.stop()
print("Saving...")
data_lock.acquire()
f = open("data.csv", "w")
target = csv.writer(f)
target.writerow(entire_data[self.pos:])
data_lock.release()
print("Saved {0} Data. Restart plotting.".format(len(entire_data)))
self.animation.event_source.start()
๋ฒํผ ์ถ๊ฐ๋ ๋ง ๊ทธ๋๋ก ๋ฒํผ์ ์ถ๊ฐํ๊ณ on click ์ด๋ฒคํธ๋ฅผ ์ฐ๊ฒฐํ๋ ๋ถ๋ถ์ด๋ค. recording start ๋ฒํผ์ ๋๋ฅด๋ฉด ํ์ฌ data์ ์ธ๋ฑ์ค๋ฅผ ์ ์ฅํ๊ณ , save data ๋ฒํผ์ ๋๋ฅด๋ฉด ์ ์ฅ๋ ์ธ๋ฑ์ค๋ถํฐ ํ์ฌ ์ธ๋ฑ์ค๊น์ง๋ฅผ csv ํํ๋ก ์ ์ฅํด์ค๋ค. ๋ถํ๋ฅผ ๋ง๊ธฐ์ํด ์ ์ฅ ์ค์๋ plotting์ ์ ์ ๋ฉ์ถ๋๋ก ํ์๋ค.
pause/restart์ plotting์ ์ ๊น ๋ฉ์ถ๊ณ ์ฌ์์ํ ์ ์๋๋ก ์ค์ ํ๋ ๋ฒํผ์ด๋ค.
show result ๋ฒํผ์ ๋ค์ ๋จ๋ฝ์ ๊ตฌํํ๋ค.
3. ์ ์ฅํ ๋ฐ์ดํฐ๋ฅผ FFT / STFT ๋ถ์ํ๊ธฐ
ํ์ผ์ ์๋ก ๋ง๋ ํ ์ ์ฅ๋ ๋ฐ์ดํฐ๋ฅผ ์๋ณธ / FFT / STFT ์ฒ๋ฆฌ ๊ฒฐ๊ณผ ์ธ ๊ฐ์ง๋ก plotting ํ๋ค. ๋ชจ๋๋ก ํธ์ถํ๋๋ก ํ ๊ฒ์ด๊ธฐ ๋๋ฌธ์ ํ๋์ ํจ์๋ก ๋ฑ๋กํด๋์๋ค.
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
import numpy as np
import torch
import csv
def show_result(e) -> None:
"""
Show original signal graph, fft and stft result of saved data.
Args:
e: event
"""
# read data
data_file = open("data.csv", "r")
reader = csv.reader(data_file, delimiter=',', quoting=csv.QUOTE_NONNUMERIC)
entire_data = []
for row in reader:
entire_data = row
print(entire_data[:10])
break
if len(entire_data) <= 1:
print("No data saved. Please try after save data.")
return
gs = gridspec.GridSpec(3, 1)
# define graph
fig = plt.figure(figsize=(20, 12))
fig.suptitle("Data Analysis Result", fontsize=20)
original = plt.subplot(gs[0, 0])
original.set_title("Original Signal")
original.set(xlabel="Samples", ylabel="mV")
stft = plt.subplot(gs[1, 0])
stft.set_title("Short-Term FFT")
stft.set(xlabel="Time(sec)", ylabel="Frequency")
fft_graph = plt.subplot(gs[2, 0])
fft_graph.set_title("FFT")
fft_graph.set(xlabel="Frequency", ylabel="Amplitude")
plt.subplots_adjust(hspace=0.5)
์ฌ๊ธฐ๊น์ง๊ฐ ๋ฐ์ดํฐ๋ฅผ ์ฝ์ด์ค๊ณ , plot๋ค์ ์ค์ ํ๋ ๋ถ๋ถ์ด๋ค.
3.1. FFT
# plot fft
Fs = 25600.0
Ts = 1/Fs
n = len(entire_data)
# pytorch fft
graph_data = torch.Tensor(entire_data)
graph_data.to("cuda:0")
fft_data = torch.fft.rfft(graph_data) / n
frequency = torch.arange(0.0, Fs/2.0, Fs/n)
fft_graph.clear()
fft_graph.plot(frequency[1:len(fft_data)//2], np.abs(fft_data[1:len(fft_data)//2]))
fft๋ ์ ์ค์๊ฐ์ผ๋ก ์ฒ๋ฆฌํ๋ ๊ฒ๊ณผ ์ฝ๋๊ฐ ๊ฐ๋ค. ์์ธํ ์ค๋ช ์ ์๋ต...
3.2. STFT
# pytorch stft
graph_data = graph_data.unsqueeze(0)
stft_ = torch.stft(graph_data, 25600)
stft_data = torch.sqrt(stft_[:,:,:,0] ** 2 + stft_[:,:,:,1]**2)
stft.pcolormesh(stft_data[0])
# scipy stft
f, t, Zxx = signal.stft(entire_data, fs=Fs, nperseg=len(entire_data)//100)
stft.pcolormesh(t, f, np.abs(Zxx), shading="gouraud"
STFT๋ PyTorch ๊ณต์๋ฌธ์๋ฅผ ๋ณด๊ณ ๋ฐ๋ผํ๋ค. PyTorch์ STFT๋ ๋ฐ์ดํฐ์ sampling rate๋ฅผ ์๊ตฌํ๋ค. ๊ฒฐ๊ณผ๋ฅผ plotํ ์ ์๋ ํํ๋ก ๋ฐ๊ฟ์ฃผ๊ณ ๋์ pcolormesh๋ก ์๊ฐํํ์๋ค.
scipy์ STFT๋ ๋ฐ์ดํฐ, sampling rate, ๊ทธ๋ฆฌ๊ณ window๋ฅผ ์๊ตฌํ๋ค. window๋ฅผ ์ ์ ํ๋ ๋ฐฉ๋ฒ์ ๋ํด์๋ ์์ง ์...๋ชจ๋ฅด๊ฒ ๋ค. ๋ฅ๋ฌ๋ ํ์ตํ ๋ hyperparameter ์กฐ์ ์ ํ๋ ๊ธฐ๋ถ์ด๋ผ๊ณ ํด์ผํ๋... window๋ฅผ ๋๊ฒ ์ก์ผ๋ฉด ์ฃผํ์ ์์ญ์ ๋ํ ํด์๋๊ฐ ์ฌ๋ผ๊ฐ๋ ๋์ ์๊ฐ์์ญ์ ๋ํ ํด์๋๊ฐ ๋จ์ด์ง๊ณ , ์ข๊ฒ ์ก์ผ๋ฉด ์๊ฐ ์์ญ์ ๋ํ ํด์๋๊ฐ ๊ฐ์ ๋๋ ๋์ ์ฃผํ์ ์์ญ์ ๋ํ ํด์๋๊ฐ ๋จ์ด์ง๋ค.
์์ฑ! ์ ์ฒด ์ฝ๋๋ ์๋ ๊นํ์ udp_server.py์ stft.py ์ฝ๋์ ์์ธํ ๋์์๋ค!
์ฌ๋ด์ผ๋ก gpu๋ฅผ ์ฌ์ฉํด์ ๋น ๋ฅด๊ฒ ์ฐ์ฐํ๊ธฐ ์ํด torch๋ tensorflow์ ์ ์ฉ์ ๊ตฌํํ์ง๋ง, ํ์ฌ๋ก์จ๋ ๋ฐ์ดํฐ๋ฅผ gpu ๋ฉ๋ชจ๋ฆฌ๋ก ๋ณต์ฌํ๋ ์๊ฐ์ด ์กด์ฌํ๊ธฐ ๋๋ฌธ์ ์ฒด๊ฐ๋๋ ์๋ ์ฐจ์ด๋ ๋ณ๋ก ์์๋ค. ๋ฐ์ดํฐ ์์ด ๋ง์ผ๋ ๋ ๊ทธ๋ฐ ๋ฏ. ๊ทธ๋ฅ numpy์ scipy๋ฅผ ์ฌ์ฉํด๋ ๋๊ฒ ๋ค.