Videos can be made of sparse filters evolving over time. Below is a code snippet implementing the K-SVD algorithm. The purpose of the snippet is to visualize the state of sparse basis functions at they are iteratively refined.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 | import matplotlib.pyplot as plt
import numpy as np
import scipy
import sklearn.linear_model
from matplotlib import gridspec
from sklearn.feature_extraction import image
import skvideo.datasets
try:
xrange
except NameError:
xrange = range
np.random.seed(0)
# use greedy K-SVD algorithm with OMP
def code_step(X, D):
model = sklearn.linear_model.OrthogonalMatchingPursuit(
n_nonzero_coefs=5, fit_intercept=False, normalize=False
)
#C = sklearn.
model.fit(D.T, X.T)
return model.coef_
def dict_step(X, C, D):
unused_indices = []
for k in xrange(D.shape[0]):
usedidx = np.abs(C[:, k])>0
if np.sum(usedidx) <= 1:
print("Skipping filter #%d" % (k,))
unused_indices.append(k)
continue
selectNotK = np.arange(D.shape[0]) != k
used_coef = C[usedidx, :][:, selectNotK]
E_kR = X[usedidx, :].T - np.dot(used_coef, D[selectNotK, :]).T
U, S, V = scipy.sparse.linalg.svds(E_kR, k=1)
# choose sign based on largest dot product
choicepos = np.dot(D[k,:], U[:, 0])
choiceneg = np.dot(D[k,:], -U[:, 0])
if choicepos > choiceneg:
D[k, :] = U[:, 0]
C[usedidx, k] = S[0] * V[0, :]
else:
D[k, :] = -U[:, 0]
C[usedidx, k] = -S[0] * V[0, :]
# re-randomize filters that were not used
for i in unused_indices:
D[i, :] = np.random.normal(size=D.shape[1])
D[i, :] /= np.sqrt(np.dot(D[i,:], D[i,:]))
return D
def plot_weights(basis):
n_filters, n_channels, height, width = basis.shape
ncols = 10
nrows = 10
fig = plt.figure()
gs = gridspec.GridSpec(nrows, ncols)
rown = 0
coln = 0
for filter in xrange(n_filters):
ax = fig.add_subplot(gs[rown, coln])
mi = np.min(basis[filter, 0, :, :])
ma = np.max(basis[filter, 0, :, :])
ma = np.max((np.abs(mi), np.abs(ma)))
mi = -ma
ax.imshow(basis[filter, 0, :, :], vmin=mi, vmax=ma, cmap='Greys_r', interpolation='none')
ax.xaxis.set_major_locator(plt.NullLocator())
ax.yaxis.set_major_locator(plt.NullLocator())
coln += 1
if coln >= ncols:
coln = 0
rown += 1
gs.tight_layout(fig, pad=0, h_pad=0, w_pad=0)
fig.canvas.draw()
buf, sz = fig.canvas.print_to_buffer()
data = np.fromstring(buf, dtype=np.uint8).reshape(sz[1], sz[0], -1)[:, :, :3]
plt.close()
return data
# a 5 fps video encoded using x264
writer = skvideo.io.FFmpegWriter("sparsity.mp4",
inputdict={
"-r": "10"
},
outputdict={
'-vcodec': 'libx264', '-b': '30000000'
})
# open the first frame of bigbuckbunny
filename = skvideo.datasets.bigbuckbunny()
vidframe = skvideo.io.vread(filename, outputdict={"-pix_fmt": "gray"})[0, :, :, 0]
# initialize D
D = np.random.normal(size=(100, 7*7))
for i in range(D.shape[0]):
D[i, :] /= np.sqrt(np.dot(D[i,:], D[i,:]))
X = image.extract_patches_2d(vidframe, (7, 7))
X = X.reshape(X.shape[0], -1).astype(np.float)
# sumsample about 10000 patches
X = X[np.random.permutation(X.shape[0])[:10000]]
for i in range(200):
print("Iteration %d / %d" % (i, 200))
C = code_step(X, D)
D = dict_step(X, C, D)
frame = plot_weights(D.reshape(100, 1, 7, 7))
writer.writeFrame(frame)
writer.close()
|
The video output for 200 iterations of the K-SVD algorithm:
If you want to create a corrupted version of a video, you can use the FFmpegReader/FFmpegWriter in combination. Just make sure that you pass the video metadata along, or you may get incorrect output video (such as incorrect framerate). Provided below is an example corrupting one frame from the source video with white noise:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | import numpy as np
import skvideo.datasets
filename = skvideo.datasets.bigbuckbunny()
vid_in = skvideo.io.FFmpegReader(filename)
data = skvideo.io.ffprobe(filename)['video']
rate = data['@r_frame_rate']
T = np.int(data['@nb_frames'])
vid_out = skvideo.io.FFmpegWriter("corrupted_video.mp4", inputdict={
'-r': rate,
},
outputdict={
'-vcodec': 'libx264',
'-pix_fmt': 'yuv420p',
'-r': rate,
})
for idx, frame in enumerate(vid_in.nextFrame()):
print("Writing frame %d/%d" % (idx, T))
if (idx >= (T/2)) & (idx <= (T/2 + 10)):
frame = np.random.normal(128, 128, size=frame.shape).astype(np.uint8)
vid_out.writeFrame(frame)
vid_out.close()
|
Video output of the corrupted BigBuckBunny sequence: