Producing AVRO messages with PHP for Kafka Connect
2020-09-24Apache Kafka has became an obvious choice and industry standard for data streaming. When streaming large amounts of data it’s often reasonable to use AVRO format, which has at least three advantages:
- it’s one of most size efficient (compared to JSON, protobuf, or parquet); AVRO serialized payload can be 10 times smaller than the JSON equivalent,
- enforces usage of a schema,
- works out of the box with Kafka Connect (it’s a requirement if you’d like to use BigQuery sink connector).
Let’s see how to send data to Kafka in AVRO format from PHP producer, so that Kafka Connect can parse it and put data to sink.
I assume you know the basics of AVRO and Schema Registry, but if not – let me know in the comments, I’d be happy to help with setting up Schema Registry! Footnote: the easiest way is it, is to use Confluent’s official Docker image and deploy it on Kubernetes.
In order to use PHP producer which will serialize payloads in AVRO, we need to send it in a particular „envelope”, which contains schema ID. This way Kafka Connect will know which schema should be retrieved from Schema Registry.
Let’s consider, that our PHP Kafka producer will send simple payload:
$data = [
'time' => '2020-09-24 20:45:00',
'level' => 'info',
'channel' => 'main',
'message' => 'Some log entry has been produced',
];
First of all, you need to create an AVRO schema for that. You can get one by using number of tools available online, like this.
Our AVRO schema will look like this:
$schema = <<<SCHEMA
{
"name": "LogEntry",
"type": "record",
"namespace": "pl.sznapka",
"fields": [
{
"name": "time",
"type": "string"
},
{
"name": "level",
"type": "string"
},
{
"name": "channel",
"type": "string"
},
{
"name": "message",
"type": "string"
}
]
}
SCHEMA;
Next step is to register our AVRO schema in Schema Registry and obtain its ID. You can either call Schema Registry API directly or use PHP library flix-tech/confluent-schema-registry-api (which I recommend).
Note – you should register your schema under Kafka topic name with suffix ‚-value’
$kafkaTopicName = 'logs';
$subject = $kafkaTopicName . '-value';
$schemaRegistry->register($subject, \AvroSchema::parse($schema));
Once your schema is in the Schema Registry you need to retrieve ID for your subject:
$schema = $schemaRegistry->latestVersion($subject);
$id = $schemaRegistry->schemaId($subject, $schema);
The last part is to create AVRO serialized payload with header, which will be readable for Kafka Connect:
$io = new \AvroStringIO();
$io->write(pack('C', 0)); // magic byte - subject version
$io->write(pack('N', $id));
$encoder = new \AvroIOBinaryEncoder($io);
$writer = new \AvroIODatumWriter($schema);
$writer->write($data, $encoder);
$kafkaPayload = $io->string();
Now we have $kafkaPayload
which we can send to Kafka (using https://github.com/arnaud-lb/php-rdkafka) and then it can be distributed to any sink, which you’ll register in Kafka Connect. In my case it’s often BigQuery and Redis.
$topic = $kafkaProducer->newTopic($kafkaTopicName);
$topic->produce(RD_KAFKA_PARTITION_UA, 0, $kafkaPayload);
$kafkaProducer->flush(1000);
That’s it, happy streaming!