-
Notifications
You must be signed in to change notification settings - Fork 0
/
New_DRRP_Functions.py
397 lines (314 loc) · 15.8 KB
/
New_DRRP_Functions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
# Functions used to run Dual Rotating Retarder Polarimeter (DRRP) measurements and analysis
# This is the new version of DRRP_Functions.py that uses Q measurements instead of I, and some additional features
import numpy as np
from numpy.linalg import inv
from astropy.io import fits
import os
import re
from scipy.optimize import curve_fit
import csv
import random
import matplotlib.pyplot as plt
import textwrap
# Mueller matrix for a linear polarizer, with angle a between transmission axis and horizontal (radians)
def linear_polarizer(a):
M01 = np.cos(2*a)
M02 = np.sin(2*a)
M10 = np.cos(2*a)
M11 = np.cos(2*a)**2
M12 = np.cos(2*a)*np.sin(2*a)
M20 = np.sin(2*a)
M21 = np.cos(2*a)*np.sin(2*a)
M22 = np.sin(2*a)**2
return 0.5*np.array([[1, M01, M02, 0],
[M10, M11, M12, 0],
[M20, M21, M22, 0],
[0, 0, 0, 0]])
# Mueller matrix for a linear retarder (waveplate). Angle of fast axis a, retardance r in radians
def linear_retarder(a, r):
M11 = np.cos(2*a)**2 + np.cos(r)*np.sin(2*a)**2
M12 = np.cos(2*a)*np.sin(2*a)*(1-np.cos(r))
M13 = -np.sin(2*a)*np.sin(r)
M21 = M12
M22 = np.sin(2*a)**2 + np.cos(2*a)**2*np.cos(r)
M23 = np.cos(2*a)*np.sin(r)
M31 = -M13
M32 = -M23
M33 = np.cos(r)
return np.array([[1, 0, 0, 0],
[0, M11, M12, M13],
[0, M21, M22, M23],
[0, M31, M32, M33]])
# Sorting function for extracting filenames based on last number in the filename (the angle of rotation)
def extract_number(filename):
match = re.findall(r'\d+(?:\.\d+)?', filename)
if match:
return float(match[-1])
# Function to subtract dark frames from raw frames. The new reduced images are saved to a different folder
def dark_subtraction(image_file, dark_file, old_directory, new_directory):
# Open the dark image and extract pixel values
fits.open(dark_file)
dark = fits.getdata(dark_file)
dark_median = np.median(dark, axis=0)
# Search through the desired raw data folder
for filename in os.listdir(old_directory):
if filename.startswith(image_file): # Call specific files starting with the desired name
with fits.open(os.path.join(old_directory, filename)) as hdul:
img_data = hdul[0].data
img_median = np.median(img_data, axis=0)
reduced_data = img_median - dark_median
# Save the newly reduced image to a reduced data folder
new_filename = f"Reduced_{filename}"
new_filepath = os.path.join(new_directory, new_filename)
fits.writeto(new_filepath, reduced_data, overwrite=True)
# Get intensity values from each spot in the reduced images. reduced_filename should just be the start of the name (leave out the last number, the angle).
def extract_intensities(reduced_filename, reduced_folder, lcenter, rcenter, maxradius, cutoff=5000):
I_left = np.array([])
I_right = np.array([])
bad_indices = np.array([])
longtheta = np.linspace(0, np.pi, 46)
for filename in sorted(os.listdir(reduced_folder), key = extract_number):
if filename.startswith(reduced_filename):
with fits.open(os.path.join(reduced_folder, filename)) as hdul:
reduced_img_data = hdul[0].data
ys, xs, = np.indices(reduced_img_data.shape)
lradius = np.sqrt((ys-lcenter[0])**2+(xs-lcenter[1])**2)
rradius = np.sqrt((ys-rcenter[0])**2+(xs-rcenter[1])**2)
lbackground_mask = (lradius > 20) & (lradius < 26)
rbackground_mask = (rradius > 20) & (rradius < 26) # Index the background around each spot, take the median value
background_lmedian = np.median(reduced_img_data[lbackground_mask])
background_rmedian = np.median(reduced_img_data[rbackground_mask])
lflux = np.sum(reduced_img_data[lradius < maxradius] - background_lmedian) # Now take the flux with the background mask subtracted
rflux = np.sum(reduced_img_data[rradius < maxradius] - background_rmedian)
I_left = np.append(I_left, lflux)
I_right = np.append(I_right, rflux)
if lflux+rflux < cutoff:
print("Warning: low flux detected, check the image " + filename + ", index: " + str(sorted(os.listdir(reduced_folder), key = extract_number).index(filename)))
bad_indices = np.append(bad_indices, sorted(os.listdir(reduced_folder), key = extract_number).index(filename))
else:
continue
# Makes the array a list of integers that can be used to index the other array
bad_indices = bad_indices.astype(int)
# Deletes the bad indices (caused by camera glitch or some other complication) from the data
I_left = np.delete(I_left, bad_indices)
I_right = np.delete(I_right, bad_indices)
new_angles = np.delete(longtheta, bad_indices)
return I_left, I_right, new_angles, bad_indices
# Gives the condition number of eventual Mueller matrix
def condition_number(matrix):
minv = np.linalg.pinv(matrix)
# Compute maximum norm
norm = np.linalg.norm(matrix, ord=np.inf)
ninv = np.linalg.norm(minv, ord=np.inf)
return norm*ninv
# Function that makes the Mueller matrix using the calibration parameters a1, w1, w2, r2, and r2. Set these to 0 for an uncalibrated matrix
def q_calibrated_full_mueller_polarimetry(thetas, a1, w1, w2, r1, r2, I_minus, I_plus, M_in=None):
nmeas = len(thetas) # Number of measurements
Wmat1 = np.zeros([nmeas, 16])
Pmat1 = np.zeros([nmeas])
Wmat2 = np.zeros([nmeas, 16])
Pmat2 = np.zeros([nmeas])
th = thetas
unnormalized_Q = I_plus - I_minus # Difference in intensities measured by the detector. Plus should be the right spot, minus the left spot
unnormalized_I_total = I_plus + I_minus
Q = unnormalized_Q/np.max(unnormalized_I_total)
I_total = unnormalized_I_total/np.max(unnormalized_I_total)
# Both Q and I should be normalized by the total INPUT flux, but we don't know this value. The closest we can guess is the maximum of the measured intensity
# This assumes the input flux is constant over time. Could be improved with a beam splitter that lets us monitor the input flux over time
for i in range(nmeas):
# Mueller Matrix of generator (linear polarizer and a quarter wave plate)
Mg = linear_retarder(th[i]+w1, np.pi/2+r1) @ linear_polarizer(0+a1)
# Mueller Matrix of analyzer (one channel of the Wollaston prism is treated as a linear polarizer. The right spot is horizontal (0) and the left spot is vertical(pi/2))
Ma = linear_retarder(th[i]*5+w2, np.pi/2+r2)
# Data reduction matrix. Taking the 0 index ensures that intensity is the output
Wmat1[i,:] = np.kron((Ma)[0,:], Mg[:,0]) # for the top row, using intensities
Wmat2[i,:] = np.kron((Ma)[1,:], Mg[:,0]) # for the bottom 3 rows, using Q
# M_in is some example Mueller matrix. Providing this input will test theoretical Mueller matrix. Otherwise, the raw data is used
if M_in is not None:
Pmat1[i] = (Ma[0,:] @ M_in @ Mg[:,0])
Pmat2[i] = (Ma[1,:] @ M_in @ Mg[:,0])
else:
Pmat1[i] = I_total[i] #Pmat is a vector of measurements (either I or Q)
Pmat2[i] = Q[i]
# Compute Mueller matrix using Moore-Penrose pseudo invervse
M1 = np.linalg.pinv(Wmat1) @ Pmat1
M1 = np.reshape(M1, [4,4])
M2 = np.linalg.pinv(Wmat2) @ Pmat2
M2 = np.reshape(M2, [4,4])
M = np.zeros([4,4])
M[0,:] = M1[0,:]
M[1:4,:] = M2[1:4,:]
return M
# Define the identity matrix and other matrices which are useful for the Mueller calculus
M_identity = np.array([[1, 0, 0, 0], [0, 1, 0, 0], [0, 0, 1, 0], [0, 0, 0, 1]])
A = np.array([1, 0, 0, 0])
B = np.array([[1], [0], [0], [0]])
C = np.array([0, 1, 0, 0])
# In order, the calibration parameters are LP1 angle, QWP1 axis angle, QWP2 axis angle, QWP1 retardance, QWP2 retrdance
def q_calibration_function(t, a1, w1, w2, r1, r2):
prediction = [None]*len(t)
for i in range(len(t)):
prediction[i] = float(C @ linear_retarder(5*t[i]+w2, np.pi/2+r2) @ M_identity @ linear_retarder(t[i]+w1, np.pi/2+r1) @ linear_polarizer(a1) @ B)
return prediction
# Basically the same as above, but with an optional input matrix to simulate data
def q_output_simulation_function(t, a1, w1, w2, r1, r2, M_in=None):
if M_in is None:
M = M_identity
else:
M = M_in
prediction = [None]*len(t)
for i in range(len(t)):
prediction[i] = float(C @ linear_retarder(5*t[i]+w2, np.pi/2+r2) @ M @ linear_retarder(t[i]+w1, np.pi/2+r1) @ linear_polarizer(a1) @ B)
return prediction
# Function that is useful for generating intensity values for a given sample matrix and offset parameters
def I_output_simulation_function(t, a1, w1, w2, r1, r2, M_in=None):
if M_in is None:
M = M_identity
else:
M = M_in
prediction = [None]*len(t)
for i in range(len(t)):
prediction[i] = float(A @ linear_retarder(5*t[i]+w2, np.pi/2+r2) @ M @ linear_retarder(t[i]+w1, np.pi/2+r1) @ linear_polarizer(a1) @ B)
return prediction
# Calculate the root-mean-square error of the calibration matrix by comparing with the identity matrix
def RMS_calculator(calibration_matrix):
differences = []
for i in range(0, 4):
for j in range(0, 4):
differences.append(calibration_matrix[i, j]-M_identity[i, j])
differences_squared = [x**2 for x in differences]
RMS = np.sqrt(sum(differences_squared)/16)
return RMS
# Calculate the retardance error by standard error propogation using RMS in the matrix elements from calibration
def propagated_error(M_R, RMS):
# return RMS/np.sqrt(1-(np.trace(M_R)/2-1)**2) # These two equations are equivalent
x = np.trace(M_R)
return 2*RMS/np.sqrt(4*x-x**2) # Value in radians
# The function that gives everything you want to know at once
def q_ultimate_polarimetry(cal_angles, cal_left_intensity, cal_right_intensity, sample_angles, sample_left_intensity, sample_right_intensity):
ICal = cal_right_intensity + cal_left_intensity # Plus should be the right spot, minus is the left spot
QCal = cal_right_intensity - cal_left_intensity
initial_guess = [0, 0, 0, 0, 0]
parameter_bounds = ([-np.pi, -np.pi, -np.pi, -np.pi/2, -np.pi/2], [np.pi, np.pi, np.pi, np.pi/2, np.pi/2])
# Find parameters from calibration
normalized_QCal = QCal/(max(ICal)) # This should be normalized by the input intensity, but we don't know that so use the max of the measured intensity instead as an approximation
popt, pcov = curve_fit(q_calibration_function, cal_angles, normalized_QCal, p0=initial_guess, bounds=parameter_bounds)
print(popt, "Fit parameters for a1, w1, w2, r1, and r2. 1 for generator, 2 for analyzer")
# The calibration matrix (should be close to identity) to see how well the parameters compensate
MCal = q_calibrated_full_mueller_polarimetry(cal_angles, popt[0], popt[1], popt[2], popt[3], popt[4], cal_left_intensity, cal_right_intensity)
MCal = MCal/np.max(np.abs(MCal))
RMS_Error = RMS_calculator(MCal)
#print(MCal, " This is the calibration Mueller Matrix.")
# Use the parameters found above from curve fitting to construct the actual Mueller matrix of the sample
MSample = q_calibrated_full_mueller_polarimetry(sample_angles, popt[0], popt[1], popt[2], popt[3], popt[4], sample_left_intensity, sample_right_intensity)
MSample = MSample/np.max(np.abs(MSample))
np.set_printoptions(suppress=True) # Suppresses scientific notation, keeps decimal format
# Use the polar decomposition of the retarder matrix (see below)
r_decomposed_MSample = decompose_retarder(MSample)
retardance = np.arccos(np.trace(decompose_retarder(r_decomposed_MSample))/2 - 1)/(2*np.pi)
Retardance_Error = propagated_error(r_decomposed_MSample, RMS_Error)
return MSample, retardance, MCal, RMS_Error, Retardance_Error
# Functions from Jaren's katsu code on Polar decomposition. Inspired by Lu & Chipman 1996 https://doi.org/10.1364/JOSAA.13.001106
def broadcast_outer(a,b):
"""broadcasted outer product of two A,B,...,N vectors. Used for polarimetric data reduction
where out is a A,B,...,N,N matrix. While in principle this does not require vectors of different length, it is not tested
to produce anything other than square matrices.
Parameters
----------
a : numpy.ndarray
A,B,...,N vector 1
b : numpy.ndarray
A,B,...,N vector 2
Returns
-------
numpy.ndarray
outer product matrix
"""
return np.einsum('...i,...j->...ij',a,b)
def _empty_mueller(shape):
"""Returns an empty array to populate with Mueller matrix elements.
Parameters
----------
shape : list
shape to prepend to the mueller matrix array. shape = [32,32] returns an array of shape [32,32,2,2]
where the matrix is assumed to be in the last indices. Defaults to None, which returns a 2x2 array.
Returns
-------
numpy.ndarray
The zero array of specified shape
Notes
-----
The structure of this function was taken from prysm.x.polarization, which was written by Jaren Ashcraft
"""
if shape is None:
shape = (4, 4)
else:
shape = (*shape, 4, 4)
return np.zeros(shape)
def decompose_diattenuator(M):
"""Decompose M into a diattenuator using the Polar decomposition from Lu & Chipman 1996 https://doi.org/10.1364/JOSAA.13.001106
Parameters
----------
M : numpy.ndarray
Mueller Matrix to decompose
Returns
-------
numpy.ndarray
Diattenuator component of mueller matrix
"""
# First, determine the diattenuator
T = M[..., 0, 0]
if M.ndim > 2:
diattenuation_vector = M[..., 0, 1:] / T[..., np.newaxis]
else:
diattenuation_vector = M[..., 0, 1:] / T
D = np.sqrt(np.sum(diattenuation_vector * diattenuation_vector, axis=-1))
mD = np.sqrt(1 - D**2)
if M.ndim > 2:
diattenutation_norm = diattenuation_vector / D[..., np.newaxis]
else:
diattenutation_norm = diattenuation_vector / D
# DD = diattenutation_norm @ np.swapaxes(diattenutation_norm,-2,-1)
DD = broadcast_outer(diattenutation_norm, diattenutation_norm)
# create diattenuator
I = np.identity(3)
if M.ndim > 2:
I = np.broadcast_to(I, [*M.shape[:-2], 3, 3])
mD = mD[..., np.newaxis, np.newaxis]
inner_diattenuator = mD * I + (1 - mD) * DD # Eq. 19 Lu & Chipman
Md = _empty_mueller(M.shape[:-2])
# Eq 18 Lu & Chipman
Md[..., 0, 0] = 1.
Md[..., 0, 1:] = diattenuation_vector
Md[..., 1:, 0] = diattenuation_vector
Md[..., 1:, 1:] = inner_diattenuator
if M.ndim > 2:
Md = Md * T[..., np.newaxis, np.newaxis]
else:
Md = Md * T
Md = Md/np.max(np.abs(Md)) # remember to normalize the matrix
return Md
def decompose_retarder(M, return_all=False):
"""Decompose M into a retarder using the Polar decomposition from Lu & Chipman 1996 https://doi.org/10.1364/JOSAA.13.001106
Note: this doesn't work if the diattenuation can be described by a pure polarizer,
because the matrix is singular and therefore non-invertible
Parameters
----------
M : numpy.ndarray
Mueller Matrix to decompose
return_all : bool
Whether to return the retarder and diattenuator vs just the retarder.
Defaults to False, which returns both
Returns
-------
numpy.ndarray
Retarder component of mueller matrix
"""
Md = decompose_diattenuator(M)
# Then, derive the retarder
Mr = M @ np.linalg.inv(Md)
Mr = Mr/np.max(np.abs(Mr)) # remember to normalize the matrix
if return_all:
return Mr, Md
else:
return Mr