Implemented Bayesian SSD decoding pipeline
Signed-off-by: Jim Martens <github@2martens.de>
This commit is contained in:
@ -190,7 +190,6 @@ def predict(generator: callable,
|
||||
_predict_loop(generator, use_dropout, steps_per_epoch,
|
||||
dropout_step=functools.partial(_predict_dropout_step,
|
||||
model=model,
|
||||
get_observations_func=_get_observations,
|
||||
batch_size=batch_size,
|
||||
forward_passes_per_image=forward_passes_per_image),
|
||||
vanilla_step=functools.partial(_predict_vanilla_step, model=model),
|
||||
@ -208,6 +207,14 @@ def predict(generator: callable,
|
||||
iou_threshold=iou_threshold,
|
||||
top_k=top_k
|
||||
),
|
||||
decode_func_dropout=functools.partial(
|
||||
_decode_predictions_dropout,
|
||||
decode_func=ssd_output_decoder.decode_detections_dropout,
|
||||
image_size=image_size,
|
||||
confidence_threshold=confidence_threshold,
|
||||
),
|
||||
apply_entropy_threshold_func=_apply_entropy_threshold,
|
||||
get_observations_func=_get_observations,
|
||||
transform_func=functools.partial(
|
||||
_transform_predictions,
|
||||
inverse_transform_func=object_detection_2d_misc_utils.apply_inverse_transforms),
|
||||
@ -292,6 +299,8 @@ def _predict_prepare_paths(output_path: str, use_dropout: bool) -> Tuple[str, st
|
||||
def _predict_loop(generator: Generator, use_dropout: bool, steps_per_epoch: int,
|
||||
dropout_step: callable, vanilla_step: callable,
|
||||
save_images: callable, decode_func: callable,
|
||||
decode_func_dropout: callable, get_observations_func: callable,
|
||||
apply_entropy_threshold_func: callable,
|
||||
transform_func: callable, save_func: callable,
|
||||
use_entropy_threshold: bool, entropy_threshold_min: float,
|
||||
entropy_threshold_max: float) -> None:
|
||||
@ -314,8 +323,14 @@ def _predict_loop(generator: Generator, use_dropout: bool, steps_per_epoch: int,
|
||||
if not saved_images_prediction:
|
||||
save_images(inputs, predictions, custom_string="after-prediction")
|
||||
saved_images_prediction = True
|
||||
if use_dropout:
|
||||
decoded_predictions = decode_func_dropout(predictions)
|
||||
observations = get_observations_func(decoded_predictions)
|
||||
for entropy_threshold in entropy_thresholds:
|
||||
decoded_predictions = decode_func(predictions, entropy_threshold=entropy_threshold)
|
||||
if use_dropout:
|
||||
decoded_predictions = apply_entropy_threshold_func(observations, entropy_threshold=entropy_threshold)
|
||||
else:
|
||||
decoded_predictions = decode_func(predictions, entropy_threshold=entropy_threshold)
|
||||
if not saved_images_decoding:
|
||||
custom_string = f"after-decoding-{entropy_threshold}" if use_entropy_threshold else "after-decoding"
|
||||
save_images(inputs, decoded_predictions, custom_string=custom_string)
|
||||
@ -335,10 +350,9 @@ def _predict_loop(generator: Generator, use_dropout: bool, steps_per_epoch: int,
|
||||
|
||||
|
||||
def _predict_dropout_step(inputs: np.ndarray, model: tf.keras.models.Model,
|
||||
get_observations_func: callable,
|
||||
batch_size: int, forward_passes_per_image: int) -> np.ndarray:
|
||||
|
||||
detections = [np.zeros((8732 * forward_passes_per_image, 73)) for _ in range(batch_size)]
|
||||
detections = np.zeros((batch_size, 8732 * forward_passes_per_image, 73))
|
||||
|
||||
for forward_pass in range(forward_passes_per_image):
|
||||
predictions = model.predict_on_batch(inputs)
|
||||
@ -346,9 +360,7 @@ def _predict_dropout_step(inputs: np.ndarray, model: tf.keras.models.Model,
|
||||
for i in range(batch_size):
|
||||
detections[i][forward_pass * 8732:forward_pass * 8732 + 8732] = predictions[i]
|
||||
|
||||
observations = np.asarray(get_observations_func(detections))
|
||||
|
||||
return observations
|
||||
return detections
|
||||
|
||||
|
||||
def _predict_vanilla_step(inputs: np.ndarray, model: tf.keras.models.Model) -> np.ndarray:
|
||||
@ -374,6 +386,37 @@ def _decode_predictions(predictions: np.ndarray,
|
||||
)
|
||||
|
||||
|
||||
def _decode_predictions_dropout(predictions: np.ndarray,
|
||||
decode_func: callable,
|
||||
image_size: int,
|
||||
# entropy_threshold: float,
|
||||
confidence_threshold: float,
|
||||
# iou_threshold: float,
|
||||
# top_k: int
|
||||
) -> List[np.ndarray]:
|
||||
return decode_func(
|
||||
y_pred=predictions,
|
||||
img_width=image_size,
|
||||
img_height=image_size,
|
||||
input_coords="corners",
|
||||
confidence_thresh=confidence_threshold
|
||||
)
|
||||
|
||||
|
||||
def _apply_entropy_threshold(observations: Sequence[np.ndarray], entropy_threshold: float) -> List[np.ndarray]:
|
||||
final_observations = []
|
||||
batch_size = len(observations)
|
||||
for i in range(batch_size):
|
||||
filtered_image_observations = observations[observations[i][-1] < entropy_threshold]
|
||||
final_image_observations = np.copy(filtered_image_observations[:, :, -8:-1])
|
||||
final_image_observations[:, :, 0] = np.argmax(filtered_image_observations[:, :, :-5], axis=-1)
|
||||
final_image_observations[:, :, 1] = np.amax(filtered_image_observations[:, :, :-5], axis=-1)
|
||||
final_image_observations[:, :, 2] = filtered_image_observations[:, :, -1]
|
||||
final_observations.append(final_image_observations)
|
||||
|
||||
return final_observations
|
||||
|
||||
|
||||
def _transform_predictions(decoded_predictions: np.ndarray, inverse_transforms: Sequence[np.ndarray],
|
||||
inverse_transform_func: callable) -> np.ndarray:
|
||||
|
||||
@ -404,16 +447,16 @@ def _predict_save_images(inputs: np.ndarray, predictions: np.ndarray,
|
||||
get_coco_cat_maps_func, custom_string)
|
||||
|
||||
|
||||
def _get_observations(detections: Sequence[np.ndarray]) -> List[List[np.ndarray]]:
|
||||
def _get_observations(detections: Sequence[np.ndarray]) -> List[np.ndarray]:
|
||||
batch_size = len(detections)
|
||||
observations = [[] for _ in range(batch_size)]
|
||||
print(f"batch size: {batch_size}")
|
||||
final_observations = []
|
||||
|
||||
# iterate over images
|
||||
for i in range(batch_size):
|
||||
detections_image = np.asarray(detections[i])
|
||||
overlaps = bounding_box_utils.iou(detections_image[:, -12:-8],
|
||||
detections_image[:, -12:-8],
|
||||
overlaps = bounding_box_utils.iou(detections_image[:, -5:-1],
|
||||
detections_image[:, -5:-1],
|
||||
mode="outer_product",
|
||||
border_pixels="include")
|
||||
image_observations = []
|
||||
@ -452,8 +495,12 @@ def _get_observations(detections: Sequence[np.ndarray]) -> List[List[np.ndarray]
|
||||
# average over class probabilities
|
||||
observation_mean = np.mean(observation_detections, axis=0)
|
||||
observations[i].append(observation_mean)
|
||||
|
||||
return observations
|
||||
|
||||
final_observations.append(np.asarray(observations[i]))
|
||||
final_observations[i][:, -1] = -np.sum(final_observations[i][:, :-5] * np.log(final_observations[i][:, :-5]),
|
||||
axis=-1)
|
||||
|
||||
return final_observations
|
||||
|
||||
|
||||
def _set_difference(first_array: np.ndarray, second_array: np.ndarray) -> np.ndarray:
|
||||
|
||||
Reference in New Issue
Block a user