🎄 Twelve Days of SMT 🎄 - Day 9: Cast
The Cast Single Message Transform lets you change the data type of fields in a Kafka message, supporting numerics, string, and boolean.
The Cast Single Message Transform lets you change the data type of fields in a Kafka message, supporting numerics, string, and boolean.
The TimestampConverter Single Message Transform lets you work with timestamp fields in Kafka messages. You can convert a string into a native Timestamp type (or Date or Time), as well as Unix epoch - and the same in reverse too.
This is really useful to make sure that data ingested into Kafka is correctly stored as a Timestamp (if it is one), and also enables you to write a Timestamp out to a sink connector in a string format that you choose.
Just like the RegExRouter, the TimeStampRouter can be used to modify the topic name of messages as they pass through Kafka Connect. Since the topic name is usually the basis for the naming of the object to which messages are written in a sink connector, this is a great way to achieve time-based partitioning of those objects if required. For example, instead of streaming messages from Kafka to an Elasticsearch index called cars, they can be routed to monthly indices e.g. cars_2020-10, cars_2020-11, cars_2020-12, etc.
The TimeStampRouter takes two arguments; the format of the final topic name to generate, and the format of the timestamp to put in the topic name (based on SimpleDateFormat).
"transforms" : "addTimestampToTopic",
"transforms.addTimestampToTopic.type" : "org.apache.kafka.connect.transforms.TimestampRouter",
"transforms.addTimestampToTopic.topic.format" : "${topic}_${timestamp}",
"transforms.addTimestampToTopic.timestamp.format": "YYYY-MM-dd"
We kicked off this series by seeing on day 1 how to use InsertField to add in the timestamp to a message passing through the Kafka Connect sink connector. Today we’ll see how to use the same Single Message Transform to add in a static field value, as well as the name of the Kafka topic, partition, and offset from which the message has been read.
"transforms" : "insertStaticField1",
"transforms.insertStaticField1.type" : "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertStaticField1.static.field": "sourceSystem",
"transforms.insertStaticField1.static.value": "NeverGonna"
If you want to mask fields of data as you ingest from a source into Kafka, or write to a sink from Kafka with Kafka Connect, the MaskField Single Message Transform is perfect for you. It retains the fields whilst replacing its value.
To use the Single Message Transform you specify the field to mask, and its replacement value. To mask the contents of a field called cc_num you would use:
"transforms" : "maskCC",
"transforms.maskCC.type" : "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.maskCC.fields" : "cc_num",
"transforms.maskCC.replacement" : "****-****-****-****"
If you want to change the topic name to which a source connector writes, or object name that’s created on a target by a sink connector, the RegExRouter is exactly what you need.
To use the Single Message Transform you specify the pattern in the topic name to match, and its replacement. To drop a prefix of test- from a topic you would use:
"transforms" : "dropTopicPrefix",
"transforms.dropTopicPrefix.type" : "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropTopicPrefix.regex" : "test-(.*)",
"transforms.dropTopicPrefix.replacement" : "$1"
The Flatten Single Message Transform (SMT) is useful when you need to collapse a nested message down to a flat structure.
To use the Single Message Transform you only need to reference it; there’s no additional configuration required:
"transforms" : "flatten",
"transforms.flatten.type" : "org.apache.kafka.connect.transforms.Flatten$Value"
Setting the key of a Kafka message is important as it ensures correct logical processing when consumed across multiple partitions, as well as being a requirement when joining to messages in other topics. When using Kafka Connect the connector may already set the key, which is great. If not, you can use these two Single Message Transforms (SMT) to set it as part of the pipeline based on a field in the value part of the message.
To use the ValueToKey Single Message Transform specify the name of the field (id) that you want to copy from the value to the key:
"transforms" : "copyIdToKey",
"transforms.copyIdToKey.type" : "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.copyIdToKey.fields" : "id",
You can use the InsertField Single Message Transform (SMT) to add the message timestamp into each message that Kafka Connect sends to a sink.
To use the Single Message Transform specify the name of the field (timestamp.field) that you want to add to hold the message timestamp:
"transforms" : "insertTS",
"transforms.insertTS.type" : "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertTS.timestamp.field": "messageTS"
Back in March 2020 the western world came to somewhat of a juddering halt, thanks to COVID-19. No-one knew then what would happen, but there was the impression that whilst the next few months were a write-off for sure, maybe things would pick up again later in the year.
It’s now early December 2020, and nothing is picking up any time soon. Summer provided a respite from the high levels of infection and mortality (in the UK at least), but then numbers spiked again in many places around the world and what was punted down the river back in March is being firmly punted yet again now.
Is a blog even a blog nowadays if it doesn’t include a "Here is my home office setup"?
Thanks to conferences all being online, and thus my talks being delivered from my study—and my habit of posting a #SpeakerSelfie each time I do a conference talk—I often get questions about my setup. Plus, I’m kinda pleased with it so I want to show it off too ;-)
Very short & sweet this post, but Google turned up nothing when I was stuck so hopefully I’ll save someone else some head scratching by sharing this.
As you may already realise, Kafka is not just a fancy message bus, or a pipe for big data. It’s an event streaming platform! If this is news to you, I’ll wait here whilst you read this or watch this…
Streaming data from Kafka to Elasticsearch is easy with Kafka Connect - you can see how in this tutorial and video.
One of the things that sometimes causes issues though is how to get location data correctly indexed into Elasticsearch as geo_point fields to enable all that lovely location analysis. Unlike data types like dates and numerics, Elasticsearch’s Dynamic Field Mapping won’t automagically pick up geo_point data, and so you have to do two things:
STRUCT)
There was a good question on StackOverflow recently in which someone was struggling to find the appropriate ksqlDB DDL to model a source topic in which there was a variable number of fields in a STRUCT.
We saw in the first post how to hack together an ingestion pipeline for XML into Kafka using a source such as curl piped through xq to wrangle the XML and stream it into Kafka using kafkacat, optionally using ksqlDB to apply and register a schema for it.
The second one showed the use of any Kafka Connect source connector plus the kafka-connect-transform-xml Single Message Transformation. Now we’re going to take a look at a source connector from the community that can also be used to ingest XML data into Kafka.
We previously looked at the background to getting XML into Kafka, and potentially how [not] to do it. Now let’s look at the proper way to build a streaming ingestion pipeline for XML into Kafka, using Kafka Connect.
If you’re unfamiliar with Kafka Connect, check out this quick intro to Kafka Connect here. Kafka Connect’s excellent plugable architecture means that we can pair any source connector to read XML from wherever we have it (for example, a flat file, or a MQ, or anywhere else), with a Single Message Transform to transform the XML into a payload with a schema, and finally a converter to serialise the data in a form that we would like to use such as Avro or Protobuf.
What would a blog post on rmoff.net be if it didn’t include the dirty hack option? 😁
The secret to dirty hacks is that they are often rather effective and when needs must, they can suffice. If you’re prototyping and need to JFDI, a dirty hack is just fine. If you’re looking for code to run in Production, then a dirty hack probably is not fine.
XML has been around for 20+ years, and whilst other ways of serialising our data have gained popularity in more recent times (such as JSON, Avro, and Protobuf), XML is not going away soon. Part of that is down to technical reasons (clearly defined and documented schemas), and part of it is simply down to enterprise inertia - having adopted XML for systems in the last couple of decades, they’re not going to be changing now just for some short-term fad. See also COBOL.