我正在尝试使用Spring集成实现一个方案,该方案必须处理动态生成的Redis队列。 到目前为止,我在Internet上找到的示例适用于预定义队列。

在我的情况下,一个应用程序在运行中生成了100多个Redis队列,我的代码将使用这些队列中的消息。 我设法创建了一个POC类型的项目( github链接 ),它正在工作。

我想知道是否有更好的方法可以达到相同目的。 据我所知,企业集成模式没有说明如何使用多个动态队列或消息源中的消息,除了自定义或更改现有框架源代码以外,还有没有其他可用的解决方案?

我更喜欢使用Spring Integration Java配置和基于XML的DSL。

===============>>#1 票数:1

请参见动态和运行时集成流程

为了简化开发经验,Spring Integration引入了IntegrationFlowContext来在运行时注册和管理IntegrationFlow实例,如以下示例所示:

@Autowired private AbstractServerConnectionFactory server1;  @Autowired private IntegrationFlowContext flowContext;  ...  @Test public void testTcpGateways() {     TestingUtilities.waitListening(this.server1, null);      IntegrationFlow flow = f -> f             .handle(Tcp.outboundGateway(Tcp.netClient("localhost", this.server1.getPort())                     .serializer(TcpCodecs.crlf())                     .deserializer(TcpCodecs.lengthHeader1())                     .id("client1"))                 .remoteTimeout(m -> 5000))             .transform(Transformers.objectToString());      IntegrationFlowRegistration theFlow = this.flowContext.registration(flow).register();     assertThat(theFlow.getMessagingTemplate().convertSendAndReceive("foo", String.class), equalTo("FOO")); } 

当我们有多个配置选项并且必须创建多个类似流程的实例时,这很有用。 为此,我们可以迭代选项并在循环内创建和注册IntegrationFlow实例。 另一个变体是当我们的数据源不基于Spring时,我们必须动态创建它。 ...

编辑

@SpringBootApplication public class So59117728Application {      public static void main(String[] args) {         SpringApplication.run(So59117728Application.class, args).close();     }      @Bean     public ApplicationRunner runner(RedisConnectionFactory cf, IntegrationFlowContext context,             RedisTemplate<String, String> template) {          return args -> {             IntegrationFlow flow = IntegrationFlows                     .from(redisEndpoint("So59117728Application", cf))                     .handle(System.out::println)                     .get();             context.registration(flow).id("myDynamicFlow").register();             template.boundListOps("So59117728Application").leftPush("foo");              Thread.sleep(10_000);             context.remove("myDynamicFlow");         };     }      private RedisQueueMessageDrivenEndpoint redisEndpoint(String queueName, RedisConnectionFactory cf) {         RedisQueueMessageDrivenEndpoint endpoint = new RedisQueueMessageDrivenEndpoint(queueName, cf);         endpoint.setSerializer(new StringRedisSerializer());         return endpoint;     }  } 

  ask by Mu Pi translate from so

本文未有回复,本站智能推荐: