Platforma Ferryt potrafi komunikować się z zewnętrznymi systemami w celu wymiany wiadomości i komunikatów za pośrednictwem platformy Apache Kafka. Kafka staje się coraz bardziej popularną platformą wymiany komunikatów. Stosowana jest przez instytucje i korporacje, gdzie wymagana jest wysokowydajna komunikacja, strumieniowe przesyłanie danych i pipeline-y danych.

Celem artykułu jest omówienie mechanizmów integracji Apache Kafka z platformą Ferryt. Nie będziemy omawiać mechaniki działania samej Kafki. Dla czytelników, którzy są zainteresowani czym jest Kafka, polecamy ten materiał.

Apache Kafka w Ferryt

Platforma Ferryt może wystąpić zarówno jako Producer – system publikujący na Kafkę, jak i Consumer – system odczytujący zdarzenia z Kafki. Ferryt potrafi obsłużyć wiele formatów danych, w szczególności także schematy Apache Avro™.  

Trochę więcej o Apache Avro™

Apache Avro™ jest systemem serializacji danych, zoptymalizowanym dla strumieniowego przetwarzania danych. Jest to jeden z podstawowych formatów danych używanych w Kafka, który opiera się na schematach danych JSON.

Oto przykładowy plik ze schematem Avro:

{
    "name": "Registry",
    "type": "record",
    "fields": [{
            "name": "schemaName",
            "type": ["null", "string"]
        }, {
            "name": "Id",
            "type": ["null", "string"]
        }, {
            "name": "modifyTime",
            "type": ["null", {
                    "type": "long",
                    "logicalType": "timestamp-millis"
                }
            ]
        }, {
            "name": "modifyUser",
            "type": ["null", "string"]
        }, {
            "name": "data",
            "type": ["null", {
                    "name": "data",
                    "type": "record",
                    "fields": [{
                            "name": "Temat",
                            "type": ["null", "string"]
                        }, {
                            "name": "Tresc",
                            "type": ["null", "string"]
                        }
                    ]
                }
            ]
        }
    ]
}

Konfiguracja z punktu widzenia architekta rozwiązań biznesowych w Ferryt

Rejestracja schema komunikatu

Pierwszym krokiem, niezależnie od tego czy rejestrujemy Ferryt jako producera czy consumera Kafki, jest zarejestrowanie schematu komunikatu. Posiadając plik ze schematem Avro należy utworzyć na jego podstawie typ danych.

Rejestrowanie Ferryt jako producera

Jeżeli w danym rozwiązaniu biznesowym konieczne jest wysyłanie komunikatów na Kafka to należy zarejestrować Ferryt jako producera zdarzeń.

W tym celu należy zarejestrować w Ferryt serwis zewnętrzny, podając parametry Kafki:

  • Adres brokerów
  • Topic
  • Sposób uwierzytelnienia
  • Format komunikatu

Więcej na temat mechanizmów integracji w Ferryt, w tym serwisów zewnętrznych, w tym materiale.

Tak skonfigurowany serwis zewnętrzny można używać w dowolnym miejscu w procesie. Wysłanie komunikatu na Kafkę następuje w momencie wywołania w runtime silnika workflow bloczka Serwis zewnętrzny.

Rejestrowanie Ferryt jako consumera

Aby Ferryt był consumerem zdarzeń Kafka to należy zarejestrować w Ferryt zdarzenie zewnętrzne podając ponownie:

  • adres brokerów,
  • topic,
  • sposób uwierzytelnienia,
  • format komunikatu,
  • funkcję workflow, którą należy wywołać dla każdego odczytanego komunikatu.

Eventy z Kafki są odczytywane cyklicznie, co określony interwał czasowy, przez mechanizm schedulerów Ferryt. W trakcie jednego odczytu pobierana jest „paczka” nowych  komunikatów, których nie było na topicu przy poprzednim odczycie. Dla każdego takiego komunikatu uruchamiana jest funkcja workflow. W tej funkcji architekt rozwiązania biznesowego może zamodelować dowolną aktywność wymaganą do wykonania po otrzymaniu komunikatu z Kafki, np. przekazanie do wniosku danych lub wywołanie akcji na wniosku.

Implementacja Kafka w Ferryt – Confluent Kafka

Ogólny schemat komunikacji pomiędzy Ferryt a serwerem Kafki przedstawiono na poniższym schemacie:


Za komunikację z serwerem Kafka odpowiada moduł Ferryt Kafka Connector. Jego zadaniem jest ustanowienie bezpośredniego połączenia z serwerem oraz wysłanie i pobranie komunikatu.

W  Ferryt jest zaimplementowane rozwiązanie oparte na bibliotece do obsługi Confluent Kafka. Jest to rozszerzenie zbudowane przez twórców Apache Kafka®. Rozszerza ono listę zalet Kafki o funkcje klasy korporacyjnej, usuwając jednocześnie obciążenia związane z zarządzaniem lub monitorowaniem Kafki.

Do komunikacji używany jest Confluent.Kafka dla platformy.NET. Jest to klient Kafki dostępny w pakiecie nuGet.

Biblioteka ta zapewnia techniczną obsługę producera i consumera. Rozwiązanie jest kompatybilne ze wszystkimi brokerami Apache Kafka ®. W kontekście wersji .NET Framework jest kompatybilny z .NET Framework >= v4.6.2, .NET Core >= v1.0 i .NET Standard.

Tworzenie producera Kafki z wykorzystaniem komponentu Confluent.Kafka

W poniżej podanym przykładzie zaprezentowano fragment kodu obrazujący etap tworzenia [producera. Podane dane są przykładowe i mają na celu zaprezentować funkcjonalność.

1. W pierwszym kroku dodawana jest referencja do biblioteki Confluent:

using Confluent.Kafka;

2. Następnie tworzony jest słownik konfiguracyjny, w którym podawany jest adres brokera:

var config = new Dictionary<string, object>
{
{ "bootstrap.servers", "host1:9092" },
};

3. Kolejnym krokiem jest utworzenie producera poprzez odniesienie się do wcześniej utworzonego słownika konfiguracyjnego. Wiadomości będą przesyłane w formacie binarnym:

using (var producer = new Producer<Null, byte[]>(config, null, new 
Confluent.Kafka.Serialization.ByteArraySerializer()))

4. Tworzenie wiadomości asynchronicznej z przykładową wiadomością:

Task<Message<Null, byte[]>> deliveryReport = 
producer.ProduceAsync("testTopic", null, Encoding.ASCII.GetBytes("wiadomość testowa"));

5. Końcowy kod, jeżeli zamkniemy go w metodę, powinien wyglądać tak:

public void CreateProducerSendMsg()
{
var config = new Dictionary<string, object>
{
{ "bootstrap.servers", " host1:9092" },
};

using (var producer = new Producer<Null, byte[]>(config, null, new Confluent.Kafka.Serialization.ByteArraySerializer()))
{
Task<Message<Null, byte[]>> deliveryReport = producer.ProduceAsync("testTopic", null, Encoding.ASCII.GetBytes("wiadomość testowa"));
}; 
}

Tworzenie consumera Kafki z wykorzystaniem komponentu Confluent.Kafka

Spójrzmy, jak wygląda kod:

1. Dodawana jest wymagana referencja do biblioteki:

using Confluent.Kafka;

2. Tworzony jest słownik konfiguracyjny z obowiązkową konfiguracją. W przykładzie przedstawiamy tylko te opcje, które są wymagane. Ferryt konfiguruje więcej opcji.

var options = new Dictionary<string, object>
{
{ "group.id", "Ferryt"},
{ "bootstrap.servers", " host1:9092"},
};

Właściwość group.id jest obowiązkowa i określa, do której grupy konsumenckiej należy consumer.

3. Utworzenie consumera, który zawiera skonfigurowany słownik:

using (var consumer = new Consumer<Null, byte[]>(options, null, new 
Confluent.Kafka.Serialization.ByteArrayDeserializer()))

4. Subskrypcja consumera –  wymagane, aby konsument mógł dołączyć do group.id:

consumer.Subscribe("testTopic");

5. Pobranie wiadomości w formacie binarnym:

Message<Null, byte[]> msg;
while (consumer.Consume(out msg, 1000))
{
try
{
consumer.CommitAsync(msg).Wait();
return msg.Value;
}
catch (Exception ex)
{
throw;
}
}

6. Końcowy kod zamknięty w metodzie:

public byte[] CreateConsumerReciveMsg()
{
var options = new Dictionary<string, object>
{
{ "group.id", "Ferryt"},
{ "bootstrap.servers", " host1:9092"},
};

using (var consumer = new Consumer<Null, byte[]>(options, null, new Confluent.Kafka.Serialization.ByteArrayDeserializer()))
      {
consumer.Subscribe("testTopic");
Message<Null, byte[]> msg;
while (consumer.Consume(out msg, 1000))
{
try
{
consumer.CommitAsync(msg).Wait();
return msg.Value;
}
catch (Exception ex)
{
throw;
}
}
}
return null;
}

Podsumowanie

Mechanizmy integracyjne w Ferryt związane z obsługą Apache Kafka pozwalają na low-codowe przetwarzanie komunikatów z i do brokera Kafki. Specyfika Kafki umożliwia budowanie na platformie Ferryt rozwiązań biznesowych, gdzie komunikaty wysyłane są potokowo. Dzieje się tak bez obawy o niekorzystny wpływ na procesowane wniosków czy wydajność platformy.

Autorzy:

Krystyna Masińska – Developer

Dorota Wichowska – Senior Ferryt Manager, Team Leader