Oracle Cloud Infrastructure (OCI) Data Flow is a fully managed big data service that lets you run Apache Spark applications at any scale with almost no administration. Spark has become the leading big data processing framework and OCI Data Flow is the easiest way to run Spark in Oracle Cloud because there’s nothing for developers to install or manage.
OCI DataFlow is now coming with Interactive OCI Data Science Notebook. Now, users can seamlessly enable and start playing with their data using the Data Science Notebook. This Notebook experience leverages the power of Apache Spark. Here, We will see how easily an OCI DataFlow Environment can be created on OCI using Conda. We will be creating OCI Data Science session & Notebook and will access OCI Dataflow spark platform using Livy session. Submit fault-tolerant Spark jobs from the notebook using synchronous and asynchronous methods to retrieve the output.
OCI DataFlow with Interactive OCI Data Science have also introduced SparkMagic commands adding it's own flavours & upgrades. SparkMagic allows for interactive communication with Spark using Livy. Using the `%%spark` magic directive within a JupyterLab code cell.
The purpose of this document is to walk you through the setup required to access the OCI Data Flow Sessions through the Data Science Notebook Session. These Sessions allow you to run interactive Spark workloads on a long lasting Data Flow cluster through an Apache Livy integration.
Also, Once OCI Data Flow Spark Session is created, will go through some Sample codes for performing Spark Operations on OCI Object Storage & Autonomous DataWarehouse.
Features & Benefit:
ALL {resource.type='dataflowrun', resource.compartment.id='<compartment_id>'} ALL {resource.type='datasciencenotebooksession', resource.compartment.id='<compartment_id>'} Any {resource.type = 'datacatalogmetastore'}
ALLOW DYNAMIC-GROUP <df-dynamic-group> TO MANAGE objects IN TENANCY WHERE ANY {target.bucket.name='<bucket_name>', target.bucket.name='dataflow-logs, target.bucket.name='dataflow-warehouse' } ALLOW DYNAMIC-GROUP '<ds-dynamic-group>' TO MANAGE dataflow-family in compartment '<your-compartment-name>' ALLOW DYNAMIC-GROUP '<df-dynamic-group>' TO MANAGE data-catalog-metastores IN TENANCY ALLOW DYNAMIC-GROUP '<dcat-hive-group>' TO READ buckets IN TENANCY ALLOW DYNAMIC-GROUP '<dcat-hive-group>' TO MANAGE object-family IN TENANCY WHERE ANY { target.bucket.name = '<bucket_name>', target.bucket.name = '<managed-table-location-bucket>', target.bucket.name = '<external-table-location-bucket>' } ALLOW DYNAMIC-GROUP '<ds-dynamic-group>' TO MANAGE objects IN TENANCY WHERE ALL {target.bucket.name='ds-conda-env'} ALLOW DYNAMIC-GROUP '<df-dynamic-group>' TO MANAGE objects IN TENANCY WHERE ALL {target.bucket.name='ds-conda-env'}
Open New OCI Data Science Session. From File option, choose New Launcher and click on Terminal.
Install and activate the pyspark32_p38_cpu_v1 conda environment from your terminal:
odsc conda install -s pyspark32_p38_cpu_v1 source activate /home/datascience/conda/pyspark32_p38_cpu_v1
odsc conda publish -s pyspark3_2anddataflowv1_0
Note: Publishing will take some time. Once it is completed, you can observe Conda package is uploaded on the Object Storage bucket.
1. Setup Authentication using ADS
import ads ads.set_auth("resource_principal") # Supported values: resource_principal, api_key
2. Load Extension
%load_ext dataflow.magics
3. Create OCI DataFlow Spark Session using Livy:
#Create OCI Dataflow Session using LIVY service through OCI Data SCience Notebook. import json command = { "compartmentId": "ocid1.compartment.oc1..xxxxxxxxxxxxxx", "displayName": "Demo_DataFlow_Spark_v1", "sparkVersion": "3.2.1", "driverShape": "VM.Standard.E3.Flex", "executorShape": "VM.Standard.E3.Flex", "driverShapeConfig":{"ocpus":1,"memoryInGBs":16}, "executorShapeConfig":{"ocpus":1,"memoryInGBs":16}, "numExecutors": 1, "logsBucketUri": "<oci://bucket@namespace/>", "archiveUri": "<oci://bucket@namespace/archive.zip>" "configuration":{"spark.archives":"<oci://bucket@namespace/>#conda", "spark.oracle.datasource.enabled":"true"} } command = f'\'{json.dumps(command)}\'' print("command",command) #"configuration":{ # "spark.dynamicAllocation.enabled":"true", # "spark.dynamicAllocation.shuffleTracking.enabled":"true", # "spark.dynamicAllocation.minExecutors":"1", # "spark.dynamicAllocation.maxExecutors":"4", # "spark.dynamicAllocation.executorIdleTimeout":"60", # "spark.dynamicAllocation.schedulerBacklogTimeout":"60", # "spark.dataflow.dynamicAllocation.quotaPolicy":"min" }}' %create_session -l python -c $command
OCI Object Storage
OCI Autonomous DataWarehouse
%%spark #Import required libraries. import json import os import sys import datetime import oci import pyspark.sql from pyspark.sql.functions import countDistinct from delta.tables import *
Read Object Storage file using spark.read from Livy Session:
%%spark -o df_Bronze_Insurance_Data
#Read Claim Insurance files from OCI Object Storage in Spark Dataframe.
df_Bronze_Insurance_Data = spark.read.format("csv").option("header", "true") \
.option("multiLine", "true").load("oci://test-demo@OSNamespace/insur_claim/claim.csv*")
print("df_RawZone_Data",df_Bronze_Insurance_Data)
df_Bronze_Insurance_Data.show(5)
Load Data into ADW using Secret Vault for Wallet:
Copy below code as it is.
Reference: https://github.com/oracle-samples/oracle-dataflow-samples/tree/main/python/loadadw
%%spark def get_authenticated_client(token_path, client, file_location=None, profile_name=None): """ Get an an authenticated OCI client. Example: get_authenticated_client(token_path, oci.object_storage.ObjectStorageClient) """ import oci if not in_dataflow(): # We are running locally, use our API Key. if file_location is None: file_location = oci.config.DEFAULT_LOCATION if profile_name is None: profile_name = oci.config.DEFAULT_PROFILE config = oci.config.from_file(file_location=file_location, profile_name=profile_name) authenticated_client = client(config) else: # We are running in Data Flow, use our Delegation Token. with open(token_path) as fd: delegation_token = fd.read() signer = oci.auth.signers.InstancePrincipalsDelegationTokenSigner( delegation_token=delegation_token ) authenticated_client = client(config={}, signer=signer) return authenticated_client def get_password_from_secrets(token_path, password_ocid): """ Get a password from the OCI Secrets Service. """ import base64 import oci secrets_client = get_authenticated_client(token_path, oci.secrets.SecretsClient) response = secrets_client.get_secret_bundle(password_ocid) base64_secret_content = response.data.secret_bundle_content.content base64_secret_bytes = base64_secret_content.encode("ascii") base64_message_bytes = base64.b64decode(base64_secret_bytes) secret_content = base64_message_bytes.decode("ascii") return secret_content def get_delegation_token_path(spark): """ Get the delegation token path when we're running in Data Flow. """ if not in_dataflow(): return None token_key = "spark.hadoop.fs.oci.client.auth.delegationTokenPath" token_path = spark.sparkContext.getConf().get(token_key) if not token_path: raise Exception(f"{token_key} is not set") return token_path def get_temporary_directory(): if in_dataflow(): return "/opt/spark/work-dir/" else: import tempfile return tempfile.gettempdir() def in_dataflow(): """ Determine if we are running in OCI Data Flow by checking the environment. """ if os.environ.get("HOME") == "/home/dataflow": return True return False def download_wallet(spark, wallet_path): """ Download an ADW/ATP wallet file and prepare it for use in a Data Flow application. """ import oci import zipfile # Get an object store client. token_path = get_delegation_token_path(spark) object_store_client = get_authenticated_client( token_path, oci.object_storage.ObjectStorageClient ) # Download the wallet file. from urllib.parse import urlparse parsed = urlparse(wallet_path) bucket_name, namespace = parsed.netloc.split("@") file_name = parsed.path[1:] response = object_store_client.get_object(namespace, bucket_name, file_name) temporary_directory = get_temporary_directory() zip_file_path = os.path.join(temporary_directory, "wallet.zip") with open(zip_file_path, "wb") as fd: for chunk in response.data.raw.stream(1024 * 1024, decode_content=False): fd.write(chunk) # Extract everything locally. with zipfile.ZipFile(zip_file_path, "r") as zip_ref: zip_ref.extractall(temporary_directory) # Distribute all wallet files. contents = "cwallet.sso ewallet.p12 keystore.jks ojdbc.properties sqlnet.ora tnsnames.ora truststore.jks".split() spark_context = spark.sparkContext for file in contents: spark_context.addFile(os.path.join(temporary_directory, file)) return temporary_directory
%%spark PASSWORD_SECRET_OCID = "ocid1.vaultsecret.oc1.phx.xxxxxxx" TARGET_TABLE = "ADMIN.TB_NAME" TNSNAME = "demolakehouseadw_medium" USER = "admin" WALLET_PATH = "oci://bucketname@osnamespace/Wallet_DemoLakeHouseADW.zip" # Download and distribute our wallet file. wallet_path = download_wallet(spark, WALLET_PATH) adw_url = "jdbc:oracle:thin:@{}?TNS_ADMIN={}".format(TNSNAME, wallet_path)
%%spark # Get our password using the secret service. print("Getting wallet password") token_path = get_delegation_token_path(spark) password = get_password_from_secrets(token_path, PASSWORD_SECRET_OCID) print("Done getting wallet password") # Save the results to the database. print("Saving processed data to " + adw_url) properties = { "driver": "oracle.jdbc.driver.OracleDriver", "oracle.net.tns_admin": TNSNAME, "password": password, "user": USER }
%%spark SOURCE_TABLE = "ADMIN.RETAILPOS" df_RetailPOS_15min = spark.read.jdbc(url=adw_url, table=SOURCE_TABLE, properties=properties)
%%spark #Load into ADW: TARGET_TABLE = "ADMIN.RETAILPOS_15MINUTES" print("TARGET_TABLE : ",TARGET_TABLE) # Write to ADW. print("Write to ADW : ") df_RetailPOS_15min.write.jdbc(url=adw_url, table=TARGET_TABLE, mode="Append", properties=properties) print("Writing done to ADW : ")
That's the end of today's blog. Thanks for reading!
With the Data Flow, you can configure Data Science notebooks to run applications interactively against Data Flow. Watch the tutorial video on using Data Science with Data Flow Studio. Also see the Oracle Accelerated Data Science SDK documentation for more information on integrating Data Science and Data Flow. More examples are available from Github with Data Flow samples and Data Science samples.
To get started today, sign up for the Oracle Cloud Free Trial or sign in to your account to try OCI Data Flow. Try Data Flow’s 15-minute no-installation-required tutorial to see just how easy Spark processing can be with Oracle Cloud Infrastructure.
Next Post