Merge the chunked video in mongodb using pyspark

Merge the chunked video in mongodb using pyspark

1- Insert a video into Mongodb

2- Create an ADF pipeline to query mongodb and store the chunks into datalake

3- Navigate to databricks to reunite the video chunks

4- Transform video by merging an image to video

While Steps 1,2 are self-explanatory and zooming in on 3 and 4

Step 3 : Code to reunite the video

from pyspark.sql import SparkSession

from pyspark.sql.functions import col

import base64

from pyspark.dbutils import DBUtils

import os

import tempfile

# Initialize Spark session

spark = SparkSession.builder.appName("RecreateMP4").getOrCreate()

# Initialize DBUtils

dbutils = DBUtils(spark)

# Read the JSON file from Azure Data Lake

json_path = "/mnt/bronze/data_5688cfd1-10da-4996-91fc-640af0b48674_df82eabc-7077-4341-824e-a2f1ce94a10a.json"

df = spark.read.json(json_path)

# Extract the base64 encoded binary data

binary_data_df = df.select(col("data.$binary").alias("binary_data"))

# Collect and decode the base64 binary data

binary_data_list = binary_data_df.collect()

decoded_data = b"".join([base64.b64decode(row["binary_data"]) for row in binary_data_list])

with tempfile.NamedTemporaryFile(delete=False) as temp_file:

temp_file_path = temp_file.name

temp_file.write(decoded_data)

# Define the output path

output_path = "/mnt/bronze/video.mp4"

# Check if the file exists and delete it if present

try:

dbutils.fs.rm(output_path)

print("Existing file deleted.")

except:

print("No existing file to delete.")

# Move the temporary file to the output location

dbutils.fs.mv(f"file:{temp_file_path}", output_path)

# Clean up the temporary file

#o s.remove(temp_file_path)

print("Video file created successfully!")

Step4 : Code to merge an image to teh above video

#Merging an image to one of the frames or sereis fo frames of a video.

import numpy as np

import cv2

import os

from datetime import datetime

from PIL import Image

#Open the video file

cap = cv2.VideoCapture("C:/MOngoDB/sample-5s.mp4")

# count the number of frames

frames = cap.get(cv2.CAP_PROP_FRAME_COUNT)

fps = cap.get(cv2.CAP_PROP_FPS)

# calculate duration of the video

seconds = round(frames / fps)

print(f"duration in seconds: {seconds}")

print(f"Count of frames: {frames}")

print(f"fps: {fps}")

frameno=0

current_datetime = datetime.now().strftime("%Y-%m-%d %H-%M-%S")

str_current_datetime = str(current_datetime)

path='C:/MOngoDB/frames/f'+ str_current_datetime

os.mkdir(path)

output_file='output_video.mp4'

i=0

while True:

ret,frame=cap.read()

if ret:

name=str(frameno) +'.jpg'

print ('new frame captured latest', name)

print('The path is ', path)

cv2.imwrite(os.path.join(path,name), frame)

frameno += 1

else:

break

image_files = [img for img in os.listdir(path) if img.endswith(".jpg")]

first_image_path = os.path.join(path, image_files[0])

first_image = cv2.imread(first_image_path)

height, width, layers = first_image.shape

fourcc = cv2.VideoWriter_fourcc(*'mp4v')

output_path = os.path.join(path, output_file)

out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

for image_file in image_files:

image_path = os.path.join(path, image_file)

print ("image path ", image_path)

if i>=90:

base_image=Image.open(image_path)

overlay_image = Image.open('C:/Data_python/latbal.jpg')

overlay_image = overlay_image.convert("RGBA")

position = (300, 300)

base_image.paste(overlay_image, position, overlay_image)

resultimage=str(i)+'result.jpg'

base_image.save(path+'/'+resultimage)

image_path = os.path.join(path, resultimage)

image = cv2.imread(image_path)

out.write(image)

i=i+1

cap.release()

out.release()

cv2.destroyAllWindows()

要查看或添加评论,请登录

Ravi Sankar的更多文章

  • Digital transformation

    Digital transformation

    Information technology n Software has been/ is dealing with Jargons..

社区洞察

其他会员也浏览了