Merge the chunked video in mongodb using pyspark
1- Insert a video into Mongodb
2- Create an ADF pipeline to query mongodb and store the chunks into datalake
3- Navigate to databricks to reunite the video chunks
4- Transform video by merging an image to video
While Steps 1,2 are self-explanatory and zooming in on 3 and 4
Step 3 : Code to reunite the video
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import base64
from pyspark.dbutils import DBUtils
import os
import tempfile
# Initialize Spark session
spark = SparkSession.builder.appName("RecreateMP4").getOrCreate()
# Initialize DBUtils
dbutils = DBUtils(spark)
# Read the JSON file from Azure Data Lake
json_path = "/mnt/bronze/data_5688cfd1-10da-4996-91fc-640af0b48674_df82eabc-7077-4341-824e-a2f1ce94a10a.json"
df = spark.read.json(json_path)
# Extract the base64 encoded binary data
binary_data_df = df.select(col("data.$binary").alias("binary_data"))
# Collect and decode the base64 binary data
binary_data_list = binary_data_df.collect()
decoded_data = b"".join([base64.b64decode(row["binary_data"]) for row in binary_data_list])
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
temp_file_path = temp_file.name
temp_file.write(decoded_data)
# Define the output path
output_path = "/mnt/bronze/video.mp4"
# Check if the file exists and delete it if present
try:
dbutils.fs.rm(output_path)
print("Existing file deleted.")
except:
print("No existing file to delete.")
# Move the temporary file to the output location
dbutils.fs.mv(f"file:{temp_file_path}", output_path)
# Clean up the temporary file
#o s.remove(temp_file_path)
print("Video file created successfully!")
Step4 : Code to merge an image to teh above video
#Merging an image to one of the frames or sereis fo frames of a video.
import numpy as np
import cv2
import os
from datetime import datetime
from PIL import Image
#Open the video file
cap = cv2.VideoCapture("C:/MOngoDB/sample-5s.mp4")
# count the number of frames
frames = cap.get(cv2.CAP_PROP_FRAME_COUNT)
fps = cap.get(cv2.CAP_PROP_FPS)
领英推荐
# calculate duration of the video
seconds = round(frames / fps)
print(f"duration in seconds: {seconds}")
print(f"Count of frames: {frames}")
print(f"fps: {fps}")
frameno=0
current_datetime = datetime.now().strftime("%Y-%m-%d %H-%M-%S")
str_current_datetime = str(current_datetime)
path='C:/MOngoDB/frames/f'+ str_current_datetime
os.mkdir(path)
output_file='output_video.mp4'
i=0
while True:
ret,frame=cap.read()
if ret:
name=str(frameno) +'.jpg'
print ('new frame captured latest', name)
print('The path is ', path)
cv2.imwrite(os.path.join(path,name), frame)
frameno += 1
else:
break
image_files = [img for img in os.listdir(path) if img.endswith(".jpg")]
first_image_path = os.path.join(path, image_files[0])
first_image = cv2.imread(first_image_path)
height, width, layers = first_image.shape
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
output_path = os.path.join(path, output_file)
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
for image_file in image_files:
image_path = os.path.join(path, image_file)
print ("image path ", image_path)
if i>=90:
base_image=Image.open(image_path)
overlay_image = Image.open('C:/Data_python/latbal.jpg')
overlay_image = overlay_image.convert("RGBA")
position = (300, 300)
base_image.paste(overlay_image, position, overlay_image)
resultimage=str(i)+'result.jpg'
base_image.save(path+'/'+resultimage)
image_path = os.path.join(path, resultimage)
image = cv2.imread(image_path)
out.write(image)
i=i+1
cap.release()
out.release()
cv2.destroyAllWindows()