Skip to content

Commit 55a36f3

Browse files
authored
Merge branch 'master' into generic-type
2 parents 14c789d + 37b55c3 commit 55a36f3

46 files changed

Lines changed: 3892 additions & 100 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

psalm-baseline.xml

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
<?xml version="1.0" encoding="UTF-8"?>
2-
<files psalm-version="6.14.3@d0b040a91f280f071c1abcb1b77ce3822058725a">
2+
<files psalm-version="6.16.0@7cf3e8b988edd75e0766963b0b9e671b220f5785">
33
<file src="src/Activity/ActivityInterface.php">
44
<DeprecatedClass>
55
<code><![CDATA[NamedArgumentConstructor]]></code>
@@ -134,15 +134,9 @@
134134
<code><![CDATA[object]]></code>
135135
<code><![CDATA[object]]></code>
136136
</InvalidReturnType>
137-
<LessSpecificReturnStatement>
138-
<code><![CDATA[new self($serviceClient, $options, $converter, $interceptorProvider)]]></code>
139-
</LessSpecificReturnStatement>
140137
<MissingParamType>
141138
<code><![CDATA[$workflow]]></code>
142139
</MissingParamType>
143-
<MoreSpecificReturnType>
144-
<code><![CDATA[static]]></code>
145-
</MoreSpecificReturnType>
146140
<RedundantFunctionCall>
147141
<code><![CDATA[\sprintf]]></code>
148142
<code><![CDATA[\sprintf]]></code>
@@ -1356,6 +1350,8 @@
13561350
$converter ?? DataConverter::createDefault(),
13571351
$rpc ?? Goridge::create(),
13581352
$credentials,
1353+
$pluginRegistry,
1354+
$client,
13591355
)]]></code>
13601356
</UnsafeInstantiation>
13611357
</file>
@@ -1410,6 +1406,12 @@
14101406
<code><![CDATA[$response->toArray()['assets']]]></code>
14111407
</PossiblyUndefinedStringArrayOffset>
14121408
</file>
1409+
<file src="testing/src/Environment.php">
1410+
<PossiblyNullReference>
1411+
<code><![CDATA[stop]]></code>
1412+
<code><![CDATA[stop]]></code>
1413+
</PossiblyNullReference>
1414+
</file>
14131415
<file src="testing/src/Replay/WorkflowReplayer.php">
14141416
<UndefinedMethod>
14151417
<code><![CDATA[getWorkflowType]]></code>

src/Client/ScheduleClient.php

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,11 @@
3232
use Temporal\DataConverter\DataConverter;
3333
use Temporal\DataConverter\DataConverterInterface;
3434
use Temporal\Internal\Mapper\ScheduleMapper;
35+
use Temporal\Internal\Interceptor\Pipeline;
36+
use Temporal\Plugin\ConnectionPluginInterface;
37+
use Temporal\Plugin\PluginRegistry;
38+
use Temporal\Plugin\ScheduleClientPluginContext;
39+
use Temporal\Plugin\ScheduleClientPluginInterface;
3540
use Temporal\Internal\Marshaller\Mapper\AttributeMapperFactory;
3641
use Temporal\Internal\Marshaller\Marshaller;
3742
use Temporal\Internal\Marshaller\MarshallerInterface;
@@ -45,14 +50,39 @@ final class ScheduleClient implements ScheduleClientInterface
4550
private DataConverterInterface $converter;
4651
private MarshallerInterface $marshaller;
4752
private ProtoToArrayConverter $protoConverter;
53+
private PluginRegistry $pluginRegistry;
4854

4955
public function __construct(
5056
ServiceClientInterface $serviceClient,
5157
?ClientOptions $options = null,
5258
?DataConverterInterface $converter = null,
59+
?PluginRegistry $pluginRegistry = null,
5360
) {
5461
$this->clientOptions = $options ?? new ClientOptions();
5562
$this->converter = $converter ?? DataConverter::createDefault();
63+
$this->pluginRegistry = $pluginRegistry ?? new PluginRegistry();
64+
65+
// Apply connection plugins (before client-level configuration)
66+
$connectionPlugins = $this->pluginRegistry->getPlugins(ConnectionPluginInterface::class);
67+
$serviceClient = Pipeline::prepare($connectionPlugins)
68+
/** @see ConnectionPluginInterface::configureServiceClient() */
69+
->with(static fn(ServiceClientInterface $serviceClient) => $serviceClient, 'configureServiceClient')($serviceClient);
70+
71+
$pluginContext = new ScheduleClientPluginContext(
72+
clientOptions: $this->clientOptions,
73+
dataConverter: $this->converter,
74+
);
75+
$schedulePlugins = $this->pluginRegistry->getPlugins(ScheduleClientPluginInterface::class);
76+
/** @see ScheduleClientPluginInterface::configureScheduleClient() */
77+
Pipeline::prepare($schedulePlugins)
78+
->with(static fn() => null, 'configureScheduleClient')($pluginContext);
79+
80+
$this->clientOptions = $pluginContext->getClientOptions();
81+
$pluginConverter = $pluginContext->getDataConverter();
82+
if ($pluginConverter !== null) {
83+
$this->converter = $pluginConverter;
84+
}
85+
5686
$this->marshaller = new Marshaller(
5787
new AttributeMapperFactory(new AttributeReader()),
5888
);
@@ -71,8 +101,9 @@ public static function create(
71101
ServiceClientInterface $serviceClient,
72102
?ClientOptions $options = null,
73103
?DataConverterInterface $converter = null,
104+
?PluginRegistry $pluginRegistry = null,
74105
): ScheduleClientInterface {
75-
return new self($serviceClient, $options, $converter);
106+
return new self($serviceClient, $options, $converter, $pluginRegistry);
76107
}
77108

78109
public function createSchedule(

src/Client/WorkflowClient.php

Lines changed: 61 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,13 @@
3838
use Temporal\Interceptor\WorkflowClientCallsInterceptor;
3939
use Temporal\Internal\Client\ActivityCompletionClient;
4040
use Temporal\Internal\Client\WorkflowProxy;
41+
use Temporal\Plugin\ClientPluginContext;
42+
use Temporal\Plugin\ClientPluginInterface;
43+
use Temporal\Plugin\CompositePipelineProvider;
44+
use Temporal\Plugin\ConnectionPluginInterface;
45+
use Temporal\Plugin\PluginRegistry;
46+
use Temporal\Plugin\ScheduleClientPluginInterface;
47+
use Temporal\Plugin\WorkerPluginInterface;
4148
use Temporal\Internal\Client\WorkflowRun;
4249
use Temporal\Internal\Client\WorkflowStarter;
4350
use Temporal\Internal\Client\WorkflowStub;
@@ -63,6 +70,7 @@ class WorkflowClient implements WorkflowClientInterface
6370
private DataConverterInterface $converter;
6471
private ?WorkflowStarter $starter = null;
6572
private WorkflowReader $reader;
73+
private PluginRegistry $pluginRegistry;
6674

6775
/** @var Pipeline<WorkflowClientCallsInterceptor, mixed> */
6876
private Pipeline $interceptorPipeline;
@@ -72,11 +80,40 @@ public function __construct(
7280
?ClientOptions $options = null,
7381
?DataConverterInterface $converter = null,
7482
?PipelineProvider $interceptorProvider = null,
83+
?PluginRegistry $pluginRegistry = null,
7584
) {
76-
$this->interceptorPipeline = ($interceptorProvider ?? new SimplePipelineProvider())
77-
->getPipeline(WorkflowClientCallsInterceptor::class);
85+
$this->pluginRegistry = $pluginRegistry ?? new PluginRegistry();
7886
$this->clientOptions = $options ?? new ClientOptions();
7987
$this->converter = $converter ?? DataConverter::createDefault();
88+
89+
// Apply connection plugins (before client-level configuration)
90+
$connectionPlugins = $this->pluginRegistry->getPlugins(ConnectionPluginInterface::class);
91+
$serviceClient = Pipeline::prepare($connectionPlugins)
92+
/** @see ConnectionPluginInterface::configureServiceClient() */
93+
->with(static fn(ServiceClientInterface $serviceClient) => $serviceClient, 'configureServiceClient')($serviceClient);
94+
95+
$pluginContext = new ClientPluginContext(
96+
clientOptions: $this->clientOptions,
97+
dataConverter: $this->converter,
98+
);
99+
$clientPlugins = $this->pluginRegistry->getPlugins(ClientPluginInterface::class);
100+
Pipeline::prepare($clientPlugins)
101+
/** @see ClientPluginInterface::configureClient() */
102+
->with(static fn(ClientPluginContext $pluginContext) => $pluginContext, 'configureClient')($pluginContext);
103+
104+
$this->clientOptions = $pluginContext->getClientOptions();
105+
$pluginConverter = $pluginContext->getDataConverter();
106+
if ($pluginConverter !== null) {
107+
$this->converter = $pluginConverter;
108+
}
109+
110+
// Build interceptor pipeline: merge plugin-contributed interceptors with user-provided ones
111+
$provider = new CompositePipelineProvider(
112+
$pluginContext->getInterceptors(),
113+
$interceptorProvider ?? new SimplePipelineProvider(),
114+
);
115+
116+
$this->interceptorPipeline = $provider->getPipeline(WorkflowClientCallsInterceptor::class);
80117
$this->reader = new WorkflowReader($this->createReader());
81118

82119
// Set Temporal-Namespace metadata
@@ -88,16 +125,34 @@ public function __construct(
88125
);
89126
}
90127

91-
/**
92-
* @return static
93-
*/
94128
public static function create(
95129
ServiceClientInterface $serviceClient,
96130
?ClientOptions $options = null,
97131
?DataConverterInterface $converter = null,
98132
?PipelineProvider $interceptorProvider = null,
133+
?PluginRegistry $pluginRegistry = null,
99134
): self {
100-
return new self($serviceClient, $options, $converter, $interceptorProvider);
135+
return new self($serviceClient, $options, $converter, $interceptorProvider, $pluginRegistry);
136+
}
137+
138+
/**
139+
* Get plugins that also implement WorkerPluginInterface for propagation to workers.
140+
*
141+
* @return list<WorkerPluginInterface>
142+
*/
143+
public function getWorkerPlugins(): array
144+
{
145+
return $this->pluginRegistry->getPlugins(WorkerPluginInterface::class);
146+
}
147+
148+
/**
149+
* Get plugins that also implement ScheduleClientPluginInterface for propagation to schedule clients.
150+
*
151+
* @return list<ScheduleClientPluginInterface>
152+
*/
153+
public function getScheduleClientPlugins(): array
154+
{
155+
return $this->pluginRegistry->getPlugins(ScheduleClientPluginInterface::class);
101156
}
102157

103158
public function getServiceClient(): ServiceClientInterface

src/Interceptor/SimplePipelineProvider.php

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,28 @@ class SimplePipelineProvider implements PipelineProvider
2222
* @param array<array-key, Interceptor> $interceptors
2323
*/
2424
public function __construct(
25-
private iterable $interceptors = [],
25+
private readonly iterable $interceptors = [],
2626
) {}
2727

28+
/**
29+
* Create a new provider with additional interceptors prepended.
30+
*
31+
* @param list<Interceptor> $interceptors Interceptors to prepend before existing ones.
32+
*/
33+
public function withPrependedInterceptors(array $interceptors): self
34+
{
35+
if ($interceptors === []) {
36+
return $this;
37+
}
38+
39+
return new self(\array_merge($interceptors, [...$this->interceptors]));
40+
}
41+
2842
public function getPipeline(string $interceptorClass): Pipeline
2943
{
3044
return $this->cache[$interceptorClass] ??= Pipeline::prepare(
3145
\array_filter(
32-
$this->interceptors,
46+
[...$this->interceptors],
3347
static fn(Interceptor $i): bool => $i instanceof $interceptorClass,
3448
),
3549
);

src/Internal/Interceptor/Pipeline.php

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ private function __construct(
5353
/**
5454
* Make sure that interceptors implement the same interface.
5555
*
56-
* @template T of Interceptor
56+
* @template T of object
5757
*
5858
* @param iterable<T> $interceptors
5959
*
@@ -64,6 +64,21 @@ public static function prepare(iterable $interceptors): self
6464
return new self($interceptors);
6565
}
6666

67+
/**
68+
* Merge two pipelines into one. Interceptors from the first pipeline run before the second.
69+
*
70+
* @template T of object
71+
*
72+
* @param self<T, mixed> $first
73+
* @param self<T, mixed> $second
74+
*
75+
* @return self<T, mixed>
76+
*/
77+
public static function merge(self $first, self $second): self
78+
{
79+
return new self([...$first->interceptors, ...$second->interceptors]);
80+
}
81+
6782
/**
6883
* @param non-empty-string $method Method name of the all interceptors.
6984
*

src/Internal/Transport/Router/GetWorkerInfo.php

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
use Temporal\Internal\Declaration\Prototype\WorkflowPrototype;
1919
use Temporal\Internal\Marshaller\MarshallerInterface;
2020
use Temporal\Internal\Repository\RepositoryInterface;
21+
use Temporal\Plugin\PluginInterface;
22+
use Temporal\Plugin\PluginRegistry;
2123
use Temporal\Worker\ServiceCredentials;
2224
use Temporal\Worker\Transport\Command\ServerRequestInterface;
2325
use Temporal\Worker\WorkerInterface;
@@ -28,6 +30,7 @@ public function __construct(
2830
private readonly RepositoryInterface $queues,
2931
private readonly MarshallerInterface $marshaller,
3032
private readonly ServiceCredentials $credentials,
33+
private readonly PluginRegistry $pluginRegistry,
3134
) {}
3235

3336
public function handle(ServerRequestInterface $request, array $headers, Deferred $resolver): void
@@ -54,6 +57,10 @@ private function workerToArray(WorkerInterface $worker): array
5457
'Name' => $activity->getID(),
5558
];
5659

60+
$map = $this->map($this->pluginRegistry->getPlugins(PluginInterface::class), static fn(PluginInterface $plugin): array => [
61+
'Name' => $plugin->getName(),
62+
'Version' => null,
63+
]);
5764
return [
5865
'TaskQueue' => $worker->getID(),
5966
'Options' => $this->marshaller->marshal($worker->getOptions()),
@@ -62,6 +69,7 @@ private function workerToArray(WorkerInterface $worker): array
6269
// ActivityInfo[]
6370
'Activities' => $this->map($worker->getActivities(), $activityMap),
6471
'PhpSdkVersion' => SdkVersion::getSdkVersion(),
72+
'Plugins' => $map,
6573
'Flags' => (object) $this->prepareFlags(),
6674
];
6775
}

src/Internal/Transport/Server.php

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,11 @@
1313

1414
use React\Promise\PromiseInterface;
1515
use Temporal\Internal\Exception\UndefinedRequestException;
16+
use Temporal\Internal\Interceptor\Pipeline;
1617
use Temporal\Internal\Queue\QueueInterface;
1718
use Temporal\Internal\Transport\Request\UndefinedResponse;
19+
use Temporal\Plugin\PluginRegistry;
20+
use Temporal\Plugin\WorkerPluginInterface;
1821
use Temporal\Worker\Transport\Command\Client\FailedClientResponse;
1922
use Temporal\Worker\Transport\Command\Client\SuccessClientResponse;
2023
use Temporal\Worker\Transport\Command\FailureResponseInterface;
@@ -34,14 +37,18 @@ final class Server implements ServerInterface
3437

3538
private \Closure $onMessage;
3639
private QueueInterface $queue;
40+
private Pipeline $interceptors;
3741

3842
/**
3943
* @psalm-param OnMessageHandler $onMessage
4044
*/
41-
public function __construct(QueueInterface $queue, callable $onMessage)
45+
public function __construct(QueueInterface $queue, callable $onMessage, PluginRegistry $pluginRegistry)
4246
{
4347
$this->queue = $queue;
4448

49+
$plugins = $pluginRegistry->getPlugins(WorkerPluginInterface::class);
50+
$this->interceptors = Pipeline::prepare($plugins);
51+
4552
$this->onMessage($onMessage);
4653
}
4754

@@ -56,7 +63,11 @@ public function onMessage(callable $then): void
5663
public function dispatch(ServerRequestInterface $request, array $headers): void
5764
{
5865
try {
59-
$result = ($this->onMessage)($request, $headers);
66+
$result = $this->interceptors->with(
67+
static fn(callable $handler, ServerRequestInterface $request, array $headers): PromiseInterface => $handler($request, $headers),
68+
/** @see WorkerPluginInterface::runWorker() */
69+
'runWorker',
70+
)($this->onMessage, $request, $headers);
6071
} catch (\Throwable $e) {
6172
$this->queue->push(new FailedClientResponse($request->getID(), $e));
6273

src/Plugin/AbstractPlugin.php

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
<?php
2+
3+
/**
4+
* This file is part of Temporal package.
5+
*
6+
* For the full copyright and license information, please view the LICENSE
7+
* file that was distributed with this source code.
8+
*/
9+
10+
declare(strict_types=1);
11+
12+
namespace Temporal\Plugin;
13+
14+
/**
15+
* Abstract base class providing no-op defaults for all plugin methods.
16+
*
17+
* Plugin authors can extend this and override only what they need.
18+
*/
19+
abstract class AbstractPlugin implements TemporalPluginInterface
20+
{
21+
use ConnectionPluginTrait;
22+
use ClientPluginTrait;
23+
use ScheduleClientPluginTrait;
24+
use WorkerPluginTrait;
25+
26+
public function __construct(
27+
private readonly string $name,
28+
) {}
29+
30+
public function getName(): string
31+
{
32+
return $this->name;
33+
}
34+
}

0 commit comments

Comments
 (0)