当前位置:首页 » PHP技术

flume ExecSource 支持获取获取指定列数据

2018-04-15 13:41 本站整理 浏览(10)

需求描述:

    flume使用 execSource 类型 实现截取数据行中指定列的数据(详见下图)

    

 

实现:

1.方案一: execSource接受的是linux命令,所以可以使用linux awk实现这个功能

   命令:tail -F /root/test.log | awk -F ',' '{print $2;fflush()}'   注意:fflush()一定要加,否则不输出

  

    完整的flume-exec.properties文件内容如下:

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#  http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.


# The configuration file needs to define the sources, 
# the channels and the sinks.
# Sources, channels and sinks are defined per agent, 
# in this case called 'agent'


hd1.sources=s1
hd1.sources.s1.type=exec
hd1.sources.s1.shell=/bin/bash -c
hd1.sources.s1.command=tail -F /root/test.log | awk -F ',' '{print $2;fflush()}'

hd1.channels=c1
hd1.channels.c1.type=memory
hd1.channels.c1.capacity=1000
hd1.channels.c1.transcationCapacity=100

hd1.sinks=sk1
hd1.sinks.sk1.type=logger

#把source 和 sink 关联到channel上
#1个source可以对应多个channel(重点)
hd1.sources.s1.channels=c1

#一个sink只对应1个sink(重点)
hd1.sinks.sk1.channel=c1

 

2.方案二:修改源码,扩展ExecSource

(1) 具体怎么改?

看ExecSource.java 的源码,可以知道 ExecSource是通过BufferedReader,读取InputStream,然后把读取出来的每行内容包装成event,发往channel,所以我们可以在包装成event之前,把内容替换成我们需要的


 

(2)具体实现:

    修改ExecSource.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.flume.source;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.flume.*;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.instrumentation.SourceCounter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

/**
 * <p>
 * A {@link Source} implementation that executes a Unix process and turns each
 * line of text into an event.
 * </p>
 * <p>
 * This source runs a given Unix command on start up and expects that process to
 * continuously produce data on standard out (stderr ignored by default). Unless
 * told to restart, if the process exits for any reason, the source also exits and
 * will produce no further data. This means configurations such as <tt>cat [named pipe]</tt>
 * or <tt>tail -F [file]</tt> are going to produce the desired results where as
 * <tt>date</tt> will probably not - the former two commands produce streams of
 * data where as the latter produces a single event and exits.
 * </p>
 * <p>
 * The <tt>ExecSource</tt> is meant for situations where one must integrate with
 * existing systems without modifying code. It is a compatibility gateway built
 * to allow simple, stop-gap integration and doesn't necessarily offer all of
 * the benefits or guarantees of native integration with Flume. If one has the
 * option of using the <tt>AvroSource</tt>, for instance, that would be greatly
 * preferred to this source as it (and similarly implemented sources) can
 * maintain the transactional guarantees that exec can not.
 * </p>
 * <p>
 * <i>Why doesn't <tt>ExecSource</tt> offer transactional guarantees?</i>
 * </p>
 * <p>
 * The problem with <tt>ExecSource</tt> and other asynchronous sources is that
 * the source can not guarantee that if there is a failure to put the event into
 * the {@link Channel} the client knows about it. As a for instance, one of the
 * most commonly requested features is the <tt>tail -F [file]</tt>-like use case
 * where an application writes to a log file on disk and Flume tails the file,
 * sending each line as an event. While this is possible, there's an obvious
 * problem; what happens if the channel fills up and Flume can't send an event?
 * Flume has no way of indicating to the application writing the log file that
 * it needs to retain the log or that the event hasn't been sent, for some
 * reason. If this doesn't make sense, you need only know this: <b>Your
 * application can never guarantee data has been received when using a
 * unidirectional asynchronous interface such as ExecSource!</b> As an extension
 * of this warning - and to be completely clear - there is absolutely zero
 * guarantee of event delivery when using this source. You have been warned.
 * </p>
 * <p>
 * <b>Configuration options</b>
 * </p>
 * <table>
 * <tr>
 * <th>Parameter</th>
 * <th>Description</th>
 * <th>Unit / Type</th>
 * <th>Default</th>
 * </tr>
 * <tr>
 * <td><tt>command</tt></td>
 * <td>The command to execute</td>
 * <td>String</td>
 * <td>none (required)</td>
 * </tr>
 * <tr>
 * <td><tt>restart</tt></td>
 * <td>Whether to restart the command when it exits</td>
 * <td>Boolean</td>
 * <td>false</td>
 * </tr>
 * <tr>
 * <td><tt>restartThrottle</tt></td>
 * <td>How long in milliseconds to wait before restarting the command</td>
 * <td>Long</td>
 * <td>10000</td>
 * </tr>
 * <tr>
 * <td><tt>logStderr</tt></td>
 * <td>Whether to log or discard the standard error stream of the command</td>
 * <td>Boolean</td>
 * <td>false</td>
 * </tr>
 * <tr>
 * <td><tt>batchSize</tt></td>
 * <td>The number of events to commit to channel at a time.</td>
 * <td>integer</td>
 * <td>20</td>
 * </tr>
 * <tr>
 * <td><tt>batchTimeout</tt></td>
 * <td>Amount of time (in milliseconds) to wait, if the buffer size was not reached,
 * before data is pushed downstream.</td>
 * <td>long</td>
 * <td>3000</td>
 * </tr>
 * </table>
 * <p>
 * <b>Metrics</b>
 * </p>
 * <p>
 * TODO
 * </p>
 */
public class ExecSource extends AbstractSource implements EventDrivenSource, Configurable {

    private static final Logger logger = LoggerFactory.getLogger(ExecSource.class);

    private String shell;

    private String command;

    private SourceCounter sourceCounter;

    private ExecutorService executor;

    private Future<?> runnerFuture;

    private long restartThrottle;

    private boolean restart;

    private boolean logStderr;

    private Integer bufferCount;

    private long batchTimeout;

    private ExecRunnable runner;

    private Charset charset;

    //开关,是否做split
    private boolean customSplitSwitchOn;

    //split的分隔符
    private String customSplitDelimiter;

    //split后获取的列
    private Integer customFetchColId;

    @Override
    public void start() {
        logger.info("Exec source starting with command: {}", command);

        // Start the counter before starting any threads that may access it.
        sourceCounter.start();

        executor = Executors.newSingleThreadExecutor();

        //把自定义的三个参数,传给构造函数
        runner = new ExecRunnable(shell, command, getChannelProcessor(), sourceCounter, restart,
                restartThrottle, logStderr, bufferCount, batchTimeout, charset,
                customSplitSwitchOn, customSplitDelimiter, customFetchColId);

        // Start the runner thread.
        runnerFuture = executor.submit(runner);

        // Mark the Source as RUNNING.
        super.start();

        logger.debug("Exec source started");
    }

    @Override
    public void stop() {
        logger.info("Stopping exec source with command: {}", command);
        if (runner != null) {
            runner.setRestart(false);
            runner.kill();
        }

        if (runnerFuture != null) {
            logger.debug("Stopping exec runner");
            runnerFuture.cancel(true);
            logger.debug("Exec runner stopped");
        }
        executor.shutdown();

        while (!executor.isTerminated()) {
            logger.debug("Waiting for exec executor service to stop");
            try {
                executor.awaitTermination(500, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                logger.debug("Interrupted while waiting for exec executor service "
                        + "to stop. Just exiting.");
                Thread.currentThread().interrupt();
            }
        }

        sourceCounter.stop();
        super.stop();

        logger.debug("Exec source with command:{} stopped. Metrics:{}", command,
                sourceCounter);
    }

    @Override
    public void configure(Context context) {
        command = context.getString("command");

        Preconditions.checkState(command != null,
                "The parameter command must be specified");

        restartThrottle = context.getLong(ExecSourceConfigurationConstants.CONFIG_RESTART_THROTTLE,
                ExecSourceConfigurationConstants.DEFAULT_RESTART_THROTTLE);

        restart = context.getBoolean(ExecSourceConfigurationConstants.CONFIG_RESTART,
                ExecSourceConfigurationConstants.DEFAULT_RESTART);

        logStderr = context.getBoolean(ExecSourceConfigurationConstants.CONFIG_LOG_STDERR,
                ExecSourceConfigurationConstants.DEFAULT_LOG_STDERR);

        bufferCount = context.getInteger(ExecSourceConfigurationConstants.CONFIG_BATCH_SIZE,
                ExecSourceConfigurationConstants.DEFAULT_BATCH_SIZE);

        batchTimeout = context.getLong(ExecSourceConfigurationConstants.CONFIG_BATCH_TIME_OUT,
                ExecSourceConfigurationConstants.DEFAULT_BATCH_TIME_OUT);

        charset = Charset.forName(context.getString(ExecSourceConfigurationConstants.CHARSET,
                ExecSourceConfigurationConstants.DEFAULT_CHARSET));

        shell = context.getString(ExecSourceConfigurationConstants.CONFIG_SHELL, null);


        // 获取split开关配置值
        customSplitSwitchOn = context
                .getBoolean(ExecSourceConfigurationConstants.CUSTOM_SPLIT_SWITCH_ON,
                        ExecSourceConfigurationConstants.DEFAULT_CUSTON_SWITCH_ON);
        //获取分隔符配置值
        customSplitDelimiter = context
                .getString(ExecSourceConfigurationConstants.CUSTOM_SPLIT_DELIMITER,
                        ExecSourceConfigurationConstants.DEFAULT_CUSTOM_SPLIT_DELIMITER);

        //获取split后的列
        customFetchColId = context.getInteger(ExecSourceConfigurationConstants.CUSTOM_FETCH_COL,
                ExecSourceConfigurationConstants.DEFAULT_CUSTOM_FETCH_COL_ID);

        if (sourceCounter == null) {
            sourceCounter = new SourceCounter(getName());
        }
    }

    private static class ExecRunnable implements Runnable {

        //构造函数加入三个参数
        public ExecRunnable(String shell, String command, ChannelProcessor channelProcessor,
                SourceCounter sourceCounter, boolean restart, long restartThrottle,
                boolean logStderr, int bufferCount, long batchTimeout, Charset charset,
                boolean customSplitSwitchOn, String customSplitDelimiter,
                Integer customFetchColId) {
            this.command = command;
            this.channelProcessor = channelProcessor;
            this.sourceCounter = sourceCounter;
            this.restartThrottle = restartThrottle;
            this.bufferCount = bufferCount;
            this.batchTimeout = batchTimeout;
            this.restart = restart;
            this.logStderr = logStderr;
            this.charset = charset;
            this.shell = shell;

            //custom属性
            this.customSplitSwitchOn = customSplitSwitchOn;
            this.customSplitDelimiter = customSplitDelimiter;
            this.customFetchColId = customFetchColId;
        }

        private final String shell;

        private final String command;

        private final ChannelProcessor channelProcessor;

        private final SourceCounter sourceCounter;

        private volatile boolean restart;

        private final long restartThrottle;

        private final int bufferCount;

        private long batchTimeout;

        private final boolean logStderr;

        private final Charset charset;

        //split的分隔符
        private String customSplitDelimiter;

        //开关(是否允许做split)
        private boolean customSplitSwitchOn;

        //split后需要获取的列id
        private int customFetchColId;

        private Process process = null;

        private SystemClock systemClock = new SystemClock();

        private Long lastPushToChannel = systemClock.currentTimeMillis();

        ScheduledExecutorService timedFlushService;

        ScheduledFuture<?> future;

        @Override
        public void run() {
            do {
                String exitCode = "unknown";
                BufferedReader reader = null;
                String line = null;
                final List<Event> eventList = new ArrayList<Event>();

                timedFlushService = Executors.newSingleThreadScheduledExecutor(
                        new ThreadFactoryBuilder().setNameFormat(
                                "timedFlushExecService" +
                                        Thread.currentThread().getId() + "-%d").build());
                try {
                    if (shell != null) {
                        String[] commandArgs = formulateShellCommand(shell, command);
                        process = Runtime.getRuntime().exec(commandArgs);
                    } else {
                        String[] commandArgs = command.split("\\s+");
                        process = new ProcessBuilder(commandArgs).start();
                    }
                    reader = new BufferedReader(
                            new InputStreamReader(process.getInputStream(), charset));

                    // StderrLogger dies as soon as the input stream is invalid
                    StderrReader stderrReader = new StderrReader(new BufferedReader(
                            new InputStreamReader(process.getErrorStream(), charset)), logStderr);
                    stderrReader.setName("StderrReader-[" + command + "]");
                    stderrReader.setDaemon(true);
                    stderrReader.start();

                    future = timedFlushService.scheduleWithFixedDelay(new Runnable() {
                                                                          @Override
                                                                          public void run() {
                                                                              try {
                                                                                  synchronized (eventList) {
                                                                                      if (!eventList.isEmpty() && timeout()) {
                                                                                          flushEventBatch(eventList);
                                                                                      }
                                                                                  }
                                                                              } catch (Exception e) {
                                                                                  logger.error("Exception occurred when processing event batch", e);
                                                                                  if (e instanceof InterruptedException) {
                                                                                      Thread.currentThread().interrupt();
                                                                                  }
                                                                              }
                                                                          }
                                                                      },
                            batchTimeout, batchTimeout, TimeUnit.MILLISECONDS);

                    String splits[];
                    while ((line = reader.readLine()) != null) {
                        sourceCounter.incrementEventReceivedCount();
                        synchronized (eventList) {
                            //如果开启了split开关,那么将根据指定的分割符做split,并返回指定列的内容
                            if (customSplitSwitchOn) {
                                try {
                                    splits = line.split(customSplitDelimiter);
                                    if (splits.length > customFetchColId) {
                                        line = splits[customFetchColId];
                                    } else {
                                        logger.error("customColId is larger than " + splits.length);
                                        continue;
                                    }
                                } catch (Exception e) {
                                    logger.error("Failed while split line: ", e);
                                    continue;
                                }
                            }

                            eventList.add(EventBuilder.withBody(line.getBytes(charset)));
                            if (eventList.size() >= bufferCount || timeout()) {
                                flushEventBatch(eventList);
                            }
                        }
                    }

                    synchronized (eventList) {
                        if (!eventList.isEmpty()) {
                            flushEventBatch(eventList);
                        }
                    }
                } catch (Exception e) {
                    logger.error("Failed while running command: " + command, e);
                    if (e instanceof InterruptedException) {
                        Thread.currentThread().interrupt();
                    }
                } finally {
                    if (reader != null) {
                        try {
                            reader.close();
                        } catch (IOException ex) {
                            logger.error("Failed to close reader for exec source", ex);
                        }
                    }
                    exitCode = String.valueOf(kill());
                }
                if (restart) {
                    logger.info("Restarting in {}ms, exit code {}", restartThrottle,
                            exitCode);
                    try {
                        Thread.sleep(restartThrottle);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                } else {
                    logger.info("Command [" + command + "] exited with " + exitCode);
                }
            } while (restart);
        }

        private void flushEventBatch(List<Event> eventList) {
            channelProcessor.processEventBatch(eventList);
            sourceCounter.addToEventAcceptedCount(eventList.size());
            eventList.clear();
            lastPushToChannel = systemClock.currentTimeMillis();
        }

        private boolean timeout() {
            return (systemClock.currentTimeMillis() - lastPushToChannel) >= batchTimeout;
        }

        private static String[] formulateShellCommand(String shell, String command) {
            String[] shellArgs = shell.split("\\s+");
            String[] result = new String[shellArgs.length + 1];
            System.arraycopy(shellArgs, 0, result, 0, shellArgs.length);
            result[shellArgs.length] = command;
            return result;
        }

        public int kill() {
            if (process != null) {
                synchronized (process) {
                    process.destroy();

                    try {
                        int exitValue = process.waitFor();

                        // Stop the Thread that flushes periodically
                        if (future != null) {
                            future.cancel(true);
                        }

                        if (timedFlushService != null) {
                            timedFlushService.shutdown();
                            while (!timedFlushService.isTerminated()) {
                                try {
                                    timedFlushService.awaitTermination(500, TimeUnit.MILLISECONDS);
                                } catch (InterruptedException e) {
                                    logger.debug(
                                            "Interrupted while waiting for exec executor service "
                                                    + "to stop. Just exiting.");
                                    Thread.currentThread().interrupt();
                                }
                            }
                        }
                        return exitValue;
                    } catch (InterruptedException ex) {
                        Thread.currentThread().interrupt();
                    }
                }
                return Integer.MIN_VALUE;
            }
            return Integer.MIN_VALUE / 2;
        }

        public void setRestart(boolean restart) {
            this.restart = restart;
        }
    }

    private static class StderrReader extends Thread {
        private BufferedReader input;

        private boolean logStderr;

        protected StderrReader(BufferedReader input, boolean logStderr) {
            this.input = input;
            this.logStderr = logStderr;
        }

        @Override
        public void run() {
            try {
                int i = 0;
                String line = null;
                while ((line = input.readLine()) != null) {
                    if (logStderr) {
                        // There is no need to read 'line' with a charset
                        // as we do not to propagate it.
                        // It is in UTF-16 and would be printed in UTF-8 format.
                        logger.info("StderrLogger[{}] = '{}'", ++i, line);
                    }
                }
            } catch (IOException e) {
                logger.info("StderrLogger exiting", e);
            } finally {
                try {
                    if (input != null) {
                        input.close();
                    }
                } catch (IOException ex) {
                    logger.error("Failed to close stderr reader for exec source", ex);
                }
            }
        }
    }
}

 

 

   修改ExecSourceConfigurationConstants

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.flume.source;

import com.google.common.annotations.VisibleForTesting;

public class ExecSourceConfigurationConstants {

    /**
     * Should the exec'ed command restarted if it dies: : default false
     */
    public static final String CONFIG_RESTART = "restart";

    public static final boolean DEFAULT_RESTART = false;

    /**
     * Amount of time to wait before attempting a restart: : default 10000 ms
     */
    public static final String CONFIG_RESTART_THROTTLE = "restartThrottle";

    public static final long DEFAULT_RESTART_THROTTLE = 10000L;

    /**
     * Should stderr from the command be logged: default false
     */
    public static final String CONFIG_LOG_STDERR = "logStdErr";

    public static final boolean DEFAULT_LOG_STDERR = false;

    /**
     * Number of lines to read at a time
     */
    public static final String CONFIG_BATCH_SIZE = "batchSize";

    public static final int DEFAULT_BATCH_SIZE = 20;

    /**
     * Amount of time to wait, if the buffer size was not reached, before
     * to data is pushed downstream: : default 3000 ms
     */
    public static final String CONFIG_BATCH_TIME_OUT = "batchTimeout";

    public static final long DEFAULT_BATCH_TIME_OUT = 3000L;

    /**
     * Charset for reading input
     */
    public static final String CHARSET = "charset";

    public static final String DEFAULT_CHARSET = "UTF-8";

    /**
     * Optional shell/command processor used to run command
     */
    public static final String CONFIG_SHELL = "shell";

    /**
     * 自定义分隔符,默认使用逗号分割
     */
    public static final String CUSTOM_SPLIT_DELIMITER = "customSplitDelimiter";

    public static final String DEFAULT_CUSTOM_SPLIT_DELIMITER = ",";

    /**
     * split的开关,默认关闭
     */
    public static final String CUSTOM_SPLIT_SWITCH_ON = "customSwitchOn";

    public static final boolean DEFAULT_CUSTON_SWITCH_ON = false;

    /**
     * split后获取哪一列,从0开始,同数组下标
     */
    public static final String CUSTOM_FETCH_COL = "customFetchCol";

    public static final int DEFAULT_CUSTOM_FETCH_COL_ID = 0;
}

   

 

    修改flume-custom.properties

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#  http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.


# The configuration file needs to define the sources, 
# the channels and the sinks.
# Sources, channels and sinks are defined per agent, 
# in this case called 'agent'


hd1.sources=s1
hd1.sources.s1.type=exec
hd1.sources.s1.shell=/bin/bash -c

#方法一
#hd1.sources.s1.command=tail -F /root/test.log | awk -F ',' '{print $2;fflush()}'

#方法二,改源码
hd1.sources.s1.command=tail -F /root/test.log
hd1.sources.s1.customSwitchOn=true
hd1.sources.s1.customFetchCol=1

hd1.channels=c1
hd1.channels.c1.type=memory
hd1.channels.c1.capacity=1000
hd1.channels.c1.transcationCapacity=100

hd1.sinks=sk1
hd1.sinks.sk1.type=logger

#把source 和 sink 关联到channel上
#1个source可以对应多个channel(重点)
hd1.sources.s1.channels=c1

#一个sink只对应1个sink(重点)
hd1.sinks.sk1.channel=c1

 

    做完以上修改 ,重新打包 flume-ng-core模块,把打完的新包,替换掉服务器的apache-flume-1.8.0-bin/lib目录下的flume-ng-core-1.8.0.jar (这个包的版本号,根据你实际的版本去替换)即可,使用上面修改好的 flume-custom.properties启动flume测试即可


 

 

思考:

  • linux的一些常用命令还是要好好掌握,遇到问题慢慢分析,例如:fflush()
  • 不要局限在解决问题,要发散,多扩展思路
  • 源码其实并没那么难改,有时候限制我们的,可能是我们自己的耐心,细心等,例如:源码编译过程比较耗时,各种找不到包等,很可能让你望而却步了
  • 多看官网,多查阅,多和大牛交流