Part1 Trading System with pyspark
Reginaldo Melo
Data Engineer | Azure | Python | SQL | Databricks | Spark | pyspark | Data Analyst | Power BI | Cognos
Hi folks,
And you see MGLU3 soaring like a rocket fifty percent in just two weeks and you think:
Oh my, I missed that! Seriously? #SQN
Can you see?
Disclaimer: I'm not a securities analyst. The intention is educational in manipulating data with pyspark.
So, lets take a look into B3 database for answers.
Wow! It's not very easy folks! You've to be patient here.
"Posi??es em Aberto de Empréstimo de Ativos" Baixar Arquivo LendingOpenPositionFile_20220705_1.csv
I'll take 20220705 until 20220721
Can you see that? csv file separated with semicolon and values with comma?
Talk is cheap, lets code now!
In https://colab.research.google.com/
1) Install pyspark like this !pip?install?pyspark
2) SparkSession - from?pyspark.sql?import?SparkSession
领英推荐
from?pyspark.sql?import?SparkSession
3) spark variable
spark=SparkSession.builder.appName("load?csv").master("local[*]").getOrCreate()
4) create folder lend manually?and upload the csv files
5) let's code in three steps: 1) load all these files at once; 2) data transformation; 3) show
Wow! This shows us something! On July 5th we had R$ 493,754,632.26 in sales. On July 21 we already had 641,923,213.96. Is a 30% increase even if this sudden is consistent?
In the market there are traders who bet on the fall while others on the rise. If the stock really goes up, it will drag all the shorts and puts to close the trade.
Will we have a short squeeze soon?
In the next we learning to write this dataframe in just one parquet file and others things.
So, anybody can ask where is the borrower rate? In another dataframe.
Next, Part2 we explore this in more details. See you soon folks. Enjoy the code.
!pip install pyspark
!pyspark --version
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("load csv").master("local[*]").getOrCreate()
# 1 read all files Lending in folder lend
import pyspark.sql.utils
try:
? ? df1 = spark.read.csv("lend/*.csv", sep=";", header="True")
except pyspark.sql.utils.AnalysisException:
? ? print("Verify lend folder, could be empty")
# 2 transformation drop, rename, format
import pyspark.sql.functions as F
df1 = df1.drop(df1.ISIN).drop(df1.Asst).drop(df1.PricFctr)
df1 = ( df1.withColumnRenamed("RptDt", "Date")
? ? ? ? ? ?.withColumnRenamed("TckrSymb", "Stock")
? ? ? ? ? ?.withColumnRenamed("BalQty", "Qty")
? ? ? ? ? ?.withColumnRenamed("TradAvrgPric","AvgPrice")
? ? ? ? ? ?.withColumnRenamed("BalVal", "Volume")
? ? ? )? ? ?
df1 = df1.withColumn('AvgPrice', F.regexp_replace('AvgPrice',',', '.')).withColumn('Volume', F.regexp_replace('Volume',',', '.'))
df2 = df1.orderBy('Stock', F.desc('Date'))
# 3 show the results
df2.filter(df2.Stock == "MGLU3").show(30, False)
links
Data Engineer | Azure | Python | SQL | Databricks | Spark | pyspark | Data Analyst | Power BI | Cognos
1 年Wow! This is very huge! 15% free float !