Skip to content

Commit 6aaa750

Browse files
committed
feat: add brokers option
1 parent 6cdd8dc commit 6aaa750

3 files changed

Lines changed: 52 additions & 11 deletions

File tree

extensions/eda/plugins/event_source/kafka.py

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,21 @@
1414
- An ansible-rulebook event source plugin for receiving events via a kafka topic.
1515
options:
1616
host:
17-
description:
1817
- The host where the kafka topic is hosted.
1918
type: str
20-
required: true
19+
required: false
2120
port:
2221
description:
2322
- The port where the kafka server is listening.
24-
type: str
25-
required: true
23+
type: false
24+
brokers:
25+
description:
26+
- A list of host[:port] strings
27+
that the consumer should contact to bootstrap initial cluster metadata.
28+
This does not have to be the full node list.
29+
It just needs to have at least one broker that will respond to a Metadata API Request
30+
type: list(str)
31+
required: false
2632
cafile:
2733
description:
2834
- The optional certificate authority file path containing certificates
@@ -126,7 +132,11 @@
126132
EXAMPLES = r"""
127133
- ansible.eda.kafka:
128134
host: "localhost"
129-
port: "9092"
135+
port: 9092
136+
brokers:
137+
- broker-1:9092
138+
- broker-2:9093
139+
- broker-3:9094
130140
check_hostname: true
131141
verify_mode: "CERT_OPTIONAL"
132142
encoding: "utf-8"
@@ -163,6 +173,7 @@ async def main( # pylint: disable=R0914
163173

164174
host = args.get("host")
165175
port = args.get("port")
176+
brokers = args.get("brokers")
166177
cafile = args.get("cafile")
167178
certfile = args.get("certfile")
168179
keyfile = args.get("keyfile")
@@ -174,6 +185,18 @@ async def main( # pylint: disable=R0914
174185
encoding = args.get("encoding", "utf-8")
175186
security_protocol = args.get("security_protocol", "PLAINTEXT")
176187

188+
if host and brokers:
189+
msg = "Only one of host and brokers parameter must be set"
190+
raise ValueError(msg)
191+
192+
if (host and not port) or (port and not host):
193+
msg = "Port and host must be set"
194+
raise ValueError(msg)
195+
196+
if not host and not brokers:
197+
msg = "host + port or brokers must be set"
198+
raise ValueError(msg)
199+
177200
if offset not in ("latest", "earliest"):
178201
msg = f"Invalid offset option: {offset}"
179202
raise ValueError(msg)
@@ -202,7 +225,7 @@ async def main( # pylint: disable=R0914
202225
ssl_context.verify_mode = verify_mode
203226

204227
kafka_consumer = AIOKafkaConsumer(
205-
bootstrap_servers=f"{host}:{port}",
228+
bootstrap_servers=brokers if brokers else f"{host}:{port}",
206229
group_id=group_id,
207230
enable_auto_commit=True,
208231
max_poll_records=1,

tests/integration/event_source_kafka/test_kafka_source.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,14 @@ def should_skip_kafka_tests() -> bool:
1717
Determine if Kafka tests should be skipped based on platform.
1818
1919
Do not run kafka tests on macos or arm64 because the testing kafka deployment does
20-
not run on arm64 and macos does not provide podman-compose cli.
20+
not run on arm64 and macos does not provide docker-compose cli.
2121
2222
Returns:
2323
bool: True if tests should be skipped, False otherwise
2424
"""
2525
system = platform.system().lower()
2626
machine = platform.machine().lower()
27-
27+
return False
2828
# Check for macos
2929
if system == "darwin":
3030
return True
@@ -106,7 +106,7 @@ def kafka_broker() -> Generator[subprocess.CompletedProcess[bytes], None, None]:
106106
print(cwd)
107107
# Keep --quiet-pull here is it does spam CI/CD console
108108
result = subprocess.run(
109-
["podman-compose", "up", "--quiet-pull", "-d"], cwd=cwd, check=True
109+
["docker-compose", "up", "--quiet-pull", "-d"], cwd=cwd, check=True
110110
)
111111

112112
# Wait for Kafka broker to be ready - focus on main port first
@@ -118,7 +118,7 @@ def kafka_broker() -> Generator[subprocess.CompletedProcess[bytes], None, None]:
118118
wait_for_kafka_ready("localhost:9095", timeout=15, check_ssl=True) # SASL_SSL
119119

120120
yield result
121-
subprocess.run(["podman-compose", "down", "-v"], cwd=cwd, check=True)
121+
subprocess.run(["docker-compose", "down", "-v"], cwd=cwd, check=True)
122122

123123

124124
@pytest.fixture(scope="session")

tests/unit/event_source/test_kafka.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import asyncio
44
import json
5+
import re
56
from typing import Any
67
from unittest.mock import MagicMock, patch
78

@@ -67,7 +68,7 @@ def test_receive_from_kafka_place_in_queue(
6768
{
6869
topic_type: topic_value,
6970
"host": "localhost",
70-
"port": "9092",
71+
"port": 9092,
7172
"group_id": "test",
7273
},
7374
)
@@ -96,3 +97,20 @@ def test_mixed_topics_and_patterns(
9697
match="Exactly one of topic, topics, or topic_pattern must be provided.",
9798
):
9899
asyncio.run(kafka_main(myqueue, topic_args))
100+
101+
@pytest.mark.parametrize(
102+
"args, error_msg",
103+
[
104+
# Only host set
105+
({"host": "localhost", "port": None, "brokers": None, "topic": "eda"}, "Port and host must be set"),
106+
# Only port set
107+
({"host": None, "port": 9092, "brokers": None, "topic": "eda"}, "Port and host must be set"),
108+
# Neither host nor brokers set
109+
({"host": None, "port": None, "brokers": None, "topic": "eda"}, "host + port or brokers must be set"),
110+
# Both host and brokers set
111+
({"host": "localhost", "port": 9092, "brokers": ["localhost:9092"], "topic": "eda"}, "Only one of host and brokers parameter must be set")
112+
]
113+
)
114+
def test_host_port_brokers_combinations(myqueue: MockQueue, args: dict[str, Any], error_msg: str) -> None:
115+
with pytest.raises(ValueError, match=re.escape(error_msg)):
116+
asyncio.run(kafka_main(myqueue, args))

0 commit comments

Comments
 (0)