How do you convert Data Stream into Avro Kafka Messages in Nested format using GoldenGate Stream Analytics?

November 8, 2023 | 15 minute read
Nisharahmed Soneji
Distinguished Data Management Specialist
Text Size 100%:

As part of my 3 part blog series, I am writing about the GoldenGate Stream Analytics custom stage functionality, how it has been used in one of the Customer's organization scenarios with examples, and what the architecture was. In this final blog, I will show you how to write the JAVA program to convert the Stream of data into Avro Kafka Messages in Nested format using the "custom-stage" functionality in the GoldenGate Stream Analytics.

As discussed in previous blogs, Custom-stage types and functions are implemented in Java programming language using interfaces, classes, and annotations provided in the osa.spark-cql.extensibility.api.jar library. You can download this jar file from the installation folder: <OSA_INSTALL_DIR>/<OSA_Versions>/osa-base/extensibility-api/osa.spark-cql.extensibility.api.jar

To develop a Custom stage Java program, you must implement the EventProcessor interface, the processEvent() method, and apply the @OsaStage annotation to the class declaration.

Let us refresh that the Stream of data is "OSA.Customers" table data(Customer's basic information) fed by GoldenGate Oracle into OSA and OSA.Contacts(Customer's contact details) are the reference data. We will build the required pipeline in OSA to generate AVRO Kafka messages. The Customer can have zero or more contacts. Please see the image for the structure of the Stream and reference data.

The structure of Data Stream and Reference Data

The outcome that the organization is expected from the GoldenGate Stream Analytics after processing data when the status in the CUST_STAT_CD column in the "Customers" data stream event value is equal to "Customer", the AVRO Kafka message should be generated with the below schema. Since the Customer can have multiple contact mobile numbers, the mobile field is an Array of JSON objects.

The Schema for AVRO Kafka Nested Message

Creating Custom Stage Java Program:

There are a lot of references available for Kafka AVRO producer in Java using schema registry. Here is the link for the confluent documentation on Avro Schema Serializer and Deserializer. The OSA Custom Stage Java program will get the data from the previous OSA stage called this Kafka AVRO producer Java program and returns the data as an output that can be used for the following OSA stages or targets.

CustomKafkaAvroProducer Java Program

package com.basha.osa; 
import org.apache.avro.Schema; 
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord; 
import org.apache.kafka.clients.producer.KafkaProducer; 
import org.apache.kafka.clients.producer.ProducerConfig; 
import org.apache.kafka.clients.producer.ProducerRecord; 
import org.json.simple.JSONArray;
import org.json.simple.JSONObject; 
import org.json.simple.parser.JSONParser; 
import org.json.simple.parser.ParseException; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
import java.io.IOException; 
import java.util.ArrayList; 
import java.util.Iterator;
import java.util.Properties; 
import org.apache.commons.io.IOUtils; 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.FSDataInputStream; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path;
import com.basha.osa.Customer;

public class CustomKafkaAvroProducer {
          private static final Logger log = LoggerFactory.getLogger(CustomKafkaAvroProducer.class);
          public void sendKafkaAvro(String topic,String hdfsHost,Customer customerRecord) throws IOException, ParseException {
          System.out.println("start");
          log.info("I am a Kafka Producer");
          Configuration configuration = new Configuration();
          configuration.set("fs.defaultFS", "hdfs://"+hdfsHost+":8020");
          FileSystem fileSystem = FileSystem.get(configuration);

          Path kafkaPropertiesPath = new Path("/osa/properties/kafka.properties");
          FSDataInputStream kafkaPropertiesStream = fileSystem.open(kafkaPropertiesPath);
          Properties kafkaProperties = new Properties();
          kafkaProperties.load(kafkaPropertiesStream);
          Path schemaPath = new Path("/osa/schemas/Customers.avsc");
          FSDataInputStream schemaPathStream = fileSystem.open(schemaPath);
          String avroSchema = IOUtils.toString(schemaPathStream, "UTF-8");

          // create Producer properties
          Properties props = new Properties();
          props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrapServers"));
          props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
          org.apache.kafka.common.serialization.StringSerializer.class);
          props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
          io.confluent.kafka.serializers.KafkaAvroSerializer.class);
          props.put("schema.registry.url", kafkaProperties.getProperty("SchemaRegistery"));
          //props.put("schema.registry.url", SchemaRegistery);
          System.out.println("Propetise set");
        
          // create the producer
          KafkaProducer<Object, Object> producer = new KafkaProducer<>(props);
          // create generic Avro record
          String key = topic+"-key";
          System.out.println(avroSchema);
          Schema.Parser parser = new Schema.Parser();
          JSONParser jsonParser = new JSONParser();
          Schema schema = parser.parse(avroSchema);
          //Schema schemaMobile = parser.parse(mobileSchema);
          Schema schemaMobile = schema.getField("mobiles").schema().getElementType();
          GenericRecord avroRecord = new GenericData.Record(schema);
          ArrayList<GenericRecord> mobile = new ArrayList<GenericRecord>();

          JSONArray jsonMobile = (JSONArray) jsonParser.parse(customerRecord.getMobile());
          Iterator i = jsonMobile.iterator();
          // take each value from the json array separately
          while (i.hasNext()) {
            GenericRecord avroMobileRecord = new GenericData.Record(schemaMobile);
            JSONObject innerObj = (JSONObject) i.next();
            System.out.println(innerObj.toString());
            System.out.println("mobile " + innerObj.get("mobile") + " id " + innerObj.get("id"));
            avroMobileRecord.put("id", innerObj.get("id"));
            avroMobileRecord.put("mobile", innerObj.get("mobile"));
            mobile.add(avroMobileRecord);
            //System.out.println(mobile.toString());
          }

          avroRecord.put("id",customerRecord.getId());
          avroRecord.put("name",customerRecord.getName());
          avroRecord.put("age",customerRecord.getAge());
          avroRecord.put("place",customerRecord.getPlace());
          avroRecord.put("mobiles",mobile);
          //avroRecord.put("mobiles",jsonMobile);
          System.out.println("avro generic record created");
          // create a producer record
          ProducerRecord<Object, Object> record = new ProducerRecord<>(topic, key, avroRecord);
          try {
              System.out.println(record.toString());
              producer.send(record);
              System.out.println("producer send");
          } catch(Exception e) {
              System.out.println("exception:"+e.getStackTrace());
              e.printStackTrace();
          }

          // When you're finished producing records, you can flush the producer to ensure it has all been written to Kafka and
          // then close the producer to free its resources.
          finally {
          producer.flush();
          producer.close();
          }
     }
}

SendAvroMessage Java Program

package com.oracle.osacs; 
import com.oracle.cep.api.event.*; 
import com.oracle.cep.api.annotations.OsaStage; 
import com.oracle.cep.api.stage.EventProcessor; 
import com.oracle.cep.api.stage.ProcessorContext;
import java.io.IOException; 
import java.util.HashMap; 
import java.util.Map; 
import org.json.simple.parser.ParseException; 
import com.basha.osa.Customer; 
import com.basha.osa.CustomKafkaAvroProducer; 

@SuppressWarnings("serial")
@OsaStage(name = "SendAvroMessage", description = "Procedure to send AVRO messahe", inputSpec = "input, id:int,name:string, age:int, place:string, mobiles:string, avroTopic:string, hdfsHost:string",
outputSpec = "output,id:int, name:string, age:int, place:string, mobiles:string")

public class SendAvroMessage implements EventProcessor {
  EventFactory eventFactory;
  EventSpec outputSpec;
  @SuppressWarnings("deprecation")
  @Override
  public void init(ProcessorContext ctx, Map<String, String> config) {
    eventFactory = ctx.getEventFactory();
    OsaStage meta = SendAvroMessage.class.getAnnotation(OsaStage.class);
    String spec = meta.outputSpec();
    outputSpec = TupleEventSpec.fromAnnotation(spec);
  }
  @Override
  public void close() {
  }
  @Override
  public Event processEvent(Event event) {
     Attr idAttr = event.getAttr("id");
     Attr nameAttr = event.getAttr("name");
     Attr ageAttr = event.getAttr("age");
     Attr placeAttr = event.getAttr("place");
     Attr mobileAttr = event.getAttr("mobiles");
     Attr avroTopic = event.getAttr("avroTopic");
     Attr hdfsHost = event.getAttr("hdfsHost");
     Customer customer = new Customer();
     CustomKafkaAvroProducer kafkaAvroProducer = new CustomKafkaAvroProducer();
     Map<String, Object> values = new HashMap<String, Object>();

     if (!idAttr.isNull()) {
     // Collect the data from previous stage in a Customer class
     customer.setId((int) idAttr.getObjectValue());
     customer.setName((String) nameAttr.getObjectValue());
     customer.setAge((int) ageAttr.getObjectValue());
     customer.setPlace((String) placeAttr.getObjectValue());
     customer.setMobile((String) mobileAttr.getObjectValue());
     values.put("id", customer.getId());
     values.put("name", customer.getName());
     values.put("age", customer.getAge());
     values.put("place", customer.getPlace());
     values.put("mobiles", customer.getMobile());
     System.out.println(customer.getMobile());
     try {
        // Call the Kafka AVRO producer with Topic, HDFS host and Customer class 
        kafkaAvroProducer.sendKafkaAvro((String)avroTopic.getObjectValue(),(String)hdfsHost.getObjectValue(), customer);
     } catch (IOException e) {
     // TODO Auto-generated catch block
     e.printStackTrace();
     } catch (ParseException e) {
     // TODO Auto-generated catch block
     e.printStackTrace();
     }
   }
   Event outputEvent = eventFactory.createEvent(outputSpec, values, event.getTime());
   return outputEvent; 
}

 

The pipeline in the OSA looks like the following, where we have 5 stages.

OSA Pipeline with various stages

 

Stage-1, Filtering and Joining:

We are filtering and joining the Customer Change data Capture Stream with Reference data and filters using the condition on the CUST_STAT_CD column equals "Customer", so that all other records are not considered in the pipeline.

Stage 2, Query Stage – Rearrange Fields:

The transformation that we are going to use in the next stage to merge multiple contact mobile numbers for the Customer is Continuous Merge Transformation. To use this transformation, we need to rearrange the fields in the proper sequence.

Stage-3, Continuous Merge – Contacts Array:

This transformation will merge the multiple contacts into a JSON array. There is no data type in OSA to support JSON array, so the generated JSON array will be in the String format, which we have to parse as JSON in our custom program. Except for the contact id, mobile fields, we have to select all the fields as key fields and name the merge field as mobiles per AVRO schema.

Stage-4, Query Stage – Set Constant:

In this stage, set the constant for avroTopic and hdfsHost name, and rename the fields per AVRO schema.

Stage-5, Custom Stage – Customers Kafka Message:

This is the target stage that receives the data from the previous stage and produces the Kafka AVRO message as per the schema in the schema registry. See the outcome shown below on the screen.

The Kafka Avro Nested Messages, the final outcome

I have worked with Hyseyin Celebi & Abdulhameed Basha on this use case, and special thanks to them for providing the valuable support.

Nisharahmed Soneji

Distinguished Data Management Specialist

Nishar is the "Distinguished Data Management Specialist", leading the sales opportunity for Data Integration products, OCI GoldenGate & GoldenGate On-premises in the EMEA region. The GoldenGate is the best database replication technology available in the market. He has been advocating for the customers on new-age technology and best practices and helping them achieve strategic goals using Oracle solutions and Products. He has been evangelizing GoldenGate Products, Solutions, and new-age architects(Data Mesh, Data Fabric) in Conferences, enabling and coaching the Tech hub engineers on the Products' functionality & insights. The family of GoldenGate products includes OCI GoldenGate, GoldenGate for Oracle, GoldenGate for Non-Oracle, GoldenGate for Big Data, GoldenGate Stream Analytics, GoldenGate Foundation Suite(Veridata, GoldenGate OEM PlugIn), and OCI Data Integration.

Before joining the sales team, he was a Senior Principal Product Manager at Oracle. He led the OCI GoldenGate Cloud Service, GoldenGate Foundation Suite Products, and GoldenGate core products globally.

Nishar has been a key leadership team member who created new cloud services from inception to growing Oracle Data Integration business services. He has also spearheaded multiple Oracle internal innovations, resulting in top-line and bottom-line growth in Database Replication.

Nishar has over 22+ years of experience in technology, specializing in enterprise data replication and integration products, with a demonstrated track record of creating and scaling world-class businesses. For the last 10+ years, He has been focused on driving revenue through sales and product management, making the business strategy, executing the product execution, and leading the cross-functional teams and products.

Before joining the Sales and Product Management role, he was the software development architect; he led various complex technology projects for several companies, including Endress+Hauser, Cummins Inc, and GoldenGate Inc.


Previous Post

Customer's use case and architecture for the custom stage capabilities

Nisharahmed Soneji | 7 min read

Next Post


Oracle GoldenGate Microservices Secure Deployment

Madhu Kumar S | 6 min read