Un-pickle-able
Grumpy cat ( yep, AI-generated )

Un-pickle-able

The Python support in Spark is impressive, yet sooner or later, it becomes clear that Spark is not in Python. Sometimes, such a reality comes into the foreground as unpickable stuff that refuses to get distributed on the cluster.

What's the story?

More or less is something like this:

from a_library_with_binary_bindings import SpecialClass 

customization = {
   "parameter": "value"
}

client = SpecialClass(**customization)

def my_function(row):
    client.do_amazing_things_on(row)
.
:
 Bla Bla Bla Bla ...
:
.
df.writeStream.outputMode("append").foreach(my_function).start()        

Boom!

TypeError: cannot pickle 'XXXXXX' object

Why does it happen?

The fact is that "my_function" must run on the workers, and Spark needs to distribute it, so it will attempt to serialize it for distribution. The function is written in Python, and it is still Python that has to do the job. However, due to the binary bindings, the XXXXX objects needs some libfoo.so.1.2.3 that the pickle library can't serialize.


Fortunately, a_library_with_binary_bindings library is supposed to be installed on the worker nodes too, so we can change the code a little to ship the Python code that starts a ready-to-use client instance instead of the client itself. Something like this:

from a_library_with_binary_bindings import SpecialClass 

customization = {
   "parameter": "value"
}

def my_function(row):
    client = SpecialClass(**customization)
    client.do_amazing_things_on(row)

df.writeStream.outputMode("append").foreach(my_function).start()        

This works because my_function contains the Python instructions to start a client, and so far, there is not yet such a client in the story.

Unfortunately, you will end up starting the client for every my_function call.

Let's try this other solution:

from a_library_with_binary_bindings import SpecialClass 

customization = {
   "parameter": "value"
}

def MyClassContainer:

    def __init__(self, customization):
        self._customization = customization

    def client(self,):
        if not self._client:
           self._client = SpecialClass(**self._customization)
        return self._client 

    def __call__(self, row):
        self.client().do_amazing_things_on(row)

my_function = MyClassContainer(customization)

df.writeStream.outputMode("append").foreach(my_function).start()        

Two paramount facts to consider.

  1. the client does not start until it is required (lazy initialization), and once started is recycled. It just starts once. Eventually, when the client is required, the code is already into the workers.
  2. Although my_function is a class, you can use it as a function, because of the __call__ method.

The unpickable side of Python

For the sake of completeness, even in the case of Pure-Python staff:

  1. Sockets: These are used for network communications and cannot be pickled because they represent a live connection to a network resource.
  2. File handlers: These are used to read and write files and cannot be pickled because they represent an open file on the system.
  3. Database connections: These are used to interact with a database and cannot be pickled because they represent a live connection to a database.

The general rule of thumb is that “logical” objects can be pickled, but “resource” objects (files, locks) can’t because it makes no sense to persist/clone them.

Further reading:

On the foreach from PySpark:

pyspark.sql.streaming.DataStreamWriter.foreach — PySpark 3.5.0 documentation (apache.org)

pyspark.sql.DataFrame.foreach — PySpark 3.5.0 documentation (apache.org)

The pickle library:

pickle — Python object serialization — Python 3.12.1 documentation

__call__()

Python's .__call__() Method: Creating Callable Instances – Real Python


The reader understands that this article may have errors and could be not updated for correct use with the Spark > 3.0 releases. The reader understands that will use it at his own risk, and there are no express or implied warranties for data or hair losses. The reader should also understand that to achieve better performance should consider Scala as a Python alternative.





要查看或添加评论,请登录

Alessio Palma的更多文章

  • Productivity doubler.

    Productivity doubler.

    I noticed that it is possible to fake the presence with a simple script. Of course, it will neither answer emails nor…

  • It is faster now.

    It is faster now.

    With modern data warehouses, the ones based on the lakes, Apache Spark increased in popularity, and in some cases…

  • The thing I learned from COVID-19

    The thing I learned from COVID-19

    Pay attention to what you desire.

    1 条评论

社区洞察

其他会员也浏览了