Got it

Spark - Calling Scala code from PySpark

Latest reply: Dec 2, 2021 05:13:13 844 1 2 0 0

Basic method call through Py4J

PySpark relies on Py4J to execute Python code that can call objects that reside in the JVM. To do that, Py4J uses a gateway between the JVM and the Python interpreter, and PySpark sets it up for you.

Let’s see how we can make a basic method call. We first create a minimal Scala object with a single method:

package com.ippontechobject Hello {
  def hello = println("hello")}

We need to package this class in a JAR. I’m reusing the spark-kafka-source project from the previous post but any Maven/SBT/… project should work.

$ mvn package
...
Building jar: .../target/spark-kafka-source-0.2.0-SNAPSHOT.jar

We can now launch the PySpark console and add the JAR to the classpath:

$ pyspark --driver-class-path .../target/spark-kafka-source-0.2.0-SNAPSHOT.jar
>>>

From there, JVM objects are accessible through the _jvm field of the SparkContext object (sc._jvm):

>>> sc._jvm.com.ippontech.Hello.hello()
hello

Good, we can call Scala code from Python. That’s a good start.

Notice that we can also assign an instance of a JVM object to a Python variable, then make the method call on that variable:

>>> h = sc._jvm.com.ippontech.Hello
>>> h.hello()
hello

Real-life method call

Here is the method we want to call:

object KafkaSource extends LazyLogging {

  def kafkaStream[K: ClassTag, V: ClassTag, KD <: Decoder[K] : ClassTag, VD <: Decoder[V] : ClassTag]
    (ssc: StreamingContext, brokers: String, offsetsStore: OffsetsStore, topic: String): InputDStream[(K, V)] {
    ...

There are a few challenges here:

  • This method has generic parameters. At that point, I’m not sure we can pass them from Python. We will need to create a helper method that doesn’t have generic arguments.

  • The StreamingContext parameter and the InputDStream return value are objects from the Spark framework. Notice that PySpark works with Python wrappers around the Java version of Spark objects, not around the Scala version of Spark objects. We will have to wrap/unwrap objects accordingly.

  • The brokers and topic parameters are strings. Python strings and Java strings are inter-changeable.

  • The OffsetsStore parameter is an object from our code base. We will have to create it on the Python side and simply pass the reference as a parameter.

Let’s create a helper class and helper method to lift the constraints described above:

object KafkaSourcePythonHelper {

  def kafkaStream(jssc: JavaStreamingContext, brokers: String, offsetsStore: OffsetsStore,
                  topic: String): JavaDStream[(String, String)] = {
    val dstream = KafkaSource.kafkaStream[String, String, StringDecoder, StringDecoder](jssc.ssc, brokers, offsetsStore, topic)
    val jdstream = new JavaDStream(dstream)
    jdstream  }}

Here are the things to notice:

  • We have removed the generic parameters from the helper method’s signature and hard-coded them when calling the actual method.

  • We expect a JavaStreamingContext instance and unwrap it by calling jssc.ssc to get the StreamingContext instance.

  • We wrap the DStream into a JavaDStream that will be returned to the Python caller.

We can now compile this code. Notice that, since our code uses dependent libraries, we need these dependencies to be included in the JAR. I’m using a profile and the Maven Shade plugin to do so:

$ mvn package -Puber-jar

We can now restart PySpark and test our code. We start by creating a StreamingContext:

>>> from pyspark.streaming import StreamingContext
>>> ssc = StreamingContext(sparkContext=sc, batchDuration=int(1))

Before calling the kafkaStream method, we need to create an instance of an OffsetStore:

>>> zkStore = sc._jvm.com.ippontech.kafka.stores.ZooKeeperOffsetsStore("localhost:2181", "/my_topic/offsets")

Notice that the zkStore instance is just a pointer to an object that resides in the JVM:

>>> zkStore
JavaObject id=o21

Now is time to call the kafkaStream method:

>>> jstream = sc._jvm.com.ippontech.kafka.KafkaSourcePythonHelper.kafkaStream(ssc._jssc, "localhost:9092", zkStore, "my_topic")

Notice we unwrapped the Python StreamingContext to get the JavaStreamingContext by calling ssc._jssc.

The last thing we need to do is to wrap the JavaDStream into a Python DStream. We have to provide a deserializer for the messages:

>>> from pyspark.serializers import PairDeserializer, NoOpSerializer
>>> from pyspark.streaming import DStream
>>> ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
>>> stream = DStream(jstream, ssc, ser)

We can now define our transformations and actions on the DStream and start the application:

>>> stream.pprint()
>>> ssc.start()
>>> -------------------------------------------
Time: 2016-09-01 15:21:38
-------------------------------------------

-------------------------------------------
Time: 2016-09-01 15:21:39
-------------------------------------------
...

Now, if you send some data into Kafka, you should see it in PySpark:

$ echo "foo bar" | kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic
-------------------------------------------
Time: 2016-09-01 15:26:18
-------------------------------------------
(None, 'foo bar')

Conclusion

We have seen it is fairly easy to call Scala code from Python. In a context where Data Scientists write Python code but Software Engineers prefer to write Java/Scala code, that’s a good thing because we can share the responsibilities within the team. That makes me happy because I’m not a big fan of shipping code written by Data Scientists to production, and that also means we will have less code duplication between languages.

In the context of Spark, the key thing to remember is to appropriately wrap/unwrap your objects. Spark has 3 versions of each class (Scala, Java, Python) and the Python classes are wrappers over the Java classes. This means:

  • When passing a Spark object as a parameter to as Scala method:

    • On the Python side, unwrap the Python object to get the Java object (e.g. unwrap a Python StreamingContext into a Java JavaStreamingContext).

    • On the Scala side, unwrap the Java object to get the Scala object (e.g. unwrap a Java JavaStreamingContext into a Scala StreamingContext).

  • When returning a Spark object from the Scala method:

    • On the Scala side, wrap the object into the corresponding Java object (e.g. wrap a Scala DStream into a Java JavaDStream).

    • On the Python side, wrap the Java object into the correspondong Python object (e.g. wrap a Java JavaDStream into a Python DStream).



  • x
  • convention:

MahMush
Moderator Author Created Dec 2, 2021 05:13:13

Very detailed information, well written....
View more
  • x
  • convention:

Comment

You need to log in to comment to the post Login | Register
Comment

Notice: To protect the legitimate rights and interests of you, the community, and third parties, do not release content that may bring legal risks to all parties, including but are not limited to the following:
  • Politically sensitive content
  • Content concerning pornography, gambling, and drug abuse
  • Content that may disclose or infringe upon others ' commercial secrets, intellectual properties, including trade marks, copyrights, and patents, and personal privacy
Do not share your account and password with others. All operations performed using your account will be regarded as your own actions and all consequences arising therefrom will be borne by you. For details, see " User Agreement."

My Followers

Login and enjoy all the member benefits

Login

Block
Are you sure to block this user?
Users on your blacklist cannot comment on your post,cannot mention you, cannot send you private messages.
Reminder
Please bind your phone number to obtain invitation bonus.
Information Protection Guide
Thanks for using Huawei Enterprise Support Community! We will help you learn how we collect, use, store and share your personal information and the rights you have in accordance with Privacy Policy and User Agreement.