Skip to main content
This guide explains how to install, configure, and run the gRPC_kafka module of Nexalis Agent.

1. Introduction

gRPC_kafka receives JSON messages from the industrial connectors and sends them to Nexalis Cloud. Key features:
  • Secure integration with Kafka using SASL_SSL or mTLS
  • Averaging of numeric datapoints over a configurable interval
  • Reliable logging, buffering, and error handling
  • Preservation of messages with on-disk buffering during outages
⚠ To configure gRPC_kafka, you require a Kafka producer topic, endpoint, and credentials. The Nexalis team provides credentials for staging and production. Please have a look at the previous sections on the technical requirements and installation of the Nexalis Agent before reading this guide.

2. Configuration

Configuration Files

The 2 following configuration files are provided as templates in the Nexalis Agent release (to be edited to suit your specific needs).
  • config_gRPC_kafka.json → SASL_SSL
  • config_gRPC_kafka_mTLS.json → mTLS (rename to config_gRPC_kafka.json if used)

Example: SASL_SSL

{
  "logger": { "MB_max_file_size": "5", "max_rotated_files": "3", "level": "debug", "log_file_path": "./gRPC_kafka.log" },
  "gRPC_server": { "gRPCServerIP": "127.0.0.1", "gRPCServerPort": "50051" },
  "kafka_producer": {
    "topicName": "test-nexalis-agent",
    "brokers": "public_url_1:9194,public_url_2:9194",
    "SASL": { "ssl_ca_location": "/usr/lib/ssl/certs" },
    "buffer": { "bufferStoragePath": "./persistent_buffer", "max_MB_buffer_size": "50000", "max_kBs_unbuffering_speed": "1000" },
    "batchIntervalSeconds": "1",
    "compression_type": "zstd",
    "compression_level": "12",
    "dataHandler": { "averagePeriod": "5min", "highFrequency": true },
    "message_max_bytes": "10000000"
  }
}
SASL credentials must be set in environment variables:
export KAFKA_SASL_USERNAME=your_username
export KAFKA_SASL_PASSWORD=your_password
Note: the use of zstd compression type is mandatory when using the Redpanda kafka broker ( other compression types don’t work with Redpanda).

Example: mTLS

{
  "logger": { "MB_max_file_size": "5", "max_rotated_files": "3", "level": "debug", "log_file_path": "./gRPC_kafka.log" },
  "gRPC_server": { "gRPCServerIP": "127.0.0.1", "gRPCServerPort": "50051" },
  "kafka_producer": {
    "topicName": "test-nexalis-agent",
    "brokers": "public_url_1:9194,public_url_2:9194",
    "sslConfig": {
      "caLocation": "./certs/CA-chain.pem",
      "certificateLocation": "./certs/certificate.txt",
      "privateKeyLocation": "./certs/private_key.txt",
      "keyPassword": "keyPassword"
    },
    "buffer": { "bufferStoragePath": "./persistent_buffer", "max_MB_buffer_size": "50000", "max_kBs_unbuffering_speed": "1000" },
    "batchIntervalSeconds": "1",
    "compression_type": "zstd",
    "compression_level": "12",
    "dataHandler": { "averagePeriod": "5min", "highFrequency": true },
    "message_max_bytes": "10000000"
  }
}

Configuration Parameters

Below are the definitions for all available fields in config_gRPC_kafka.json.

Logger

  • MB_max_file_size (MB): Maximum size of each log file before rotation. Default: 30.
  • max_rotated_files: Maximum number of rotated log files to keep. Default: 3.
  • level: Logging verbosity. Accepted values include "debug", "info", "error".
  • log_file_path: Path to the log file written by gRPC_kafka.

gRPC Server

  • gRPCServerIP: IP address the gRPC server binds to.
  • gRPCServerPort: Port the gRPC server listens on.

Kafka Producer

  • topicName: Kafka topic provided by Nexalis (account‑scoped).
  • brokers: Comma‑separated list of broker endpoints.
  • SASL (exclusive with mTLS):
    • ssl_ca_location: Path to CA certificates. On Debian, /usr/lib/ssl/certs is commonly used.
    • Credentials via environment variables (must be exported before start):
      export KAFKA_SASL_USERNAME=your_username
      export KAFKA_SASL_PASSWORD=your_password
      
  • sslConfig (mTLS) (exclusive with SASL):
    • caLocation: Path to CA chain file (CA_chain.pem). This file must contain two certificates: e.g., the public AWS root CA and the private Nexalis CA.
    • certificateLocation: Path to the client certificate.
    • privateKeyLocation: Path to the client private key.
    • keyPassword: Password protecting the private key (if used).

Buffer

  • bufferStoragePath: Directory for on‑disk buffered messages. Default: ./persistent_buffer. Files are Zstd‑compressed.
  • max_MB_buffer_size: Maximum total size (in MB) of the buffer directory.
  • max_kBs_unbuffering_speed: Cap for replay speed in kB/s. Minimum: 1. Also determines batch sizing (e.g., 1000 → batches up to ~1 MB before compression).

Batching & Compression

  • batchIntervalSeconds: Max time to accumulate messages before send (seconds). Min: 1; Default: 1.
  • compression_type: Codec used by the Kafka producer. Default: zstd. Options: none, gzip, lz4, snappy, zstd.
    • Redpanda note: zstd is mandatory when using Redpanda.
  • compression_level: Compression level for the selected codec. Ranges: gzip [0–9], lz4 [0–12], zstd [0–12], snappy = 0, -1 uses codec default. Default: 12.

Data Handling

  • dataHandler (optional) — controls forwarding behavior:
    • highFrequency:
      • true (default): forward high‑frequency/raw messages.
      • false: disable raw forwarding (aggregates only).
    • averagePeriod:
      • "None": disable averages.
      • "Xmin": compute weighted averages over X minutes, where 1 ≤ X ≤ 60 (e.g., "5min"). Default: "5min".
  • message_max_bytes: Max uncompressed size of a single message sent to the cloud. Default: 10 MB; Min: 1 MB; Max: 50 MB. Applies to non‑buffered messages only.
  • max_kBs_speed: Max regular (non‑buffered) send rate in kB/s. Default: 1 Gb/s.

3. Average Values

  • Computes averages only for numeric data types (integer, float). It does not compute averages for non-numeric types such as boolean or string.
  • Includes min (the smallest value recorded during the interval), max (the largest value recorded during the interval), count (The total number of data points used to calculate the average) in metaData.
  • Uses weighted averages, accounting for the duration each data point is active and with continuity across intervals (the last value from the previous interval is carried over as the initial value for the next interval).
{
    "siteName": "SiteA",
    "deviceID": "Device123",
    "deviceModel": "ModelX",
    "protocol": "HTTP",
    "dataPoint": "Tag1",
    "value": 108.24997,
    "tsConnector": 1732238100000,
    "triggerType": "5min-avg",
    "metaData": {
        "min": 108.0,
        "max": 108.8,
        "count": 10,
        "nx-agent-id": "1c77fade-xxxx-xxxx-xxxx-b873945210ab"
    },
    "description": "Temperature measurement", #optional, only if provided by the connector
    "unit": "°C" #optional, only if provided by the connector
}

4. On-Disk Buffering

  • Buffers messages on the disk if broker unavailable (ensures that messages are preserved even in the event of a power outage during communication loss, preventing data loss)
  • Compressed with Zstd
  • Adaptive replay speed (starts slow, scales up, i.e., increases the rate of buffered message sending)
  • The metaData JSON section marks buffered messages: "buffered": true. If the message was sent in real-time, there is no key buffered, inside the metaData section.

5. Running the gRPC_kafka manually

Normally, gRPC_kafka is started automatically by the Nexalis Agent launcher and requires no manual action. For debugging purposes only, you can run it directly if config_gRPC_kafka.json is in the same directory as the executable:
./gRPC_kafka

6. gRPC Service

Messages must be JSON arrays with the following required fields:
siteName, deviceID, deviceModel, protocol, dataPoint, value, tsConnector, triggerType.
The message from the gRPC server to the gRPC client contains the result of the forwarding operation:
  • success: Boolean indicating whether the forwarding was successful.
  • confirmation: Confirmation message if successful.
  • error_message: Error message if unsuccessful.
The max gRPC message size is 4MB.
If the message is not correctly formated, gRPC_kafka will reject the message and send an error to the gRPC client.
gRPC_kafka checks that all required keys have non-empty values, except for the value key. The “value” key can be empty for tag discovery purposes.
tsConnector is the number of milliseconds since 1970-01-01 UTC.
Example of a valid JSON message:
[
  {
    "siteName": "TEST_SITE1",
    "deviceID": "10",
    "deviceModel": " deviceModel1 ",
    "protocol": "modbus_tcp",
    "dataPoint": 1,
    "description": "Description1"
    "unit": "kWh"
    "value": 100,
    "tsSource": null,
    "qualitySource": null,
    "tsConnector": "1716346122000",
    "triggerType": "cyclic-1000",
    "metaData": {
      "modbusServerID": 1,
      "numberOfRetries": 5,
      "timeout": 500,
      "requestID": 1,
      "type": "s32",
      "bytesOrder": "ABCD",
      "cyclicPeriod": 1000,
      "registerType": "holdingregisters",
  "buffered": true}
  },
  {
    "siteName": "TEST_SITE1",
    "deviceID": "11",
    "deviceModel": " deviceModel2",
    "protocol": "modbus_tcp",
    "dataPoint": 2,
    "description": "Description2"
    "unit": "kWh"
    "value": 101,
    "tsSource": null,
    "qualitySource": null,
    "tsConnector": " 1716346122000",
    "triggerType": "cyclic-1000",
    "metaData": {
      "modbusServerID": 1,
      "numberOfRetries": 5,
      "timeout": 500,
      "requestID": 1,
      "type": "s32",
      "bytesOrder": "ABCD",
      "cyclicPeriod": 1000,
      "registerType": "holdingregisters"}
  }
]

7. gRPC_kafka – Common Errors and Solutions (Q&A)

Q1: SSL handshake failed

Error message:
SSL handshake failed: error:0A000086:SSL routines::certificate verify failed: broker certificate could not be verified, verify that ssl.ca.location is correctly configured or root CA certificates are installed (install ca-certificates package)
Meaning:
The certificates configured in the gRPC_kafka configuration file are not correct.
Possible causes:
  • Typo in the certificate file names.
  • CA_chain.pem does not contain the required 2 certificates (AWS root CA + Nexalis private CA).
  • Certificates are invalid and new ones must be generated.

Q2: Failed to resolve URL

Error message:
Failed to resolve '[URL]': Name or service not known
Meaning:
The broker URLs in the gRPC_kafka configuration are not properly set.

Q3: Kafka topic authorization failed

Error message:
Failed to send message to Kafka: Broker: Topic authorization failed
Meaning:
The certificates are not valid and are not authorized to send data to the topic configured in gRPC_kafka.

Q4: Connection timeout

Error message:
[error] [librdkafka-producer] ERROR: sasl_ssl://[URL].cloud.redpanda.com:9092/4: Connection setup timed out in state CONNECT (after 30052ms in state CONNECT).
Meaning:
The Kafka producer cannot establish a connection to the specified Redpanda broker, typically due to firewall restrictions.
Solution:
  • Ensure all Redpanda broker URLs are provided to the network/security team for firewall rule configuration.
  • The timeout interval (every 30 seconds) corresponds to socket.connection.setup.timeout.ms in librdkafka.
  • Verify connectivity to the brokers using nc (netcat) or telnet to check if required ports are accessible.
Example command:
nc -zv [URL].cloud.redpanda.com 9092