内射老阿姨1区2区3区4区_久久精品人人做人人爽电影蜜月_久久国产精品亚洲77777_99精品又大又爽又粗少妇毛片

springbatch中基于RabbitMQ遠程分區(qū)Step是怎樣的

spring batch中基于RabbitMQ遠程分區(qū)Step是怎樣的,相信很多沒有經(jīng)驗的人對此束手無策,為此本文總結(jié)了問題出現(xiàn)的原因和解決方法,通過這篇文章希望你能解決這個問題。

十年的樂昌網(wǎng)站建設經(jīng)驗,針對設計、前端、開發(fā)、售后、文案、推廣等六對一服務,響應快,48小時及時工作處理。營銷型網(wǎng)站的優(yōu)勢是能夠根據(jù)用戶設備顯示端的尺寸不同,自動調(diào)整樂昌建站的顯示方式,使網(wǎng)站能夠適用不同顯示終端,在瀏覽器中調(diào)整網(wǎng)站的寬度,無論在任何一種瀏覽器上瀏覽網(wǎng)站,都能展現(xiàn)優(yōu)雅布局與設計,從而大程度地提升瀏覽體驗。成都創(chuàng)新互聯(lián)公司從事“樂昌網(wǎng)站設計”,“樂昌網(wǎng)站推廣”以來,每個客戶項目都認真落實執(zhí)行。

前言碎語

小編構建的實例可為主服務,從服務,主從混用等模式,可以大大提高spring batch在單機處理時的時效。

項目源碼:https://gitee.com/kailing/partitionjob

spring batch遠程分區(qū)Step的原理

master節(jié)點將數(shù)據(jù)根據(jù)相關邏輯(ID,hash),拆分成一段一段要處理的數(shù)據(jù)集,然后將數(shù)據(jù)集放到消息中間件中(ActiveMQ,RabbitMQ ),從節(jié)點監(jiān)聽到消息,獲取消息,讀取消息中的數(shù)據(jù)集處理并發(fā)回結(jié)果。如下圖:

spring batch中基于RabbitMQ遠程分區(qū)Step是怎樣的

下面按原理分步驟實施,完成springbatch的遠程分區(qū)實例

 第一步,首先引入相關依賴

見:https://gitee.com/kailing/partitionjob/blob/master/pom.xml

分區(qū)job主要依賴為:spring-batch-integration,提供了遠程通訊的能力

第二步,Master節(jié)點數(shù)據(jù)分發(fā)

@Profile({"master", "mixed"})
    @Bean
    public Job job(@Qualifier("masterStep") Step masterStep) {
        return jobBuilderFactory.get("endOfDayjob")
                .start(masterStep)
                .incrementer(new BatchIncrementer())
                .listener(new JobListener())
                .build();
    }

    @Bean("masterStep")
    public Step masterStep(@Qualifier("slaveStep") Step slaveStep,
                           PartitionHandler partitionHandler,
                           DataSource dataSource) {
        return stepBuilderFactory.get("masterStep")
                .partitioner(slaveStep.getName(), new ColumnRangePartitioner(dataSource))
                .step(slaveStep)
                .partitionHandler(partitionHandler)
                .build();
    }

master節(jié)點關鍵部分是,他的Step需要設置從節(jié)點Step的Name,和一個數(shù)據(jù)分區(qū)器,數(shù)據(jù)分區(qū)器需要實現(xiàn)Partitioner接口,它返回一個Map<String, ExecutionContext>的數(shù)據(jù)結(jié)構,這個結(jié)構完整的描述了每個從節(jié)點需要處理的分區(qū)片段。ExecutionContext保存了從節(jié)點要處理的數(shù)據(jù)邊界,當然,ExecutionContext里的參數(shù)是根據(jù)你的業(yè)務來的,我這里,已數(shù)據(jù)ID為邊界劃分了每個區(qū)。具體的Partitioner實現(xiàn)如下:

/**
 * Created by kl on 2018/3/1.
 * Content :根據(jù)數(shù)據(jù)ID分片
 */
public class ColumnRangePartitioner implements Partitioner {
    private JdbcOperations jdbcTemplate;
    ColumnRangePartitioner(DataSource dataSource){
        this.jdbcTemplate = new JdbcTemplate(dataSource);
    }
    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        int min = jdbcTemplate.queryForObject("SELECT MIN(arcid) from  kl_article", Integer.class);
        int max = jdbcTemplate.queryForObject("SELECT MAX(arcid) from  kl_article", Integer.class);
        int targetSize = (max - min) / gridSize + 1;
        Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>();
        int number = 0;
        int start = min;
        int end = start + targetSize - 1;

        while (start <= max) {
            ExecutionContext value = new ExecutionContext();
            result.put("partition" + number, value);

            if (end >= max) {
                end = max;
            }
            value.putInt("minValue", start);
            value.putInt("maxValue", end);
            start += targetSize;
            end += targetSize;
            number++;
        }
        return result;
    }
}

第三步,Integration配置

spring batch Integration提供了遠程分區(qū)通訊能力,Spring Integration擁有豐富的通道適配器(例如JMS和AMQP),基于ActiveMQ,RabbitMQ等中間件都可以實現(xiàn)遠程分區(qū)處理。本文使用RabbitMQ來做為通訊的中間件。關于RabbitMQ的安裝等不在本篇范圍,下面代碼描述了如何配置MQ連接,以及spring batch分區(qū)相關隊列,消息適配器等。

/**
 * Created by kl on 2018/3/1.
 * Content :遠程分區(qū)通訊
 */
@Configuration
@ConfigurationProperties(prefix = "spring.rabbit")
public class IntegrationConfiguration {
    private String host;
    private Integer port=5672;
    private String username;
    private String password;
    private String virtualHost;
    private int connRecvThreads=5;
    private int channelCacheSize=10;

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualHost);
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(connRecvThreads);
        executor.initialize();
        connectionFactory.setExecutor(executor);
        connectionFactory.setPublisherConfirms(true);
        connectionFactory.setChannelCacheSize(channelCacheSize);
        return connectionFactory;
    }
    @Bean
    public MessagingTemplate messageTemplate() {
        MessagingTemplate messagingTemplate = new MessagingTemplate(outboundRequests());
        messagingTemplate.setReceiveTimeout(60000000l);
        return messagingTemplate;
    }
    @Bean
    public DirectChannel outboundRequests() {
        return new DirectChannel();
    }
    @Bean
    @ServiceActivator(inputChannel = "outboundRequests")
    public AmqpOutboundEndpoint amqpOutboundEndpoint(AmqpTemplate template) {
        AmqpOutboundEndpoint endpoint = new AmqpOutboundEndpoint(template);
        endpoint.setExpectReply(true);
        endpoint.setOutputChannel(inboundRequests());
        endpoint.setRoutingKey("partition.requests");
        return endpoint;
    }
    @Bean
    public Queue requestQueue() {
        return new Queue("partition.requests", false);
    }

    @Bean
    @Profile({"slave","mixed"})
    public AmqpInboundChannelAdapter inbound(SimpleMessageListenerContainer listenerContainer) {
        AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer);
        adapter.setOutputChannel(inboundRequests());
        adapter.afterPropertiesSet();
        return adapter;
    }

    @Bean
    public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setQueueNames("partition.requests");
        container.setAutoStartup(false);
        return container;
    }

    @Bean
    public PollableChannel outboundStaging() {
        return new NullChannel();
    }

    @Bean
    public QueueChannel inboundRequests() {
        return new QueueChannel();
    }

第四步,從節(jié)點接收分區(qū)信息并處理

@Bean
    @Profile({"slave","mixed"})
    @ServiceActivator(inputChannel = "inboundRequests", outputChannel = "outboundStaging")
    public StepExecutionRequestHandler stepExecutionRequestHandler() {
        StepExecutionRequestHandler stepExecutionRequestHandler = new StepExecutionRequestHandler();
        BeanFactoryStepLocator stepLocator = new BeanFactoryStepLocator();
        stepLocator.setBeanFactory(this.applicationContext);
        stepExecutionRequestHandler.setStepLocator(stepLocator);
        stepExecutionRequestHandler.setJobExplorer(this.jobExplorer);
        return stepExecutionRequestHandler;
    }
    @Bean("slaveStep")
    public Step slaveStep(MyProcessorItem processorItem,
                          JpaPagingItemReader reader) {
        CompositeItemProcessor itemProcessor = new CompositeItemProcessor();
        List<ItemProcessor> processorList = new ArrayList<>();
        processorList.add(processorItem);
        itemProcessor.setDelegates(processorList);
        return stepBuilderFactory.get("slaveStep")
                .<Article, Article>chunk(1000)//事務提交批次
                .reader(reader)
                .processor(itemProcessor)
                .writer(new PrintWriterItem())
                .build();
    }

從節(jié)點最關鍵的地方在于StepExecutionRequestHandler,他會接收MQ消息中間件中的消息,并從分區(qū)信息中獲取到需要處理的數(shù)據(jù)邊界,如下ItemReader:

@Bean(destroyMethod = "")
    @StepScope
    public JpaPagingItemReader<Article> jpaPagingItemReader(
            @Value("#{stepExecutionContext['minValue']}") Long minValue,
            @Value("#{stepExecutionContext['maxValue']}") Long maxValue) {
        System.err.println("接收到分片參數(shù)["+minValue+"->"+maxValue+"]");
        JpaPagingItemReader<Article> reader = new JpaPagingItemReader<>();
        JpaNativeQueryProvider queryProvider = new JpaNativeQueryProvider<>();
        String sql = "select * from kl_article where  arcid >= :minValue and arcid <= :maxValue";
        queryProvider.setSqlQuery(sql);
        queryProvider.setEntityClass(Article.class);
        reader.setQueryProvider(queryProvider);
        Map queryParames= new HashMap();
        queryParames.put("minValue",minValue);
        queryParames.put("maxValue",maxValue);
        reader.setParameterValues(queryParames);
        reader.setEntityManagerFactory(entityManagerFactory);
        return  reader;
    }

中的minValuemin,maxValue,正是前文中Master節(jié)點分區(qū)中設置的值

如上,已經(jīng)完成了整個spring batch 遠程分區(qū)處理的實例,需要注意的是,一個實例,即可主可從可主從,是有spring profile來控制的,細心的人可能會發(fā)現(xiàn)@Profile({"master", "mixed"})等注解,所以如果你在測試的時候,別忘了在spring boot中配置好spring.profiles.active=slave等。

看完上述內(nèi)容,你們掌握spring batch中基于RabbitMQ遠程分區(qū)Step是怎樣的的方法了嗎?如果還想學到更多技能或想了解更多相關內(nèi)容,歡迎關注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝各位的閱讀!

網(wǎng)頁題目:springbatch中基于RabbitMQ遠程分區(qū)Step是怎樣的
分享鏈接:http://m.rwnh.cn/article32/gspcpc.html

成都網(wǎng)站建設公司_創(chuàng)新互聯(lián),為您提供微信公眾號微信小程序、軟件開發(fā)、網(wǎng)站設計電子商務、網(wǎng)站改版

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

手機網(wǎng)站建設
深州市| 集安市| 鄯善县| 潞西市| 仁怀市| 改则县| 沁阳市| 陇西县| 北京市| 塔河县| 夏邑县| 大石桥市| 巧家县| 临沧市| 梁河县| 屏南县| 轮台县| 遂平县| 合江县| 昌吉市| 邻水| 恩施市| 乌兰察布市| 友谊县| 九江市| 台州市| 景宁| 洱源县| 滁州市| 临朐县| 嘉义县| 苍南县| 宁都县| 吉林省| 闵行区| 白城市| 苏尼特左旗| 报价| 贵德县| 吕梁市| 慈利县|