123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545 |
- import numpy as np
- from numpy.linalg import lstsq
- from numpy.linalg import norm
- from numpy import fft
-
- import pylab
-
- from scipy.signal import correlate
-
- def autocorr(x):
-
- result = correlate(x, x, mode='full')
- return result[result.size/2:]
-
- class AdaptiveFilter:
-
- def __init__(self, mu):
- self.mu = mu
-
- def adapt_filt_Ref(self, x, R, M, mu, PCA, lambda2=0.95, H0=0):
- """ Taken from .m file
- This function is written to allow the user to filter a input signal
- with an adaptive filter that utilizes 2 reference signals instead of
- the standard method which allows for only 1 reference signal.
- Author: Rob Clemens Date: 3/16/06
- Modified and ported to Python, now takes arbitray number of reference points
- """
-
- import akvo.tressel.pca as pca
-
- if np.shape(x) != np.shape(R[0]):
- print ("Error, non aligned")
- exit(1)
-
- if PCA == "Yes":
-
- R, K, means = pca.pca( R )
-
- if all(H0) == 0:
- H = np.zeros( (len(R)*M))
-
- else:
- H = H0
- Rn = np.ones(len(R)*M) / mu
-
- r_ = np.zeros( (len(R), M) )
- e = np.zeros(len(x))
- ilambda = lambda2**-1
-
- for z in range(0, len(x)):
-
-
- for ir in range(len(R)):
- if z < M:
- r_[ir,0:z] = R[ir][0:z]
- r_[ir,z:M] = 0
- else:
-
- r_[ir,:] = R[ir][z-M:z]
-
- r_n = np.reshape(r_, -1)
-
-
- K = (Rn* r_n) / (lambda2 + np.dot(r_n*Rn, r_n))
- e[z] = x[z] - np.dot(r_n.T, H)
- H += K*e[z];
- Rn = ilambda*Rn - ilambda*np.dot(np.dot(K, r_n.T), Rn)
- return e, H
-
- def transferFunctionFFT(self, D, R, reg=1e-2):
- from akvo.tressel import pca
- """
- Computes the transfer function (H) between a Data channel and
- a number of Reference channels. The Matrices D and R are
- expected to be in the frequency domain on input.
- | R1'R1 R1'R2 R1'R3| |h1| |R1'D|
- | R2'R1 R2'R2 R2'R3| * |h2| = |R2'D|
- | R3'R1 R3'R2 R3'R3| |h3| |R3'D|
-
- Returns the corrected array
- """
-
-
-
-
-
-
-
-
-
-
-
-
-
- import scipy.linalg
- import akvo.tressel.pca as pca
-
-
- nref = len(R)
- H = np.zeros( (np.shape(D)[1], len(R)), dtype=complex )
- for iw in range(np.shape(D)[1]):
- A = np.zeros( (nref, nref), dtype=complex )
- B = np.zeros( (nref) , dtype=complex)
- for ii in range(nref):
- for jj in range(nref):
-
- A[ii,jj] = np.dot(R[ii][:,iw], R[jj][:,iw])
-
-
- B[ii] = np.dot( R[ii][:,iw], D[:,iw] )
-
-
-
-
-
-
-
-
- H[iw, :] = scipy.linalg.lstsq(A,B,cond=reg)[0]
-
-
-
-
-
- Error = np.zeros(np.shape(D), dtype=complex)
- for ir in range(nref):
- for q in range( np.shape(D)[0] ):
-
- Error[q,:] += H[:,ir]*R[ir][q,:]
- return D - Error
-
- def adapt_filt_tworefFreq(self, x, rx0, rx1, M, lambda2=0.95):
- """ Frequency domain version of above
- """
- from akvo.tressel import pca
-
- pylab.figure()
- pylab.plot(rx0)
- pylab.plot(rx1)
-
-
- transMatrix, K, means = pca.pca( np.array([rx0, rx1]))
- rx0 = transMatrix[:,0]
- rx1 = transMatrix[:,1]
-
- pylab.plot(rx0)
- pylab.plot(rx1)
- pylab.show()
- exit()
-
- if np.shape(x) != np.shape(rx0) or np.shape(x) != np.shape(rx1):
- print ("Error, non aligned")
- exit(1)
-
- wx = fft.rfft(x)
- wr0 = fft.rfft(rx0)
- wr1 = fft.rfft(rx1)
-
- H = np.zeros( (2*M), dtype=complex )
- ident_mat = np.eye((2*M))
- Rn = ident_mat / 0.1
- r_v = np.zeros( (M), dtype=complex )
- r_h = np.zeros( (M), dtype=complex )
- e = np.zeros(len(x), dtype=complex )
- ilambda = lambda2**-1
-
- for z in range(0, len(wx)):
-
- r_v = wr0[::-1][:M]
- r_h = wr1[::-1][:M]
- r_n = np.concatenate((r_v, r_h ))
- K = np.dot(Rn, r_n) / (lambda2 + np.dot(np.dot(r_n.T, Rn), r_n))
- e[z] = wx[z] - np.dot(r_n,H)
- H += K * e[z];
- Rn = ilambda*Rn - ilambda*K*r_n.T*Rn
-
- return fft.irfft(e)
-
- def iter_filt_refFreq(self, x, rx0, Ahat=.05, Bhat=.5, k=0.05):
-
- X = np.fft.rfft(x)
- X0 = np.copy(X)
- RX0 = np.fft.rfft(rx0)
-
-
- Abs2HW = []
- alphai = k * (np.abs(Ahat)**2 / np.abs(Bhat)**2)
- betai = k * (1. / (np.abs(Bhat)**2) )
- Hw = ((1.+alphai) * np.abs(X)**2 ) / (np.abs(X)**2 + betai*(np.abs(RX0)**2))
- H = np.abs(Hw)**2
- pylab.ion()
- pylab.figure()
- for i in range(10):
-
-
-
- alphai = k * (np.abs(Ahat)**2 / np.abs(Bhat)**2) * np.product(H, axis=0)
- betai = k * (1. / np.abs(Bhat)**2) * np.product(H, axis=0)
-
- Hw = ((1.+alphai) * np.abs(X)**2) / (np.abs(X)**2 + betai*np.abs(RX0)**2)
- Hw = np.nan_to_num(Hw)
- X *= Hw
- H = np.vstack( (H, np.abs(Hw)**2) )
-
- pylab.cla()
- pylab.plot(Hw)
-
-
- pylab.draw()
- raw_input("wait")
-
- pylab.cla()
- pylab.ioff()
-
- return np.fft.irfft(X)
-
- def iter_filt_refFreq(self, x, rx0, rx1, Ahat=.1, Bhat=1., k=0.001):
-
- X = np.fft.rfft(x)
- X0 = np.copy(X)
- RX0 = np.fft.rfft(rx0)
- RX1 = np.fft.rfft(rx1)
-
-
- alphai = k * (np.abs(Ahat)**2 / np.abs(Bhat)**2)
- betai = k * (1. / (np.abs(Bhat)**2) )
-
- H = np.ones(len(X))
-
-
- for i in range(1000):
-
-
-
- alphai = k * (np.abs(Ahat)**2 / np.abs(Bhat)**2) * np.product(H, axis=0)
- betai = k * (1. / np.abs(Bhat)**2) * np.product(H, axis=0)
-
- Hw = ((1.+alphai) * np.abs(X)**2) / (np.abs(X)**2 + betai*np.abs(RX0)**2)
- Hw = np.nan_to_num(Hw)
- X *= Hw
-
- H = np.vstack((H, np.abs(Hw)) )
-
-
-
-
-
-
-
-
-
-
- return np.fft.irfft(X0-X)
-
-
- def Tdomain_DFT(self, desired, input, S):
- """ Lifted from Adaptive filtering toolbox. Modefied to accept more than one input
- vector
- """
-
-
- nCoefficients = S["filterOrderNo"]/2+1
- nIterations = len(desired)
-
-
- errorVector = np.zeros(nIterations, dtype='complex')
- outputVector = np.zeros(nIterations, dtype='complex')
-
-
- coefficientVectorDFT = np.fft.rfft(S["initialCoefficients"])/np.sqrt(float(nCoefficients))
- desiredDFT = np.fft.rfft(desired)
- powerVector = S["initialPower"]*np.ones(nCoefficients)
-
-
-
- prefixedInput = np.concatenate([np.zeros(nCoefficients-1), np.array(input)])
-
-
- pylab.ion()
- pylab.figure(11)
- for it in range(nIterations):
-
- regressorDFT = np.fft.rfft(prefixedInput[it:it+nCoefficients][::-1]) /\
- np.sqrt(float(nCoefficients))
-
-
- powerVector = S["alpha"] * (regressorDFT*np.conjugate(regressorDFT)) + \
- (1.-S["alpha"])*(powerVector)
-
- pylab.cla()
-
-
-
-
- pylab.plot(powerVector.real)
- pylab.plot(powerVector.imag)
-
-
-
- pylab.draw()
-
-
- outputVector[it] = np.dot(coefficientVectorDFT.T, regressorDFT)
-
-
- errorVector[it] = desiredDFT[it] - outputVector[it]
-
-
-
-
- coefficientVectorDFT += (S["step"]*np.conjugate(errorVector[it])*regressorDFT) /\
- (S['gamma']+powerVector)
-
- return np.real(np.fft.irfft(errorVector))
-
-
- def Tdomain_DCT(self, desired, input, S):
- """ Lifted from Adaptive filtering toolbox. Modefied to accept more than one input
- vector. Uses cosine transform
- """
- from scipy.fftpack import dct
-
-
- nCoefficients = S["filterOrderNo"]+1
- nIterations = len(desired)
-
-
- errorVector = np.zeros(nIterations)
- outputVector = np.zeros(nIterations)
-
-
- coefficientVectorDCT = dct(S["initialCoefficients"])
- desiredDCT = dct(desired)
- powerVector = S["initialPower"]*np.ones(nCoefficients)
-
-
- prefixedInput = np.concatenate([np.zeros(nCoefficients-1), np.array(input)])
-
-
-
-
- for it in range(0, nIterations):
-
- regressorDCT = dct(prefixedInput[it:it+nCoefficients][::-1], type=2)
-
-
-
- powerVector = S["alpha"]*(regressorDCT) + (1.-S["alpha"])*(powerVector)
-
-
-
-
- outputVector[it] = np.dot(coefficientVectorDCT.T, regressorDCT)
-
- errorVector[it] = desiredDCT[it] - outputVector[it]
-
-
- coefficientVectorDCT += (S["step"]*errorVector[it]*regressorDCT)
-
-
-
-
- return dct(errorVector, type=3)
-
-
-
-
- def Tdomain_CORR(self, desired, input, S):
-
- from scipy.linalg import toeplitz
- from scipy.signal import correlate
-
-
- ac = np.correlate(input, input, mode='full')
- ac = ac[ac.size/2:]
- R = toeplitz(ac)
-
-
- r = np.correlate(desired, input, mode='full')
- r = r[r.size/2:]
-
-
- print ("R", np.shape(R))
- print ("r", np.shape(r))
- print ("solving")
-
- H = np.linalg.lstsq(R,r,rcond=.01)[0]
-
- print ("done solving")
- pylab.figure()
- pylab.plot(H)
- pylab.title("H")
-
-
-
- return desired - np.dot(toeplitz(H), input)
-
-
-
-
-
-
-
-
-
- def Fdomain_CORR(self, desired, input, dt, freq):
-
- from scipy.linalg import toeplitz
-
-
- Input = np.fft.rfft(input)
- Desired = np.fft.rfft(desired)
-
- T = toeplitz(Input)
-
- H = np.linalg.lstsq(T, Desired)[0]
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- pylab.figure()
- pylab.plot(H.real)
- pylab.plot(H.imag)
- pylab.plot(Input.real)
- pylab.plot(Input.imag)
- pylab.plot(Desired.real)
- pylab.plot(Desired.imag)
- pylab.legend(["hr","hi","ir","ii","dr","di"])
- pylab.title("H")
-
- return np.fft.irfft(H*Input)
-
- def Tdomain_RLS(self, desired, input, S):
- """
- A DFT is first performed on the data. Than a RLS algorithm is carried out
- for noise cancellation. Related to the RLS_Alt Algoritm 5.3 in Diniz book.
- The desired and input signals are assummed to be real time series data.
- """
-
-
- Input = np.fft.rfft(input)
- Desired = np.fft.rfft(desired)
-
-
- nCoefficients = S["filterOrderNo"]+1
- nIterations = len(Desired)
-
-
- errorVector = np.zeros(nIterations, dtype="complex")
- outputVector = np.zeros(nIterations, dtype="complex")
- errorVectorPost = np.zeros(nIterations, dtype="complex")
- outputVectorPost = np.zeros(nIterations, dtype="complex")
- coefficientVector = np.zeros( (nCoefficients, nIterations+1), dtype="complex" )
-
-
- coefficientVector[:,1] = S["initialCoefficients"]
- S_d = S["delta"]*np.eye(nCoefficients)
-
-
- prefixedInput = np.concatenate([np.zeros(nCoefficients-1, dtype="complex"),
- np.array(Input)])
- invLambda = 1./S["lambda"]
-
-
- pylab.ion()
- pylab.figure(11)
-
- for it in range(nIterations):
-
- regressor = prefixedInput[it:it+nCoefficients][::-1]
-
-
- outputVector[it] = np.dot(coefficientVector[:,it].T, regressor)
-
-
- errorVector[it] = Desired[it] - outputVector[it]
-
- psi = np.dot(S_d, regressor)
- if np.isnan(psi).any():
- print ("psi", psi)
- exit(1)
-
- pylab.cla()
-
- pylab.plot(regressor.real)
- pylab.plot(regressor.imag)
- pylab.plot(coefficientVector[:,it].real)
- pylab.plot(coefficientVector[:,it].imag)
- pylab.legend(["rr","ri", "cr", "ci"])
- pylab.draw()
- raw_input("paws")
-
- S_d = invLambda * (S_d - np.dot(psi, psi.T) /\
- S["lambda"] + np.dot(psi.T, regressor))
-
- coefficientVector[:,it+1] = coefficientVector[:,it] + \
- np.conjugate(errorVector[it])*np.dot(S_d, regressor)
-
- outputVectorPost[it] = np.dot(coefficientVector[:,it+1].T, regressor)
-
-
- errorVectorPost[it] = Desired[it] - outputVectorPost[it]
-
- errorVectorPost = np.nan_to_num(errorVectorPost)
-
- pylab.figure(11)
- print (np.shape(errorVectorPost))
- pylab.plot(errorVectorPost.real)
- pylab.plot(errorVectorPost.imag)
- pylab.show()
- print(errorVectorPost)
-
- return np.fft.irfft(errorVectorPost)
-
- if __name__ == "__main__":
-
- def noise(nu, t, phi):
- return np.sin(nu*2.*np.pi*t + phi)
-
- import matplotlib.pyplot as plt
- print("Test driver for adaptive filtering")
- Filt = AdaptiveFilter(.1)
- t = np.arange(0, .5, 1e-4)
- omega = 2000 * 2.*np.pi
- T2 = .100
- n1 = noise(60, t, .2 )
- n2 = noise(61, t, .514 )
- x = np.sin(omega*t)* np.exp(-t/T2) + 2.3*noise(60, t, .34) + 1.783*noise(31, t, 2.1)
- e = Filt.adapt_filt_tworef(x, n1, n2, 200, .98)
- plt.plot(t, x)
- plt.plot(t, n1)
- plt.plot(t, n2)
- plt.plot(t, e)
- plt.show()
|